From 2e39f892c353851fe90261db0a0875abab50539f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 10 Dec 2009 09:47:24 +0100 Subject: [PATCH] ZMQII-27: Allow setting SNDBUF and RCVBUF size from 0MQ API (POSIX) --- bindings/c/zmq.h | 2 ++ bindings/cl/zeromq.lisp | 2 ++ bindings/java/org/zmq/Socket.java | 2 ++ bindings/python/pyzmq.cpp | 6 ++++++ bindings/ruby/rbzmq.cpp | 2 ++ man/man3/zmq_setsockopt.3 | 24 ++++++++++++++++++++++++ src/options.cpp | 18 ++++++++++++++++++ src/options.hpp | 3 +++ src/tcp_socket.cpp | 17 +++++++++++++++-- src/tcp_socket.hpp | 3 ++- src/zmq_connecter_init.cpp | 2 +- src/zmq_engine.cpp | 8 +++++--- src/zmq_engine.hpp | 5 ++++- src/zmq_listener_init.cpp | 2 +- 14 files changed, 87 insertions(+), 9 deletions(-) diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 849ddefb..ae8d6b17 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -159,6 +159,8 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_RATE 8 #define ZMQ_RECOVERY_IVL 9 #define ZMQ_MCAST_LOOP 10 +#define ZMQ_SNDBUF 11 +#define ZMQ_RCVBUF 12 #define ZMQ_NOBLOCK 1 #define ZMQ_NOFLUSH 2 diff --git a/bindings/cl/zeromq.lisp b/bindings/cl/zeromq.lisp index 94f7672f..03befd5f 100644 --- a/bindings/cl/zeromq.lisp +++ b/bindings/cl/zeromq.lisp @@ -158,6 +158,8 @@ (defconstant rate 8) (defconstant recovery-ivl 9) (defconstant mcast-loop 10) +(defconstant sndbuf 11) +(defconstant rcvbuf 12) (defcfun* ("zmq_setsockopt" %setsockopt) :int (s :pointer) diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java index 396a6a03..935fade9 100644 --- a/bindings/java/org/zmq/Socket.java +++ b/bindings/java/org/zmq/Socket.java @@ -47,6 +47,8 @@ public class Socket public static final int RATE = 8; public static final int RECOVERY_IVL = 9; public static final int MCAST_LOOP = 10; + public static final int SNDBUF = 11; + public static final int RCVBUF = 12; /** * Class constructor. diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp index 26ca7ac3..f171eab2 100644 --- a/bindings/python/pyzmq.cpp +++ b/bindings/python/pyzmq.cpp @@ -534,6 +534,12 @@ PyMODINIT_FUNC initlibpyzmq () t = PyInt_FromLong (ZMQ_MCAST_LOOP); PyDict_SetItemString (dict, "MCAST_LOOP", t); Py_DECREF (t); + t = PyInt_FromLong (ZMQ_SNDBUF); + PyDict_SetItemString (dict, "SNDBUF", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_RCVBUF); + PyDict_SetItemString (dict, "RCVBUF", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_POLL); PyDict_SetItemString (dict, "POLL", t); Py_DECREF (t); diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp index 2a26ce15..43baeef9 100644 --- a/bindings/ruby/rbzmq.cpp +++ b/bindings/ruby/rbzmq.cpp @@ -266,6 +266,8 @@ extern "C" void Init_librbzmq () rb_define_global_const ("RATE", INT2NUM (ZMQ_RATE)); rb_define_global_const ("RECOVERY_IVL", INT2NUM (ZMQ_RECOVERY_IVL)); rb_define_global_const ("MCAST_LOOP", INT2NUM (ZMQ_MCAST_LOOP)); + rb_define_global_const ("SNDBUF", INT2NUM (ZMQ_SNDBUF)); + rb_define_global_const ("RCVBUF", INT2NUM (ZMQ_RCVBUF)); rb_define_global_const ("NOBLOCK", INT2NUM (ZMQ_NOBLOCK)); rb_define_global_const ("NOFLUSH", INT2NUM (ZMQ_NOFLUSH)); diff --git a/man/man3/zmq_setsockopt.3 b/man/man3/zmq_setsockopt.3 index a79f8790..36b7f082 100644 --- a/man/man3/zmq_setsockopt.3 +++ b/man/man3/zmq_setsockopt.3 @@ -17,6 +17,7 @@ High watermark for the message pipes associated with the socket. The water mark cannot be exceeded. If the messages don't fit into the pipe emergency mechanisms of the particular socket type are used (block, drop etc.) If HWM is set to zero, there are no limits for the content of the pipe. + Type: int64_t Unit: bytes Default: 0 .IP "\fBZMQ_LWM\fP" @@ -24,6 +25,7 @@ Low watermark makes sense only if high watermark is defined (i.e. is non-zero). When the emergency state is reached when messages overflow the pipe, the emergency lasts till the size of the pipe decreases to low watermark. At that point normal state is resumed. + Type: int64_t Unit: bytes Default: 0 .IP "\fBZMQ_SWAP\fP" @@ -31,6 +33,7 @@ Swap allows the pipe to exceed high watermark. However, the data are written to the disk rather than held in the memory. Until high watermark is exceeded there is no disk activity involved though. The value of the option defines maximal size of the swap file. + Type: int64_t Unit: bytes Default: 0 .IP "\fBZMQ_AFFINITY\fP" @@ -41,6 +44,7 @@ fairly among the threads in the thread pool. For non-zero values, the lowest bit corresponds to the thread 1, second lowest bit to the thread 2 etc. Thus, value of 3 means that from now on newly created sockets will handle I/O activity exclusively using threads no. 1 and 2. + Type: int64_t Unit: N/A (bitmap) Default: 0 .IP "\fBZMQ_IDENTITY\fP" @@ -50,6 +54,7 @@ separated from other runs. However, with identity application reconnects to existing infrastructure left by the previous run. Thus it may receive messages that were sent in the meantime, it shares pipe limits with the previous run etc. + Type: string Unit: N/A Default: NULL .IP "\fBZMQ_SUBSCRIBE\fP" @@ -61,6 +66,7 @@ specific topic ("x.y.z") and/or messages with specific topic prefix the very beginning of the message. Multiple filters can be attached to a single 'sub' socket. In that case message passes if it matches at least one of the filters. + Type: string Unit: N/A Default: N/A .IP "\fBZMQ_UNSUBSCRIBE\fP" @@ -69,12 +75,14 @@ The filter specified must match the string passed to ZMQ_SUBSCRIBE options exactly. If there were several instances of the same filter created, this options removes only one of them, leaving the rest in place and functional. + Type: string Unit: N/A Default: N/A .IP "\fBZMQ_RATE\fP" This option applies only to sending side of multicast transports (pgm & udp). It specifies maximal outgoing data rate that an individual sender socket can send. + Type: uint64_t Unit: kilobits/second Default: 100 .IP "\fBZMQ_RECOVERY_IVL\fP" @@ -84,6 +92,7 @@ Keep in mind that large recovery intervals at high data rates result in very large recovery buffers, meaning that you can easily overload your box by setting say 1 minute recovery interval at 1Gb/s rate (requires 7GB in-memory buffer). + Type: uint64_t Unit: seconds Default: 10 .IP "\fBZMQ_MCAST_LOOP\fP" @@ -92,8 +101,23 @@ means that the mutlicast packets can be received on the box they were sent from. Setting the value to 0 disables the loopback functionality which can have negative impact on the performance. If possible, disable the loopback in production environments. + Type: uint64_t Unit: N/A (boolean value) Default: 1 +.IP "\fBZMQ_SNDBUF\fP" +Sets the underlying kernel transmit buffer size to the specified size. See +.IR SO_SNDBUF +POSIX socket option. Value of zero means leaving the OS default unchanged. + +Type: uint64_t Unit: bytes Default: 0 + +.IP "\fBZMQ_RCVBUF\fP" +Sets the underlying kernel receive buffer size to the specified size. See +.IR SO_RCVBUF +POSIX socket option. Value of zero means leaving the OS default unchanged. + +Type: uint64_t Unit: bytes Default: 0 + .SH RETURN VALUE In case of success the function returns zero. Otherwise it returns -1 and sets diff --git a/src/options.cpp b/src/options.cpp index 120ae7c6..3f903cb7 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -30,6 +30,8 @@ zmq::options_t::options_t () : rate (100), recovery_ivl (10), use_multicast_loop (true), + sndbuf (0), + rcvbuf (0), requires_in (false), requires_out (false) { @@ -106,6 +108,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return -1; } return 0; + + case ZMQ_SNDBUF: + if (optvallen_ != sizeof (uint64_t)) { + errno = EINVAL; + return -1; + } + sndbuf = *((uint64_t*) optval_); + return 0; + + case ZMQ_RCVBUF: + if (optvallen_ != sizeof (uint64_t)) { + errno = EINVAL; + return -1; + } + rcvbuf = *((uint64_t*) optval_); + return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index a52fdeb2..16bb857f 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -49,6 +49,9 @@ namespace zmq // Enable multicast loopback. Default disabled (false). bool use_multicast_loop; + uint64_t sndbuf; + uint64_t rcvbuf; + // These options are never set by the user directly. Instead they are // provided by the specific socket type. bool requires_in; diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index d3de66f6..6df9ef6b 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -34,7 +34,7 @@ zmq::tcp_socket_t::~tcp_socket_t () close (); } -int zmq::tcp_socket_t::open (fd_t fd_) +int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_) { zmq_assert (s == retired_fd); s = fd_; @@ -129,10 +129,23 @@ zmq::tcp_socket_t::~tcp_socket_t () close (); } -int zmq::tcp_socket_t::open (fd_t fd_) +int zmq::tcp_socket_t::open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_) { assert (s == retired_fd); s = fd_; + + if (sndbuf_) { + int sz = (int) sndbuf_; + int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sz, sizeof (int)); + errno_assert (rc == 0); + } + + if (rcvbuf_) { + int sz = (int) rcvbuf_; + int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &sz, sizeof (int)); + errno_assert (rc == 0); + } + return 0; } diff --git a/src/tcp_socket.hpp b/src/tcp_socket.hpp index 406e4c02..e71a6004 100644 --- a/src/tcp_socket.hpp +++ b/src/tcp_socket.hpp @@ -21,6 +21,7 @@ #define __ZMQ_TCP_SOCKET_HPP_INCLUDED__ #include "fd.hpp" +#include "stdint.hpp" namespace zmq { @@ -35,7 +36,7 @@ namespace zmq ~tcp_socket_t (); // Associates a socket with a native socket descriptor. - int open (fd_t fd_); + int open (fd_t fd_, uint64_t sndbuf_, uint64_t rcvbuf_); // Closes the underlying socket. int close (); diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp index 3f165cd4..ea6a8c01 100644 --- a/src/zmq_connecter_init.cpp +++ b/src/zmq_connecter_init.cpp @@ -31,7 +31,7 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_, session_name (session_name_) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_); + engine = new zmq_engine_t (parent_, fd_, options); zmq_assert (engine); } diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index e8e689d2..e8c9889b 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -24,7 +24,8 @@ #include "config.hpp" #include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : +zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, + const options_t &options_) : io_object_t (parent_), inbuf (NULL), insize (0), @@ -32,7 +33,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : outbuf (NULL), outsize (0), outpos (0), - inout (NULL) + inout (NULL), + options (options_) { // Allocate read & write buffer. inbuf_storage = (unsigned char*) malloc (in_batch_size); @@ -41,7 +43,7 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : zmq_assert (outbuf_storage); // Initialise the underlying socket. - int rc = tcp_socket.open (fd_); + int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index ea77b7e4..c842da78 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -36,7 +36,8 @@ namespace zmq { public: - zmq_engine_t (class io_thread_t *parent_, fd_t fd_); + zmq_engine_t (class io_thread_t *parent_, fd_t fd_, + const options_t &options_); ~zmq_engine_t (); // i_engine interface implementation. @@ -71,6 +72,8 @@ namespace zmq zmq_encoder_t encoder; zmq_decoder_t decoder; + options_t options; + zmq_engine_t (const zmq_engine_t&); void operator = (const zmq_engine_t&); }; diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index 632bebe8..0c9f0ee3 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -29,7 +29,7 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_, has_peer_identity (false) { // Create associated engine object. - engine = new zmq_engine_t (parent_, fd_); + engine = new zmq_engine_t (parent_, fd_, options); zmq_assert (engine); }