From ea26e68b9d4c3625e80b959e9db02fcdff14a7b1 Mon Sep 17 00:00:00 2001 From: Jens Auer Date: Thu, 8 Oct 2015 22:06:33 +0200 Subject: [PATCH 1/3] New options to set send/recv buffer size for TCP sockets. --- doc/zmq_getsockopt.txt | 27 +++++++++++++++++++++++++++ doc/zmq_setsockopt.txt | 27 +++++++++++++++++++++++++++ include/zmq.h | 2 ++ src/options.cpp | 29 +++++++++++++++++++++++++++++ src/options.hpp | 4 ++++ src/stream_engine.cpp | 24 ++++++++++++------------ 6 files changed, 101 insertions(+), 12 deletions(-) diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 1ca6646e..fb7583f6 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -736,6 +736,33 @@ Option value unit:: N/A Default value:: not set Applicable socket types:: all, when using TCP transport +ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RECV_BUFFER' specifies the maximum number of bytes which can +be received by an individual syscall to receive data from the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport + +ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_SEND_BUFFER' specifies the maximum number of bytes which can +be sent by an individual syscall to transmit data to the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport RETURN VALUE ------------ diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index de79c93b..9921b12d 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -1071,6 +1071,33 @@ Option value unit:: boolean Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. +ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RECV_BUFFER' specifies the maximum number of bytes which can +be received by an individual syscall to receive data from the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport + +ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_SEND_BUFFER' specifies the maximum number of bytes which can +be sent by an individual syscall to transmit data to the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport RETURN VALUE ------------ diff --git a/include/zmq.h b/include/zmq.h index 62479163..bc587bf9 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -323,6 +323,8 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); #define ZMQ_CONNECT_TIMEOUT 79 #define ZMQ_TCP_RETRANSMIT_TIMEOUT 80 #define ZMQ_THREAD_SAFE 81 +#define ZMQ_TCP_RECV_BUFFER 82 +#define ZMQ_TCP_SEND_BUFFER 83 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index 67dfe025..5c6e8406 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -28,6 +28,7 @@ */ #include +#include #include "options.hpp" #include "err.hpp" @@ -65,6 +66,8 @@ zmq::options_t::options_t () : tcp_keepalive_cnt (-1), tcp_keepalive_idle (-1), tcp_keepalive_intvl (-1), + tcp_recv_buffer_size (3), + tcp_send_buffer_size (3), mechanism (ZMQ_NULL), as_server (0), gss_plaintext (false), @@ -280,6 +283,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; + case ZMQ_TCP_RECV_BUFFER: + if (is_int && (value >= 0 && value <= 10) ) { + tcp_send_buffer_size = static_cast(std::pow(2, value)) * 1024; + } + break; + + case ZMQ_TCP_SEND_BUFFER: + if (is_int && (value >= 0 && value <= 10) ) { + tcp_send_buffer_size = static_cast(std::pow(2, value)) * 1024; + } + break; + case ZMQ_IMMEDIATE: if (is_int && (value == 0 || value == 1)) { immediate = value; @@ -790,6 +805,20 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; + case ZMQ_TCP_SEND_BUFFER: + if (is_int) { + *value = tcp_send_buffer_size; + return 0; + } + break; + + case ZMQ_TCP_RECV_BUFFER: + if (is_int) { + *value = tcp_recv_buffer_size; + return 0; + } + break; + case ZMQ_MECHANISM: if (is_int) { *value = mechanism; diff --git a/src/options.hpp b/src/options.hpp index 288a1b36..be04ec64 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -156,6 +156,10 @@ namespace zmq typedef std::vector tcp_accept_filters_t; tcp_accept_filters_t tcp_accept_filters; + // TCO buffer sizes + int tcp_recv_buffer_size; + int tcp_send_buffer_size; + // IPC accept() filters # if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED bool zap_ipc_creds; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 44814802..03c1f9a7 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -203,10 +203,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, if (options.raw_socket) { // no handshaking for raw sock, instantiate raw encoder and decoders - encoder = new (std::nothrow) raw_encoder_t (out_batch_size); + encoder = new (std::nothrow) raw_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); - decoder = new (std::nothrow) raw_decoder_t (in_batch_size); + decoder = new (std::nothrow) raw_decoder_t (options.tcp_recv_buffer_size); alloc_assert (decoder); // disable handshaking for raw socket @@ -385,12 +385,12 @@ void zmq::stream_engine_t::out_event () outpos = NULL; outsize = encoder->encode (&outpos, 0); - while (outsize < out_batch_size) { + while (outsize < options.tcp_send_buffer_size) { if ((this->*next_msg) (&tx_msg) == -1) break; encoder->load_msg (&tx_msg); unsigned char *bufptr = outpos + outsize; - size_t n = encoder->encode (&bufptr, out_batch_size - outsize); + size_t n = encoder->encode (&bufptr, options.tcp_send_buffer_size - outsize); zmq_assert (n > 0); if (outpos == NULL) outpos = bufptr; @@ -587,10 +587,10 @@ bool zmq::stream_engine_t::handshake () return false; } - encoder = new (std::nothrow) v1_encoder_t (out_batch_size); + encoder = new (std::nothrow) v1_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); - decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize); + decoder = new (std::nothrow) v1_decoder_t (options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); // We have already sent the message header. @@ -635,11 +635,11 @@ bool zmq::stream_engine_t::handshake () } encoder = new (std::nothrow) v1_encoder_t ( - out_batch_size); + options.tcp_send_buffer_size); alloc_assert (encoder); decoder = new (std::nothrow) v1_decoder_t ( - in_batch_size, options.maxmsgsize); + options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); } else @@ -650,19 +650,19 @@ bool zmq::stream_engine_t::handshake () return false; } - encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); decoder = new (std::nothrow) v2_decoder_t ( - in_batch_size, options.maxmsgsize); + options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); } else { - encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); decoder = new (std::nothrow) v2_decoder_t ( - in_batch_size, options.maxmsgsize); + options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); if (options.mechanism == ZMQ_NULL From cdeec4c115b3d2f62417ae57c2cd5f5dcdf348f4 Mon Sep 17 00:00:00 2001 From: Jens Auer Date: Thu, 8 Oct 2015 22:06:33 +0200 Subject: [PATCH 2/3] New options to set send/recv buffer size for TCP sockets. --- doc/zmq_getsockopt.txt | 27 +++++++++++++++++++++++++++ doc/zmq_setsockopt.txt | 27 +++++++++++++++++++++++++++ include/zmq.h | 2 ++ src/options.cpp | 29 +++++++++++++++++++++++++++++ src/options.hpp | 4 ++++ src/stream_engine.cpp | 24 ++++++++++++------------ 6 files changed, 101 insertions(+), 12 deletions(-) diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 1ca6646e..fb7583f6 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -736,6 +736,33 @@ Option value unit:: N/A Default value:: not set Applicable socket types:: all, when using TCP transport +ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RECV_BUFFER' specifies the maximum number of bytes which can +be received by an individual syscall to receive data from the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport + +ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_SEND_BUFFER' specifies the maximum number of bytes which can +be sent by an individual syscall to transmit data to the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport RETURN VALUE ------------ diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index de79c93b..9921b12d 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -1071,6 +1071,33 @@ Option value unit:: boolean Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. +ZMQ_TCP_RECV_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_RECV_BUFFER' specifies the maximum number of bytes which can +be received by an individual syscall to receive data from the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport + +ZMQ_TCP_SEND_BUFFER: Size of the TCP receive buffer +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_SEND_BUFFER' specifies the maximum number of bytes which can +be sent by an individual syscall to transmit data to the TCP +socket. The buffer size is specified as an integer number from 0 (very small) +to 10 (very large). The default value is 3. + + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 3 +Applicable socket types:: all, when using TCP transport RETURN VALUE ------------ diff --git a/include/zmq.h b/include/zmq.h index 11a37d71..89b87c44 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -323,6 +323,8 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); #define ZMQ_CONNECT_TIMEOUT 79 #define ZMQ_TCP_RETRANSMIT_TIMEOUT 80 #define ZMQ_THREAD_SAFE 81 +#define ZMQ_TCP_RECV_BUFFER 82 +#define ZMQ_TCP_SEND_BUFFER 83 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index 67dfe025..5c6e8406 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -28,6 +28,7 @@ */ #include +#include #include "options.hpp" #include "err.hpp" @@ -65,6 +66,8 @@ zmq::options_t::options_t () : tcp_keepalive_cnt (-1), tcp_keepalive_idle (-1), tcp_keepalive_intvl (-1), + tcp_recv_buffer_size (3), + tcp_send_buffer_size (3), mechanism (ZMQ_NULL), as_server (0), gss_plaintext (false), @@ -280,6 +283,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; + case ZMQ_TCP_RECV_BUFFER: + if (is_int && (value >= 0 && value <= 10) ) { + tcp_send_buffer_size = static_cast(std::pow(2, value)) * 1024; + } + break; + + case ZMQ_TCP_SEND_BUFFER: + if (is_int && (value >= 0 && value <= 10) ) { + tcp_send_buffer_size = static_cast(std::pow(2, value)) * 1024; + } + break; + case ZMQ_IMMEDIATE: if (is_int && (value == 0 || value == 1)) { immediate = value; @@ -790,6 +805,20 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; + case ZMQ_TCP_SEND_BUFFER: + if (is_int) { + *value = tcp_send_buffer_size; + return 0; + } + break; + + case ZMQ_TCP_RECV_BUFFER: + if (is_int) { + *value = tcp_recv_buffer_size; + return 0; + } + break; + case ZMQ_MECHANISM: if (is_int) { *value = mechanism; diff --git a/src/options.hpp b/src/options.hpp index 288a1b36..be04ec64 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -156,6 +156,10 @@ namespace zmq typedef std::vector tcp_accept_filters_t; tcp_accept_filters_t tcp_accept_filters; + // TCO buffer sizes + int tcp_recv_buffer_size; + int tcp_send_buffer_size; + // IPC accept() filters # if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED bool zap_ipc_creds; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 44814802..03c1f9a7 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -203,10 +203,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, if (options.raw_socket) { // no handshaking for raw sock, instantiate raw encoder and decoders - encoder = new (std::nothrow) raw_encoder_t (out_batch_size); + encoder = new (std::nothrow) raw_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); - decoder = new (std::nothrow) raw_decoder_t (in_batch_size); + decoder = new (std::nothrow) raw_decoder_t (options.tcp_recv_buffer_size); alloc_assert (decoder); // disable handshaking for raw socket @@ -385,12 +385,12 @@ void zmq::stream_engine_t::out_event () outpos = NULL; outsize = encoder->encode (&outpos, 0); - while (outsize < out_batch_size) { + while (outsize < options.tcp_send_buffer_size) { if ((this->*next_msg) (&tx_msg) == -1) break; encoder->load_msg (&tx_msg); unsigned char *bufptr = outpos + outsize; - size_t n = encoder->encode (&bufptr, out_batch_size - outsize); + size_t n = encoder->encode (&bufptr, options.tcp_send_buffer_size - outsize); zmq_assert (n > 0); if (outpos == NULL) outpos = bufptr; @@ -587,10 +587,10 @@ bool zmq::stream_engine_t::handshake () return false; } - encoder = new (std::nothrow) v1_encoder_t (out_batch_size); + encoder = new (std::nothrow) v1_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); - decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize); + decoder = new (std::nothrow) v1_decoder_t (options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); // We have already sent the message header. @@ -635,11 +635,11 @@ bool zmq::stream_engine_t::handshake () } encoder = new (std::nothrow) v1_encoder_t ( - out_batch_size); + options.tcp_send_buffer_size); alloc_assert (encoder); decoder = new (std::nothrow) v1_decoder_t ( - in_batch_size, options.maxmsgsize); + options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); } else @@ -650,19 +650,19 @@ bool zmq::stream_engine_t::handshake () return false; } - encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); decoder = new (std::nothrow) v2_decoder_t ( - in_batch_size, options.maxmsgsize); + options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); } else { - encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size); alloc_assert (encoder); decoder = new (std::nothrow) v2_decoder_t ( - in_batch_size, options.maxmsgsize); + options.tcp_recv_buffer_size, options.maxmsgsize); alloc_assert (decoder); if (options.mechanism == ZMQ_NULL From 908d6b67401509de724c509f1c4b09187596e177 Mon Sep 17 00:00:00 2001 From: Jens Auer Date: Sun, 8 Nov 2015 21:48:36 +0100 Subject: [PATCH 3/3] Update options.cpp Fixed ZMQ_TCP_RECV_BUFFER set case. --- src/options.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/options.cpp b/src/options.cpp index 5c6e8406..e4bc011a 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -285,7 +285,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, case ZMQ_TCP_RECV_BUFFER: if (is_int && (value >= 0 && value <= 10) ) { - tcp_send_buffer_size = static_cast(std::pow(2, value)) * 1024; + tcp_recv_buffer_size = static_cast(std::pow(2, value)) * 1024; } break;