From 6e71a54b1efe1ddb1805c6cc49e3f91492622a81 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Mon, 19 Mar 2012 19:41:20 -0500 Subject: [PATCH] Fixed issues #337, #341, and #340 * Implemented new ctx API (_new, _destroy, _get, _set) * Removed 'typesafe' macros from zmq.h * Added support for MAX_SOCKETS (was tied into change for #337) * Created new man pages --- doc/Makefile.am | 6 +- doc/zmq.txt | 18 +++- doc/zmq_ctx_destroy.txt | 66 ++++++++++++ doc/zmq_ctx_get.txt | 67 ++++++++++++ doc/zmq_ctx_new.txt | 49 +++++++++ doc/zmq_ctx_set.txt | 75 +++++++++++++ doc/zmq_init.txt | 1 + doc/zmq_term.txt | 1 + include/zmq.h | 89 +++++++--------- src/ctx.cpp | 191 ++++++++++++++++++++++----------- src/ctx.hpp | 29 ++++- src/options.cpp | 5 +- src/options.hpp | 3 + src/pair.cpp | 4 +- src/pair.hpp | 2 +- src/pub.cpp | 6 +- src/pub.hpp | 4 +- src/pull.cpp | 4 +- src/pull.hpp | 2 +- src/push.cpp | 4 +- src/push.hpp | 2 +- src/rep.cpp | 6 +- src/rep.hpp | 2 +- src/req.cpp | 4 +- src/req.hpp | 4 +- src/socket_base.cpp | 27 ++--- src/socket_base.hpp | 6 +- src/sub.cpp | 6 +- src/sub.hpp | 4 +- src/xpub.cpp | 4 +- src/xpub.hpp | 2 +- src/xrep.cpp | 4 +- src/xrep.hpp | 2 +- src/xreq.cpp | 4 +- src/xreq.hpp | 2 +- src/xsub.cpp | 4 +- src/xsub.hpp | 2 +- src/zmq.cpp | 58 +++++++--- tests/test_last_endpoint | 225 +++++++++++++++++++++++++++++++++++++++ 39 files changed, 798 insertions(+), 196 deletions(-) create mode 100644 doc/zmq_ctx_destroy.txt create mode 100644 doc/zmq_ctx_get.txt create mode 100644 doc/zmq_ctx_new.txt create mode 100644 doc/zmq_ctx_set.txt create mode 100755 tests/test_last_endpoint diff --git a/doc/Makefile.am b/doc/Makefile.am index 2f68edf5..eba0f998 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,9 +1,11 @@ -MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 zmq_init.3 \ +MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \ + zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \ + zmq_init.3 zmq_term.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_send.3 zmq_msg_recv.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ - zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ + zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 diff --git a/doc/zmq.txt b/doc/zmq.txt index f5eda96d..4dfe0d1f 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -30,9 +30,21 @@ provided by the 0MQ library. Context ~~~~~~~ -Before using any 0MQ library functions the caller must initialise a 0MQ -'context' using _zmq_init()_. The following functions are provided to handle -initialisation and termination of a 'context': +Before using any 0MQ library functions you must create a 0MQ 'context'. When +you exit your application you must destroy the 'context'. These functions let +you work with 'contexts': + +Create a new 0MQ context:: + linkzmq:zmq_ctx_new[3] + +Work with context properties:: + linkzmq:zmq_ctx_set[3] + linkzmq:zmq_ctx_get[3] + +Destroy a 0MQ context:: + linkzmq:zmq_ctx_destroy[3] + +These deprecated functions let you create and destroy 'contexts': Initialise 0MQ context:: linkzmq:zmq_init[3] diff --git a/doc/zmq_ctx_destroy.txt b/doc/zmq_ctx_destroy.txt new file mode 100644 index 00000000..640b6b8a --- /dev/null +++ b/doc/zmq_ctx_destroy.txt @@ -0,0 +1,66 @@ +zmq_ctx_destroy(3) +================== + + +NAME +---- +zmq_ctx_destroy - destroy a 0MQ context + + +SYNOPSIS +-------- +*int zmq_ctx_destroy (void '*context');* + + +DESCRIPTION +----------- +The _zmq_ctx_destroy()_ function shall destroy the 0MQ context 'context'. + +Context termination is performed in the following steps: + +1. Any blocking operations currently in progress on sockets open within + 'context' shall return immediately with an error code of ETERM. With the + exception of _zmq_close()_, any further operations on sockets open within + 'context' shall fail with an error code of ETERM. + +2. After interrupting all blocking calls, _zmq_ctx_destroy()_ shall _block_ until the + following conditions are satisfied: + + * All sockets open within 'context' have been closed with _zmq_close()_. + + * For each socket within 'context', all messages sent by the application + with _zmq_send()_ have either been physically transferred to a network + peer, or the socket's linger period set with the _ZMQ_LINGER_ socket + option has expired. + +For further details regarding socket linger behavior refer to the _ZMQ_LINGER_ +option in linkzmq:zmq_setsockopt[3]. + +This function replaces the deprecated function linkzmq:zmq_term[3]. + + +RETURN VALUE +------------ +The _zmq_ctx_destroy()_ function shall return zero if successful. Otherwise +it shall return `-1` and set 'errno' to one of the values defined below. + + +ERRORS +------ +*EFAULT*:: +The provided 'context' was invalid. +*EINTR*:: +Termination was interrupted by a signal. It can be restarted if needed. + + +SEE ALSO +-------- +linkzmq:zmq[7] +linkzmq:zmq_init[3] +linkzmq:zmq_close[3] +linkzmq:zmq_setsockopt[3] + + +AUTHORS +------- +This 0MQ manual page was written by Pieter Hintjens diff --git a/doc/zmq_ctx_get.txt b/doc/zmq_ctx_get.txt new file mode 100644 index 00000000..a6b3614b --- /dev/null +++ b/doc/zmq_ctx_get.txt @@ -0,0 +1,67 @@ +zmq_ctx_get(3) +============== + + +NAME +---- + +zmq_ctx_get - get context options + + +SYNOPSIS +-------- +*int zmq_ctx_get (void '*context', int 'option_name');* + + +DESCRIPTION +----------- +The _zmq_ctx_get()_ function shall return the option specified by the +'option_name' argument. + +The _zmq_ctx_get()_ function accepts the following option names: + + +ZMQ_IO_THREADS: Get number of I/O threads +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_IO_THREADS' argument returns the size of the 0MQ thread pool +for this context. + +ZMQ_MAX_SOCKETS: Set maximum number of sockets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_MAX_SOCKETS' argument returns the maximum number of sockets +allowed for this context. + + +RETURN VALUE +------------ +The _zmq_ctx_get()_ function returns a value of 0 or greater if successful. +Otherwise it returns `-1` and sets 'errno' to one of the values defined +below. + + +ERRORS +------ +*EINVAL*:: +The requested option _option_name_ is unknown. + + +EXAMPLE +------- +.Setting a limit on the number of sockets +---- +void *context = zmq_ctx_new (); +zmq_ctx_get (context, ZMQ_MAX_SOCKETS, 256); +int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS); +assert (max_sockets == 256); +---- + + +SEE ALSO +-------- +linkzmq:zmq_ctx_set[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This 0MQ manual page was written by Pieter Hintjens diff --git a/doc/zmq_ctx_new.txt b/doc/zmq_ctx_new.txt new file mode 100644 index 00000000..38cb05ef --- /dev/null +++ b/doc/zmq_ctx_new.txt @@ -0,0 +1,49 @@ +zmq_ctx_new(3) +============== + + +NAME +---- +zmq_ctx_new - create new 0MQ context + + +SYNOPSIS +-------- +*void *zmq_ctx_new ();* + + +DESCRIPTION +----------- +The _zmq_ctx_new()_ function creates a new 0MQ 'context'. + +This function replaces the deprecated function linkzmq:zmq_init[3]. + +.Thread safety +A 0MQ 'context' is thread safe and may be shared among as many application +threads as necessary, without any additional locking required on the part of +the caller. + + +RETURN VALUE +------------ +The _zmq_ctx_new()_ function shall return an opaque handle to the newly created +'context' if successful. Otherwise it shall return NULL and set 'errno' to one +of the values defined below. + + +ERRORS +------ +No error values are defined for this function. + + +SEE ALSO +-------- +linkzmq:zmq[7] +linkzmq:zmq_ctx_put[3] +linkzmq:zmq_ctx_get[3] +linkzmq:zmq_ctx_destroy[3] + + +AUTHORS +------- +This 0MQ manual page was written by Pieter Hintjens diff --git a/doc/zmq_ctx_set.txt b/doc/zmq_ctx_set.txt new file mode 100644 index 00000000..88fcc1e0 --- /dev/null +++ b/doc/zmq_ctx_set.txt @@ -0,0 +1,75 @@ +zmq_ctx_set(3) +============== + + +NAME +---- + +zmq_ctx_set - set context options + + +SYNOPSIS +-------- +*int zmq_ctx_set (void '*context', int 'option_name', int 'option_value');* + + +DESCRIPTION +----------- +The _zmq_ctx_set()_ function shall set the option specified by the +'option_name' argument to the value of the 'option_value' argument. + +The _zmq_ctx_set()_ function accepts the following options: + + +ZMQ_IO_THREADS: Set number of I/O threads +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to +handle I/O operations. If your application is using only the 'inproc' +transport for messaging you may set this to zero, otherwise set it to at +least one. This option only applies before creating any sockets on the +context. + +[horizontal] +Default value:: 1 + +ZMQ_MAX_SOCKETS: Set maximum number of sockets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed +on the context. + +[horizontal] +Default value:: 1024 + + +RETURN VALUE +------------ +The _zmq_ctx_set()_ function returns zero if successful. Otherwise it +returns `-1` and sets 'errno' to one of the values defined below. + + +ERRORS +------ +*EINVAL*:: +The requested option _option_name_ is unknown. + + +EXAMPLE +------- +.Setting a limit on the number of sockets +---- +void *context = zmq_ctx_new (); +zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256); +int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS); +assert (max_sockets == 256); +---- + + +SEE ALSO +-------- +linkzmq:zmq_ctx_get[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This 0MQ manual page was written by Pieter Hintjens diff --git a/doc/zmq_init.txt b/doc/zmq_init.txt index eadf65d0..94795bf7 100644 --- a/doc/zmq_init.txt +++ b/doc/zmq_init.txt @@ -25,6 +25,7 @@ A 0MQ 'context' is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of the caller. +This function is deprecated by linkzmq:zmq_ctx_new[3]. RETURN VALUE ------------ diff --git a/doc/zmq_term.txt b/doc/zmq_term.txt index df424ab0..f3604ce5 100644 --- a/doc/zmq_term.txt +++ b/doc/zmq_term.txt @@ -36,6 +36,7 @@ Context termination is performed in the following steps: For further details regarding socket linger behaviour refer to the _ZMQ_LINGER_ option in linkzmq:zmq_setsockopt[3]. +This function is deprecated by linkzmq:zmq_ctx_destroy[3]. RETURN VALUE ------------ diff --git a/include/zmq.h b/include/zmq.h index fc6e4f3e..40d37752 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -67,33 +67,6 @@ extern "C" { #define ZMQ_VERSION \ ZMQ_MAKE_VERSION(ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH) -/* ensure one of ZMQ_TYPE_SAFE/UNSAFE is defined */ -/* Choose default based on version */ - -/* Uncomment to test */ -/* #define ZMQ_EMULATE_TYPE_SAFE */ - -#if !defined(ZMQ_TYPE_SAFE) && !defined(ZMQ_TYPE_UNSAFE) -# if ZMQ_VERSION_MAJOR <= 3 -# if defined ZMQ_EMULATE_TYPE_SAFE -# else -# define ZMQ_TYPE_UNSAFE -# endif -# else -# define ZMQ_TYPE_SAFE -# endif -#elif defined(ZMQ_TYPE_SAFE) && defined(ZMQ_TYPE_UNSAFE) -# error "BOTH ZMQ_TYPE_SAFE and ZMQ_TYPE_UNSAFE are defined!" -#endif - -#ifdef ZMQ_TYPE_UNSAFE -typedef void *zmq_socket_t; -typedef void *zmq_ctx_t; -#else -typedef struct zmq_socket_t { void *data; } zmq_socket_t; -typedef struct zmq_ctx_t { void *data; } zmq_ctx_t; -#endif - /* Run-time API version detection */ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); @@ -152,6 +125,29 @@ ZMQ_EXPORT int zmq_errno (void); /* Resolves system errors and 0MQ errors to human-readable string. */ ZMQ_EXPORT const char *zmq_strerror (int errnum); +/******************************************************************************/ +/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */ +/******************************************************************************/ + +/* New API */ +// Context options +#define ZMQ_IO_THREADS 1 +#define ZMQ_MAX_SOCKETS 2 + +// Default for new contexts +#define ZMQ_IO_THREADS_DFLT 1 +#define ZMQ_MAX_SOCKETS_DFLT 1024 + +ZMQ_EXPORT void *zmq_ctx_new (void); +ZMQ_EXPORT int zmq_ctx_destroy (void *context); +ZMQ_EXPORT int zmq_ctx_set (void *context, int option, int optval); +ZMQ_EXPORT int zmq_ctx_get (void *context, int option); + +/* Old (legacy) API */ +ZMQ_EXPORT void *zmq_init (int io_threads); +ZMQ_EXPORT int zmq_term (void *context); + + /******************************************************************************/ /* 0MQ message definition. */ /******************************************************************************/ @@ -164,8 +160,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size); ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint); -ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, zmq_socket_t s, int flags); -ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, zmq_socket_t s, int flags); +ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, void *s, int flags); +ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, void *s, int flags); ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); @@ -177,13 +173,6 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int option, void *optval, ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, const void *optval, size_t *optvallen); -/******************************************************************************/ -/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */ -/******************************************************************************/ - -ZMQ_EXPORT zmq_ctx_t zmq_init (int io_threads); -ZMQ_EXPORT int zmq_term (zmq_ctx_t context); - /******************************************************************************/ /* 0MQ socket definition. */ /******************************************************************************/ @@ -239,23 +228,23 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context); #define ZMQ_DONTWAIT 1 #define ZMQ_SNDMORE 2 -ZMQ_EXPORT zmq_socket_t zmq_socket (zmq_ctx_t context, int type); -ZMQ_EXPORT int zmq_close (zmq_socket_t s); -ZMQ_EXPORT int zmq_setsockopt (zmq_socket_t s, int option, const void *optval, +ZMQ_EXPORT void *zmq_socket (void *, int type); +ZMQ_EXPORT int zmq_close (void *s); +ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); -ZMQ_EXPORT int zmq_getsockopt (zmq_socket_t s, int option, void *optval, +ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen); -ZMQ_EXPORT int zmq_bind (zmq_socket_t s, const char *addr); -ZMQ_EXPORT int zmq_connect (zmq_socket_t s, const char *addr); -ZMQ_EXPORT int zmq_send (zmq_socket_t s, const void *buf, size_t len, int flags); -ZMQ_EXPORT int zmq_recv (zmq_socket_t s, void *buf, size_t len, int flags); +ZMQ_EXPORT int zmq_bind (void *s, const char *addr); +ZMQ_EXPORT int zmq_connect (void *s, const char *addr); +ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); +ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); -ZMQ_EXPORT int zmq_sendmsg (zmq_socket_t s, zmq_msg_t *msg, int flags); -ZMQ_EXPORT int zmq_recvmsg (zmq_socket_t s, zmq_msg_t *msg, int flags); +ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags); +ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags); /* Experimental */ -ZMQ_EXPORT int zmq_sendiov (zmq_socket_t s, struct iovec *iov, size_t count, int flags); -ZMQ_EXPORT int zmq_recviov (zmq_socket_t s, struct iovec *iov, size_t *count, int flags); +ZMQ_EXPORT int zmq_sendiov (void *s, struct iovec *iov, size_t count, int flags); +ZMQ_EXPORT int zmq_recviov (void *s, struct iovec *iov, size_t *count, int flags); /******************************************************************************/ /* I/O multiplexing. */ @@ -267,7 +256,7 @@ ZMQ_EXPORT int zmq_recviov (zmq_socket_t s, struct iovec *iov, size_t *count, in typedef struct { - zmq_socket_t socket; + void *socket; #if defined _WIN32 SOCKET fd; #else @@ -287,7 +276,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); #define ZMQ_FORWARDER 2 #define ZMQ_QUEUE 3 -ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); +ZMQ_EXPORT int zmq_device (int device, void *insocket, void* outsocket); #undef ZMQ_EXPORT diff --git a/src/ctx.cpp b/src/ctx.cpp index b29f8b60..5727b3e7 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -37,45 +37,21 @@ #include "err.hpp" #include "msg.hpp" -zmq::ctx_t::ctx_t (uint32_t io_threads_) : - tag (0xbadcafe0), - terminating (false) +zmq::ctx_t::ctx_t () : + tag (0xabadcafe), + starting (true), + terminating (false), + reaper (NULL), + slot_count (0), + slots (NULL), + max_sockets (ZMQ_MAX_SOCKETS_DFLT), + io_thread_count (ZMQ_IO_THREADS_DFLT) { - // Initialise the array of mailboxes. Additional three slots are for - // internal log socket and the zmq_term thread the reaper thread. - slot_count = max_sockets + io_threads_ + 3; - slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); - alloc_assert (slots); - - // Initialise the infrastructure for zmq_term thread. - slots [term_tid] = &term_mailbox; - - // Create the reaper thread. - reaper = new (std::nothrow) reaper_t (this, reaper_tid); - alloc_assert (reaper); - slots [reaper_tid] = reaper->get_mailbox (); - reaper->start (); - - // Create I/O thread objects and launch them. - for (uint32_t i = 2; i != io_threads_ + 2; i++) { - io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); - alloc_assert (io_thread); - io_threads.push_back (io_thread); - slots [i] = io_thread->get_mailbox (); - io_thread->start (); - } - - // In the unused part of the slot array, create a list of empty slots. - for (int32_t i = (int32_t) slot_count - 1; - i >= (int32_t) io_threads_ + 2; i--) { - empty_slots.push_back (i); - slots [i] = NULL; - } } bool zmq::ctx_t::check_tag () { - return tag == 0xbadcafe0; + return tag == 0xabadcafe; } zmq::ctx_t::~ctx_t () @@ -93,12 +69,14 @@ zmq::ctx_t::~ctx_t () delete io_threads [i]; // Deallocate the reaper thread object. - delete reaper; + if (reaper) + delete reaper; // Deallocate the array of mailboxes. No special work is // needed as mailboxes themselves were deallocated with their // corresponding io_thread/socket objects. - free (slots); + if (slots) + free (slots); // Remove the tag, so that the object is considered dead. tag = 0xdeadbeef; @@ -106,44 +84,125 @@ zmq::ctx_t::~ctx_t () int zmq::ctx_t::terminate () { - // Check whether termination was already underway, but interrupted and now - // restarted. - slot_sync.lock (); - bool restarted = terminating; - slot_sync.unlock (); + if (!starting) { - // First attempt to terminate the context. - if (!restarted) { - // First send stop command to sockets so that any blocking calls can be - // interrupted. If there are no sockets we can ask reaper thread to stop. + // Check whether termination was already underway, but interrupted and now + // restarted. slot_sync.lock (); + bool restarted = terminating; terminating = true; - for (sockets_t::size_type i = 0; i != sockets.size (); i++) - sockets [i]->stop (); - if (sockets.empty ()) - reaper->stop (); + slot_sync.unlock (); + + // First attempt to terminate the context. + if (!restarted) { + + // First send stop command to sockets so that any blocking calls + // can be interrupted. If there are no sockets we can ask reaper + // thread to stop. + slot_sync.lock (); + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + if (sockets.empty ()) + reaper->stop (); + slot_sync.unlock (); + } + + // Wait till reaper thread closes all the sockets. + command_t cmd; + int rc = term_mailbox.recv (&cmd, -1); + if (rc == -1 && errno == EINTR) + return -1; + zmq_assert (rc == 0); + zmq_assert (cmd.type == command_t::done); + slot_sync.lock (); + zmq_assert (sockets.empty ()); slot_sync.unlock (); } - // Wait till reaper thread closes all the sockets. - command_t cmd; - int rc = term_mailbox.recv (&cmd, -1); - if (rc == -1 && errno == EINTR) - return -1; - zmq_assert (rc == 0); - zmq_assert (cmd.type == command_t::done); - slot_sync.lock (); - zmq_assert (sockets.empty ()); - slot_sync.unlock (); - // Deallocate the resources. delete this; return 0; } +int zmq::ctx_t::set (int option_, int optval_) +{ + int rc = 0; + if (option_ == ZMQ_MAX_SOCKETS) { + opt_sync.lock (); + max_sockets = optval_; + opt_sync.unlock (); + } + else + if (option_ == ZMQ_IO_THREADS) { + opt_sync.lock (); + io_thread_count = optval_; + opt_sync.unlock (); + } + else { + errno = EINVAL; + rc = -1; + } + return rc; +} + +int zmq::ctx_t::get (int option_) +{ + int rc = 0; + if (option_ == ZMQ_MAX_SOCKETS) + rc = max_sockets; + else + if (option_ == ZMQ_IO_THREADS) + rc = io_thread_count; + else { + errno = EINVAL; + rc = -1; + } + return rc; +} + zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) { + if (unlikely (starting)) { + + starting = false; + + // Initialise the array of mailboxes. Additional three slots are for + // zmq_term thread and reaper thread. + opt_sync.lock (); + int mazmq = max_sockets; + int ios = io_thread_count; + opt_sync.unlock (); + slot_count = mazmq + ios + 2; + slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); + alloc_assert (slots); + + // Initialise the infrastructure for zmq_term thread. + slots [term_tid] = &term_mailbox; + + // Create the reaper thread. + reaper = new (std::nothrow) reaper_t (this, reaper_tid); + alloc_assert (reaper); + slots [reaper_tid] = reaper->get_mailbox (); + reaper->start (); + + // Create I/O thread objects and launch them. + for (int i = 2; i != ios + 2; i++) { + io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); + alloc_assert (io_thread); + io_threads.push_back (io_thread); + slots [i] = io_thread->get_mailbox (); + io_thread->start (); + } + + // In the unused part of the slot array, create a list of empty slots. + for (int32_t i = (int32_t) slot_count - 1; + i >= (int32_t) ios + 2; i--) { + empty_slots.push_back (i); + slots [i] = NULL; + } + } + slot_sync.lock (); // Once zmq_term() was called, we can't create new sockets. @@ -164,8 +223,11 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) uint32_t slot = empty_slots.back (); empty_slots.pop_back (); + // Generate new unique socket ID. + int sid = ((int) max_socket_id.add (1)) + 1; + // Create the socket and register its mailbox. - socket_base_t *s = socket_base_t::create (type_, this, slot); + socket_base_t *s = socket_base_t::create (type_, this, slot, sid); if (!s) { empty_slots.push_back (slot); slot_sync.unlock (); @@ -286,3 +348,8 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) endpoints_sync.unlock (); return *endpoint; } + +// The last used socket ID, or 0 if no socket was used so far. Note that this +// is a global variable. Thus, even sockets created in different contexts have +// unique IDs. +zmq::atomic_counter_t zmq::ctx_t::max_socket_id; diff --git a/src/ctx.hpp b/src/ctx.hpp index 9e345598..f9d02a2f 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -33,6 +33,7 @@ #include "mutex.hpp" #include "stdint.hpp" #include "options.hpp" +#include "atomic_counter.hpp" namespace zmq { @@ -58,9 +59,8 @@ namespace zmq { public: - // Create the context object. The argument specifies the size - // of I/O thread pool to create. - ctx_t (uint32_t io_threads_); + // Create the context object + ctx_t (); // Returns false if object is not a context. bool check_tag (); @@ -71,6 +71,10 @@ namespace zmq // after the last one is closed. int terminate (); + // Set and set context properties + int set (int option_, int optval_); + int get (int option_); + // Create and destroy a socket. zmq::socket_base_t *create_socket (int type_); void destroy_socket (zmq::socket_base_t *socket_); @@ -113,6 +117,10 @@ namespace zmq typedef std::vector emtpy_slots_t; emtpy_slots_t empty_slots; + // If true, zmq_init has been called but no socket have been created + // yes. Launching of I/O threads is delayed. + bool starting; + // If true, zmq_term was already called. bool terminating; @@ -143,6 +151,18 @@ namespace zmq // Synchronisation of access to the list of inproc endpoints. mutex_t endpoints_sync; + // Maximum socket ID. + static atomic_counter_t max_socket_id; + + // Maximum number of sockets that can be opened at the same time. + int max_sockets; + + // Number of I/O threads to launch. + int io_thread_count; + + // Synchronisation of access to context options. + mutex_t opt_sync; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; @@ -150,4 +170,3 @@ namespace zmq } #endif - diff --git a/src/options.cpp b/src/options.cpp index 5f1d4440..05a69bf9 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file @@ -48,7 +48,8 @@ zmq::options_t::options_t () : delay_on_disconnect (true), filter (false), send_identity (false), - recv_identity (false) + recv_identity (false), + socket_id (0) { } diff --git a/src/options.hpp b/src/options.hpp index 2bf793ee..667416ad 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -110,6 +110,9 @@ namespace zmq // Receivers identity from all new connections. bool recv_identity; + + // ID of the socket. + int socket_id; }; } diff --git a/src/pair.cpp b/src/pair.cpp index c066b293..cf35e123 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -24,8 +24,8 @@ #include "pipe.hpp" #include "msg.hpp" -zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), +zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), pipe (NULL) { options.type = ZMQ_PAIR; diff --git a/src/pair.hpp b/src/pair.hpp index 1c0859e6..d07052e9 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - pair_t (zmq::ctx_t *parent_, uint32_t tid_); + pair_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); ~pair_t (); // Overloads of functions from socket_base_t. diff --git a/src/pub.cpp b/src/pub.cpp index e2ad8a68..d92d0d96 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -22,8 +22,8 @@ #include "pub.hpp" #include "msg.hpp" -zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : - xpub_t (parent_, tid_) +zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + xpub_t (parent_, tid_, sid_) { options.type = ZMQ_PUB; } diff --git a/src/pub.hpp b/src/pub.hpp index fb481730..bdc6a8b4 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -36,7 +36,7 @@ namespace zmq { public: - pub_t (zmq::ctx_t *parent_, uint32_t tid_); + pub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); ~pub_t (); // Implementations of virtual functions from socket_base_t. diff --git a/src/pull.cpp b/src/pull.cpp index f6538e6f..f9ab5e32 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -24,8 +24,8 @@ #include "msg.hpp" #include "pipe.hpp" -zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_) +zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_) { options.type = ZMQ_PULL; } diff --git a/src/pull.hpp b/src/pull.hpp index 2bdc5d0d..3fc2393d 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -39,7 +39,7 @@ namespace zmq { public: - pull_t (zmq::ctx_t *parent_, uint32_t tid_); + pull_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); ~pull_t (); protected: diff --git a/src/push.cpp b/src/push.cpp index 80e8f4cf..e961f5c3 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -24,8 +24,8 @@ #include "err.hpp" #include "msg.hpp" -zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_) +zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_) { options.type = ZMQ_PUSH; } diff --git a/src/push.hpp b/src/push.hpp index 1012ce28..3ac2bf0f 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -39,7 +39,7 @@ namespace zmq { public: - push_t (zmq::ctx_t *parent_, uint32_t tid_); + push_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); ~push_t (); protected: diff --git a/src/rep.cpp b/src/rep.cpp index fbc981cd..bb3ca039 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -23,8 +23,8 @@ #include "err.hpp" #include "msg.hpp" -zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) : - xrep_t (parent_, tid_), +zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + xrep_t (parent_, tid_, sid_), sending_reply (false), request_begins (true) { diff --git a/src/rep.hpp b/src/rep.hpp index 04ac5c0b..a92ee6aa 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -36,7 +36,7 @@ namespace zmq { public: - rep_t (zmq::ctx_t *parent_, uint32_t tid_); + rep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); ~rep_t (); // Overloads of functions from socket_base_t. diff --git a/src/req.cpp b/src/req.cpp index 3d2a6bc8..f7bf11fa 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -27,8 +27,8 @@ #include "random.hpp" #include "likely.hpp" -zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : - xreq_t (parent_, tid_), +zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + xreq_t (parent_, tid_, sid_), receiving_reply (false), message_begins (true) { diff --git a/src/req.hpp b/src/req.hpp index 0edcb046..22c3c089 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file @@ -38,7 +38,7 @@ namespace zmq { public: - req_t (zmq::ctx_t *parent_, uint32_t tid_); + req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); ~req_t (); // Overloads of functions from socket_base_t. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 94906bc3..db0cf5a9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -75,43 +75,43 @@ bool zmq::socket_base_t::check_tag () } zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, - uint32_t tid_) + uint32_t tid_, int sid_) { socket_base_t *s = NULL; switch (type_) { case ZMQ_PAIR: - s = new (std::nothrow) pair_t (parent_, tid_); + s = new (std::nothrow) pair_t (parent_, tid_, sid_); break; case ZMQ_PUB: - s = new (std::nothrow) pub_t (parent_, tid_); + s = new (std::nothrow) pub_t (parent_, tid_, sid_); break; case ZMQ_SUB: - s = new (std::nothrow) sub_t (parent_, tid_); + s = new (std::nothrow) sub_t (parent_, tid_, sid_); break; case ZMQ_REQ: - s = new (std::nothrow) req_t (parent_, tid_); + s = new (std::nothrow) req_t (parent_, tid_, sid_); break; case ZMQ_REP: - s = new (std::nothrow) rep_t (parent_, tid_); + s = new (std::nothrow) rep_t (parent_, tid_, sid_); break; case ZMQ_XREQ: - s = new (std::nothrow) xreq_t (parent_, tid_); + s = new (std::nothrow) xreq_t (parent_, tid_, sid_); break; case ZMQ_XREP: - s = new (std::nothrow) xrep_t (parent_, tid_); + s = new (std::nothrow) xrep_t (parent_, tid_, sid_); break; case ZMQ_PULL: - s = new (std::nothrow) pull_t (parent_, tid_); + s = new (std::nothrow) pull_t (parent_, tid_, sid_); break; case ZMQ_PUSH: - s = new (std::nothrow) push_t (parent_, tid_); + s = new (std::nothrow) push_t (parent_, tid_, sid_); break; case ZMQ_XPUB: - s = new (std::nothrow) xpub_t (parent_, tid_); + s = new (std::nothrow) xpub_t (parent_, tid_, sid_); break; case ZMQ_XSUB: - s = new (std::nothrow) xsub_t (parent_, tid_); + s = new (std::nothrow) xsub_t (parent_, tid_, sid_); break; default: errno = EINVAL; @@ -121,7 +121,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, return s; } -zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : +zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : own_t (parent_, tid_), tag (0xbaddecaf), ctx_terminated (false), @@ -130,6 +130,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : ticks (0), rcvmore (false) { + options.socket_id = sid_; } zmq::socket_base_t::~socket_base_t () diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 602a2098..eed43a6c 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file @@ -57,7 +57,7 @@ namespace zmq // Create a socket of a specified type. static socket_base_t *create (int type_, zmq::ctx_t *parent_, - uint32_t tid_); + uint32_t tid_, int sid_); // Returns the mailbox associated with this socket. mailbox_t *get_mailbox (); @@ -99,7 +99,7 @@ namespace zmq void unlock(); protected: - socket_base_t (zmq::ctx_t *parent_, uint32_t tid_); + socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); virtual ~socket_base_t (); // Concrete algorithms for the x- methods are to be defined by diff --git a/src/sub.cpp b/src/sub.cpp index 12422dbf..7378bdec 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -22,8 +22,8 @@ #include "sub.hpp" #include "msg.hpp" -zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : - xsub_t (parent_, tid_) +zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + xsub_t (parent_, tid_, sid_) { options.type = ZMQ_SUB; diff --git a/src/sub.hpp b/src/sub.hpp index 778786d6..ee66d7df 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -1,6 +1,6 @@ /* + Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -36,7 +36,7 @@ namespace zmq { public: - sub_t (zmq::ctx_t *parent_, uint32_t tid_); + sub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); ~sub_t (); protected: diff --git a/src/xpub.cpp b/src/xpub.cpp index f0b28922..e7b230e1 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -26,8 +26,8 @@ #include "err.hpp" #include "msg.hpp" -zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), +zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), more (false) { options.type = ZMQ_XPUB; diff --git a/src/xpub.hpp b/src/xpub.hpp index 9c8b40f0..92a589a5 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -43,7 +43,7 @@ namespace zmq { public: - xpub_t (zmq::ctx_t *parent_, uint32_t tid_); + xpub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); ~xpub_t (); // Implementations of virtual functions from socket_base_t. diff --git a/src/xrep.cpp b/src/xrep.cpp index c7e82b15..6e9a1147 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -27,8 +27,8 @@ #include "likely.hpp" #include "err.hpp" -zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), +zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), prefetched (0), more_in (false), current_out (NULL), diff --git a/src/xrep.hpp b/src/xrep.hpp index 6377c557..d9355811 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -44,7 +44,7 @@ namespace zmq { public: - xrep_t (zmq::ctx_t *parent_, uint32_t tid_); + xrep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); ~xrep_t (); // Overloads of functions from socket_base_t. diff --git a/src/xreq.cpp b/src/xreq.cpp index 5c30558b..b116610d 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,8 +23,8 @@ #include "err.hpp" #include "msg.hpp" -zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), +zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), prefetched (false) { options.type = ZMQ_XREQ; diff --git a/src/xreq.hpp b/src/xreq.hpp index 678e50c1..d7ddde06 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -40,7 +40,7 @@ namespace zmq { public: - xreq_t (zmq::ctx_t *parent_, uint32_t tid_); + xreq_t (zmq::ctx_t *parent_, uint32_t tid_, int sid); ~xreq_t (); protected: diff --git a/src/xsub.cpp b/src/xsub.cpp index 1a47dc6f..06c503c1 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -24,8 +24,8 @@ #include "xsub.hpp" #include "err.hpp" -zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), +zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_), has_message (false), more (false) { diff --git a/src/xsub.hpp b/src/xsub.hpp index 003034d9..6182a3f3 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -39,7 +39,7 @@ namespace zmq { public: - xsub_t (zmq::ctx_t *parent_, uint32_t tid_); + xsub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); ~xsub_t (); protected: diff --git a/src/zmq.cpp b/src/zmq.cpp index a962bcb5..9a6e5e18 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -92,7 +92,6 @@ struct iovec { typedef char check_msg_t_size [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1]; -// Version. void zmq_version (int *major_, int *minor_, int *patch_) { @@ -101,7 +100,6 @@ void zmq_version (int *major_, int *minor_, int *patch_) *patch_ = ZMQ_VERSION_PATCH; } -// Errors. const char *zmq_strerror (int errnum_) { @@ -113,15 +111,11 @@ int zmq_errno () return errno; } -// Contexts. -static zmq::ctx_t *s_init (int io_threads_) +// New context API + +void *zmq_ctx_new (void) { - if (io_threads_ < 0) { - errno = EINVAL; - return NULL; - } - #if defined ZMQ_HAVE_OPENPGM // Init PGM transport. Ensure threading and timer are enabled. Find PGM @@ -162,22 +156,18 @@ static zmq::ctx_t *s_init (int io_threads_) #endif // Create 0MQ context. - zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); + zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; alloc_assert (ctx); return ctx; } -void *zmq_init (int io_threads_) -{ - return (void *) s_init (io_threads_); -} - -int zmq_term (void *ctx_) +int zmq_ctx_destroy (void *ctx_) { if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { errno = EFAULT; return -1; } + int rc = ((zmq::ctx_t*) ctx_)->terminate (); int en = errno; @@ -197,7 +187,41 @@ int zmq_term (void *ctx_) return rc; } -// Sockets. +int zmq_ctx_set (void *ctx_, int option_, int optval_) +{ + if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + errno = EFAULT; + return -1; + } + return ((zmq::ctx_t*) ctx_)->set (option_, optval_); +} + +int zmq_ctx_get (void *ctx_, int option_) +{ + if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + errno = EFAULT; + return -1; + } + return ((zmq::ctx_t*) ctx_)->get (option_); +} + + +// Stable/legacy context API + +void *zmq_init (int io_threads_) +{ + void *ctx = zmq_ctx_new (); + zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_); + return ctx; +} + +int zmq_term (void *ctx_) +{ + return zmq_ctx_destroy (ctx_); +} + + +// Sockets void *zmq_socket (void *ctx_, int type_) { diff --git a/tests/test_last_endpoint b/tests/test_last_endpoint new file mode 100755 index 00000000..025322bd --- /dev/null +++ b/tests/test_last_endpoint @@ -0,0 +1,225 @@ +#! /bin/bash + +# test_last_endpoint - temporary wrapper script for .libs/test_last_endpoint +# Generated by libtool (GNU libtool) 2.4 Debian-2.4-2ubuntu1 +# +# The test_last_endpoint program cannot be directly executed until all the libtool +# libraries that it depends on are installed. +# +# This wrapper script should never be moved out of the build directory. +# If it is, it will not operate correctly. + +# Sed substitution that helps us do robust quoting. It backslashifies +# metacharacters that are still active within double-quoted strings. +sed_quote_subst='s/\([`"$\\]\)/\\\1/g' + +# Be Bourne compatible +if test -n "${ZSH_VERSION+set}" && (emulate sh) >/dev/null 2>&1; then + emulate sh + NULLCMD=: + # Zsh 3.x and 4.x performs word splitting on ${1+"$@"}, which + # is contrary to our usage. Disable this feature. + alias -g '${1+"$@"}'='"$@"' + setopt NO_GLOB_SUBST +else + case `(set -o) 2>/dev/null` in *posix*) set -o posix;; esac +fi +BIN_SH=xpg4; export BIN_SH # for Tru64 +DUALCASE=1; export DUALCASE # for MKS sh + +# The HP-UX ksh and POSIX shell print the target directory to stdout +# if CDPATH is set. +(unset CDPATH) >/dev/null 2>&1 && unset CDPATH + +relink_command="(cd /home/ph/work/libzmq_pieterh/tests; { test -z \"\${LIBRARY_PATH+set}\" || unset LIBRARY_PATH || { LIBRARY_PATH=; export LIBRARY_PATH; }; }; { test -z \"\${COMPILER_PATH+set}\" || unset COMPILER_PATH || { COMPILER_PATH=; export COMPILER_PATH; }; }; { test -z \"\${GCC_EXEC_PREFIX+set}\" || unset GCC_EXEC_PREFIX || { GCC_EXEC_PREFIX=; export GCC_EXEC_PREFIX; }; }; { test -z \"\${LD_RUN_PATH+set}\" || unset LD_RUN_PATH || { LD_RUN_PATH=; export LD_RUN_PATH; }; }; LD_LIBRARY_PATH=/usr/local/lib; export LD_LIBRARY_PATH; PATH=/opt/android-toolchain/bin:/usr/lib/lightdm/lightdm:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games; export PATH; g++ -g -O2 -o \$progdir/\$file test_last_endpoint.o ../src/.libs/libzmq.so -lrt -lpthread -Wl,-rpath -Wl,/home/ph/work/libzmq_pieterh/src/.libs)" + +# This environment variable determines our operation mode. +if test "$libtool_install_magic" = "%%%MAGIC variable%%%"; then + # install mode needs the following variables: + generated_by_libtool_version='2.4' + notinst_deplibs=' ../src/libzmq.la' +else + # When we are sourced in execute mode, $file and $ECHO are already set. + if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then + file="$0" + +# A function that is used when there is no print builtin or printf. +func_fallback_echo () +{ + eval 'cat <<_LTECHO_EOF +$1 +_LTECHO_EOF' +} + ECHO="printf %s\\n" + fi + +# Very basic option parsing. These options are (a) specific to +# the libtool wrapper, (b) are identical between the wrapper +# /script/ and the wrapper /executable/ which is used only on +# windows platforms, and (c) all begin with the string --lt- +# (application programs are unlikely to have options which match +# this pattern). +# +# There are only two supported options: --lt-debug and +# --lt-dump-script. There is, deliberately, no --lt-help. +# +# The first argument to this parsing function should be the +# script's ../libtool value, followed by no. +lt_option_debug= +func_parse_lt_options () +{ + lt_script_arg0=$0 + shift + for lt_opt + do + case "$lt_opt" in + --lt-debug) lt_option_debug=1 ;; + --lt-dump-script) + lt_dump_D=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%/[^/]*$%%'` + test "X$lt_dump_D" = "X$lt_script_arg0" && lt_dump_D=. + lt_dump_F=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%^.*/%%'` + cat "$lt_dump_D/$lt_dump_F" + exit 0 + ;; + --lt-*) + $ECHO "Unrecognized --lt- option: '$lt_opt'" 1>&2 + exit 1 + ;; + esac + done + + # Print the debug banner immediately: + if test -n "$lt_option_debug"; then + echo "test_last_endpoint:test_last_endpoint:${LINENO}: libtool wrapper (GNU libtool) 2.4 Debian-2.4-2ubuntu1" 1>&2 + fi +} + +# Used when --lt-debug. Prints its arguments to stdout +# (redirection is the responsibility of the caller) +func_lt_dump_args () +{ + lt_dump_args_N=1; + for lt_arg + do + $ECHO "test_last_endpoint:test_last_endpoint:${LINENO}: newargv[$lt_dump_args_N]: $lt_arg" + lt_dump_args_N=`expr $lt_dump_args_N + 1` + done +} + +# Core function for launching the target application +func_exec_program_core () +{ + + if test -n "$lt_option_debug"; then + $ECHO "test_last_endpoint:test_last_endpoint:${LINENO}: newargv[0]: $progdir/$program" 1>&2 + func_lt_dump_args ${1+"$@"} 1>&2 + fi + exec "$progdir/$program" ${1+"$@"} + + $ECHO "$0: cannot exec $program $*" 1>&2 + exit 1 +} + +# A function to encapsulate launching the target application +# Strips options in the --lt-* namespace from $@ and +# launches target application with the remaining arguments. +func_exec_program () +{ + for lt_wr_arg + do + case $lt_wr_arg in + --lt-*) ;; + *) set x "$@" "$lt_wr_arg"; shift;; + esac + shift + done + func_exec_program_core ${1+"$@"} +} + + # Parse options + func_parse_lt_options "$0" ${1+"$@"} + + # Find the directory that this script lives in. + thisdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'` + test "x$thisdir" = "x$file" && thisdir=. + + # Follow symbolic links until we get to the real thisdir. + file=`ls -ld "$file" | /bin/sed -n 's/.*-> //p'` + while test -n "$file"; do + destdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'` + + # If there was a directory component, then change thisdir. + if test "x$destdir" != "x$file"; then + case "$destdir" in + [\\/]* | [A-Za-z]:[\\/]*) thisdir="$destdir" ;; + *) thisdir="$thisdir/$destdir" ;; + esac + fi + + file=`$ECHO "$file" | /bin/sed 's%^.*/%%'` + file=`ls -ld "$thisdir/$file" | /bin/sed -n 's/.*-> //p'` + done + + # Usually 'no', except on cygwin/mingw when embedded into + # the cwrapper. + WRAPPER_SCRIPT_BELONGS_IN_OBJDIR=no + if test "$WRAPPER_SCRIPT_BELONGS_IN_OBJDIR" = "yes"; then + # special case for '.' + if test "$thisdir" = "."; then + thisdir=`pwd` + fi + # remove .libs from thisdir + case "$thisdir" in + *[\\/].libs ) thisdir=`$ECHO "$thisdir" | /bin/sed 's%[\\/][^\\/]*$%%'` ;; + .libs ) thisdir=. ;; + esac + fi + + # Try to get the absolute directory name. + absdir=`cd "$thisdir" && pwd` + test -n "$absdir" && thisdir="$absdir" + + program=lt-'test_last_endpoint' + progdir="$thisdir/.libs" + + if test ! -f "$progdir/$program" || + { file=`ls -1dt "$progdir/$program" "$progdir/../$program" 2>/dev/null | /bin/sed 1q`; \ + test "X$file" != "X$progdir/$program"; }; then + + file="$$-$program" + + if test ! -d "$progdir"; then + mkdir "$progdir" + else + rm -f "$progdir/$file" + fi + + # relink executable if necessary + if test -n "$relink_command"; then + if relink_command_output=`eval $relink_command 2>&1`; then : + else + printf %s\n "$relink_command_output" >&2 + rm -f "$progdir/$file" + exit 1 + fi + fi + + mv -f "$progdir/$file" "$progdir/$program" 2>/dev/null || + { rm -f "$progdir/$program"; + mv -f "$progdir/$file" "$progdir/$program"; } + rm -f "$progdir/$file" + fi + + if test -f "$progdir/$program"; then + if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then + # Run the actual program with our arguments. + func_exec_program ${1+"$@"} + fi + else + # The program doesn't exist. + $ECHO "$0: error: \`$progdir/$program' does not exist" 1>&2 + $ECHO "This script is just a wrapper for $program." 1>&2 + $ECHO "See the libtool documentation for more information." 1>&2 + exit 1 + fi +fi