mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-22 10:25:52 +01:00
Merge pull request #1649 from sheremetyev/fix-pipe-activation-race
Fix pipe activation race
This commit is contained in:
commit
782fbe5bef
11
src/pipe.cpp
11
src/pipe.cpp
@ -460,14 +460,9 @@ int zmq::pipe_t::compute_lwm (int hwm_)
|
||||
// result in low performance.
|
||||
//
|
||||
// Given the 3. it would be good to keep HWM and LWM as far apart as
|
||||
// possible to reduce the thread switching overhead to almost zero,
|
||||
// say HWM-LWM should be max_wm_delta.
|
||||
//
|
||||
// That done, we still we have to account for the cases where
|
||||
// HWM < max_wm_delta thus driving LWM to negative numbers.
|
||||
// Let's make LWM 1/2 of HWM in such cases.
|
||||
int result = (hwm_ > max_wm_delta * 2) ?
|
||||
hwm_ - max_wm_delta : (hwm_ + 1) / 2;
|
||||
// possible to reduce the thread switching overhead to almost zero.
|
||||
// Let's make LWM 1/2 of HWM.
|
||||
int result = (hwm_ + 1) / 2;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
@ -151,7 +151,81 @@ int test_blocking (int send_hwm, int msgCnt)
|
||||
return recv_count;
|
||||
}
|
||||
|
||||
// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
|
||||
void test_reset_hwm ()
|
||||
{
|
||||
int first_count = 9999;
|
||||
int second_count = 1100;
|
||||
int hwm = 11024;
|
||||
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
int rc;
|
||||
|
||||
// Set up bind socket
|
||||
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
|
||||
assert (pub_socket);
|
||||
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (pub_socket, "tcp://127.0.0.1:1234");
|
||||
assert (rc == 0);
|
||||
|
||||
// Set up connect socket
|
||||
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
|
||||
assert (sub_socket);
|
||||
rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sub_socket, "tcp://127.0.0.1:1234");
|
||||
assert (rc == 0);
|
||||
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
|
||||
assert (rc == 0);
|
||||
|
||||
msleep (100);
|
||||
|
||||
// Send messages
|
||||
int send_count = 0;
|
||||
while (send_count < first_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++send_count;
|
||||
assert (first_count == send_count);
|
||||
|
||||
msleep (100);
|
||||
|
||||
// Now receive all sent messages
|
||||
int recv_count = 0;
|
||||
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
|
||||
{
|
||||
++recv_count;
|
||||
}
|
||||
assert (first_count == recv_count);
|
||||
|
||||
msleep (100);
|
||||
|
||||
// Send messages
|
||||
send_count = 0;
|
||||
while (send_count < second_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
|
||||
++send_count;
|
||||
assert (second_count == send_count);
|
||||
|
||||
msleep (100);
|
||||
|
||||
// Now receive all sent messages
|
||||
recv_count = 0;
|
||||
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
|
||||
{
|
||||
++recv_count;
|
||||
}
|
||||
assert (second_count == recv_count);
|
||||
|
||||
// Clean up
|
||||
rc = zmq_close (sub_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (pub_socket);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
@ -167,5 +241,8 @@ int main (void)
|
||||
count = test_blocking (2000,6000);
|
||||
assert (count == 6000);
|
||||
|
||||
// hwm should apply to the messages that have already been received
|
||||
test_reset_hwm ();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user