Merge pull request #570 from hintjens/master

Packaging of probe function
This commit is contained in:
Ian Barber 2013-06-06 01:18:37 -07:00
commit 7a43c02aaf
10 changed files with 120 additions and 39 deletions

1
.gitignore vendored
View File

@ -48,6 +48,7 @@ tests/test_disconnect_inproc
tests/test_ctx_options
tests/test_iov
tests/test_security
tests/test_probe_router
src/platform.hpp*
src/stamp-h1
perf/local_lat

View File

@ -13,8 +13,8 @@ SYNOPSIS
*int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');*
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER, ZMQ_ROUTER_MANDATORY and ZMQ_XPUB_VERBOSE only take effect for
subsequent socket bind/connects.
ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, and ZMQ_XPUB_VERBOSE
only take effect for subsequent socket bind/connects.
DESCRIPTION
-----------
@ -392,7 +392,7 @@ Applicable socket types:: all, only for connection-oriented transports.
ZMQ_ROUTER_MANDATORY: accept only routable messages on ROUTER sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'ROUTER' socket behavior when an unroutable message is encountered. A
Sets the ROUTER socket behavior when an unroutable message is encountered. A
value of `0` is the default and discards the message silently when it cannot be
routed. A value of `1` returns an 'EHOSTUNREACH' error code if the message
cannot be routed.
@ -407,7 +407,7 @@ Applicable socket types:: ZMQ_ROUTER
ZMQ_ROUTER_RAW: switch ROUTER socket to raw mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the raw mode on the 'ROUTER', when set to 1. When the ROUTER socket is in
Sets the raw mode on the ROUTER, when set to 1. When the ROUTER socket is in
raw mode, and when using the tcp:// transport, it will read and write TCP data
without 0MQ framing. This lets 0MQ applications talk to non-0MQ applications.
When using raw mode, you cannot set explicit identities, and the ZMQ_MSGMORE
@ -421,21 +421,23 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER
ZMQ_PROBE: automatically send empty packet to every established connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ZMQ_PROBE_ROUTER: bootstrap connections to ROUTER sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the compatible sockets behavior to automatically send an empty packet
to any new connection made (or accepted) by socket. It could help sockets to
auto discovery them-self. It especially important in 'ROUTER' <-> 'ROUTER' connections
where it solves 'who will write first' problems.
NOTE: Don't set this options for sockets working with ZMQ_REP, ZMQ_REQ sockets.
It will interfere with their strict synchronous logic and framing.
When set to 1, the socket will automatically send an empty message when a
new connection is made or accepted. You may set this on REQ, DEALER, or
ROUTER sockets connected to a ROUTER socket. The application must filter
such empty messages. The ZMQ_PROBE_ROUTER option in effect provides the
ROUTER application with an event signaling the arrival of a new peer.
NOTE: do not set this option on a socket that talks to any other socket
types: the results are undefined.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REP, ZMQ_REQ
Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REQ
ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets

View File

@ -147,7 +147,7 @@ message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_BEHAVIOR'
the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY'
socket option is set to '1'.
When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the

View File

@ -274,7 +274,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_CURVE_SERVER 47
#define ZMQ_CURVE_PUBLICKEY 48
#define ZMQ_CURVE_SERVERKEY 49
#define ZMQ_PROBE 50
#define ZMQ_PROBE_ROUTER 50
/* Message options */
#define ZMQ_MORE 1

View File

@ -23,7 +23,7 @@
zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
probe_new_peers(false)
probe_router (false)
{
options.type = ZMQ_DEALER;
}
@ -39,15 +39,13 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
zmq_assert (pipe_);
if (probe_new_peers) {
int rc;
if (probe_router) {
msg_t probe_msg_;
rc = probe_msg_.init ();
int rc = probe_msg_.init ();
errno_assert (rc == 0);
rc = pipe_->write (&probe_msg_);
zmq_assert (rc);
int ok = pipe_->write (&probe_msg_);
zmq_assert (ok);
pipe_->flush ();
rc = probe_msg_.close ();
@ -65,9 +63,9 @@ int zmq::dealer_t::xsetsockopt (int option_, const void *optval_,
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_PROBE:
case ZMQ_PROBE_ROUTER:
if (is_int && value >= 0) {
probe_new_peers = value;
probe_router = value;
return 0;
}
break;

View File

@ -62,8 +62,8 @@ namespace zmq
fq_t fq;
lb_t lb;
// if true, send an empty message to every connected peer
bool probe_new_peers;
// if true, send an empty message to every connected router peer
bool probe_router;
dealer_t (const dealer_t&);
const dealer_t &operator = (const dealer_t&);

View File

@ -32,9 +32,9 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
current_out (NULL),
more_out (false),
next_peer_id (generate_random ()),
mandatory(false),
raw_sock(false),
probe_new_peers(false)
mandatory (false),
raw_sock (false),
probe_router (false)
{
options.type = ZMQ_ROUTER;
options.recv_identity = true;
@ -91,9 +91,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
}
break;
case ZMQ_PROBE:
case ZMQ_PROBE_ROUTER:
if (is_int && value >= 0) {
probe_new_peers = value;
probe_router = value;
return 0;
}
break;
@ -391,19 +391,20 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
zmq_assert (ok);
if (probe_new_peers) {
int rc;
if (probe_router) {
msg_t probe_msg_;
rc = probe_msg_.init ();
int rc = probe_msg_.init ();
errno_assert (rc == 0);
rc = pipe_->write (&probe_msg_);
zmq_assert (rc);
ok = pipe_->write (&probe_msg_);
pipe_->flush ();
rc = probe_msg_.close ();
errno_assert (rc == 0);
// Ignore not probed peers
if (!ok)
return false;
}
return true;

View File

@ -112,8 +112,8 @@ namespace zmq
bool mandatory;
bool raw_sock;
// if true, send an empty message to every connected peer to solve 'who will write first?' auto discovery problem
bool probe_new_peers;
// if true, send an empty message to every connected router peer
bool probe_router;
router_t (const router_t&);
const router_t &operator = (const router_t&);

View File

@ -18,6 +18,7 @@ noinst_PROGRAMS = test_pair_inproc \
test_term_endpoint \
test_monitor \
test_router_mandatory \
test_probe_router \
test_raw_sock \
test_disconnect_inproc \
test_ctx_options \
@ -46,6 +47,7 @@ test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp
test_router_mandatory_SOURCES = test_router_mandatory.cpp
test_probe_router_SOURCES = test_probe_router.cpp
test_raw_sock_SOURCES = test_raw_sock.cpp
test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp
test_ctx_options_SOURCES = test_ctx_options.cpp

View File

@ -0,0 +1,77 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <string.h>
#undef NDEBUG
#include <assert.h>
int main (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create server and bind to endpoint
void *server = zmq_socket (ctx, ZMQ_ROUTER);
assert (server);
int rc = zmq_bind (server, "tcp://*:5560");
assert (rc == 0);
// Create client and connect to server, doing a probe
void *client = zmq_socket (ctx, ZMQ_DEALER);
// Trying this results in the first recv waiting forever
// void *client = zmq_socket (ctx, ZMQ_ROUTER);
assert (client);
rc = zmq_setsockopt (client, ZMQ_IDENTITY, "X", 1);
assert (rc == 0);
int probe = 1;
rc = zmq_setsockopt (client, ZMQ_PROBE_ROUTER, &probe, sizeof (probe));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:5560");
assert (rc == 0);
// We expect an identity=X + empty message from client
unsigned char buffer [255];
rc = zmq_recv (server, buffer, 255, 0);
assert (rc == 1);
assert (buffer [0] == 'X');
rc = zmq_recv (server, buffer, 255, 0);
assert (rc == 0);
// Send a message to client now
rc = zmq_send (server, "X", 1, ZMQ_SNDMORE);
assert (rc == 1);
rc = zmq_send (server, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (client, buffer, 255, 0);
assert (rc == 5);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}