mirror of
				https://github.com/zeromq/libzmq.git
				synced 2025-10-23 00:08:02 +02:00 
			
		
		
		
	Problem: enormous memory increase due to zero copy decoding
The zero copy decoding strategy implemented for 4.2.0 can lead to a large increase of main memory usage in some cases (I have seen one program go up to 40G from 10G after upgrading from 4.1.4). This commit adds a new option to contexts, called ZMQ_ZERO_COPY_RECV, which allows one to switch to the old decoding strategy.
This commit is contained in:
		| @@ -39,6 +39,12 @@ The 'ZMQ_MAX_MSGSZ' argument returns the maximum size of a message | ||||
| allowed for this context. Default value is INT_MAX. | ||||
|  | ||||
|  | ||||
| ZMQ_ZERO_COPY_RCV: Get message decoding strategy | ||||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
| The 'ZMQ_ZERO_COPY_RCV' argument return whether message decoder uses a zero copy | ||||
| strategy when receiving messages. Default value is 1. | ||||
|  | ||||
|  | ||||
| ZMQ_SOCKET_LIMIT: Get largest configurable number of sockets | ||||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
| The 'ZMQ_SOCKET_LIMIT' argument returns the largest number of sockets that | ||||
|   | ||||
| @@ -127,6 +127,18 @@ Default value:: INT_MAX | ||||
| Maximum value:: INT_MAX | ||||
|  | ||||
|  | ||||
| ZMQ_ZERO_COPY_RCV: Specify message decoding strategy | ||||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
| The 'ZMQ_ZERO_COPY_RCV' argument specifies whether the message decoder should | ||||
| use a zero copy strategy when receiving messages. The zero copy strategy can | ||||
| lead to increased memory usage in some cases. This option allows you to use the | ||||
| older copying strategy. You can query the value of this option with | ||||
| linkzmq:zmq_ctx_get[3] using the 'ZMQ_ZERO_COPY_RECV' option. | ||||
|  | ||||
| [horizontal] | ||||
| Default value:: 1 | ||||
|  | ||||
|  | ||||
| ZMQ_MAX_SOCKETS: Set maximum number of sockets | ||||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||||
| The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed | ||||
|   | ||||
| @@ -628,6 +628,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread); | ||||
| #define ZMQ_THREAD_AFFINITY_CPU_ADD 7 | ||||
| #define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8 | ||||
| #define ZMQ_THREAD_NAME_PREFIX 9 | ||||
| #define ZMQ_ZERO_COPY_RECV 10 | ||||
|  | ||||
| /*  DRAFT Socket methods.                                                     */ | ||||
| ZMQ_EXPORT int zmq_join (void *s, const char *group); | ||||
|   | ||||
							
								
								
									
										10
									
								
								src/ctx.cpp
									
									
									
									
									
								
							
							
						
						
									
										10
									
								
								src/ctx.cpp
									
									
									
									
									
								
							| @@ -76,7 +76,8 @@ zmq::ctx_t::ctx_t () : | ||||
|     max_msgsz (INT_MAX), | ||||
|     io_thread_count (ZMQ_IO_THREADS_DFLT), | ||||
|     blocky (true), | ||||
|     ipv6 (false) | ||||
|     ipv6 (false), | ||||
|     zero_copy (true) | ||||
| { | ||||
| #ifdef HAVE_FORK | ||||
|     pid = getpid (); | ||||
| @@ -245,6 +246,9 @@ int zmq::ctx_t::set (int option_, int optval_) | ||||
|     } else if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) { | ||||
|         scoped_lock_t locker (opt_sync); | ||||
|         max_msgsz = optval_ < INT_MAX ? optval_ : INT_MAX; | ||||
|     } else if (option_ == ZMQ_ZERO_COPY_RECV && optval_ >= 0) { | ||||
|         scoped_lock_t locker (opt_sync); | ||||
|         zero_copy = (optval_ != 0); | ||||
|     } else { | ||||
|         rc = thread_ctx_t::set (option_, optval_); | ||||
|     } | ||||
| @@ -268,7 +272,9 @@ int zmq::ctx_t::get (int option_) | ||||
|         rc = max_msgsz; | ||||
|     else if (option_ == ZMQ_MSG_T_SIZE) | ||||
|         rc = sizeof (zmq_msg_t); | ||||
|     else { | ||||
|     else if (option_ == ZMQ_ZERO_COPY_RECV) { | ||||
|         rc = zero_copy; | ||||
|     } else { | ||||
|         errno = EINVAL; | ||||
|         rc = -1; | ||||
|     } | ||||
|   | ||||
| @@ -234,6 +234,9 @@ class ctx_t : public thread_ctx_t | ||||
|     //  Is IPv6 enabled on this context? | ||||
|     bool ipv6; | ||||
|  | ||||
|     // Should we use zero copy message decoding in this context? | ||||
|     bool zero_copy; | ||||
|  | ||||
|     ctx_t (const ctx_t &); | ||||
|     const ctx_t &operator= (const ctx_t &); | ||||
|  | ||||
|   | ||||
| @@ -405,8 +405,8 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) | ||||
|           (NormRxStreamState *) NormObjectGetUserData (object); | ||||
|         if (NULL == rxState) { | ||||
|             // This is a new stream, so create rxState with zmq decoder, etc | ||||
|             rxState = | ||||
|               new (std::nothrow) NormRxStreamState (object, options.maxmsgsize); | ||||
|             rxState = new (std::nothrow) | ||||
|               NormRxStreamState (object, options.maxmsgsize, options.zero_copy); | ||||
|             errno_assert (rxState); | ||||
|  | ||||
|             if (!rxState->Init ()) { | ||||
| @@ -547,9 +547,10 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) | ||||
| } // end zmq::norm_engine_t::recv_data() | ||||
|  | ||||
| zmq::norm_engine_t::NormRxStreamState::NormRxStreamState ( | ||||
|   NormObjectHandle normStream, int64_t maxMsgSize) : | ||||
|   NormObjectHandle normStream, int64_t maxMsgSize, bool zeroCopy) : | ||||
|     norm_stream (normStream), | ||||
|     max_msg_size (maxMsgSize), | ||||
|     zero_copy (zeroCopy), | ||||
|     in_sync (false), | ||||
|     rx_ready (false), | ||||
|     zmq_decoder (NULL), | ||||
| @@ -582,7 +583,8 @@ bool zmq::norm_engine_t::NormRxStreamState::Init () | ||||
|     if (NULL != zmq_decoder) | ||||
|         delete zmq_decoder; | ||||
|     // Note "in_batch_size" comes from config.h | ||||
|     zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size); | ||||
|     zmq_decoder = | ||||
|       new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy); | ||||
|     alloc_assert (zmq_decoder); | ||||
|     if (NULL != zmq_decoder) { | ||||
|         buffer_count = 0; | ||||
|   | ||||
| @@ -68,7 +68,9 @@ class norm_engine_t : public io_object_t, public i_engine | ||||
|     class NormRxStreamState | ||||
|     { | ||||
|       public: | ||||
|         NormRxStreamState (NormObjectHandle normStream, int64_t maxMsgSize); | ||||
|         NormRxStreamState (NormObjectHandle normStream, | ||||
|                            int64_t maxMsgSize, | ||||
|                            bool zeroCopy); | ||||
|         ~NormRxStreamState (); | ||||
|  | ||||
|         NormObjectHandle GetStreamHandle () const { return norm_stream; } | ||||
| @@ -132,6 +134,7 @@ class norm_engine_t : public io_object_t, public i_engine | ||||
|       private: | ||||
|         NormObjectHandle norm_stream; | ||||
|         int64_t max_msg_size; | ||||
|         bool zero_copy; | ||||
|         bool in_sync; | ||||
|         bool rx_ready; | ||||
|         v2_decoder_t *zmq_decoder; | ||||
|   | ||||
| @@ -91,7 +91,8 @@ zmq::options_t::options_t () : | ||||
|     heartbeat_timeout (-1), | ||||
|     use_fd (-1), | ||||
|     zap_enforce_domain (false), | ||||
|     loopback_fastpath (false) | ||||
|     loopback_fastpath (false), | ||||
|     zero_copy (true) | ||||
| { | ||||
|     memset (curve_public_key, 0, CURVE_KEYSIZE); | ||||
|     memset (curve_secret_key, 0, CURVE_KEYSIZE); | ||||
|   | ||||
| @@ -251,6 +251,9 @@ struct options_t | ||||
|  | ||||
|     // Use of loopback fastpath. | ||||
|     bool loopback_fastpath; | ||||
|  | ||||
|     // Use zero copy strategy for storing message content when decoding. | ||||
|     bool zero_copy; | ||||
| }; | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -206,6 +206,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, | ||||
|     options.socket_id = sid_; | ||||
|     options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); | ||||
|     options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0); | ||||
|     options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV); | ||||
|  | ||||
|     if (thread_safe) { | ||||
|         mailbox = new (std::nothrow) mailbox_safe_t (&sync); | ||||
|   | ||||
| @@ -636,15 +636,15 @@ bool zmq::stream_engine_t::handshake () | ||||
|         encoder = new (std::nothrow) v2_encoder_t (out_batch_size); | ||||
|         alloc_assert (encoder); | ||||
|  | ||||
|         decoder = | ||||
|           new (std::nothrow) v2_decoder_t (in_batch_size, options.maxmsgsize); | ||||
|         decoder = new (std::nothrow) | ||||
|           v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy); | ||||
|         alloc_assert (decoder); | ||||
|     } else { | ||||
|         encoder = new (std::nothrow) v2_encoder_t (out_batch_size); | ||||
|         alloc_assert (encoder); | ||||
|  | ||||
|         decoder = | ||||
|           new (std::nothrow) v2_decoder_t (in_batch_size, options.maxmsgsize); | ||||
|         decoder = new (std::nothrow) | ||||
|           v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy); | ||||
|         alloc_assert (decoder); | ||||
|  | ||||
|         if (options.mechanism == ZMQ_NULL | ||||
|   | ||||
| @@ -38,10 +38,13 @@ | ||||
| #include "wire.hpp" | ||||
| #include "err.hpp" | ||||
|  | ||||
| zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : | ||||
| zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, | ||||
|                                  int64_t maxmsgsize_, | ||||
|                                  bool zero_copy_) : | ||||
|     shared_message_memory_allocator (bufsize_), | ||||
|     decoder_base_t<v2_decoder_t, shared_message_memory_allocator> (this), | ||||
|     msg_flags (0), | ||||
|     zero_copy (zero_copy_), | ||||
|     maxmsgsize (maxmsgsize_) | ||||
| { | ||||
|     int rc = in_progress.init (); | ||||
| @@ -111,8 +114,9 @@ int zmq::v2_decoder_t::size_ready (uint64_t msg_size, | ||||
|     // the current message can exceed the current buffer. We have to copy the buffer | ||||
|     // data into a new message and complete it in the next receive. | ||||
|  | ||||
|     if (unlikely ((unsigned char *) read_pos + msg_size | ||||
|                   > (data () + size ()))) { | ||||
|     if (unlikely ( | ||||
|           !zero_copy | ||||
|           || ((unsigned char *) read_pos + msg_size > (data () + size ())))) { | ||||
|         // a new message has started, but the size would exceed the pre-allocated arena | ||||
|         // this happens every time when a message does not fit completely into the buffer | ||||
|         rc = in_progress.init_size (static_cast<size_t> (msg_size)); | ||||
|   | ||||
| @@ -44,7 +44,7 @@ class v2_decoder_t : | ||||
|   public decoder_base_t<v2_decoder_t, shared_message_memory_allocator> | ||||
| { | ||||
|   public: | ||||
|     v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_); | ||||
|     v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_, bool zero_copy_); | ||||
|     virtual ~v2_decoder_t (); | ||||
|  | ||||
|     //  i_decoder interface. | ||||
| @@ -62,6 +62,7 @@ class v2_decoder_t : | ||||
|     unsigned char msg_flags; | ||||
|     msg_t in_progress; | ||||
|  | ||||
|     const bool zero_copy; | ||||
|     const int64_t maxmsgsize; | ||||
|  | ||||
|     v2_decoder_t (const v2_decoder_t &); | ||||
|   | ||||
| @@ -99,6 +99,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); | ||||
| #define ZMQ_THREAD_AFFINITY_CPU_ADD 7 | ||||
| #define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8 | ||||
| #define ZMQ_THREAD_NAME_PREFIX 9 | ||||
| #define ZMQ_ZERO_COPY_RECV 10 | ||||
|  | ||||
| /*  DRAFT Socket methods.                                                     */ | ||||
| int zmq_join (void *s, const char *group); | ||||
|   | ||||
| @@ -146,6 +146,56 @@ void test_ctx_thread_opts (void *ctx) | ||||
| #endif | ||||
| } | ||||
|  | ||||
| void test_ctx_zero_copy (void *ctx) | ||||
| { | ||||
| #ifdef ZMQ_ZERO_COPY_RECV | ||||
|     int zero_copy; | ||||
|     // Default value is 1. | ||||
|     zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV); | ||||
|     assert (zero_copy == 1); | ||||
|  | ||||
|     // Test we can set it to 0. | ||||
|     assert (0 == zmq_ctx_set (ctx, ZMQ_ZERO_COPY_RECV, 0)); | ||||
|     zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV); | ||||
|     assert (zero_copy == 0); | ||||
|  | ||||
|     // Create a TCP socket pair using the context and test that messages can be | ||||
|     // received. Note that inproc sockets cannot be used for this test. | ||||
|     void *pull = zmq_socket (ctx, ZMQ_PULL); | ||||
|     assert (0 == zmq_bind (pull, "tcp://127.0.0.1:*")); | ||||
|  | ||||
|     void *push = zmq_socket (ctx, ZMQ_PUSH); | ||||
|     size_t endpoint_len = MAX_SOCKET_STRING; | ||||
|     char endpoint[MAX_SOCKET_STRING]; | ||||
|     assert ( | ||||
|       0 == zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len)); | ||||
|     assert (0 == zmq_connect (push, endpoint)); | ||||
|  | ||||
|     const char *small_str = "abcd"; | ||||
|     const char *large_str = | ||||
|       "01234567890123456789012345678901234567890123456789"; | ||||
|  | ||||
|     assert (4 == zmq_send (push, (void *) small_str, 4, 0)); | ||||
|     assert (40 == zmq_send (push, (void *) large_str, 40, 0)); | ||||
|  | ||||
|     zmq_msg_t small_msg, large_msg; | ||||
|     zmq_msg_init (&small_msg); | ||||
|     zmq_msg_init (&large_msg); | ||||
|     assert (4 == zmq_msg_recv (&small_msg, pull, 0)); | ||||
|     assert (40 == zmq_msg_recv (&large_msg, pull, 0)); | ||||
|     assert (!strncmp (small_str, (const char *) zmq_msg_data (&small_msg), 4)); | ||||
|     assert (!strncmp (large_str, (const char *) zmq_msg_data (&large_msg), 40)); | ||||
|  | ||||
|     // Clean up. | ||||
|     assert (0 == zmq_close (push)); | ||||
|     assert (0 == zmq_close (pull)); | ||||
|     assert (0 == zmq_msg_close (&small_msg)); | ||||
|     assert (0 == zmq_msg_close (&large_msg)); | ||||
|     assert (0 == zmq_ctx_set (ctx, ZMQ_ZERO_COPY_RECV, 1)); | ||||
|     zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV); | ||||
|     assert (zero_copy == 1); | ||||
| #endif | ||||
| } | ||||
|  | ||||
| int main (void) | ||||
| { | ||||
| @@ -173,6 +223,7 @@ int main (void) | ||||
|     assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1); | ||||
|  | ||||
|     test_ctx_thread_opts (ctx); | ||||
|     test_ctx_zero_copy (ctx); | ||||
|  | ||||
|     void *router = zmq_socket (ctx, ZMQ_ROUTER); | ||||
|     int value; | ||||
|   | ||||
| @@ -43,7 +43,7 @@ | ||||
| //  settled. Tested to work reliably at 1 msec on a fast PC. | ||||
| #define SETTLE_TIME 300 //  In msec | ||||
| //  Commonly used buffer size for ZMQ_LAST_ENDPOINT | ||||
| #define MAX_SOCKET_STRING sizeof ("tcp://127.0.0.1:65536") | ||||
| #define MAX_SOCKET_STRING sizeof ("tcp://[::ffff:127.127.127.127]:65536") | ||||
|  | ||||
| //  We need to test codepaths with non-random bind ports. List them here to | ||||
| //  keep them unique, to allow parallel test runs. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Stefan Kaes
					Stefan Kaes