diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 32015e56..68a5ffc6 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -12,8 +12,9 @@ SYNOPSIS -------- *int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');* -Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE and -ZMQ_LINGER, only take effect for subsequent socket bind/connects. +Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, +ZMQ_LINGER and ZMQ_FAIL_UNROUTABLE only take effect for subsequent socket +bind/connects. DESCRIPTION ----------- @@ -348,6 +349,21 @@ Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. +ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the behavior when an unroutable message is encountered in a 'ZMQ_ROUTER' +socket. A value of `0` is the default behavior when the message is silently +dropped, while a value of `1` forces the sending to fail with a 'EHOSTUNREACH' +error code. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: ZMQ_ROUTER + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index cc5db4ee..8bc3d901 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -230,6 +230,7 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context); #define ZMQ_SNDTIMEO 28 #define ZMQ_IPV4ONLY 31 #define ZMQ_LAST_ENDPOINT 32 +#define ZMQ_FAIL_UNROUTABLE 33 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/xrep.cpp b/src/xrep.cpp index e5845061..c7e82b15 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -33,7 +33,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : more_in (false), current_out (NULL), more_out (false), - next_peer_id (generate_random ()) + next_peer_id (generate_random ()), + fail_unroutable(false) { options.type = ZMQ_XREP; @@ -77,6 +78,24 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) fq.attach (pipe_); } +int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != ZMQ_FAIL_UNROUTABLE) { + errno = EINVAL; + return -1; + } + + if(sizeof(optvallen_) != sizeof(int)) { + errno = EINVAL; + return -1; + } + + fail_unroutable = *((const int*) optval_); + + return 0; +} + void zmq::xrep_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); @@ -118,6 +137,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) if (!more_out) { zmq_assert (!current_out); + int retval = 0; + // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. @@ -126,7 +147,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) more_out = true; // Find the pipe associated with the identity stored in the prefix. - // If there's no such pipe just silently ignore the message. + // If there's no such pipe just silently ignore the message, unless + // fail_unreachable is set. blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); outpipes_t::iterator it = outpipes.find (identity); @@ -142,6 +164,9 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } rc = empty.close (); errno_assert (rc == 0); + } else if(fail_unroutable) { + more_out = false; + retval = EHOSTUNREACH; } } @@ -150,7 +175,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) errno_assert (rc == 0); rc = msg_->init (); errno_assert (rc == 0); - return 0; + return retval; } // Check whether this is the last part of the message. diff --git a/src/xrep.hpp b/src/xrep.hpp index 414ce7a4..6377c557 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -49,6 +49,7 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); bool xhas_in (); @@ -100,6 +101,9 @@ namespace zmq // algorithm. This value is the next ID to use (if not used already). uint32_t next_peer_id; + // If true, fail on unroutable messages instead of silently dropping them. + bool fail_unroutable; + xrep_t (const xrep_t&); const xrep_t &operator = (const xrep_t&); };