mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-31 14:39:55 +01:00
Merge pull request #658 from ricnewton/inproc_connect_before_bind
Support high water mark on inproc socket connect before bind.
This commit is contained in:
commit
90668517da
99
src/ctx.cpp
99
src/ctx.cpp
@ -406,12 +406,7 @@ void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pendi
|
||||
else
|
||||
{
|
||||
// Bind has happened in the mean time, connect directly
|
||||
it->second.socket->inc_seqnum();
|
||||
pending_connection_.bind_pipe->set_tid(it->second.socket->get_tid());
|
||||
command_t cmd;
|
||||
cmd.type = command_t::bind;
|
||||
cmd.args.bind.pipe = pending_connection_.bind_pipe;
|
||||
it->second.socket->process_command(cmd);
|
||||
connect_inproc_sockets(it->second.socket, it->second.options, pending_connection_, connect_side);
|
||||
}
|
||||
|
||||
endpoints_sync.unlock ();
|
||||
@ -425,38 +420,7 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so
|
||||
|
||||
for (pending_connections_t::iterator p = pending.first; p != pending.second; ++p)
|
||||
{
|
||||
bind_socket_->inc_seqnum();
|
||||
p->second.bind_pipe->set_tid(bind_socket_->get_tid());
|
||||
command_t cmd;
|
||||
cmd.type = command_t::bind;
|
||||
cmd.args.bind.pipe = p->second.bind_pipe;
|
||||
bind_socket_->process_command(cmd);
|
||||
|
||||
bind_socket_->send_inproc_connected(p->second.endpoint.socket);
|
||||
|
||||
// Send identities
|
||||
options_t& bind_options = endpoints[addr_].options;
|
||||
if (bind_options.recv_identity) {
|
||||
|
||||
msg_t id;
|
||||
int rc = id.init_size (p->second.endpoint.options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), p->second.endpoint.options.identity, p->second.endpoint.options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = p->second.connect_pipe->write (&id);
|
||||
zmq_assert (written);
|
||||
p->second.connect_pipe->flush ();
|
||||
}
|
||||
if (p->second.endpoint.options.recv_identity) {
|
||||
msg_t id;
|
||||
int rc = id.init_size (bind_options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = p->second.bind_pipe->write (&id);
|
||||
zmq_assert (written);
|
||||
p->second.bind_pipe->flush ();
|
||||
}
|
||||
connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
|
||||
}
|
||||
|
||||
pending_connections.erase(pending.first, pending.second);
|
||||
@ -464,6 +428,65 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so
|
||||
endpoints_sync.unlock ();
|
||||
}
|
||||
|
||||
void zmq::ctx_t::connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_)
|
||||
{
|
||||
bind_socket_->inc_seqnum();
|
||||
pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid());
|
||||
|
||||
if (side_ == bind_side)
|
||||
{
|
||||
command_t cmd;
|
||||
cmd.type = command_t::bind;
|
||||
cmd.args.bind.pipe = pending_connection_.bind_pipe;
|
||||
bind_socket_->process_command(cmd);
|
||||
bind_socket_->send_inproc_connected(pending_connection_.endpoint.socket);
|
||||
}
|
||||
else
|
||||
{
|
||||
pending_connection_.connect_pipe->send_bind(bind_socket_, pending_connection_.bind_pipe, false);
|
||||
}
|
||||
|
||||
int sndhwm = 0;
|
||||
if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0)
|
||||
sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm;
|
||||
int rcvhwm = 0;
|
||||
if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0)
|
||||
rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm;
|
||||
|
||||
bool conflate = pending_connection_.endpoint.options.conflate &&
|
||||
(pending_connection_.endpoint.options.type == ZMQ_DEALER ||
|
||||
pending_connection_.endpoint.options.type == ZMQ_PULL ||
|
||||
pending_connection_.endpoint.options.type == ZMQ_PUSH ||
|
||||
pending_connection_.endpoint.options.type == ZMQ_PUB ||
|
||||
pending_connection_.endpoint.options.type == ZMQ_SUB);
|
||||
|
||||
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
|
||||
pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]);
|
||||
pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]);
|
||||
|
||||
if (bind_options.recv_identity) {
|
||||
|
||||
msg_t id;
|
||||
int rc = id.init_size (pending_connection_.endpoint.options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), pending_connection_.endpoint.options.identity, pending_connection_.endpoint.options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = pending_connection_.connect_pipe->write (&id);
|
||||
zmq_assert (written);
|
||||
pending_connection_.connect_pipe->flush ();
|
||||
}
|
||||
if (pending_connection_.endpoint.options.recv_identity) {
|
||||
msg_t id;
|
||||
int rc = id.init_size (bind_options.identity_size);
|
||||
errno_assert (rc == 0);
|
||||
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
|
||||
id.set_flags (msg_t::identity);
|
||||
bool written = pending_connection_.bind_pipe->write (&id);
|
||||
zmq_assert (written);
|
||||
pending_connection_.bind_pipe->flush ();
|
||||
}
|
||||
}
|
||||
|
||||
// The last used socket ID, or 0 if no socket was used so far. Note that this
|
||||
// is a global variable. Thus, even sockets created in different contexts have
|
||||
// unique IDs.
|
||||
|
@ -195,6 +195,8 @@ namespace zmq
|
||||
// the process that created this context. Used to detect forking.
|
||||
pid_t pid;
|
||||
#endif
|
||||
enum side { connect_side, bind_side };
|
||||
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -52,6 +52,7 @@ namespace zmq
|
||||
ctx_t *get_ctx ();
|
||||
void process_command (zmq::command_t &cmd_);
|
||||
void send_inproc_connected (zmq::socket_base_t *socket_);
|
||||
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true);
|
||||
|
||||
protected:
|
||||
|
||||
@ -80,8 +81,6 @@ namespace zmq
|
||||
zmq::own_t *object_);
|
||||
void send_attach (zmq::session_base_t *destination_,
|
||||
zmq::i_engine *engine_, bool inc_seqnum_ = true);
|
||||
void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,
|
||||
bool inc_seqnum_ = true);
|
||||
void send_activate_read (zmq::pipe_t *destination_);
|
||||
void send_activate_write (zmq::pipe_t *destination_,
|
||||
uint64_t msgs_read_);
|
||||
|
@ -478,3 +478,8 @@ void zmq::pipe_t::hiccup ()
|
||||
send_hiccup (peer, (void*) inpipe);
|
||||
}
|
||||
|
||||
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
|
||||
{
|
||||
lwm = compute_lwm (inhwm_);
|
||||
hwm = outhwm_;
|
||||
}
|
||||
|
@ -112,6 +112,9 @@ namespace zmq
|
||||
// before actual shutdown.
|
||||
void terminate (bool delay_);
|
||||
|
||||
// set the high water marks.
|
||||
void set_hwms (int inhwm_, int outhwm_);
|
||||
|
||||
private:
|
||||
|
||||
// Type of the underlying lock-free pipe.
|
||||
|
@ -439,10 +439,14 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// The total HWM for an inproc connection should be the sum of
|
||||
// the binder's HWM and the connector's HWM.
|
||||
int sndhwm = 0;
|
||||
if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
|
||||
if (peer.socket == NULL)
|
||||
sndhwm = options.sndhwm;
|
||||
else if (options.sndhwm != 0 && peer.options.rcvhwm != 0)
|
||||
sndhwm = options.sndhwm + peer.options.rcvhwm;
|
||||
int rcvhwm = 0;
|
||||
if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
|
||||
if (peer.socket == NULL)
|
||||
rcvhwm = options.rcvhwm;
|
||||
else if (options.rcvhwm != 0 && peer.options.sndhwm != 0)
|
||||
rcvhwm = options.rcvhwm + peer.options.sndhwm;
|
||||
|
||||
// Create a bi-directional pipe to connect the peers.
|
||||
|
@ -22,61 +22,281 @@
|
||||
#include <string.h>
|
||||
#include "testutil.hpp"
|
||||
|
||||
int main (void)
|
||||
const int MAX_SENDS = 10000;
|
||||
|
||||
enum TestType { BIND_FIRST, CONNECT_FIRST };
|
||||
|
||||
int test_defaults ()
|
||||
{
|
||||
setup_test_environment();
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
int rc;
|
||||
|
||||
// Create pair of socket, each with high watermark of 2. Thus the total
|
||||
// buffer space should be 4 messages.
|
||||
void *sb = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (sb);
|
||||
int hwm = 2;
|
||||
int rc = zmq_setsockopt (sb, ZMQ_RCVHWM, &hwm, sizeof (hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (sb, "inproc://a");
|
||||
// Set up bind socket
|
||||
void *bind_socket = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (bind_socket);
|
||||
rc = zmq_bind (bind_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sc = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (sc);
|
||||
rc = zmq_setsockopt (sc, ZMQ_SNDHWM, &hwm, sizeof (hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc, "inproc://a");
|
||||
// Set up connect socket
|
||||
void *connect_socket = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (connect_socket);
|
||||
rc = zmq_connect (connect_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Try to send 10 messages. Only 4 should succeed.
|
||||
for (int i = 0; i < 10; i++)
|
||||
{
|
||||
int rc = zmq_send (sc, NULL, 0, ZMQ_DONTWAIT);
|
||||
if (i < 4)
|
||||
assert (rc == 0);
|
||||
else
|
||||
assert (rc < 0 && errno == EAGAIN);
|
||||
}
|
||||
// Send until we block
|
||||
int send_count = 0;
|
||||
while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++send_count;
|
||||
|
||||
// There should be now 4 messages pending, consume them.
|
||||
for (int i = 0; i != 4; i++) {
|
||||
rc = zmq_recv (sb, NULL, 0, 0);
|
||||
assert (rc == 0);
|
||||
}
|
||||
// Now receive all sent messages
|
||||
int recv_count = 0;
|
||||
while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++recv_count;
|
||||
|
||||
// Now it should be possible to send one more.
|
||||
rc = zmq_send (sc, NULL, 0, 0);
|
||||
assert (send_count == recv_count);
|
||||
|
||||
// Clean up
|
||||
rc = zmq_close (connect_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
// Consume the remaining message.
|
||||
rc = zmq_recv (sb, NULL, 0, 0);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sc);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb);
|
||||
rc = zmq_close (bind_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
return send_count;
|
||||
}
|
||||
|
||||
int count_msg (int send_hwm, int recv_hwm, TestType testType)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
int rc;
|
||||
|
||||
void *bind_socket;
|
||||
void *connect_socket;
|
||||
if (testType == BIND_FIRST)
|
||||
{
|
||||
// Set up bind socket
|
||||
bind_socket = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (bind_socket);
|
||||
rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (bind_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Set up connect socket
|
||||
connect_socket = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (connect_socket);
|
||||
rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (connect_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Set up connect socket
|
||||
connect_socket = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (connect_socket);
|
||||
rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (connect_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Set up bind socket
|
||||
bind_socket = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (bind_socket);
|
||||
rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (bind_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
// Send until we block
|
||||
int send_count = 0;
|
||||
while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++send_count;
|
||||
|
||||
// Now receive all sent messages
|
||||
int recv_count = 0;
|
||||
while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++recv_count;
|
||||
|
||||
assert (send_count == recv_count);
|
||||
|
||||
// Now it should be possible to send one more.
|
||||
rc = zmq_send (connect_socket, NULL, 0, 0);
|
||||
assert (rc == 0);
|
||||
|
||||
// Consume the remaining message.
|
||||
rc = zmq_recv (bind_socket, NULL, 0, 0);
|
||||
assert (rc == 0);
|
||||
|
||||
// Clean up
|
||||
rc = zmq_close (connect_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (bind_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return send_count;
|
||||
}
|
||||
|
||||
int test_inproc_bind_first (int send_hwm, int recv_hwm)
|
||||
{
|
||||
return count_msg(send_hwm, recv_hwm, BIND_FIRST);
|
||||
}
|
||||
|
||||
int test_inproc_connect_first (int send_hwm, int recv_hwm)
|
||||
{
|
||||
return count_msg(send_hwm, recv_hwm, CONNECT_FIRST);
|
||||
}
|
||||
|
||||
int test_inproc_connect_and_close_first (int send_hwm, int recv_hwm)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
int rc;
|
||||
|
||||
// Set up connect socket
|
||||
void *connect_socket = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (connect_socket);
|
||||
rc = zmq_setsockopt (connect_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (connect_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Send until we block
|
||||
int send_count = 0;
|
||||
while (send_count < MAX_SENDS && zmq_send (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++send_count;
|
||||
|
||||
// Close connect
|
||||
rc = zmq_close (connect_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
// Set up bind socket
|
||||
void *bind_socket = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (bind_socket);
|
||||
rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (bind_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Now receive all sent messages
|
||||
int recv_count = 0;
|
||||
while (zmq_recv (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++recv_count;
|
||||
|
||||
assert (send_count == recv_count);
|
||||
|
||||
// Clean up
|
||||
rc = zmq_close (bind_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return send_count;
|
||||
}
|
||||
|
||||
int test_inproc_bind_and_close_first (int send_hwm, int recv_hwm)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
int rc;
|
||||
|
||||
// Set up bind socket
|
||||
void *bind_socket = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (bind_socket);
|
||||
rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (bind_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Send until we block
|
||||
int send_count = 0;
|
||||
while (send_count < MAX_SENDS && zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++send_count;
|
||||
|
||||
// Close bind
|
||||
rc = zmq_close (bind_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
/* Can't currently do connect without then wiring up a bind as things hang, this needs top be fixed.
|
||||
// Set up connect socket
|
||||
void *connect_socket = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (connect_socket);
|
||||
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &recv_hwm, sizeof (recv_hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (connect_socket, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Now receive all sent messages
|
||||
int recv_count = 0;
|
||||
while (zmq_recv (connect_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++recv_count;
|
||||
|
||||
assert (send_count == recv_count);
|
||||
*/
|
||||
|
||||
// Clean up
|
||||
//rc = zmq_close (connect_socket);
|
||||
//assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return send_count;
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment();
|
||||
|
||||
int count;
|
||||
|
||||
// Default values are 1000 on send and 1000 one receive, so 2000 total
|
||||
count = test_defaults ();
|
||||
assert (count == 2000);
|
||||
|
||||
// Infinite send and receive buffer
|
||||
count = test_inproc_bind_first (0, 0);
|
||||
assert (count == MAX_SENDS);
|
||||
count = test_inproc_connect_first (0, 0);
|
||||
assert (count == MAX_SENDS);
|
||||
|
||||
// Infinite send buffer
|
||||
count = test_inproc_bind_first (1, 0);
|
||||
assert (count == MAX_SENDS);
|
||||
count = test_inproc_connect_first (1, 0);
|
||||
assert (count == MAX_SENDS);
|
||||
|
||||
// Infinite receive buffer
|
||||
count = test_inproc_bind_first (0, 1);
|
||||
assert (count == MAX_SENDS);
|
||||
count = test_inproc_connect_first (0, 1);
|
||||
assert (count == MAX_SENDS);
|
||||
|
||||
// Send and recv buffers hwm 1, so total that can be queued is 2
|
||||
count = test_inproc_bind_first (1, 1);
|
||||
assert (count == 2);
|
||||
count = test_inproc_connect_first (1, 1);
|
||||
assert (count == 2);
|
||||
|
||||
// Send hwm of 1, send before bind so total that can be queued is 1
|
||||
count = test_inproc_connect_and_close_first (1, 0);
|
||||
assert (count == 1);
|
||||
|
||||
// Send hwm of 1, send from bind side before connect so total that can be queued should be 1,
|
||||
// however currently all messages get thrown away before the connect. BUG?
|
||||
count = test_inproc_bind_and_close_first (1, 0);
|
||||
//assert (count == 1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user