From 829d0003beb251643e9c7ece33c702a3a18fc017 Mon Sep 17 00:00:00 2001 From: Kobolog Date: Sun, 17 Jun 2012 02:33:43 +0400 Subject: [PATCH] Verbose ROUTER socket behavior patch --- doc/zmq_setsockopt.txt | 16 ++++----- doc/zmq_socket.txt | 10 +++--- include/zmq.h | 2 +- src/router.cpp | 19 +++++----- src/router.hpp | 5 +-- tests/Makefile.am | 4 ++- tests/test_router_behavior.cpp | 64 ++++++++++++++++++++++++++++++++++ 7 files changed, 93 insertions(+), 27 deletions(-) create mode 100644 tests/test_router_behavior.cpp diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 7fa2c49e..9d234059 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -366,18 +366,18 @@ Default value:: 0 (false) Applicable socket types:: all, primarily when using TCP/IPC transports. -ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +ZMQ_ROUTER_BEHAVIOR: Set the ROUTER socket behavior +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Sets the behavior when an unroutable message is encountered in a 'ZMQ_ROUTER' -socket. A value of `0` is the default behavior when the message is silently -dropped, while a value of `1` forces the sending to fail with a 'EHOSTUNREACH' -error code. +Sets the 'ROUTER' socket behavior when an unroutable message is encountered. A value +of `0` is the default when the message is silently discarded, while a value of `1` +forces the sending to fail with an 'EAGAIN' error code, effectively enabling sending +messages in a blocking fashion. [horizontal] Option value type:: int -Option value unit:: boolean -Default value:: 0 (false) +Option value unit:: 0, 1 +Default value:: 0 Applicable socket types:: ZMQ_ROUTER diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index 86333121..659863ef 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -147,13 +147,13 @@ message before passing it to the application. Messages received are fair-queued from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall remove the first part of the message and use it to determine the _identity_ of the peer the message shall be routed to. If the peer does not exist anymore -the message shall be silently discarded. +the message shall be silently discarded by default, unless 'ZMQ_ROUTER_BEHAVIOR' +socket option is set to '1'. When a 'ZMQ_ROUTER' socket enters an exceptional state due to having reached the -high water mark for all peers, or if there are no peers at all, then any -messages sent to the socket shall be dropped until the exceptional state ends. -Likewise, any messages routed to a non-existent peer or a peer for which the -individual high water mark has been reached shall also be dropped. +high water mark for all peers, then any messages sent to the socket shall be dropped +until the exceptional state ends. Likewise, any messages routed to a peer for which +the individual high water mark has been reached shall also be dropped. When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the _identity_ of the originating peer each message received shall contain an empty diff --git a/include/zmq.h b/include/zmq.h index b8813947..2095b335 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -221,7 +221,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_SNDTIMEO 28 #define ZMQ_IPV4ONLY 31 #define ZMQ_LAST_ENDPOINT 32 -#define ZMQ_FAIL_UNROUTABLE 33 +#define ZMQ_ROUTER_BEHAVIOR 33 #define ZMQ_TCP_KEEPALIVE 34 #define ZMQ_TCP_KEEPALIVE_CNT 35 #define ZMQ_TCP_KEEPALIVE_IDLE 36 diff --git a/src/router.cpp b/src/router.cpp index 3a45de6f..81275f65 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -35,7 +35,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : current_out (NULL), more_out (false), next_peer_id (generate_random ()), - fail_unroutable(false) + report_unroutable(false) { options.type = ZMQ_ROUTER; @@ -74,7 +74,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) int zmq::router_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { - if (option_ != ZMQ_FAIL_UNROUTABLE) { + if (option_ != ZMQ_ROUTER_BEHAVIOR) { errno = EINVAL; return -1; } @@ -82,7 +82,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } - fail_unroutable = *static_cast (optval_); + report_unroutable = *static_cast (optval_); return 0; } @@ -135,8 +135,6 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) if (!more_out) { zmq_assert (!current_out); - int retval = 0; - // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. @@ -146,7 +144,7 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message, unless - // fail_unreachable is set. + // report_unreachable is set. blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); outpipes_t::iterator it = outpipes.find (identity); @@ -156,9 +154,10 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) it->second.active = false; current_out = NULL; } - } else if(fail_unroutable) { - errno = EHOSTUNREACH; - retval = -1; + } else if (report_unroutable) { + more_out = false; + errno = EAGAIN; + return -1; } } @@ -166,7 +165,7 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) errno_assert (rc == 0); rc = msg_->init (); errno_assert (rc == 0); - return retval; + return 0; } // Check whether this is the last part of the message. diff --git a/src/router.hpp b/src/router.hpp index 73bdf90c..fba576e5 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -110,8 +110,9 @@ namespace zmq // algorithm. This value is the next ID to use (if not used already). uint32_t next_peer_id; - // If true, fail on unroutable messages instead of silently dropping them. - bool fail_unroutable; + // If true, report EAGAIN to the caller instead of silently dropping + // the message targeting an unknown peer. + bool report_unroutable; router_t (const router_t&); const router_t &operator = (const router_t&); diff --git a/tests/Makefile.am b/tests/Makefile.am index 024867d2..104dff82 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -16,7 +16,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_connect_delay \ test_last_endpoint \ test_term_endpoint \ - test_monitor + test_monitor \ + test_router_behavior if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -39,6 +40,7 @@ test_connect_delay_SOURCES = test_connect_delay.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp +test_router_behavior_SOURCES = test_router_behavior.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_router_behavior.cpp b/tests/test_router_behavior.cpp new file mode 100644 index 00000000..e507ff7c --- /dev/null +++ b/tests/test_router_behavior.cpp @@ -0,0 +1,64 @@ +/* + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include "testutil.hpp" + +int main (int argc, char *argv []) +{ + fprintf (stderr, "test_router_behavior running...\n"); + + void *ctx = zmq_init (1); + assert (ctx); + + // Creating the first socket. + void *sa = zmq_socket (ctx, ZMQ_ROUTER); + assert (sa); + + int rc = zmq_bind (sa, "tcp://127.0.0.1:15560"); + assert (rc == 0); + + // Sending a message to an unknown peer with the default behavior. + rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE); + assert (rc == 7); + rc = zmq_send (sa, "DATA", 4, 0); + assert (rc == 4); + + int behavior = 1; + + // Setting the socket behavior to a new mode. + rc = zmq_setsockopt (sa, ZMQ_ROUTER_BEHAVIOR, &behavior, sizeof (behavior)); + assert (rc == 0); + + // Sending a message to an unknown peer with verbose behavior. + rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE); + assert (rc == -1 && errno == EAGAIN); + + rc = zmq_close (sa); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +}