From 2f36f65f8ff0b1064f708525e1abd61932b744f3 Mon Sep 17 00:00:00 2001 From: Tim M Date: Sun, 19 Jan 2014 15:05:20 -0800 Subject: [PATCH 1/3] Fixed duplicate variable declaration. --- src/stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream.cpp b/src/stream.cpp index ea48bb19..026a5d8c 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -269,7 +269,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) } else { put_uint32 (buffer + 1, next_rid++); - blob_t identity = blob_t (buffer, sizeof buffer); + identity = blob_t (buffer, sizeof buffer); memcpy (options.identity, identity.data (), identity.size ()); options.identity_size = identity.size (); } From 1d9b76c860b3575bbe956cce461e6c001f497e2f Mon Sep 17 00:00:00 2001 From: Tim M Date: Sun, 19 Jan 2014 15:59:43 -0800 Subject: [PATCH 2/3] Added test for ZMQ_CONNECT_RID --- CMakeLists.txt | 1 + src/router.cpp | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 27c2050c..5e47a998 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -622,6 +622,7 @@ set(tests test_timeo test_many_sockets test_diffserv + test_connect_rid ) if(NOT WIN32) list(APPEND tests diff --git a/src/router.cpp b/src/router.cpp index 3b7f1ac9..a2c17e5a 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -402,6 +402,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) put_uint32 (buf + 1, next_rid++); identity = blob_t (buf, sizeof buf); } + else if (!options.raw_sock) { // Pick up handshake cases and also case where next identity is set msg.init (); @@ -409,9 +410,6 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) if (!ok) return false; - if (connect_rid_used) // we read but do not use identity from peer - msg.close(); - else if (msg.size () == 0) { // Fall back on the auto-generation unsigned char buf [5]; From 3fbc10eba7393b0bfac87d0c700a14162fefc895 Mon Sep 17 00:00:00 2001 From: Tim M Date: Sun, 19 Jan 2014 17:28:13 -0800 Subject: [PATCH 3/3] Updated man entries, and added behavior in case the user duplicates peer ids. In this case the socket reverts to default behavior. --- doc/zmq_setsockopt.txt | 26 ++++++++++++++++++++++++++ src/router.cpp | 6 ++++-- src/stream.cpp | 5 ++++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 2e157f9c..5ad3077d 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -67,6 +67,32 @@ Default value:: 100 Applicable socket types:: all, only for connection-oriented transports. +ZMQ_CONNECT_RID: Assign the next outbound connection id +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_CONNECT_RID' option sets the peer id of the next host connected +via the zmq_connect() call, and immediately readies that connection for +data transfer with the named id. This option applies only to the first +subsequent call to zmq_connect(), calls thereafter use default connection +behavior. + +Typical use is to set this socket option on each zmq_connect() attempt +to a new host. Each connection should be assigned a unique name. Duplicated +names will trigger default connection behavior. + +Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it +allows for immediate sending to peers. Outbound id framing requirements +for ROUTER and STREAM sockets apply. + +The peer id should be from 1 to 255 bytes long and MAY NOT start with +binary zero. + +[horizontal] +Option value type:: binary data +Option value unit:: N/A +Default value:: NULL +Applicable socket types:: ZMQ_ROUTER, ZMQ_STREAM + + ZMQ_CONFLATE: Keep only last message ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If set, a socket shall keep only one message in its inbound/outbound diff --git a/src/router.cpp b/src/router.cpp index a2c17e5a..4a9614b5 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -387,13 +387,15 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) msg_t msg; blob_t identity; bool ok; - bool connect_rid_used = false; if (connect_rid.length()) { identity = blob_t ((unsigned char*) connect_rid.c_str (), connect_rid.length()); connect_rid.clear (); - connect_rid_used = true; + outpipes_t::iterator it = outpipes.find (identity); + if (it != outpipes.end ()) { + return false; // duplicate connection + } } else if (options.raw_sock) { // Always assign identity for raw-socket diff --git a/src/stream.cpp b/src/stream.cpp index 026a5d8c..2e0ecba5 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -266,9 +266,12 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) identity = blob_t ((unsigned char*) connect_rid.c_str(), connect_rid.length ()); connect_rid.clear (); + outpipes_t::iterator it = outpipes.find (identity); + if (it != outpipes.end ()) + goto d; } else { - put_uint32 (buffer + 1, next_rid++); +d: put_uint32 (buffer + 1, next_rid++); identity = blob_t (buffer, sizeof buffer); memcpy (options.identity, identity.data (), identity.size ()); options.identity_size = identity.size ();