From 50bd28c037ad85af638a97bc869725dbff51c8c0 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Sun, 19 Jan 2014 09:27:57 +0100 Subject: [PATCH] Cleaned up option to force identity on outgoing connection - renamed to ZMQ_CONNECT_RID - fixed whitespace malformating around previous patch - renamamed next_peer_id to next_rid in preparation for larger rename of IDENTITY to ROUTING_ID Note: ZMQ_CONNECT_RID has no test case and no entry in the man page, as yet. --- include/zmq.h | 3 ++- src/router.cpp | 45 ++++++++++++++++++++++++--------------------- src/router.hpp | 4 ++-- src/socket_base.hpp | 4 +++- src/stream.cpp | 20 +++++++++++--------- src/stream.hpp | 4 ++-- 6 files changed, 44 insertions(+), 36 deletions(-) diff --git a/include/zmq.h b/include/zmq.h index 3556e5d3..524d6aa2 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -293,7 +293,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_IPC_FILTER_PID 58 #define ZMQ_IPC_FILTER_UID 59 #define ZMQ_IPC_FILTER_GID 60 -#define ZMQ_NEXT_CONNECT_PEER_ID 61 +#define ZMQ_CONNECT_RID 61 + /* Message options */ #define ZMQ_MORE 1 #define ZMQ_SRCFD 2 diff --git a/src/router.cpp b/src/router.cpp index 2e74a3e5..3b7f1ac9 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -31,7 +31,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : more_in (false), current_out (NULL), more_out (false), - next_peer_id (generate_random ()), + next_rid (generate_random ()), mandatory (false), // raw_sock functionality in ROUTER is deprecated raw_sock (false), @@ -88,12 +88,12 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, int value = is_int? *((int *) optval_): 0; switch (option_) { - case ZMQ_NEXT_CONNECT_PEER_ID: - if(optval_ && optvallen_) { - next_identity.assign((char*)optval_,optvallen_); + case ZMQ_CONNECT_RID: + if (optval_ && optvallen_) { + connect_rid.assign ((char *) optval_, optvallen_); return 0; } - break; + break; case ZMQ_ROUTER_RAW: if (is_int && value >= 0) { raw_sock = (value != 0); @@ -387,33 +387,36 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) msg_t msg; blob_t identity; bool ok; - bool next_identity_used = false; + bool connect_rid_used = false; - if (next_identity.length()) { - identity = blob_t((unsigned char*) next_identity.c_str(), - next_identity.length()); - next_identity.clear(); - next_identity_used = true; + if (connect_rid.length()) { + identity = blob_t ((unsigned char*) connect_rid.c_str (), + connect_rid.length()); + connect_rid.clear (); + connect_rid_used = true; } - else if (options.raw_sock) { // Always assign identity for raw-socket - unsigned char buf [5]; + else + if (options.raw_sock) { // Always assign identity for raw-socket + unsigned char buf [5]; buf [0] = 0; - put_uint32 (buf + 1, next_peer_id++); + put_uint32 (buf + 1, next_rid++); identity = blob_t (buf, sizeof buf); } - if (!options.raw_sock){ // pick up handshake cases and also case where next identity is set + if (!options.raw_sock) { + // Pick up handshake cases and also case where next identity is set msg.init (); ok = pipe_->read (&msg); if (!ok) return false; - if (next_identity_used){ // we read but do not use identity from peer - msg.close(); - } - else if (msg.size () == 0) { + + if (connect_rid_used) // we read but do not use identity from peer + msg.close(); + else + if (msg.size () == 0) { // Fall back on the auto-generation unsigned char buf [5]; buf [0] = 0; - put_uint32 (buf + 1, next_peer_id++); + put_uint32 (buf + 1, next_rid++); identity = blob_t (buf, sizeof buf); msg.close (); } @@ -432,7 +435,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) // existing pipe so we can terminate it asynchronously. unsigned char buf [5]; buf [0] = 0; - put_uint32 (buf + 1, next_peer_id++); + put_uint32 (buf + 1, next_rid++); blob_t new_identity = blob_t (buf, sizeof buf); it->second.pipe->set_identity (new_identity); diff --git a/src/router.hpp b/src/router.hpp index a2bdb2ec..f5b1ff6c 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -104,9 +104,9 @@ namespace zmq // If true, more outgoing message parts are expected. bool more_out; - // Peer ID are generated. It's a simple increment and wrap-over + // Routing IDs are generated. It's a simple increment and wrap-over // algorithm. This value is the next ID to use (if not used already). - uint32_t next_peer_id; + uint32_t next_rid; // If true, report EAGAIN to the caller instead of silently dropping // the message targeting an unknown peer. diff --git a/src/socket_base.hpp b/src/socket_base.hpp index a6880ae9..efb0cced 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -164,8 +164,10 @@ namespace zmq // Monitor socket cleanup void stop_monitor (); + // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types - std::string next_identity; + std::string connect_rid; + private: // Creates new endpoint ID and adds the endpoint to the map. void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe); diff --git a/src/stream.cpp b/src/stream.cpp index 3826eb76..ea48bb19 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -30,7 +30,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : identity_sent (false), current_out (NULL), more_out (false), - next_peer_id (generate_random ()) + next_rid (generate_random ()) { options.type = ZMQ_STREAM; options.raw_sock = true; @@ -163,13 +163,14 @@ int zmq::stream_t::xsend (msg_t *msg_) return 0; } + int zmq::stream_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { switch (option_) { - case ZMQ_NEXT_CONNECT_PEER_ID: - if(optval_ && optvallen_) { - next_identity.assign((char*)optval_,optvallen_); + case ZMQ_CONNECT_RID: + if (optval_ && optvallen_) { + connect_rid.assign ((char*) optval_, optvallen_); return 0; } break; @@ -179,6 +180,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } + int zmq::stream_t::xrecv (msg_t *msg_) { if (prefetched) { @@ -260,13 +262,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) unsigned char buffer [5]; buffer [0] = 0; blob_t identity; - if (next_identity.length()) { - identity = blob_t((unsigned char*) next_identity.c_str(), - next_identity.length()); - next_identity.clear(); + if (connect_rid.length ()) { + identity = blob_t ((unsigned char*) connect_rid.c_str(), + connect_rid.length ()); + connect_rid.clear (); } else { - put_uint32 (buffer + 1, next_peer_id++); + put_uint32 (buffer + 1, next_rid++); blob_t identity = blob_t (buffer, sizeof buffer); memcpy (options.identity, identity.data (), identity.size ()); options.identity_size = identity.size (); diff --git a/src/stream.hpp b/src/stream.hpp index 8734230f..5ea23754 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -84,9 +84,9 @@ namespace zmq // If true, more outgoing message parts are expected. bool more_out; - // Peer ID are generated. It's a simple increment and wrap-over + // Routing IDs are generated. It's a simple increment and wrap-over // algorithm. This value is the next ID to use (if not used already). - uint32_t next_peer_id; + uint32_t next_rid; stream_t (const stream_t&); const stream_t &operator = (const stream_t&);