diff --git a/src/command.hpp b/src/command.hpp index 83783695..b70b417c 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -47,6 +47,7 @@ namespace zmq own, attach, bind, + detach, activate_read, activate_write, hiccup, @@ -87,6 +88,11 @@ namespace zmq struct { zmq::pipe_t *pipe; } bind; + + // Sent from session to socket to disconnect a pipe + struct { + zmq::pipe_t *pipe; + } detach; // Sent by pipe writer to inform dormant pipe reader that there // are messages in the pipe. diff --git a/src/object.cpp b/src/object.cpp index 80784d7a..37db6f0c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -92,6 +92,11 @@ void zmq::object_t::process_command (command_t &cmd_) process_seqnum (); break; + case command_t::detach: + process_detach (cmd_.args.detach.pipe); + process_seqnum (); + break; + case command_t::hiccup: process_hiccup (cmd_.args.hiccup.pipe); break; @@ -211,6 +216,15 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, send_command (cmd); } +void zmq::object_t::send_detach (own_t *destination_, pipe_t *pipe_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::detach; + cmd.args.detach.pipe = pipe_; + send_command (cmd); +} + void zmq::object_t::send_activate_read (pipe_t *destination_) { command_t cmd; @@ -331,6 +345,11 @@ void zmq::object_t::process_bind (pipe_t *pipe_) zmq_assert (false); } +void zmq::object_t::process_detach (pipe_t *pipe_) +{ + zmq_assert (false); +} + void zmq::object_t::process_activate_read () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 932cea7d..e088c14b 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -78,6 +78,7 @@ namespace zmq zmq::i_engine *engine_, bool inc_seqnum_ = true); void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true); + void send_detach (own_t *destination_, pipe_t *pipe_); void send_activate_read (zmq::pipe_t *destination_); void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); @@ -99,6 +100,7 @@ namespace zmq virtual void process_own (zmq::own_t *object_); virtual void process_attach (zmq::i_engine *engine_); virtual void process_bind (zmq::pipe_t *pipe_); + virtual void process_detach (zmq::pipe_t *pipe_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); diff --git a/src/session_base.cpp b/src/session_base.cpp index b77c2452..aa1fc8b9 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -228,7 +228,7 @@ void zmq::session_base_t::clean_pipes () } void zmq::session_base_t::terminated (pipe_t *pipe_) -{ +{ // Drop the reference to the deallocated pipe. zmq_assert (pipe == pipe_); pipe = NULL; @@ -306,9 +306,15 @@ void zmq::session_base_t::process_attach (i_engine *engine_) zmq_assert (!pipe); pipe = pipes [0]; + // Remember the remote end of the pipe if required + if (options.delay_attach_on_connect == 1) + outpipe = pipes [1]; + // Ask socket to plug into the pipe. send_bind (socket, pipes [1]); } + else if (outpipe && (options.delay_attach_on_connect == 1)) + send_bind (socket, outpipe); // Plug in the engine. zmq_assert (!engine); @@ -405,6 +411,11 @@ void zmq::session_base_t::detached () // the socket object to resend all the subscriptions. if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) pipe->hiccup (); + + // For delayed connect situations, hiccup the socket to have it + // pause usage of this pipe + if (outpipe && options.delay_attach_on_connect == 1) + send_detach(socket, outpipe); } void zmq::session_base_t::start_connecting (bool wait_) diff --git a/src/session_base.hpp b/src/session_base.hpp index a9f0496f..488b34f8 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -52,9 +52,6 @@ namespace zmq // To be used once only, when creating the session. void attach_pipe (zmq::pipe_t *pipe_); - - // To be used once only, for delayed connection - void onconnect_attach_pipe (pipe_t *pipe_); // Following functions are the interface exposed towards the engine. virtual int read (msg_t *msg_); @@ -106,7 +103,10 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; - + + // Socket end of pipe, in case of reconnection + zmq::pipe_t *outpipe; + // This flag is true if the remainder of the message being processed // is still in the in pipe. bool incomplete_in; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6cec33f2..45cfc345 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -529,13 +529,13 @@ int zmq::socket_base_t::connect (const char *addr_) session_base_t *session = session_base_t::create (io_thread, true, this, options, paddr); errno_assert (session); - + // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. bool icanhasall = false; if (protocol == "pgm" || protocol == "epgm") icanhasall = true; - + if (options.delay_attach_on_connect != 1 && icanhasall != true) { // Create a bi-directional pipe. object_t *parents [2] = {this, session}; @@ -547,7 +547,7 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach local end of the pipe to the socket object. attach_pipe (pipes [0], icanhasall); - + // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); } @@ -876,6 +876,17 @@ void zmq::socket_base_t::process_destroy () destroyed = true; } +void zmq::socket_base_t::process_detach (pipe_t *pipe_) +{ + // If we are blocking connecting threads, drop this one + if (options.delay_attach_on_connect == 1) { + zmq_assert (pipe_); + pipes.erase (pipe_); + // Let derived sockets know we're ditching this pipe + xterminated (pipe_); + } +} + int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index ab630e35..ced475eb 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -185,6 +185,9 @@ namespace zmq void process_bind (zmq::pipe_t *pipe_); void process_term (int linger_); + // Allow blocking reconnecting pipes + void process_detach (pipe_t *pipe_); + // Socket's mailbox object. mailbox_t mailbox; diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index 2446d2c9..52f05a75 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -36,25 +36,33 @@ int main (int argc, char *argv []) int seen = 0; void *context = zmq_ctx_new(); + assert (context); void *to = zmq_socket(context, ZMQ_PULL); + assert (to); + val = 0; - zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); - zmq_bind(to, "tcp://*:5555"); + rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + rc = zmq_bind(to, "tcp://*:5555"); + assert (rc == 0); // Create a socket pushing to two endpoints - only 1 message should arrive. void *from = zmq_socket (context, ZMQ_PUSH); + assert(from); + val = 0; - zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val)); - rc = zmq_connect(from, "tcp://localhost:5556"); + zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); + rc = zmq_connect (from, "tcp://localhost:5556"); assert (rc == 0); - rc = zmq_connect(from, "tcp://localhost:5555"); + rc = zmq_connect (from, "tcp://localhost:5555"); assert (rc == 0); for (int i = 0; i < 10; ++i) { std::string message("message "); message += ('0' + i); - zmq_send(from, message.data(), message.size(), 0); + rc = zmq_send (from, message.data(), message.size(), 0); + assert(rc >= 0); } sleep(1); @@ -62,7 +70,7 @@ int main (int argc, char *argv []) for (int i = 0; i < 10; ++i) { memset(&buffer, 0, sizeof(buffer)); - rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); if( rc == -1) break; seen++; @@ -81,27 +89,39 @@ int main (int argc, char *argv []) context = zmq_ctx_new(); std::cout << " Rerunning with DELAY_ATTACH_ON_CONNECT\n"; - to = zmq_socket(context, ZMQ_PULL); - zmq_bind(to, "tcp://*:5560"); + to = zmq_socket (context, ZMQ_PULL); + assert (to); + rc = zmq_bind (to, "tcp://*:5560"); + assert(rc == 0); + val = 0; - zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); + rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); // Create a socket pushing to two endpoints - all messages should arrive. from = zmq_socket (context, ZMQ_PUSH); + assert (from); + val = 0; - zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val)); - val = 1; - zmq_setsockopt(from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); - rc = zmq_connect(from, "tcp://localhost:5561"); + rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); assert (rc == 0); - rc = zmq_connect(from, "tcp://localhost:5560"); + + val = 1; + rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_connect (from, "tcp://localhost:5561"); + assert (rc == 0); + + rc = zmq_connect (from, "tcp://localhost:5560"); assert (rc == 0); for (int i = 0; i < 10; ++i) { std::string message("message "); message += ('0' + i); - zmq_send(from, message.data(), message.size(), 0); + rc = zmq_send (from, message.data(), message.size(), 0); + assert (rc >= 0); } sleep(1); @@ -110,13 +130,9 @@ int main (int argc, char *argv []) for (int i = 0; i < 10; ++i) { memset(&buffer, 0, sizeof(buffer)); - rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); - if( rc == -1) { - break; - } - seen++; + rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + assert (rc != -1); } - assert (seen == 10); rc = zmq_close (from); assert (rc == 0);