On the advice of Martin Hurton, removed the new command type and just terminated the pipe in a reconnect situation, and notified the socket of the same. This handles the blocking properly, but at the cost of potentially losing in flight messages. However, this is a reasonable trade off given how much simpler it makes the patch.

This commit is contained in:
Ian Barber 2012-06-04 10:27:16 +01:00
parent 1566091bc6
commit c13f1d52ff
6 changed files with 8 additions and 46 deletions

View File

@ -47,7 +47,6 @@ namespace zmq
own, own,
attach, attach,
bind, bind,
detach,
activate_read, activate_read,
activate_write, activate_write,
hiccup, hiccup,
@ -89,11 +88,6 @@ namespace zmq
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
} bind; } bind;
// Sent from session to socket to disconnect a pipe
struct {
zmq::pipe_t *pipe;
} detach;
// Sent by pipe writer to inform dormant pipe reader that there // Sent by pipe writer to inform dormant pipe reader that there
// are messages in the pipe. // are messages in the pipe.
struct { struct {

View File

@ -92,11 +92,6 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum (); process_seqnum ();
break; break;
case command_t::detach:
process_detach (cmd_.args.detach.pipe);
process_seqnum ();
break;
case command_t::hiccup: case command_t::hiccup:
process_hiccup (cmd_.args.hiccup.pipe); process_hiccup (cmd_.args.hiccup.pipe);
break; break;
@ -216,15 +211,6 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_detach (own_t *destination_, pipe_t *pipe_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::detach;
cmd.args.detach.pipe = pipe_;
send_command (cmd);
}
void zmq::object_t::send_activate_read (pipe_t *destination_) void zmq::object_t::send_activate_read (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
@ -345,11 +331,6 @@ void zmq::object_t::process_bind (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_detach (pipe_t *pipe_)
{
zmq_assert (false);
}
void zmq::object_t::process_activate_read () void zmq::object_t::process_activate_read ()
{ {
zmq_assert (false); zmq_assert (false);

View File

@ -78,7 +78,6 @@ namespace zmq
zmq::i_engine *engine_, bool inc_seqnum_ = true); zmq::i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_detach (own_t *destination_, pipe_t *pipe_);
void send_activate_read (zmq::pipe_t *destination_); void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (zmq::pipe_t *destination_, void send_activate_write (zmq::pipe_t *destination_,
uint64_t msgs_read_); uint64_t msgs_read_);
@ -100,7 +99,6 @@ namespace zmq
virtual void process_own (zmq::own_t *object_); virtual void process_own (zmq::own_t *object_);
virtual void process_attach (zmq::i_engine *engine_); virtual void process_attach (zmq::i_engine *engine_);
virtual void process_bind (zmq::pipe_t *pipe_); virtual void process_bind (zmq::pipe_t *pipe_);
virtual void process_detach (zmq::pipe_t *pipe_);
virtual void process_activate_read (); virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_); virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_); virtual void process_hiccup (void *pipe_);

View File

@ -412,10 +412,13 @@ void zmq::session_base_t::detached ()
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup (); pipe->hiccup ();
// For delayed connect situations, hiccup the socket to have it // For delayed connect situations, terminate the pipe
// pause usage of this pipe // and reestablish later on
if (outpipe && options.delay_attach_on_connect == 1) if (pipe && options.delay_attach_on_connect == 1) {
send_detach(socket, outpipe); pipe->terminate (false);
socket->terminated (pipe);
pipe = NULL;
}
} }
void zmq::session_base_t::start_connecting (bool wait_) void zmq::session_base_t::start_connecting (bool wait_)

View File

@ -876,17 +876,6 @@ void zmq::socket_base_t::process_destroy ()
destroyed = true; destroyed = true;
} }
void zmq::socket_base_t::process_detach (pipe_t *pipe_)
{
// If we are blocking connecting threads, drop this one
if (options.delay_attach_on_connect == 1) {
zmq_assert (pipe_);
pipes.erase (pipe_);
// Let derived sockets know we're ditching this pipe
xterminated (pipe_);
}
}
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {

View File

@ -185,9 +185,6 @@ namespace zmq
void process_bind (zmq::pipe_t *pipe_); void process_bind (zmq::pipe_t *pipe_);
void process_term (int linger_); void process_term (int linger_);
// Allow blocking reconnecting pipes
void process_detach (pipe_t *pipe_);
// Socket's mailbox object. // Socket's mailbox object.
mailbox_t mailbox; mailbox_t mailbox;