New options to set send/recv buffer size for TCP sockets.

This commit is contained in:
Jens Auer
2015-10-08 22:06:33 +02:00
committed by Jens Auer
parent c41fe88df6
commit cdeec4c115
6 changed files with 101 additions and 12 deletions

View File

@@ -203,10 +203,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
if (options.raw_socket) {
// no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
encoder = new (std::nothrow) raw_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder);
decoder = new (std::nothrow) raw_decoder_t (in_batch_size);
decoder = new (std::nothrow) raw_decoder_t (options.tcp_recv_buffer_size);
alloc_assert (decoder);
// disable handshaking for raw socket
@@ -385,12 +385,12 @@ void zmq::stream_engine_t::out_event ()
outpos = NULL;
outsize = encoder->encode (&outpos, 0);
while (outsize < out_batch_size) {
while (outsize < options.tcp_send_buffer_size) {
if ((this->*next_msg) (&tx_msg) == -1)
break;
encoder->load_msg (&tx_msg);
unsigned char *bufptr = outpos + outsize;
size_t n = encoder->encode (&bufptr, out_batch_size - outsize);
size_t n = encoder->encode (&bufptr, options.tcp_send_buffer_size - outsize);
zmq_assert (n > 0);
if (outpos == NULL)
outpos = bufptr;
@@ -587,10 +587,10 @@ bool zmq::stream_engine_t::handshake ()
return false;
}
encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
encoder = new (std::nothrow) v1_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t (in_batch_size, options.maxmsgsize);
decoder = new (std::nothrow) v1_decoder_t (options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder);
// We have already sent the message header.
@@ -635,11 +635,11 @@ bool zmq::stream_engine_t::handshake ()
}
encoder = new (std::nothrow) v1_encoder_t (
out_batch_size);
options.tcp_send_buffer_size);
alloc_assert (encoder);
decoder = new (std::nothrow) v1_decoder_t (
in_batch_size, options.maxmsgsize);
options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder);
}
else
@@ -650,19 +650,19 @@ bool zmq::stream_engine_t::handshake ()
return false;
}
encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t (
in_batch_size, options.maxmsgsize);
options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder);
}
else {
encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
encoder = new (std::nothrow) v2_encoder_t (options.tcp_send_buffer_size);
alloc_assert (encoder);
decoder = new (std::nothrow) v2_decoder_t (
in_batch_size, options.maxmsgsize);
options.tcp_recv_buffer_size, options.maxmsgsize);
alloc_assert (decoder);
if (options.mechanism == ZMQ_NULL