diff --git a/src/router.cpp b/src/router.cpp index 78e15cb6..e62d0afb 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -159,6 +159,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_) zmq_assert (iter != outpipes.end ()); outpipes.erase (iter); fq.pipe_terminated (pipe_); + pipe_->rollback (); if (pipe_ == current_out) current_out = NULL; } @@ -273,6 +274,9 @@ int zmq::router_t::xsend (msg_t *msg_) // Message failed to send - we must close it ourselves. int rc = msg_->close (); errno_assert (rc == 0); + // HWM was checked before, so the pipe must be gone. Roll back + // messages that were piped, for example REP labels. + current_out->rollback (); current_out = NULL; } else { if (!more_out) { diff --git a/tests/test_reqrep_ipc.cpp b/tests/test_reqrep_ipc.cpp index 394a4fda..7e483ee9 100644 --- a/tests/test_reqrep_ipc.cpp +++ b/tests/test_reqrep_ipc.cpp @@ -29,9 +29,48 @@ #include "testutil.hpp" -int main (void) +void test_leak (void) +{ + char my_endpoint[256]; + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_REP); + assert (sb); + int rc = zmq_bind (sb, "ipc://*"); + assert (rc == 0); + size_t len = sizeof(my_endpoint); + rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_REQ); + assert (sc); + rc = zmq_connect (sc, my_endpoint); + assert (rc == 0); + + rc = s_send (sc, "leakymsg"); + assert (rc == strlen ("leakymsg")); + + char *buf = s_recv (sb); + free (buf); + + rc = zmq_close (sc); + assert (rc == 0); + + msleep (SETTLE_TIME); + + rc = s_send (sb, "leakymsg"); + assert (rc == strlen ("leakymsg")); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +void test_simple (void) { - setup_test_environment(); char my_endpoint[256]; void *ctx = zmq_ctx_new (); assert (ctx); @@ -59,6 +98,15 @@ int main (void) rc = zmq_ctx_term (ctx); assert (rc == 0); +} + +int main (void) +{ + setup_test_environment(); + + test_simple (); + + test_leak (); return 0 ; }