mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-27 19:10:22 +01:00
LWM is computed rather than explicitly specified by user
This commit is contained in:
@@ -40,21 +40,6 @@ Default value:: 0
|
|||||||
Applicable socket types:: all
|
Applicable socket types:: all
|
||||||
|
|
||||||
|
|
||||||
ZMQ_LWM: Set low water mark
|
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
||||||
The 'ZMQ_LWM' option shall set the low water mark for the _message queue_
|
|
||||||
associated with the socket. This option only makes sense when used in
|
|
||||||
conjunction with the 'ZMQ_HWM' option. A socket which has reached it's high
|
|
||||||
water mark remains in the "emergency" state until the number of outstanding
|
|
||||||
messages in it's associated message queue falls below the low water mark, at
|
|
||||||
which point normal message processing is resumed.
|
|
||||||
|
|
||||||
Option value type:: int64_t
|
|
||||||
Option value unit:: messages
|
|
||||||
Default value:: 0
|
|
||||||
Applicable socket types:: all
|
|
||||||
|
|
||||||
|
|
||||||
ZMQ_SWAP: Set disk offload size
|
ZMQ_SWAP: Set disk offload size
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the _message
|
The 'ZMQ_SWAP' option shall set the disk offload (swap) size for the _message
|
||||||
|
|||||||
@@ -169,6 +169,7 @@ ZMQ_EXPORT int zmq_term (void *context);
|
|||||||
|
|
||||||
/* Socket options. */
|
/* Socket options. */
|
||||||
#define ZMQ_HWM 1
|
#define ZMQ_HWM 1
|
||||||
|
/* TODO: LWM is obsolete and should be removed in next version. */
|
||||||
#define ZMQ_LWM 2
|
#define ZMQ_LWM 2
|
||||||
#define ZMQ_SWAP 3
|
#define ZMQ_SWAP 3
|
||||||
#define ZMQ_AFFINITY 4
|
#define ZMQ_AFFINITY 4
|
||||||
|
|||||||
@@ -60,6 +60,9 @@ namespace zmq
|
|||||||
// unnecessary network stack traversals.
|
// unnecessary network stack traversals.
|
||||||
out_batch_size = 8192,
|
out_batch_size = 8192,
|
||||||
|
|
||||||
|
// Maximal delta between high and low watermark.
|
||||||
|
max_wm_delta = 1024,
|
||||||
|
|
||||||
// Maximum number of events the I/O thread can process in one go.
|
// Maximum number of events the I/O thread can process in one go.
|
||||||
max_io_events = 256,
|
max_io_events = 256,
|
||||||
|
|
||||||
|
|||||||
@@ -26,7 +26,6 @@
|
|||||||
|
|
||||||
zmq::options_t::options_t () :
|
zmq::options_t::options_t () :
|
||||||
hwm (0),
|
hwm (0),
|
||||||
lwm (0),
|
|
||||||
swap (0),
|
swap (0),
|
||||||
affinity (0),
|
affinity (0),
|
||||||
rate (100),
|
rate (100),
|
||||||
@@ -53,14 +52,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
|||||||
hwm = *((uint64_t*) optval_);
|
hwm = *((uint64_t*) optval_);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
case ZMQ_LWM:
|
|
||||||
if (optvallen_ != sizeof (uint64_t)) {
|
|
||||||
errno = EINVAL;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
lwm = *((uint64_t*) optval_);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
case ZMQ_SWAP:
|
case ZMQ_SWAP:
|
||||||
if (optvallen_ != sizeof (int64_t)) {
|
if (optvallen_ != sizeof (int64_t)) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
@@ -155,15 +146,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
|||||||
*optvallen_ = sizeof (uint64_t);
|
*optvallen_ = sizeof (uint64_t);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
case ZMQ_LWM:
|
|
||||||
if (*optvallen_ < sizeof (uint64_t)) {
|
|
||||||
errno = EINVAL;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
*((uint64_t*) optval_) = lwm;
|
|
||||||
*optvallen_ = sizeof (uint64_t);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
case ZMQ_SWAP:
|
case ZMQ_SWAP:
|
||||||
if (*optvallen_ < sizeof (int64_t)) {
|
if (*optvallen_ < sizeof (int64_t)) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
|
|||||||
@@ -35,7 +35,6 @@ namespace zmq
|
|||||||
int getsockopt (int option_, void *optval_, size_t *optvallen_);
|
int getsockopt (int option_, void *optval_, size_t *optvallen_);
|
||||||
|
|
||||||
uint64_t hwm;
|
uint64_t hwm;
|
||||||
uint64_t lwm;
|
|
||||||
int64_t swap;
|
int64_t swap;
|
||||||
uint64_t affinity;
|
uint64_t affinity;
|
||||||
blob_t identity;
|
blob_t identity;
|
||||||
|
|||||||
35
src/pipe.cpp
35
src/pipe.cpp
@@ -233,9 +233,9 @@ bool zmq::writer_t::pipe_full ()
|
|||||||
}
|
}
|
||||||
|
|
||||||
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
|
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
|
||||||
uint64_t hwm_, uint64_t lwm_) :
|
uint64_t hwm_) :
|
||||||
reader (reader_parent_, hwm_, lwm_),
|
reader (reader_parent_, hwm_, compute_lwm (hwm_)),
|
||||||
writer (writer_parent_, hwm_, lwm_)
|
writer (writer_parent_, hwm_, compute_lwm (hwm_))
|
||||||
{
|
{
|
||||||
reader.set_pipe (this);
|
reader.set_pipe (this);
|
||||||
writer.set_pipe (this);
|
writer.set_pipe (this);
|
||||||
@@ -250,3 +250,32 @@ zmq::pipe_t::~pipe_t ()
|
|||||||
while (read (&msg))
|
while (read (&msg))
|
||||||
zmq_msg_close (&msg);
|
zmq_msg_close (&msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
|
||||||
|
{
|
||||||
|
// Following point should be taken into consideration when computing
|
||||||
|
// low watermark:
|
||||||
|
//
|
||||||
|
// 1. LWM has to be less than HWM.
|
||||||
|
// 2. LWM cannot be set to very low value (such as zero) as after filling
|
||||||
|
// the queue it would start to refill only after all the messages are
|
||||||
|
// read from it and thus unnecessarily hold the progress back.
|
||||||
|
// 3. LWM cannot be set to very high value (such as HWM-1) as it would
|
||||||
|
// result in lock-step filling of the queue - if a single message is read
|
||||||
|
// from a full queue, writer thread is resumed to write exactly one
|
||||||
|
// message to the queue and go back to sleep immediately. This would
|
||||||
|
// result in low performance.
|
||||||
|
//
|
||||||
|
// Given the 3. it would be good to keep HWM and LWM as far apart as
|
||||||
|
// possible to reduce the thread switching overhead to almost zero,
|
||||||
|
// say HWM-LWM should be 500 (max_wm_delta).
|
||||||
|
//
|
||||||
|
// That done, we still we have to account for the cases where HWM<500 thus
|
||||||
|
// driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases.
|
||||||
|
|
||||||
|
if (hwm_ > max_wm_delta * 2)
|
||||||
|
return hwm_ - max_wm_delta;
|
||||||
|
else
|
||||||
|
return hwm_ / 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -150,7 +150,7 @@ namespace zmq
|
|||||||
public:
|
public:
|
||||||
|
|
||||||
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
|
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
|
||||||
uint64_t hwm_, uint64_t lwm_);
|
uint64_t hwm_);
|
||||||
~pipe_t ();
|
~pipe_t ();
|
||||||
|
|
||||||
reader_t reader;
|
reader_t reader;
|
||||||
@@ -158,6 +158,8 @@ namespace zmq
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
uint64_t compute_lwm (uint64_t hwm_);
|
||||||
|
|
||||||
pipe_t (const pipe_t&);
|
pipe_t (const pipe_t&);
|
||||||
void operator = (const pipe_t&);
|
void operator = (const pipe_t&);
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -265,8 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
|
|||||||
writer_t *socket_writer = NULL;
|
writer_t *socket_writer = NULL;
|
||||||
|
|
||||||
if (options.requires_in && !out_pipe) {
|
if (options.requires_in && !out_pipe) {
|
||||||
pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
|
pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm);
|
||||||
options.hwm, options.lwm);
|
|
||||||
zmq_assert (pipe);
|
zmq_assert (pipe);
|
||||||
out_pipe = &pipe->writer;
|
out_pipe = &pipe->writer;
|
||||||
out_pipe->set_endpoint (this);
|
out_pipe->set_endpoint (this);
|
||||||
@@ -274,8 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (options.requires_out && !in_pipe) {
|
if (options.requires_out && !in_pipe) {
|
||||||
pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
|
pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm);
|
||||||
options.hwm, options.lwm);
|
|
||||||
zmq_assert (pipe);
|
zmq_assert (pipe);
|
||||||
in_pipe = &pipe->reader;
|
in_pipe = &pipe->reader;
|
||||||
in_pipe->set_endpoint (this);
|
in_pipe->set_endpoint (this);
|
||||||
|
|||||||
@@ -194,15 +194,13 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
|
|
||||||
// Create inbound pipe, if required.
|
// Create inbound pipe, if required.
|
||||||
if (options.requires_in) {
|
if (options.requires_in) {
|
||||||
in_pipe = new (std::nothrow) pipe_t (this, peer,
|
in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm);
|
||||||
options.hwm, options.lwm);
|
|
||||||
zmq_assert (in_pipe);
|
zmq_assert (in_pipe);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create outbound pipe, if required.
|
// Create outbound pipe, if required.
|
||||||
if (options.requires_out) {
|
if (options.requires_out) {
|
||||||
out_pipe = new (std::nothrow) pipe_t (peer, this,
|
out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm);
|
||||||
options.hwm, options.lwm);
|
|
||||||
zmq_assert (out_pipe);
|
zmq_assert (out_pipe);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -235,16 +233,14 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
|
|
||||||
// Create inbound pipe, if required.
|
// Create inbound pipe, if required.
|
||||||
if (options.requires_in) {
|
if (options.requires_in) {
|
||||||
in_pipe = new (std::nothrow) pipe_t (this, session,
|
in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm);
|
||||||
options.hwm, options.lwm);
|
|
||||||
zmq_assert (in_pipe);
|
zmq_assert (in_pipe);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create outbound pipe, if required.
|
// Create outbound pipe, if required.
|
||||||
if (options.requires_out) {
|
if (options.requires_out) {
|
||||||
out_pipe = new (std::nothrow) pipe_t (session, this,
|
out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm);
|
||||||
options.hwm, options.lwm);
|
|
||||||
zmq_assert (out_pipe);
|
zmq_assert (out_pipe);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user