diff --git a/src/client.cpp b/src/client.cpp index 9ec1a5f9..1c2b099f 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -43,9 +43,12 @@ zmq::client_t::~client_t () { } -void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::client_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); diff --git a/src/client.hpp b/src/client.hpp index 162e36dd..a4e85774 100644 --- a/src/client.hpp +++ b/src/client.hpp @@ -49,7 +49,9 @@ class client_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); diff --git a/src/dealer.cpp b/src/dealer.cpp index ee9dbf49..3f99b1bd 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -44,9 +44,12 @@ zmq::dealer_t::~dealer_t () { } -void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initated_); zmq_assert (pipe_); diff --git a/src/dealer.hpp b/src/dealer.hpp index 77239898..d779f6e1 100644 --- a/src/dealer.hpp +++ b/src/dealer.hpp @@ -51,7 +51,9 @@ class dealer_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); diff --git a/src/dgram.cpp b/src/dgram.cpp index cbed5144..e0b19445 100644 --- a/src/dgram.cpp +++ b/src/dgram.cpp @@ -51,9 +51,12 @@ zmq::dgram_t::~dgram_t () zmq_assert (!_pipe); } -void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); diff --git a/src/dgram.hpp b/src/dgram.hpp index bc99cd4a..f3a137f5 100644 --- a/src/dgram.hpp +++ b/src/dgram.hpp @@ -48,7 +48,9 @@ class dgram_t : public socket_base_t ~dgram_t (); // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); diff --git a/src/dish.cpp b/src/dish.cpp index e690abb7..72611dff 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -54,9 +54,12 @@ zmq::dish_t::~dish_t () errno_assert (rc == 0); } -void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::dish_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); _fq.attach (pipe_); diff --git a/src/dish.hpp b/src/dish.hpp index bb966e84..caf1d079 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -52,7 +52,9 @@ class dish_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); bool xhas_out (); int xrecv (zmq::msg_t *msg_); diff --git a/src/gather.cpp b/src/gather.cpp index aec39359..263a5059 100644 --- a/src/gather.cpp +++ b/src/gather.cpp @@ -44,9 +44,12 @@ zmq::gather_t::~gather_t () { } -void zmq::gather_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::gather_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); _fq.attach (pipe_); diff --git a/src/gather.hpp b/src/gather.hpp index 2c3c8033..06e3bb7e 100644 --- a/src/gather.hpp +++ b/src/gather.hpp @@ -47,7 +47,9 @@ class gather_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); const blob_t &get_credential () const; diff --git a/src/pair.cpp b/src/pair.cpp index 25d4c235..fc404837 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -47,9 +47,12 @@ zmq::pair_t::~pair_t () zmq_assert (!_pipe); } -void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_ != NULL); diff --git a/src/pair.hpp b/src/pair.hpp index 63625b3b..918787e4 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -48,7 +48,9 @@ class pair_t : public socket_base_t ~pair_t (); // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); diff --git a/src/pub.cpp b/src/pub.cpp index 303da068..fe42982e 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -43,7 +43,9 @@ zmq::pub_t::~pub_t () { } -void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::pub_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { zmq_assert (pipe_); @@ -51,7 +53,7 @@ void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) // to receive the delimiter. pipe_->set_nodelay (); - xpub_t::xattach_pipe (pipe_, subscribe_to_all_); + xpub_t::xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_); } int zmq::pub_t::xrecv (class msg_t *) diff --git a/src/pub.hpp b/src/pub.hpp index 8ca28896..fe31b83d 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -46,7 +46,9 @@ class pub_t : public xpub_t ~pub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_ = false, + bool locally_initiated_ = false); int xrecv (zmq::msg_t *msg_); bool xhas_in (); diff --git a/src/pull.cpp b/src/pull.cpp index d25249fb..d48f1786 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -44,9 +44,12 @@ zmq::pull_t::~pull_t () { } -void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::pull_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); _fq.attach (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index 992fa437..b9cba7f5 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -49,7 +49,9 @@ class pull_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); const blob_t &get_credential () const; diff --git a/src/push.cpp b/src/push.cpp index 6062a631..42184807 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -44,9 +44,12 @@ zmq::push_t::~push_t () { } -void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::push_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); // Don't delay pipe termination as there is no one // to receive the delimiter. diff --git a/src/push.hpp b/src/push.hpp index 2dd6401d..0dfa878b 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -49,7 +49,9 @@ class push_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); bool xhas_out (); void xwrite_activated (zmq::pipe_t *pipe_); diff --git a/src/radio.cpp b/src/radio.cpp index 703eb619..e0a78b62 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -47,9 +47,12 @@ zmq::radio_t::~radio_t () { } -void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::radio_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); diff --git a/src/radio.hpp b/src/radio.hpp index 9679c9c6..16d7488d 100644 --- a/src/radio.hpp +++ b/src/radio.hpp @@ -52,7 +52,9 @@ class radio_t : public socket_base_t ~radio_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_ = false, + bool locally_initiated_ = false); int xsend (zmq::msg_t *msg_); bool xhas_out (); int xrecv (zmq::msg_t *msg_); diff --git a/src/router.cpp b/src/router.cpp index ab9dcb7b..627a989e 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -67,7 +67,9 @@ zmq::router_t::~router_t () _prefetched_msg.close (); } -void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::router_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); @@ -88,7 +90,7 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) errno_assert (rc == 0); } - bool routing_id_ok = identify_peer (pipe_); + bool routing_id_ok = identify_peer (pipe_, locally_initiated_); if (routing_id_ok) _fq.attach (pipe_); else @@ -166,7 +168,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) if (it == _anonymous_pipes.end ()) _fq.activated (pipe_); else { - bool routing_id_ok = identify_peer (pipe_); + bool routing_id_ok = identify_peer (pipe_, false); if (routing_id_ok) { _anonymous_pipes.erase (it); _fq.attach (pipe_); @@ -440,13 +442,13 @@ int zmq::router_t::get_peer_state (const void *routing_id_, return res; } -bool zmq::router_t::identify_peer (pipe_t *pipe_) +bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) { msg_t msg; blob_t routing_id; - const std::string connect_routing_id = extract_connect_routing_id (); - if (!connect_routing_id.empty ()) { + if (locally_initiated_ && connect_routing_id_is_set ()) { + const std::string connect_routing_id = extract_connect_routing_id (); routing_id.set ( reinterpret_cast (connect_routing_id.c_str ()), connect_routing_id.length ()); diff --git a/src/router.hpp b/src/router.hpp index 370f4dcc..fa94e2f7 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -52,7 +52,9 @@ class router_t : public routing_socket_base_t ~router_t (); // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); @@ -69,7 +71,7 @@ class router_t : public routing_socket_base_t private: // Receive peer id and update lookup map - bool identify_peer (pipe_t *pipe_); + bool identify_peer (pipe_t *pipe_, bool locally_initiated); // Fair queueing object for inbound pipes. fq_t _fq; diff --git a/src/scatter.cpp b/src/scatter.cpp index cb9507a5..b951b92e 100644 --- a/src/scatter.cpp +++ b/src/scatter.cpp @@ -44,9 +44,12 @@ zmq::scatter_t::~scatter_t () { } -void zmq::scatter_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::scatter_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); // Don't delay pipe termination as there is no one // to receive the delimiter. diff --git a/src/scatter.hpp b/src/scatter.hpp index 10b57ed0..78659d48 100644 --- a/src/scatter.hpp +++ b/src/scatter.hpp @@ -49,7 +49,9 @@ class scatter_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); bool xhas_out (); void xwrite_activated (zmq::pipe_t *pipe_); diff --git a/src/server.cpp b/src/server.cpp index 9da4b478..b8c32b9b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -48,9 +48,12 @@ zmq::server_t::~server_t () zmq_assert (_out_pipes.empty ()); } -void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::server_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); diff --git a/src/server.hpp b/src/server.hpp index f77677ed..617c8aff 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -52,7 +52,9 @@ class server_t : public socket_base_t ~server_t (); // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index bf933213..fcead300 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -341,14 +341,16 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); _pipes.push_back (pipe_); // Let the derived socket type know about new pipe. - xattach_pipe (pipe_, subscribe_to_all_); + xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_); // If the socket is already being closed, ask any new pipes to terminate // straight away. @@ -553,7 +555,7 @@ int zmq::socket_base_t::bind (const char *addr_) errno_assert (rc == 0); // Attach local end of the pipe to the socket object. - attach_pipe (new_pipes[0], true); + attach_pipe (new_pipes[0], true, true); newpipe = new_pipes[0]; // Attach remote end of the pipe to the session object later on. @@ -773,7 +775,7 @@ int zmq::socket_base_t::connect (const char *addr_) } // Attach local end of the pipe to this socket object. - attach_pipe (new_pipes[0]); + attach_pipe (new_pipes[0], false, true); // Save last endpoint URI _last_endpoint.assign (addr_); @@ -959,7 +961,7 @@ int zmq::socket_base_t::connect (const char *addr_) errno_assert (rc == 0); // Attach local end of the pipe to the socket object. - attach_pipe (new_pipes[0], subscribe_to_all); + attach_pipe (new_pipes[0], subscribe_to_all, true); newpipe = new_pipes[0]; // Attach remote end of the pipe to the session object later on. @@ -1809,6 +1811,11 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id () return res; } +bool zmq::routing_socket_base_t::connect_routing_id_is_set () +{ + return !_connect_routing_id.empty (); +} + void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_, pipe_t *pipe_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index e3b6918d..a34a8ac5 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -150,7 +150,8 @@ class socket_base_t : public own_t, // Concrete algorithms for the x- methods are to be defined by // individual socket types. virtual void xattach_pipe (zmq::pipe_t *pipe_, - bool subscribe_to_all_ = false) = 0; + bool subscribe_to_all_ = false, + bool locally_initiated_ = false) = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, override this @@ -234,7 +235,9 @@ class socket_base_t : public own_t, int check_protocol (const std::string &protocol_); // Register the pipe with this socket. - void attach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); + void attach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_ = false, + bool locally_initiated_ = false); // Processes commands sent to this socket (if any). If timeout is -1, // returns only after at least one command was processed. @@ -311,6 +314,7 @@ class routing_socket_base_t : public socket_base_t // own methods std::string extract_connect_routing_id (); + bool connect_routing_id_is_set (); struct out_pipe_t { diff --git a/src/stream.cpp b/src/stream.cpp index 2e888642..da6768ee 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -57,13 +57,15 @@ zmq::stream_t::~stream_t () _prefetched_msg.close (); } -void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::stream_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); zmq_assert (pipe_); - identify_peer (pipe_); + identify_peer (pipe_, locally_initiated_); _fq.attach (pipe_); } @@ -264,14 +266,14 @@ bool zmq::stream_t::xhas_out () return true; } -void zmq::stream_t::identify_peer (pipe_t *pipe_) +void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) { // Always assign routing id for raw-socket unsigned char buffer[5]; buffer[0] = 0; blob_t routing_id; - const std::string connect_routing_id = extract_connect_routing_id (); - if (!connect_routing_id.empty ()) { + if (locally_initiated_ && connect_routing_id_is_set ()) { + const std::string connect_routing_id = extract_connect_routing_id (); routing_id.set ( reinterpret_cast (connect_routing_id.c_str ()), connect_routing_id.length ()); diff --git a/src/stream.hpp b/src/stream.hpp index 46829764..7fa30625 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -46,7 +46,9 @@ class stream_t : public routing_socket_base_t ~stream_t (); // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_); bool xhas_in (); @@ -57,7 +59,7 @@ class stream_t : public routing_socket_base_t private: // Generate peer's id and update lookup map - void identify_peer (pipe_t *pipe_); + void identify_peer (pipe_t *pipe_, bool locally_initiated_); // Fair queueing object for inbound pipes. fq_t _fq; diff --git a/src/xpub.cpp b/src/xpub.cpp index c63939ff..2652c8c0 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -57,8 +57,12 @@ zmq::xpub_t::~xpub_t () _welcome_msg.close (); } -void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { + LIBZMQ_UNUSED (locally_initiated_); + zmq_assert (pipe_); _dist.attach (pipe_); diff --git a/src/xpub.hpp b/src/xpub.hpp index 757c700b..d8032249 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -51,7 +51,9 @@ class xpub_t : public socket_base_t ~xpub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_ = false, + bool locally_initiated_ = false); int xsend (zmq::msg_t *msg_); bool xhas_out (); int xrecv (zmq::msg_t *msg_); diff --git a/src/xsub.cpp b/src/xsub.cpp index a9ff5f81..8f5f7232 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -55,9 +55,12 @@ zmq::xsub_t::~xsub_t () errno_assert (rc == 0); } -void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) { LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); zmq_assert (pipe_); _fq.attach (pipe_); diff --git a/src/xsub.hpp b/src/xsub.hpp index cc50224d..ff3dd64f 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -50,7 +50,9 @@ class xsub_t : public socket_base_t protected: // Overrides of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); int xsend (zmq::msg_t *msg_); bool xhas_out (); int xrecv (zmq::msg_t *msg_); diff --git a/tests/test_connect_rid.cpp b/tests/test_connect_rid.cpp index c7601d32..1f4cecdf 100644 --- a/tests/test_connect_rid.cpp +++ b/tests/test_connect_rid.cpp @@ -189,6 +189,106 @@ void test_router_2_router (bool named_) zmq_ctx_destroy (ctx); } +void test_router_2_router_while_receiving () +{ + void *xbind, *zbind, *yconn; + int ret; + char buff[256]; + char msg[] = "hi 1"; + const char *wildcard_bind = "tcp://127.0.0.1:*"; + int zero = 0; + size_t len = MAX_SOCKET_STRING; + char x_endpoint[MAX_SOCKET_STRING]; + char z_endpoint[MAX_SOCKET_STRING]; + void *ctx = zmq_ctx_new (); + + // Create xbind socket. + xbind = zmq_socket (ctx, ZMQ_ROUTER); + assert (xbind); + ret = zmq_setsockopt (xbind, ZMQ_LINGER, &zero, sizeof (zero)); + assert (0 == ret); + ret = zmq_bind (xbind, wildcard_bind); + assert (0 == ret); + ret = zmq_getsockopt (xbind, ZMQ_LAST_ENDPOINT, x_endpoint, &len); + assert (0 == ret); + + // Create zbind socket. + zbind = zmq_socket (ctx, ZMQ_ROUTER); + assert (zbind); + ret = zmq_setsockopt (zbind, ZMQ_LINGER, &zero, sizeof (zero)); + assert (0 == ret); + ret = zmq_bind (zbind, wildcard_bind); + assert (0 == ret); + ret = zmq_getsockopt (zbind, ZMQ_LAST_ENDPOINT, z_endpoint, &len); + assert (0 == ret); + + // Create connection socket. + yconn = zmq_socket (ctx, ZMQ_ROUTER); + assert (yconn); + ret = zmq_setsockopt (yconn, ZMQ_LINGER, &zero, sizeof (zero)); + assert (0 == ret); + + // set identites for each socket + ret = zmq_setsockopt (xbind, ZMQ_ROUTING_ID, "X", 2); + ret = zmq_setsockopt (yconn, ZMQ_ROUTING_ID, "Y", 2); + ret = zmq_setsockopt (zbind, ZMQ_ROUTING_ID, "Z", 2); + + // Connect Y to X using a routing id + ret = zmq_setsockopt (yconn, ZMQ_CONNECT_ROUTING_ID, "X", 2); + assert (0 == ret); + ret = zmq_connect (yconn, x_endpoint); + assert (0 == ret); + + // Send some data from Y to X. + ret = zmq_send (yconn, "X", 2, ZMQ_SNDMORE); + assert (2 == ret); + ret = zmq_send (yconn, msg, 5, 0); + assert (5 == ret); + + // wait for the Y->X message to be received + msleep (SETTLE_TIME); + + // Now X tries to connect to Z and send a message + ret = zmq_setsockopt (xbind, ZMQ_CONNECT_ROUTING_ID, "Z", 2); + assert (0 == ret); + ret = zmq_connect (xbind, z_endpoint); + assert (0 == ret); + + // Try to send some data from X to Z. + ret = zmq_send (xbind, "Z", 2, ZMQ_SNDMORE); + assert (2 == ret); + ret = zmq_send (xbind, msg, 5, 0); + assert (5 == ret); + + // wait for the X->Z message to be received (so that our non-blocking check will actually + // fail if the message is routed to Y) + msleep (SETTLE_TIME); + + // nothing should have been received on the Y socket + ret = zmq_recv (yconn, buff, 256, ZMQ_DONTWAIT); + assert (ret == -1); + assert (zmq_errno () == EAGAIN); + + // the message should have been received on the Z socket + ret = zmq_recv (zbind, buff, 256, 0); + assert (ret && 'X' == buff[0]); + ret = zmq_recv (zbind, buff + 128, 128, 0); + assert (5 == ret && 'h' == buff[128]); + + ret = zmq_unbind (xbind, x_endpoint); + assert (0 == ret); + ret = zmq_unbind (zbind, z_endpoint); + assert (0 == ret); + ret = zmq_close (yconn); + assert (0 == ret); + ret = zmq_close (xbind); + assert (0 == ret); + ret = zmq_close (zbind); + assert (0 == ret); + + zmq_ctx_destroy (ctx); +} + int main (void) { setup_test_environment (); @@ -196,6 +296,7 @@ int main (void) test_stream_2_stream (); test_router_2_router (false); test_router_2_router (true); + test_router_2_router_while_receiving (); return 0; }