Verbose ROUTER socket behavior patch

This commit is contained in:
Kobolog 2012-06-17 02:33:43 +04:00
parent 21eb8c8fa5
commit 829d0003be
7 changed files with 93 additions and 27 deletions

View File

@ -366,18 +366,18 @@ Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.
ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ZMQ_ROUTER_BEHAVIOR: Set the ROUTER socket behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the behavior when an unroutable message is encountered in a 'ZMQ_ROUTER'
socket. A value of `0` is the default behavior when the message is silently
dropped, while a value of `1` forces the sending to fail with a 'EHOSTUNREACH'
error code.
Sets the 'ROUTER' socket behavior when an unroutable message is encountered. A value
of `0` is the default when the message is silently discarded, while a value of `1`
forces the sending to fail with an 'EAGAIN' error code, effectively enabling sending
messages in a blocking fashion.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_ROUTER

View File

@ -147,13 +147,13 @@ message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded.
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_BEHAVIOR'
socket option is set to '1'.
When a 'ZMQ_ROUTER' socket enters an exceptional state due to having reached the
high water mark for all peers, or if there are no peers at all, then any
messages sent to the socket shall be dropped until the exceptional state ends.
Likewise, any messages routed to a non-existent peer or a peer for which the
individual high water mark has been reached shall also be dropped.
high water mark for all peers, then any messages sent to the socket shall be dropped
until the exceptional state ends. Likewise, any messages routed to a peer for which
the individual high water mark has been reached shall also be dropped.
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
_identity_ of the originating peer each message received shall contain an empty

View File

@ -221,7 +221,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_SNDTIMEO 28
#define ZMQ_IPV4ONLY 31
#define ZMQ_LAST_ENDPOINT 32
#define ZMQ_FAIL_UNROUTABLE 33
#define ZMQ_ROUTER_BEHAVIOR 33
#define ZMQ_TCP_KEEPALIVE 34
#define ZMQ_TCP_KEEPALIVE_CNT 35
#define ZMQ_TCP_KEEPALIVE_IDLE 36

View File

@ -35,7 +35,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
current_out (NULL),
more_out (false),
next_peer_id (generate_random ()),
fail_unroutable(false)
report_unroutable(false)
{
options.type = ZMQ_ROUTER;
@ -74,7 +74,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != ZMQ_FAIL_UNROUTABLE) {
if (option_ != ZMQ_ROUTER_BEHAVIOR) {
errno = EINVAL;
return -1;
}
@ -82,7 +82,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL;
return -1;
}
fail_unroutable = *static_cast <const int*> (optval_);
report_unroutable = *static_cast <const int*> (optval_);
return 0;
}
@ -135,8 +135,6 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
if (!more_out) {
zmq_assert (!current_out);
int retval = 0;
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
@ -146,7 +144,7 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message, unless
// fail_unreachable is set.
// report_unreachable is set.
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
@ -156,9 +154,10 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
it->second.active = false;
current_out = NULL;
}
} else if(fail_unroutable) {
errno = EHOSTUNREACH;
retval = -1;
} else if (report_unroutable) {
more_out = false;
errno = EAGAIN;
return -1;
}
}
@ -166,7 +165,7 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
return retval;
return 0;
}
// Check whether this is the last part of the message.

View File

@ -110,8 +110,9 @@ namespace zmq
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_peer_id;
// If true, fail on unroutable messages instead of silently dropping them.
bool fail_unroutable;
// If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer.
bool report_unroutable;
router_t (const router_t&);
const router_t &operator = (const router_t&);

View File

@ -16,7 +16,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_connect_delay \
test_last_endpoint \
test_term_endpoint \
test_monitor
test_monitor \
test_router_behavior
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
@ -39,6 +40,7 @@ test_connect_delay_SOURCES = test_connect_delay.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp
test_router_behavior_SOURCES = test_router_behavior.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp

View File

@ -0,0 +1,64 @@
/*
Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2011 iMatix Corporation
Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
fprintf (stderr, "test_router_behavior running...\n");
void *ctx = zmq_init (1);
assert (ctx);
// Creating the first socket.
void *sa = zmq_socket (ctx, ZMQ_ROUTER);
assert (sa);
int rc = zmq_bind (sa, "tcp://127.0.0.1:15560");
assert (rc == 0);
// Sending a message to an unknown peer with the default behavior.
rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE);
assert (rc == 7);
rc = zmq_send (sa, "DATA", 4, 0);
assert (rc == 4);
int behavior = 1;
// Setting the socket behavior to a new mode.
rc = zmq_setsockopt (sa, ZMQ_ROUTER_BEHAVIOR, &behavior, sizeof (behavior));
assert (rc == 0);
// Sending a message to an unknown peer with verbose behavior.
rc = zmq_send (sa, "UNKNOWN", 7, ZMQ_SNDMORE);
assert (rc == -1 && errno == EAGAIN);
rc = zmq_close (sa);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
return 0 ;
}