mirror of
				https://github.com/zeromq/libzmq.git
				synced 2025-10-25 10:09:38 +02:00 
			
		
		
		
	Problem: REP leaves label msgs for dead REQ in pipe
Solution: roll back the pipe if writing messages other than the first fails in router::xsend. Roll it back also when the pipe is terminating. Also add test case that reproduces the memory leak when ran with valgrind. Fixes #2567
This commit is contained in:
		| @@ -159,6 +159,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_) | |||||||
|         zmq_assert (iter != outpipes.end ()); |         zmq_assert (iter != outpipes.end ()); | ||||||
|         outpipes.erase (iter); |         outpipes.erase (iter); | ||||||
|         fq.pipe_terminated (pipe_); |         fq.pipe_terminated (pipe_); | ||||||
|  |         pipe_->rollback (); | ||||||
|         if (pipe_ == current_out) |         if (pipe_ == current_out) | ||||||
|             current_out = NULL; |             current_out = NULL; | ||||||
|     } |     } | ||||||
| @@ -273,6 +274,9 @@ int zmq::router_t::xsend (msg_t *msg_) | |||||||
|             // Message failed to send - we must close it ourselves. |             // Message failed to send - we must close it ourselves. | ||||||
|             int rc = msg_->close (); |             int rc = msg_->close (); | ||||||
|             errno_assert (rc == 0); |             errno_assert (rc == 0); | ||||||
|  |             // HWM was checked before, so the pipe must be gone. Roll back | ||||||
|  |             // messages that were piped, for example REP labels. | ||||||
|  |             current_out->rollback (); | ||||||
|             current_out = NULL; |             current_out = NULL; | ||||||
|         } else { |         } else { | ||||||
|           if (!more_out) { |           if (!more_out) { | ||||||
|   | |||||||
| @@ -29,9 +29,48 @@ | |||||||
|  |  | ||||||
| #include "testutil.hpp" | #include "testutil.hpp" | ||||||
|  |  | ||||||
| int main (void) | void test_leak (void) | ||||||
|  | { | ||||||
|  |     char my_endpoint[256]; | ||||||
|  |     void *ctx = zmq_ctx_new (); | ||||||
|  |     assert (ctx); | ||||||
|  |  | ||||||
|  |     void *sb = zmq_socket (ctx, ZMQ_REP); | ||||||
|  |     assert (sb); | ||||||
|  |     int rc = zmq_bind (sb, "ipc://*"); | ||||||
|  |     assert (rc == 0); | ||||||
|  |     size_t len = sizeof(my_endpoint); | ||||||
|  |     rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len); | ||||||
|  |     assert (rc == 0); | ||||||
|  |  | ||||||
|  |     void *sc = zmq_socket (ctx, ZMQ_REQ); | ||||||
|  |     assert (sc); | ||||||
|  |     rc = zmq_connect (sc, my_endpoint); | ||||||
|  |     assert (rc == 0); | ||||||
|  |  | ||||||
|  |     rc = s_send (sc, "leakymsg"); | ||||||
|  |     assert (rc == strlen ("leakymsg")); | ||||||
|  |  | ||||||
|  |     char *buf = s_recv (sb); | ||||||
|  |     free (buf); | ||||||
|  |  | ||||||
|  |     rc = zmq_close (sc); | ||||||
|  |     assert (rc == 0); | ||||||
|  |  | ||||||
|  |     msleep (SETTLE_TIME); | ||||||
|  |  | ||||||
|  |     rc = s_send (sb, "leakymsg"); | ||||||
|  |     assert (rc == strlen ("leakymsg")); | ||||||
|  |  | ||||||
|  |     rc = zmq_close (sb); | ||||||
|  |     assert (rc == 0); | ||||||
|  |  | ||||||
|  |     rc = zmq_ctx_term (ctx); | ||||||
|  |     assert (rc == 0); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void test_simple (void) | ||||||
| { | { | ||||||
|     setup_test_environment(); |  | ||||||
|     char my_endpoint[256]; |     char my_endpoint[256]; | ||||||
|     void *ctx = zmq_ctx_new (); |     void *ctx = zmq_ctx_new (); | ||||||
|     assert (ctx); |     assert (ctx); | ||||||
| @@ -59,6 +98,15 @@ int main (void) | |||||||
|  |  | ||||||
|     rc = zmq_ctx_term (ctx); |     rc = zmq_ctx_term (ctx); | ||||||
|     assert (rc == 0); |     assert (rc == 0); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | int main (void) | ||||||
|  | { | ||||||
|  |     setup_test_environment(); | ||||||
|  |  | ||||||
|  |     test_simple (); | ||||||
|  |  | ||||||
|  |     test_leak (); | ||||||
|  |  | ||||||
|     return 0 ; |     return 0 ; | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Luca Boccassi
					Luca Boccassi