diff --git a/doc/zmq_ctx_get.txt b/doc/zmq_ctx_get.txt index 3d2f4839..0a3825c3 100644 --- a/doc/zmq_ctx_get.txt +++ b/doc/zmq_ctx_get.txt @@ -40,6 +40,12 @@ ZMQ_IPV6: Set IPv6 option ~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_IPV6' argument returns the IPv6 option for the context. +ZMQ_BLOCKY: Get blocky setting +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_BLOCKY' argument returns 1 if the context will block on terminate, +zero if the "block forever on context termination" gambit was disabled by +setting ZMQ_BLOCKY to false on all new contexts. + RETURN VALUE ------------ @@ -63,6 +69,10 @@ zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256); int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS); assert (max_sockets == 256); ---- +.Switching off the context deadlock gambit +---- +zmq_ctx_set (ctx, ZMQ_BLOCKY, false); +---- SEE ALSO diff --git a/doc/zmq_ctx_set.txt b/doc/zmq_ctx_set.txt index 706cfcb6..bf848332 100644 --- a/doc/zmq_ctx_set.txt +++ b/doc/zmq_ctx_set.txt @@ -21,6 +21,21 @@ The _zmq_ctx_set()_ function shall set the option specified by the The _zmq_ctx_set()_ function accepts the following options: +ZMQ_BLOCKY: Fix blocky behavior +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +By default the context will block, forever, on a zmq_ctx_term call. The +assumption behind this behavior is that abrupt termination will cause +message loss. Most real applications use some form of handshaking to ensure +applications receive termination messages, and then terminate the context +with 'ZMQ_LINGER' set to zero on all sockets. This setting is an easier way +to get the same result. When 'ZMQ_BLOCKY' is set to false, all new sockets +are given a linger timeout of zero. You must still close all sockets before +calling zmq_term. + +[horizontal] +Default value:: false (old behavior) + + ZMQ_IO_THREADS: Set number of I/O threads ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to diff --git a/include/zmq.h b/include/zmq.h index 5dff076d..199e4cba 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -304,6 +304,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); #define ZMQ_IDENTITY_FD 67 #define ZMQ_SOCKS_PROXY 68 #define ZMQ_XPUB_NODROP 69 +#define ZMQ_BLOCKY 70 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/ctx.cpp b/src/ctx.cpp index 399c2677..2c8fbf13 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -65,6 +65,7 @@ zmq::ctx_t::ctx_t () : slots (NULL), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), io_thread_count (ZMQ_IO_THREADS_DFLT), + blocky (true), ipv6 (false), thread_priority (ZMQ_THREAD_PRIORITY_DFLT), thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT) @@ -222,6 +223,12 @@ int zmq::ctx_t::set (int option_, int optval_) thread_sched_policy = optval_; opt_sync.unlock(); } + else + if (option_ == ZMQ_BLOCKY && optval_ >= 0) { + opt_sync.lock (); + blocky = (optval_ != 0); + opt_sync.unlock (); + } else { errno = EINVAL; rc = -1; @@ -243,6 +250,9 @@ int zmq::ctx_t::get (int option_) else if (option_ == ZMQ_IPV6) rc = ipv6; + else + if (option_ == ZMQ_BLOCKY) + rc = blocky; else { errno = EINVAL; rc = -1; diff --git a/src/ctx.hpp b/src/ctx.hpp index b9382175..655da37a 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -187,6 +187,9 @@ namespace zmq // Number of I/O threads to launch. int io_thread_count; + // Does context wait (possibly forever) on termination? + bool blocky; + // Is IPv6 enabled on this context? bool ipv6; diff --git a/src/options.cpp b/src/options.cpp index a6fc287c..5450dbf2 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -35,7 +35,7 @@ zmq::options_t::options_t () : rcvbuf (0), tos (0), type (-1), - linger (30000), + linger (-1), reconnect_ivl (100), reconnect_ivl_max (0), backlog (100), diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 433f53fe..56a3ce20 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -143,6 +143,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : { options.socket_id = sid_; options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); + options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0; } zmq::socket_base_t::~socket_base_t () diff --git a/tests/test_ctx_options.cpp b/tests/test_ctx_options.cpp index 17eccc9e..914dae3e 100644 --- a/tests/test_ctx_options.cpp +++ b/tests/test_ctx_options.cpp @@ -43,15 +43,26 @@ int main (void) assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1); void *router = zmq_socket (ctx, ZMQ_ROUTER); - int ipv6; + int value; size_t optsize = sizeof (int); - rc = zmq_getsockopt (router, ZMQ_IPV6, &ipv6, &optsize); + rc = zmq_getsockopt (router, ZMQ_IPV6, &value, &optsize); assert (rc == 0); - assert (ipv6); - + assert (value == 1); + rc = zmq_getsockopt (router, ZMQ_LINGER, &value, &optsize); + assert (rc == 0); + assert (value == -1); rc = zmq_close (router); assert (rc == 0); + rc = zmq_ctx_set (ctx, ZMQ_BLOCKY, false); + assert (zmq_ctx_get (ctx, ZMQ_BLOCKY) == 0); + router = zmq_socket (ctx, ZMQ_ROUTER); + rc = zmq_getsockopt (router, ZMQ_LINGER, &value, &optsize); + assert (rc == 0); + assert (value == 0); + rc = zmq_close (router); + assert (rc == 0); + rc = zmq_ctx_term (ctx); assert (rc == 0);