mirror of
https://github.com/zeromq/libzmq.git
synced 2025-02-20 22:31:34 +01:00
Allow blocking while connect() is completing
This patch, salvaged from a trainwreck accidental merge earlier, adds a new sockopt, ZMQ_DELAY_ATTACH_ON_CONNECT which prevents a end point being available to push messages to until it has fully connected, making connect work more like bind. This also applies to reconnecting sockets, which may cause message loss of in-queue messages, so it is sensible to use this in conjunction with a low HWM and potentially an alternative acknowledgement path. Notes on most of the individual commits can be found the repository log.
This commit is contained in:
parent
409d5e8fff
commit
e5904e63ce
1
.gitignore
vendored
1
.gitignore
vendored
@ -39,6 +39,7 @@ tests/test_invalid_rep
|
||||
tests/test_msg_flags
|
||||
tests/test_ts_context
|
||||
tests/test_connect_resolve
|
||||
tests/test_connect_delay
|
||||
tests/test_term_endpoint
|
||||
src/platform.hpp*
|
||||
src/stamp-h1
|
||||
|
@ -342,6 +342,21 @@ Default value:: 1 (true)
|
||||
Applicable socket types:: all, when using TCP transports.
|
||||
|
||||
|
||||
ZMQ_DELAY_ATTACH_ON_CONNECT
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Retrieve the state of the attach on connect value. If set to `1`, will delay the
|
||||
attachment of a pipe on connect until the underlying connection has completed.
|
||||
This will cause the socket to block if there are no other connections, but will
|
||||
prevent queues from filling on pipes awaiting connection.
|
||||
|
||||
[horizontal]
|
||||
Option value type:: int
|
||||
Option value unit:: boolean
|
||||
Default value:: 0 (false)
|
||||
Applicable socket types:: all, primarily when using TCP/IPC transports.
|
||||
|
||||
|
||||
ZMQ_FD: Retrieve file descriptor associated with the socket
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
|
||||
|
@ -352,6 +352,20 @@ Default value:: 1 (true)
|
||||
Applicable socket types:: all, when using TCP transports.
|
||||
|
||||
|
||||
ZMQ_DELAY_ATTACH_ON_CONNECT
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If set to `1`, will delay the attachment of a pipe on connect until the underlying
|
||||
connection has completed. This will cause the socket to block if there are no other
|
||||
connections, but will prevent queues from filling on pipes awaiting connection.
|
||||
|
||||
[horizontal]
|
||||
Option value type:: int
|
||||
Option value unit:: boolean
|
||||
Default value:: 0 (false)
|
||||
Applicable socket types:: all, primarily when using TCP/IPC transports.
|
||||
|
||||
|
||||
ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
||||
#define ZMQ_TCP_KEEPALIVE_IDLE 36
|
||||
#define ZMQ_TCP_KEEPALIVE_INTVL 37
|
||||
#define ZMQ_TCP_ACCEPT_FILTER 38
|
||||
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39
|
||||
|
||||
/* Message options */
|
||||
#define ZMQ_MORE 1
|
||||
|
@ -41,8 +41,7 @@ zmq::lb_t::~lb_t ()
|
||||
void zmq::lb_t::attach (pipe_t *pipe_)
|
||||
{
|
||||
pipes.push_back (pipe_);
|
||||
pipes.swap (active, pipes.size () - 1);
|
||||
active++;
|
||||
activated (pipe_);
|
||||
}
|
||||
|
||||
void zmq::lb_t::terminated (pipe_t *pipe_)
|
||||
|
@ -44,6 +44,7 @@ zmq::options_t::options_t () :
|
||||
rcvtimeo (-1),
|
||||
sndtimeo (-1),
|
||||
ipv4only (1),
|
||||
delay_attach_on_connect (0),
|
||||
delay_on_close (true),
|
||||
delay_on_disconnect (true),
|
||||
filter (false),
|
||||
@ -218,6 +219,8 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
ipv4only = val;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
case ZMQ_TCP_KEEPALIVE:
|
||||
{
|
||||
@ -236,6 +239,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
return 0;
|
||||
}
|
||||
|
||||
case ZMQ_DELAY_ATTACH_ON_CONNECT:
|
||||
{
|
||||
if (optvallen_ != sizeof (int)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
int val = *((int*) optval_);
|
||||
if (val != 0 && val != 1) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
delay_attach_on_connect = val;
|
||||
return 0;
|
||||
}
|
||||
|
||||
case ZMQ_TCP_KEEPALIVE_CNT:
|
||||
{
|
||||
if (optvallen_ != sizeof (int)) {
|
||||
@ -483,6 +501,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
*((int*) optval_) = ipv4only;
|
||||
*optvallen_ = sizeof (int);
|
||||
return 0;
|
||||
|
||||
case ZMQ_DELAY_ATTACH_ON_CONNECT:
|
||||
if (*optvallen_ < sizeof (int)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
*((int*) optval_) = delay_attach_on_connect;
|
||||
*optvallen_ = sizeof (int);
|
||||
return 0;
|
||||
|
||||
case ZMQ_TCP_KEEPALIVE:
|
||||
if (*optvallen_ < sizeof (int)) {
|
||||
|
@ -96,6 +96,10 @@ namespace zmq
|
||||
// possible to communicate with IPv6-only hosts. If 0, the socket can
|
||||
// connect to and accept connections from both IPv4 and IPv6 hosts.
|
||||
int ipv4only;
|
||||
|
||||
// If 1, connecting pipes are not attached immediately, meaning a send()
|
||||
// on a socket with only connecting pipes would block
|
||||
int delay_attach_on_connect;
|
||||
|
||||
// If true, session reads all the pending messages from the pipe and
|
||||
// sends them to the network when socket is closed.
|
||||
|
@ -229,20 +229,28 @@ void zmq::session_base_t::clean_pipes ()
|
||||
|
||||
void zmq::session_base_t::terminated (pipe_t *pipe_)
|
||||
{
|
||||
// Drop the reference to the deallocated pipe.
|
||||
zmq_assert (pipe == pipe_);
|
||||
pipe = NULL;
|
||||
// Drop the reference to the deallocated pipe if required.
|
||||
zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0);
|
||||
|
||||
// If we are waiting for pending messages to be sent, at this point
|
||||
// we are sure that there will be no more messages and we can proceed
|
||||
// with termination safely.
|
||||
if (pending)
|
||||
if (pipe == pipe_)
|
||||
// If this is our current pipe, remove it
|
||||
pipe = NULL;
|
||||
else
|
||||
// Remove the pipe from the detached pipes set
|
||||
incomplete_pipes.erase (pipe_);
|
||||
|
||||
// If we are waiting for pending messages to be sent, at this point
|
||||
// we are sure that there will be no more messages and we can proceed
|
||||
// with termination safely.
|
||||
if (pending && !pipe && incomplete_pipes.size () == 0)
|
||||
proceed_with_term ();
|
||||
}
|
||||
|
||||
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||
{
|
||||
zmq_assert (pipe == pipe_);
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (incomplete_pipes.size () > 0 && pipe_ != pipe)
|
||||
return;
|
||||
|
||||
if (likely (engine != NULL))
|
||||
engine->activate_out ();
|
||||
@ -252,7 +260,9 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||
|
||||
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
||||
{
|
||||
zmq_assert (pipe == pipe_);
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (incomplete_pipes.size () > 0 && pipe_ != pipe)
|
||||
return;
|
||||
|
||||
if (engine)
|
||||
engine->activate_in ();
|
||||
@ -395,6 +405,16 @@ void zmq::session_base_t::detached ()
|
||||
return;
|
||||
}
|
||||
|
||||
// For delayed connect situations, terminate the pipe
|
||||
// and reestablish later on
|
||||
if (pipe && options.delay_attach_on_connect == 1
|
||||
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
|
||||
pipe->hiccup ();
|
||||
pipe->terminate (false);
|
||||
incomplete_pipes.insert (pipe);
|
||||
pipe = NULL;
|
||||
}
|
||||
|
||||
reset ();
|
||||
|
||||
// Reconnect.
|
||||
|
@ -103,6 +103,9 @@ namespace zmq
|
||||
|
||||
// Pipe connecting the session to its socket.
|
||||
zmq::pipe_t *pipe;
|
||||
|
||||
// This set is added to with pipes we are disconnecting, but haven't yet completed
|
||||
std::set<pipe_t *> incomplete_pipes;
|
||||
|
||||
// This flag is true if the remainder of the message being processed
|
||||
// is still in the in pipe.
|
||||
|
@ -530,27 +530,29 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
options, paddr);
|
||||
errno_assert (session);
|
||||
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents [2] = {this, session};
|
||||
pipe_t *pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {options.sndhwm, options.rcvhwm};
|
||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
||||
rc = pipepair (parents, pipes, hwms, delays);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// PGM does not support subscription forwarding; ask for all data to be
|
||||
// sent to this pipe.
|
||||
bool icanhasall = false;
|
||||
if (protocol == "pgm" || protocol == "epgm")
|
||||
icanhasall = true;
|
||||
|
||||
// Attach local end of the pipe to the socket object.
|
||||
attach_pipe (pipes [0], icanhasall);
|
||||
if (options.delay_attach_on_connect != 1 || icanhasall) {
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents [2] = {this, session};
|
||||
pipe_t *pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {options.sndhwm, options.rcvhwm};
|
||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
||||
rc = pipepair (parents, pipes, hwms, delays);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
// Attach local end of the pipe to the socket object.
|
||||
attach_pipe (pipes [0], icanhasall);
|
||||
|
||||
// Save last endpoint URI
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
}
|
||||
|
||||
// Save last endpoint URI
|
||||
paddr->to_string (options.last_endpoint);
|
||||
|
||||
add_endpoint (addr_, (own_t *) session);
|
||||
@ -968,7 +970,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_)
|
||||
|
||||
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
|
||||
{
|
||||
xhiccuped (pipe_);
|
||||
if (options.delay_attach_on_connect == 1)
|
||||
pipe_->terminate (false);
|
||||
else
|
||||
// Notify derived sockets of the hiccup
|
||||
xhiccuped (pipe_);
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::terminated (pipe_t *pipe_)
|
||||
|
@ -13,6 +13,7 @@ noinst_PROGRAMS = test_pair_inproc \
|
||||
test_invalid_rep \
|
||||
test_msg_flags \
|
||||
test_connect_resolve \
|
||||
test_connect_delay \
|
||||
test_last_endpoint \
|
||||
test_term_endpoint \
|
||||
test_monitor
|
||||
@ -34,6 +35,7 @@ test_sub_forward_SOURCES = test_sub_forward.cpp
|
||||
test_invalid_rep_SOURCES = test_invalid_rep.cpp
|
||||
test_msg_flags_SOURCES = test_msg_flags.cpp
|
||||
test_connect_resolve_SOURCES = test_connect_resolve.cpp
|
||||
test_connect_delay_SOURCES = test_connect_delay.cpp
|
||||
test_last_endpoint_SOURCES = test_last_endpoint.cpp
|
||||
test_term_endpoint_SOURCES = test_term_endpoint.cpp
|
||||
test_monitor_SOURCES = test_monitor.cpp
|
||||
|
Loading…
x
Reference in New Issue
Block a user