diff --git a/CMakeLists.txt b/CMakeLists.txt index 27c2050c..5e47a998 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -622,6 +622,7 @@ set(tests test_timeo test_many_sockets test_diffserv + test_connect_rid ) if(NOT WIN32) list(APPEND tests diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 2e157f9c..5ad3077d 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -67,6 +67,32 @@ Default value:: 100 Applicable socket types:: all, only for connection-oriented transports. +ZMQ_CONNECT_RID: Assign the next outbound connection id +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_CONNECT_RID' option sets the peer id of the next host connected +via the zmq_connect() call, and immediately readies that connection for +data transfer with the named id. This option applies only to the first +subsequent call to zmq_connect(), calls thereafter use default connection +behavior. + +Typical use is to set this socket option on each zmq_connect() attempt +to a new host. Each connection should be assigned a unique name. Duplicated +names will trigger default connection behavior. + +Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it +allows for immediate sending to peers. Outbound id framing requirements +for ROUTER and STREAM sockets apply. + +The peer id should be from 1 to 255 bytes long and MAY NOT start with +binary zero. + +[horizontal] +Option value type:: binary data +Option value unit:: N/A +Default value:: NULL +Applicable socket types:: ZMQ_ROUTER, ZMQ_STREAM + + ZMQ_CONFLATE: Keep only last message ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If set, a socket shall keep only one message in its inbound/outbound diff --git a/src/router.cpp b/src/router.cpp index 3b7f1ac9..4a9614b5 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -387,13 +387,15 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) msg_t msg; blob_t identity; bool ok; - bool connect_rid_used = false; if (connect_rid.length()) { identity = blob_t ((unsigned char*) connect_rid.c_str (), connect_rid.length()); connect_rid.clear (); - connect_rid_used = true; + outpipes_t::iterator it = outpipes.find (identity); + if (it != outpipes.end ()) { + return false; // duplicate connection + } } else if (options.raw_sock) { // Always assign identity for raw-socket @@ -402,6 +404,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) put_uint32 (buf + 1, next_rid++); identity = blob_t (buf, sizeof buf); } + else if (!options.raw_sock) { // Pick up handshake cases and also case where next identity is set msg.init (); @@ -409,9 +412,6 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) if (!ok) return false; - if (connect_rid_used) // we read but do not use identity from peer - msg.close(); - else if (msg.size () == 0) { // Fall back on the auto-generation unsigned char buf [5]; diff --git a/src/stream.cpp b/src/stream.cpp index ea48bb19..2e0ecba5 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -266,10 +266,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) identity = blob_t ((unsigned char*) connect_rid.c_str(), connect_rid.length ()); connect_rid.clear (); + outpipes_t::iterator it = outpipes.find (identity); + if (it != outpipes.end ()) + goto d; } else { - put_uint32 (buffer + 1, next_rid++); - blob_t identity = blob_t (buffer, sizeof buffer); +d: put_uint32 (buffer + 1, next_rid++); + identity = blob_t (buffer, sizeof buffer); memcpy (options.identity, identity.data (), identity.size ()); options.identity_size = identity.size (); }