diff --git a/src/pipe.cpp b/src/pipe.cpp index 74c12e7b..c09d4afb 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -460,14 +460,9 @@ int zmq::pipe_t::compute_lwm (int hwm_) // result in low performance. // // Given the 3. it would be good to keep HWM and LWM as far apart as - // possible to reduce the thread switching overhead to almost zero, - // say HWM-LWM should be max_wm_delta. - // - // That done, we still we have to account for the cases where - // HWM < max_wm_delta thus driving LWM to negative numbers. - // Let's make LWM 1/2 of HWM in such cases. - int result = (hwm_ > max_wm_delta * 2) ? - hwm_ - max_wm_delta : (hwm_ + 1) / 2; + // possible to reduce the thread switching overhead to almost zero. + // Let's make LWM 1/2 of HWM. + int result = (hwm_ + 1) / 2; return result; } diff --git a/tests/test_hwm_pubsub.cpp b/tests/test_hwm_pubsub.cpp index a37bc8e6..993ab4b4 100644 --- a/tests/test_hwm_pubsub.cpp +++ b/tests/test_hwm_pubsub.cpp @@ -151,7 +151,81 @@ int test_blocking (int send_hwm, int msgCnt) return recv_count; } +// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100 +void test_reset_hwm () +{ + int first_count = 9999; + int second_count = 1100; + int hwm = 11024; + void *ctx = zmq_ctx_new (); + assert (ctx); + int rc; + + // Set up bind socket + void *pub_socket = zmq_socket (ctx, ZMQ_PUB); + assert (pub_socket); + rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm)); + assert (rc == 0); + rc = zmq_bind (pub_socket, "tcp://127.0.0.1:1234"); + assert (rc == 0); + + // Set up connect socket + void *sub_socket = zmq_socket (ctx, ZMQ_SUB); + assert (sub_socket); + rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm)); + assert (rc == 0); + rc = zmq_connect (sub_socket, "tcp://127.0.0.1:1234"); + assert (rc == 0); + rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0); + assert (rc == 0); + + msleep (100); + + // Send messages + int send_count = 0; + while (send_count < first_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++send_count; + assert (first_count == send_count); + + msleep (100); + + // Now receive all sent messages + int recv_count = 0; + while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) + { + ++recv_count; + } + assert (first_count == recv_count); + + msleep (100); + + // Send messages + send_count = 0; + while (send_count < second_count && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) + ++send_count; + assert (second_count == send_count); + + msleep (100); + + // Now receive all sent messages + recv_count = 0; + while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) + { + ++recv_count; + } + assert (second_count == recv_count); + + // Clean up + rc = zmq_close (sub_socket); + assert (rc == 0); + + rc = zmq_close (pub_socket); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} int main (void) { @@ -167,5 +241,8 @@ int main (void) count = test_blocking (2000,6000); assert (count == 6000); + // hwm should apply to the messages that have already been received + test_reset_hwm (); + return 0; }