mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-31 22:45:38 +01:00
Merge pull request #887 from ricnewton/master
Fix hang on terminate when inproc is connected but never bound.
This commit is contained in:
commit
1737520c67
@ -95,6 +95,14 @@ zmq::ctx_t::~ctx_t ()
|
|||||||
|
|
||||||
int zmq::ctx_t::terminate ()
|
int zmq::ctx_t::terminate ()
|
||||||
{
|
{
|
||||||
|
// Connect up any pending inproc connections, otherwise we will hang
|
||||||
|
pending_connections_t copy = pending_connections;
|
||||||
|
for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); ++p) {
|
||||||
|
zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
|
||||||
|
s->bind (p->first.c_str ());
|
||||||
|
s->close ();
|
||||||
|
}
|
||||||
|
|
||||||
slot_sync.lock ();
|
slot_sync.lock ();
|
||||||
if (!starting) {
|
if (!starting) {
|
||||||
|
|
||||||
@ -108,6 +116,7 @@ int zmq::ctx_t::terminate ()
|
|||||||
term_mailbox.forked();
|
term_mailbox.forked();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Check whether termination was already underway, but interrupted and now
|
// Check whether termination was already underway, but interrupted and now
|
||||||
// restarted.
|
// restarted.
|
||||||
bool restarted = terminating;
|
bool restarted = terminating;
|
||||||
|
@ -36,7 +36,7 @@ static void pusher (void *ctx)
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_bind_before_connect()
|
void test_bind_before_connect ()
|
||||||
{
|
{
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
@ -52,7 +52,7 @@ void test_bind_before_connect()
|
|||||||
assert (connectSocket);
|
assert (connectSocket);
|
||||||
rc = zmq_connect (connectSocket, "inproc://a");
|
rc = zmq_connect (connectSocket, "inproc://a");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Queue up some data
|
// Queue up some data
|
||||||
rc = zmq_send_const (connectSocket, "foobar", 6, 0);
|
rc = zmq_send_const (connectSocket, "foobar", 6, 0);
|
||||||
assert (rc == 6);
|
assert (rc == 6);
|
||||||
@ -77,7 +77,7 @@ void test_bind_before_connect()
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_connect_before_bind()
|
void test_connect_before_bind ()
|
||||||
{
|
{
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
@ -97,7 +97,7 @@ void test_connect_before_bind()
|
|||||||
assert (bindSocket);
|
assert (bindSocket);
|
||||||
rc = zmq_bind (bindSocket, "inproc://a");
|
rc = zmq_bind (bindSocket, "inproc://a");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Read pending message
|
// Read pending message
|
||||||
zmq_msg_t msg;
|
zmq_msg_t msg;
|
||||||
rc = zmq_msg_init (&msg);
|
rc = zmq_msg_init (&msg);
|
||||||
@ -118,7 +118,7 @@ void test_connect_before_bind()
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_connect_before_bind_pub_sub()
|
void test_connect_before_bind_pub_sub ()
|
||||||
{
|
{
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
@ -140,7 +140,7 @@ void test_connect_before_bind_pub_sub()
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_bind (bindSocket, "inproc://a");
|
rc = zmq_bind (bindSocket, "inproc://a");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
// Wait for pub-sub connection to happen
|
// Wait for pub-sub connection to happen
|
||||||
msleep (SETTLE_TIME);
|
msleep (SETTLE_TIME);
|
||||||
|
|
||||||
@ -168,14 +168,14 @@ void test_connect_before_bind_pub_sub()
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_multiple_connects()
|
void test_multiple_connects ()
|
||||||
{
|
{
|
||||||
const unsigned int no_of_connects = 10;
|
const unsigned int no_of_connects = 10;
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
|
|
||||||
int rc;
|
int rc;
|
||||||
void *connectSocket[no_of_connects];
|
void *connectSocket [no_of_connects];
|
||||||
|
|
||||||
// Connect first
|
// Connect first
|
||||||
for (unsigned int i = 0; i < no_of_connects; ++i)
|
for (unsigned int i = 0; i < no_of_connects; ++i)
|
||||||
@ -195,7 +195,7 @@ void test_multiple_connects()
|
|||||||
assert (bindSocket);
|
assert (bindSocket);
|
||||||
rc = zmq_bind (bindSocket, "inproc://a");
|
rc = zmq_bind (bindSocket, "inproc://a");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
for (unsigned int i = 0; i < no_of_connects; ++i)
|
for (unsigned int i = 0; i < no_of_connects; ++i)
|
||||||
{
|
{
|
||||||
// Read pending message
|
// Read pending message
|
||||||
@ -222,7 +222,7 @@ void test_multiple_connects()
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_multiple_threads()
|
void test_multiple_threads ()
|
||||||
{
|
{
|
||||||
const unsigned int no_of_threads = 30;
|
const unsigned int no_of_threads = 30;
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
@ -268,21 +268,21 @@ void test_multiple_threads()
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_identity()
|
void test_identity ()
|
||||||
{
|
{
|
||||||
// Create the infrastructure
|
// Create the infrastructure
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
|
|
||||||
void *sc = zmq_socket (ctx, ZMQ_DEALER);
|
void *sc = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
assert (sc);
|
assert (sc);
|
||||||
|
|
||||||
int rc = zmq_connect (sc, "inproc://a");
|
int rc = zmq_connect (sc, "inproc://a");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
|
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
|
||||||
assert (sb);
|
assert (sb);
|
||||||
|
|
||||||
rc = zmq_bind (sb, "inproc://a");
|
rc = zmq_bind (sb, "inproc://a");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
@ -316,17 +316,34 @@ void test_identity()
|
|||||||
// Deallocate the infrastructure.
|
// Deallocate the infrastructure.
|
||||||
rc = zmq_close (sc);
|
rc = zmq_close (sc);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_close (sb);
|
rc = zmq_close (sb);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_ctx_term (ctx);
|
||||||
|
assert (rc == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test_connect_only ()
|
||||||
|
{
|
||||||
|
void *ctx = zmq_ctx_new ();
|
||||||
|
assert (ctx);
|
||||||
|
|
||||||
|
void *connectSocket = zmq_socket (ctx, ZMQ_PUSH);
|
||||||
|
assert (connectSocket);
|
||||||
|
int rc = zmq_connect (connectSocket, "inproc://a");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_close (connectSocket);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_ctx_term (ctx);
|
rc = zmq_ctx_term (ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main (void)
|
int main (void)
|
||||||
{
|
{
|
||||||
setup_test_environment();
|
setup_test_environment ();
|
||||||
|
|
||||||
test_bind_before_connect ();
|
test_bind_before_connect ();
|
||||||
test_connect_before_bind ();
|
test_connect_before_bind ();
|
||||||
@ -334,6 +351,7 @@ int main (void)
|
|||||||
test_multiple_connects ();
|
test_multiple_connects ();
|
||||||
test_multiple_threads ();
|
test_multiple_threads ();
|
||||||
test_identity ();
|
test_identity ();
|
||||||
|
test_connect_only ();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user