From fca45921a85aafe7c08cc54c040f04cd93103031 Mon Sep 17 00:00:00 2001 From: somdoron Date: Tue, 3 May 2016 19:37:39 +0300 Subject: [PATCH] problem: zeromq performance got worsen by some changes --- src/config.hpp | 6 ++++++ src/options.cpp | 8 ++++---- src/stream_engine.cpp | 25 ++++++++++++------------- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/config.hpp b/src/config.hpp index ec518896..0a7433a8 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -59,6 +59,12 @@ namespace zmq // unnecessary network stack traversals. in_batch_size = 8192, + // Maximal batching size for engines with sending functionality. + // So, if there are 10 messages that fit into the batch size, all of + // them may be written by a single 'send' system call, thus avoiding + // unnecessary network stack traversals. + out_batch_size = 8192, + // Maximal delta between high and low watermark. max_wm_delta = 1024, diff --git a/src/options.cpp b/src/options.cpp index 69ac7dee..330d7ac8 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -43,8 +43,8 @@ zmq::options_t::options_t () : recovery_ivl (10000), multicast_hops (1), multicast_maxtpdu (1500), - sndbuf (8192), - rcvbuf (8192), + sndbuf (-1), + rcvbuf (-1), tos (0), type (-1), linger (-1), @@ -146,14 +146,14 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, break; case ZMQ_SNDBUF: - if (is_int && value >= 0) { + if (is_int && value >= -1) { sndbuf = value; return 0; } break; case ZMQ_RCVBUF: - if (is_int && value >= 0) { + if (is_int && value >= -1) { rcvbuf = value; return 0; } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index f2de8f5f..e1d16149 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -192,10 +192,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, if (options.raw_socket) { // no handshaking for raw sock, instantiate raw encoder and decoders - encoder = new (std::nothrow) raw_encoder_t (options.sndbuf); + encoder = new (std::nothrow) raw_encoder_t (out_batch_size); alloc_assert (encoder); - decoder = new (std::nothrow) raw_decoder_t (options.rcvbuf); + decoder = new (std::nothrow) raw_decoder_t (in_batch_size); alloc_assert (decoder); // disable handshaking for raw socket @@ -374,12 +374,12 @@ void zmq::stream_engine_t::out_event () outpos = NULL; outsize = encoder->encode (&outpos, 0); - while (outsize < (size_t) options.sndbuf) { + while (outsize < (size_t) out_batch_size) { if ((this->*next_msg) (&tx_msg) == -1) break; encoder->load_msg (&tx_msg); unsigned char *bufptr = outpos + outsize; - size_t n = encoder->encode (&bufptr, options.sndbuf - outsize); + size_t n = encoder->encode (&bufptr, out_batch_size - outsize); zmq_assert (n > 0); if (outpos == NULL) outpos = bufptr; @@ -576,10 +576,10 @@ bool zmq::stream_engine_t::handshake () return false; } - encoder = new (std::nothrow) v1_encoder_t (options.sndbuf); + encoder = new (std::nothrow) v1_encoder_t (out_batch_size); alloc_assert (encoder); - decoder = new (std::nothrow) v1_decoder_t (options.rcvbuf, options.maxmsgsize); + decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize); alloc_assert (decoder); // We have already sent the message header. @@ -623,12 +623,11 @@ bool zmq::stream_engine_t::handshake () return false; } - encoder = new (std::nothrow) v1_encoder_t ( - options.sndbuf); + encoder = new (std::nothrow) v1_encoder_t (out_batch_size); alloc_assert (encoder); decoder = new (std::nothrow) v1_decoder_t ( - options.rcvbuf, options.maxmsgsize); + in_batch_size, options.maxmsgsize); alloc_assert (decoder); } else @@ -639,19 +638,19 @@ bool zmq::stream_engine_t::handshake () return false; } - encoder = new (std::nothrow) v2_encoder_t (options.sndbuf); + encoder = new (std::nothrow) v2_encoder_t (out_batch_size); alloc_assert (encoder); decoder = new (std::nothrow) v2_decoder_t ( - options.rcvbuf, options.maxmsgsize); + in_batch_size, options.maxmsgsize); alloc_assert (decoder); } else { - encoder = new (std::nothrow) v2_encoder_t (options.sndbuf); + encoder = new (std::nothrow) v2_encoder_t (out_batch_size); alloc_assert (encoder); decoder = new (std::nothrow) v2_decoder_t ( - options.rcvbuf, options.maxmsgsize); + in_batch_size, options.maxmsgsize); alloc_assert (decoder); if (options.mechanism == ZMQ_NULL