From df584a3be04cac3fffb8b8b319d7351c8d9b6345 Mon Sep 17 00:00:00 2001 From: Kobolog Date: Thu, 15 Mar 2012 14:57:38 +0300 Subject: [PATCH] an option to fail on unroutable messages in ROUTER sockets --- include/zmq.h | 1 + src/xrep.cpp | 31 ++++++++++++++++++++++++++++--- src/xrep.hpp | 4 ++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/include/zmq.h b/include/zmq.h index cc5db4ee..a7000fc0 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -230,6 +230,7 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context); #define ZMQ_SNDTIMEO 28 #define ZMQ_IPV4ONLY 31 #define ZMQ_LAST_ENDPOINT 32 +#define ZMQ_FAIL_UNROUTEABLE 33 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/xrep.cpp b/src/xrep.cpp index e5845061..eef994fc 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -33,7 +33,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : more_in (false), current_out (NULL), more_out (false), - next_peer_id (generate_random ()) + next_peer_id (generate_random ()), + fail_unrouteable(false) { options.type = ZMQ_XREP; @@ -77,6 +78,24 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) fq.attach (pipe_); } +int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != ZMQ_FAIL_UNROUTABLE) { + errno = EINVAL; + return -1; + } + + if(sizeof(optvallen_) != sizeof(uint64_t)) { + errno = EINVAL; + return -1; + } + + fail_unroutable = *((const uint64_t*) optval_); + + return 0; +} + void zmq::xrep_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); @@ -118,6 +137,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) if (!more_out) { zmq_assert (!current_out); + int retval = 0; + // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. @@ -126,7 +147,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) more_out = true; // Find the pipe associated with the identity stored in the prefix. - // If there's no such pipe just silently ignore the message. + // If there's no such pipe just silently ignore the message, unless + // fail_unreachable is set. blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); outpipes_t::iterator it = outpipes.find (identity); @@ -142,6 +164,9 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } rc = empty.close (); errno_assert (rc == 0); + } else if(fail_unreachable) { + more_out = false; + retval = EHOSTUNREACH; } } @@ -150,7 +175,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) errno_assert (rc == 0); rc = msg_->init (); errno_assert (rc == 0); - return 0; + return retval; } // Check whether this is the last part of the message. diff --git a/src/xrep.hpp b/src/xrep.hpp index 414ce7a4..06286a26 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -49,6 +49,7 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); bool xhas_in (); @@ -100,6 +101,9 @@ namespace zmq // algorithm. This value is the next ID to use (if not used already). uint32_t next_peer_id; + // If true, fail on unroutable messages instead of silently dropping them. + bool fail_unrouteable; + xrep_t (const xrep_t&); const xrep_t &operator = (const xrep_t&); };