From 234018d749959bb62366d9ca5625250464d8b407 Mon Sep 17 00:00:00 2001 From: Fedor Sheremetyev Date: Tue, 24 Nov 2015 17:30:27 +0000 Subject: [PATCH 1/2] Add test demostrating that HWM applies to messages that have been already consumed. --- tests/test_hwm_pubsub.cpp | 77 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/tests/test_hwm_pubsub.cpp b/tests/test_hwm_pubsub.cpp index a37bc8e6..993ab4b4 100644 --- a/tests/test_hwm_pubsub.cpp +++ b/tests/test_hwm_pubsub.cpp @@ -151,7 +151,81 @@ int test_blocking (int send_hwm, int msgCnt) return recv_count; } +// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100 +void test_reset_hwm () +{ + int first_count = 9999; + int second_count = 1100; + int hwm = 11024; + void *ctx = zmq_ctx_new (); + assert (ctx); + int rc; + + // Set up bind socket + void *pub_socket = zmq_socket (ctx, ZMQ_PUB); + assert (pub_socket); + rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm)); + assert (rc == 0); + rc = zmq_bind (pub_socket, "tcp://127.0.0.1:1234"); + assert (rc == 0); + + // Set up connect socket + void *sub_socket = zmq_socket (ctx, ZMQ_SUB); + assert (sub_socket); + rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm)); + assert (rc == 0); + rc = zmq_connect (sub_socket, "tcp://127.0.0.1:1234"); + assert (rc == 0); + rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0); + assert (rc == 0); + + msleep (100); + + // Send messages + int send_count = 0; + while (send_count < first_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++send_count; + assert (first_count == send_count); + + msleep (100); + + // Now receive all sent messages + int recv_count = 0; + while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) + { + ++recv_count; + } + assert (first_count == recv_count); + + msleep (100); + + // Send messages + send_count = 0; + while (send_count < second_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++send_count; + assert (second_count == send_count); + + msleep (100); + + // Now receive all sent messages + recv_count = 0; + while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) + { + ++recv_count; + } + assert (second_count == recv_count); + + // Clean up + rc = zmq_close (sub_socket); + assert (rc == 0); + + rc = zmq_close (pub_socket); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} int main (void) { @@ -167,5 +241,8 @@ int main (void) count = test_blocking (2000,6000); assert (count == 6000); + // hwm should apply to the messages that have already been received + test_reset_hwm (); + return 0; } From bad93c536ae1de1c0dd7c7a12f272ce1af4278b7 Mon Sep 17 00:00:00 2001 From: Fedor Sheremetyev Date: Tue, 24 Nov 2015 17:33:38 +0000 Subject: [PATCH 2/2] Set LWM to half of HWL. This reduces chances of race between writer deactivation and activation. Reader sends activation command to writer when number or messages is multiple of LWM. In situation with high throughput (millions of messages per second) and correspondingly large HWM (e.g. 10M) the difference between HWM needs to be large enough - so that activation command is received before pipe becomes full. --- src/pipe.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/pipe.cpp b/src/pipe.cpp index 74c12e7b..c09d4afb 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -460,14 +460,9 @@ int zmq::pipe_t::compute_lwm (int hwm_) // result in low performance. // // Given the 3. it would be good to keep HWM and LWM as far apart as - // possible to reduce the thread switching overhead to almost zero, - // say HWM-LWM should be max_wm_delta. - // - // That done, we still we have to account for the cases where - // HWM < max_wm_delta thus driving LWM to negative numbers. - // Let's make LWM 1/2 of HWM in such cases. - int result = (hwm_ > max_wm_delta * 2) ? - hwm_ - max_wm_delta : (hwm_ + 1) / 2; + // possible to reduce the thread switching overhead to almost zero. + // Let's make LWM 1/2 of HWM. + int result = (hwm_ + 1) / 2; return result; }