mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-12 10:33:52 +01:00
Merge pull request #373 from ianbarber/master
Add ZMQ_DELAY_ATTACH_ON_CONNECT sockopt
This commit is contained in:
commit
076e081de2
1
.gitignore
vendored
1
.gitignore
vendored
@ -39,6 +39,7 @@ tests/test_invalid_rep
|
||||
tests/test_msg_flags
|
||||
tests/test_ts_context
|
||||
tests/test_connect_resolve
|
||||
tests/test_connect_delay
|
||||
tests/test_term_endpoint
|
||||
src/platform.hpp*
|
||||
src/stamp-h1
|
||||
|
@ -342,6 +342,21 @@ Default value:: 1 (true)
|
||||
Applicable socket types:: all, when using TCP transports.
|
||||
|
||||
|
||||
ZMQ_DELAY_ATTACH_ON_CONNECT
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Retrieve the state of the attach on connect value. If set to `1`, will delay the
|
||||
attachment of a pipe on connect until the underlying connection has completed.
|
||||
This will cause the socket to block if there are no other connections, but will
|
||||
prevent queues from filling on pipes awaiting connection.
|
||||
|
||||
[horizontal]
|
||||
Option value type:: int
|
||||
Option value unit:: boolean
|
||||
Default value:: 0 (false)
|
||||
Applicable socket types:: all, primarily when using TCP/IPC transports.
|
||||
|
||||
|
||||
ZMQ_FD: Retrieve file descriptor associated with the socket
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
|
||||
|
@ -352,6 +352,20 @@ Default value:: 1 (true)
|
||||
Applicable socket types:: all, when using TCP transports.
|
||||
|
||||
|
||||
ZMQ_DELAY_ATTACH_ON_CONNECT
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If set to `1`, will delay the attachment of a pipe on connect until the underlying
|
||||
connection has completed. This will cause the socket to block if there are no other
|
||||
connections, but will prevent queues from filling on pipes awaiting connection.
|
||||
|
||||
[horizontal]
|
||||
Option value type:: int
|
||||
Option value unit:: boolean
|
||||
Default value:: 0 (false)
|
||||
Applicable socket types:: all, primarily when using TCP/IPC transports.
|
||||
|
||||
|
||||
ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
|
@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
||||
#define ZMQ_TCP_KEEPALIVE_IDLE 36
|
||||
#define ZMQ_TCP_KEEPALIVE_INTVL 37
|
||||
#define ZMQ_TCP_ACCEPT_FILTER 38
|
||||
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39
|
||||
|
||||
/* Message options */
|
||||
#define ZMQ_MORE 1
|
||||
|
@ -41,8 +41,7 @@ zmq::lb_t::~lb_t ()
|
||||
void zmq::lb_t::attach (pipe_t *pipe_)
|
||||
{
|
||||
pipes.push_back (pipe_);
|
||||
pipes.swap (active, pipes.size () - 1);
|
||||
active++;
|
||||
activated (pipe_);
|
||||
}
|
||||
|
||||
void zmq::lb_t::terminated (pipe_t *pipe_)
|
||||
|
@ -44,6 +44,7 @@ zmq::options_t::options_t () :
|
||||
rcvtimeo (-1),
|
||||
sndtimeo (-1),
|
||||
ipv4only (1),
|
||||
delay_attach_on_connect (0),
|
||||
delay_on_close (true),
|
||||
delay_on_disconnect (true),
|
||||
filter (false),
|
||||
@ -218,6 +219,8 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
ipv4only = val;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
case ZMQ_TCP_KEEPALIVE:
|
||||
{
|
||||
@ -236,6 +239,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
return 0;
|
||||
}
|
||||
|
||||
case ZMQ_DELAY_ATTACH_ON_CONNECT:
|
||||
{
|
||||
if (optvallen_ != sizeof (int)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
int val = *((int*) optval_);
|
||||
if (val != 0 && val != 1) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
delay_attach_on_connect = val;
|
||||
return 0;
|
||||
}
|
||||
|
||||
case ZMQ_TCP_KEEPALIVE_CNT:
|
||||
{
|
||||
if (optvallen_ != sizeof (int)) {
|
||||
@ -483,6 +501,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
*((int*) optval_) = ipv4only;
|
||||
*optvallen_ = sizeof (int);
|
||||
return 0;
|
||||
|
||||
case ZMQ_DELAY_ATTACH_ON_CONNECT:
|
||||
if (*optvallen_ < sizeof (int)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
*((int*) optval_) = delay_attach_on_connect;
|
||||
*optvallen_ = sizeof (int);
|
||||
return 0;
|
||||
|
||||
case ZMQ_TCP_KEEPALIVE:
|
||||
if (*optvallen_ < sizeof (int)) {
|
||||
|
@ -96,6 +96,10 @@ namespace zmq
|
||||
// possible to communicate with IPv6-only hosts. If 0, the socket can
|
||||
// connect to and accept connections from both IPv4 and IPv6 hosts.
|
||||
int ipv4only;
|
||||
|
||||
// If 1, connecting pipes are not attached immediately, meaning a send()
|
||||
// on a socket with only connecting pipes would block
|
||||
int delay_attach_on_connect;
|
||||
|
||||
// If true, session reads all the pending messages from the pipe and
|
||||
// sends them to the network when socket is closed.
|
||||
|
@ -229,20 +229,30 @@ void zmq::session_base_t::clean_pipes ()
|
||||
|
||||
void zmq::session_base_t::terminated (pipe_t *pipe_)
|
||||
{
|
||||
// Drop the reference to the deallocated pipe.
|
||||
zmq_assert (pipe == pipe_);
|
||||
pipe = NULL;
|
||||
// Drop the reference to the deallocated pipe if required.
|
||||
zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1);
|
||||
|
||||
// If we are waiting for pending messages to be sent, at this point
|
||||
// we are sure that there will be no more messages and we can proceed
|
||||
// with termination safely.
|
||||
if (pending)
|
||||
if (pipe == pipe_)
|
||||
// If this is our current pipe, remove it
|
||||
pipe = NULL;
|
||||
else
|
||||
// Remove the pipe from the detached pipes set
|
||||
terminating_pipes.erase (pipe_);
|
||||
|
||||
// If we are waiting for pending messages to be sent, at this point
|
||||
// we are sure that there will be no more messages and we can proceed
|
||||
// with termination safely.
|
||||
if (pending && !pipe && terminating_pipes.size () == 0)
|
||||
proceed_with_term ();
|
||||
}
|
||||
|
||||
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||
{
|
||||
zmq_assert (pipe == pipe_);
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (pipe != pipe_) {
|
||||
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
||||
return;
|
||||
}
|
||||
|
||||
if (likely (engine != NULL))
|
||||
engine->activate_out ();
|
||||
@ -252,7 +262,11 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||
|
||||
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
||||
{
|
||||
zmq_assert (pipe == pipe_);
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (pipe != pipe_) {
|
||||
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
||||
return;
|
||||
}
|
||||
|
||||
if (engine)
|
||||
engine->activate_in ();
|
||||
@ -395,6 +409,16 @@ void zmq::session_base_t::detached ()
|
||||
return;
|
||||
}
|
||||
|
||||
// For delayed connect situations, terminate the pipe
|
||||
// and reestablish later on
|
||||
if (pipe && options.delay_attach_on_connect == 1
|
||||
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
|
||||
pipe->hiccup ();
|
||||
pipe->terminate (false);
|
||||
terminating_pipes.insert (pipe);
|
||||
pipe = NULL;
|
||||
}
|
||||
|
||||
reset ();
|
||||
|
||||
// Reconnect.
|
||||
|
@ -103,6 +103,9 @@ namespace zmq
|
||||
|
||||
// Pipe connecting the session to its socket.
|
||||
zmq::pipe_t *pipe;
|
||||
|
||||
// This set is added to with pipes we are disconnecting, but haven't yet completed
|
||||
std::set<pipe_t *> terminating_pipes;
|
||||
|
||||
// This flag is true if the remainder of the message being processed
|
||||
// is still in the in pipe.
|
||||
|
@ -530,27 +530,29 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
options, paddr);
|
||||
errno_assert (session);
|
||||
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents [2] = {this, session};
|
||||
pipe_t *pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {options.sndhwm, options.rcvhwm};
|
||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
||||
rc = pipepair (parents, pipes, hwms, delays);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// PGM does not support subscription forwarding; ask for all data to be
|
||||
// sent to this pipe.
|
||||
bool icanhasall = false;
|
||||
if (protocol == "pgm" || protocol == "epgm")
|
||||
icanhasall = true;
|
||||
|
||||
// Attach local end of the pipe to the socket object.
|
||||
attach_pipe (pipes [0], icanhasall);
|
||||
if (options.delay_attach_on_connect != 1 || icanhasall) {
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents [2] = {this, session};
|
||||
pipe_t *pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {options.sndhwm, options.rcvhwm};
|
||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
||||
rc = pipepair (parents, pipes, hwms, delays);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
// Attach local end of the pipe to the socket object.
|
||||
attach_pipe (pipes [0], icanhasall);
|
||||
|
||||
// Save last endpoint URI
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
}
|
||||
|
||||
// Save last endpoint URI
|
||||
paddr->to_string (options.last_endpoint);
|
||||
|
||||
add_endpoint (addr_, (own_t *) session);
|
||||
@ -968,7 +970,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_)
|
||||
|
||||
void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
|
||||
{
|
||||
xhiccuped (pipe_);
|
||||
if (options.delay_attach_on_connect == 1)
|
||||
pipe_->terminate (false);
|
||||
else
|
||||
// Notify derived sockets of the hiccup
|
||||
xhiccuped (pipe_);
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::terminated (pipe_t *pipe_)
|
||||
|
@ -13,6 +13,7 @@ noinst_PROGRAMS = test_pair_inproc \
|
||||
test_invalid_rep \
|
||||
test_msg_flags \
|
||||
test_connect_resolve \
|
||||
test_connect_delay \
|
||||
test_last_endpoint \
|
||||
test_term_endpoint \
|
||||
test_monitor
|
||||
@ -34,6 +35,7 @@ test_sub_forward_SOURCES = test_sub_forward.cpp
|
||||
test_invalid_rep_SOURCES = test_invalid_rep.cpp
|
||||
test_msg_flags_SOURCES = test_msg_flags.cpp
|
||||
test_connect_resolve_SOURCES = test_connect_resolve.cpp
|
||||
test_connect_delay_SOURCES = test_connect_delay.cpp
|
||||
test_last_endpoint_SOURCES = test_last_endpoint.cpp
|
||||
test_term_endpoint_SOURCES = test_term_endpoint.cpp
|
||||
test_monitor_SOURCES = test_monitor.cpp
|
||||
|
260
tests/test_connect_delay.cpp
Normal file
260
tests/test_connect_delay.cpp
Normal file
@ -0,0 +1,260 @@
|
||||
/*
|
||||
Copyright (c) 2012 Ian Barber
|
||||
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
|
||||
#include <assert.h>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <errno.h>
|
||||
|
||||
#include "../include/zmq.h"
|
||||
|
||||
static void *server (void *c)
|
||||
{
|
||||
void *socket, *context;
|
||||
char buffer[16];
|
||||
int rc, val;
|
||||
|
||||
context = zmq_init (1);
|
||||
assert (context);
|
||||
|
||||
socket = zmq_socket (context, ZMQ_PULL);
|
||||
assert (socket);
|
||||
|
||||
val = 0;
|
||||
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (socket, "ipc:///tmp/recon");
|
||||
assert (rc == 0);
|
||||
|
||||
memset (&buffer, 0, sizeof(buffer));
|
||||
rc = zmq_recv (socket, &buffer, sizeof(buffer), 0);
|
||||
|
||||
// Intentionally bail out
|
||||
rc = zmq_close (socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_term (context);
|
||||
assert (rc == 0);
|
||||
|
||||
usleep (200000);
|
||||
|
||||
context = zmq_init (1);
|
||||
assert (context);
|
||||
|
||||
socket = zmq_socket (context, ZMQ_PULL);
|
||||
assert (socket);
|
||||
|
||||
val = 0;
|
||||
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (socket, "ipc:///tmp/recon");
|
||||
assert (rc == 0);
|
||||
|
||||
usleep (200000);
|
||||
|
||||
memset (&buffer, 0, sizeof(buffer));
|
||||
rc = zmq_recv (socket, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
|
||||
assert (rc != -1);
|
||||
|
||||
// Start closing the socket while the connecting process is underway.
|
||||
rc = zmq_close (socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_term (context);
|
||||
assert (rc == 0);
|
||||
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
|
||||
static void *worker (void *n)
|
||||
{
|
||||
void *socket, *context;
|
||||
int rc, hadone, val;
|
||||
|
||||
context = zmq_init (1);
|
||||
assert (context);
|
||||
|
||||
socket = zmq_socket (context, ZMQ_PUSH);
|
||||
assert (socket);
|
||||
|
||||
val = 0;
|
||||
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
|
||||
val = 1;
|
||||
rc = zmq_setsockopt (socket, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (socket, "ipc:///tmp/recon");
|
||||
assert (rc == 0);
|
||||
|
||||
hadone = 0;
|
||||
// Not checking RC as some may be -1
|
||||
for (int i = 0; i < 4; i++) {
|
||||
usleep(200000);
|
||||
rc = zmq_send (socket, "hi", 2, ZMQ_DONTWAIT);
|
||||
if (rc != -1)
|
||||
hadone ++;
|
||||
}
|
||||
|
||||
assert (hadone >= 2);
|
||||
assert (hadone < 4);
|
||||
|
||||
rc = zmq_close (socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_term (context);
|
||||
assert (rc == 0);
|
||||
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
|
||||
int main (int argc, char *argv [])
|
||||
{
|
||||
fprintf (stderr, "test_connect_delay running...\n");
|
||||
int val;
|
||||
int rc;
|
||||
char buffer[16];
|
||||
int seen = 0;
|
||||
|
||||
void *context = zmq_ctx_new();
|
||||
assert (context);
|
||||
void *to = zmq_socket(context, ZMQ_PULL);
|
||||
assert (to);
|
||||
|
||||
val = 0;
|
||||
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind(to, "tcp://*:5555");
|
||||
assert (rc == 0);
|
||||
|
||||
// Create a socket pushing to two endpoints - only 1 message should arrive.
|
||||
void *from = zmq_socket (context, ZMQ_PUSH);
|
||||
assert(from);
|
||||
|
||||
val = 0;
|
||||
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
|
||||
rc = zmq_connect (from, "tcp://localhost:5556");
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (from, "tcp://localhost:5555");
|
||||
assert (rc == 0);
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
std::string message("message ");
|
||||
message += ('0' + i);
|
||||
rc = zmq_send (from, message.data(), message.size(), 0);
|
||||
assert(rc >= 0);
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
seen = 0;
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
memset (&buffer, 0, sizeof(buffer));
|
||||
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
|
||||
if( rc == -1)
|
||||
break;
|
||||
seen++;
|
||||
}
|
||||
assert (seen == 5);
|
||||
|
||||
rc = zmq_close (from);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (to);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_destroy(context);
|
||||
assert (rc == 0);
|
||||
|
||||
context = zmq_ctx_new();
|
||||
fprintf (stderr, " Rerunning with DELAY_ATTACH_ON_CONNECT\n");
|
||||
|
||||
to = zmq_socket (context, ZMQ_PULL);
|
||||
assert (to);
|
||||
rc = zmq_bind (to, "tcp://*:5560");
|
||||
assert (rc == 0);
|
||||
|
||||
val = 0;
|
||||
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
|
||||
// Create a socket pushing to two endpoints - all messages should arrive.
|
||||
from = zmq_socket (context, ZMQ_PUSH);
|
||||
assert (from);
|
||||
|
||||
val = 0;
|
||||
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
|
||||
val = 1;
|
||||
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (from, "tcp://localhost:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (from, "tcp://localhost:5560");
|
||||
assert (rc == 0);
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
std::string message("message ");
|
||||
message += ('0' + i);
|
||||
rc = zmq_send (from, message.data(), message.size(), 0);
|
||||
assert (rc >= 0);
|
||||
}
|
||||
|
||||
sleep(1);
|
||||
|
||||
seen = 0;
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
memset(&buffer, 0, sizeof(buffer));
|
||||
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
|
||||
assert (rc != -1);
|
||||
}
|
||||
|
||||
rc = zmq_close (from);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (to);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_destroy(context);
|
||||
assert (rc == 0);
|
||||
|
||||
fprintf (stderr, " Running DELAY_ATTACH_ON_CONNECT with disconnect\n");
|
||||
|
||||
pthread_t serv, work;
|
||||
|
||||
rc = pthread_create (&serv, NULL, server, NULL);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = pthread_create (&work, NULL, worker, NULL);
|
||||
assert (rc == 0);
|
||||
|
||||
pthread_exit(NULL);
|
||||
}
|
Loading…
Reference in New Issue
Block a user