mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-17 03:03:25 +02:00
better naming of flags and variables to real functionality: nodrop
This commit is contained in:
@@ -307,7 +307,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
|
|||||||
#define ZMQ_HANDSHAKE_IVL 66
|
#define ZMQ_HANDSHAKE_IVL 66
|
||||||
#define ZMQ_IDENTITY_FD 67
|
#define ZMQ_IDENTITY_FD 67
|
||||||
#define ZMQ_SOCKS_PROXY 68
|
#define ZMQ_SOCKS_PROXY 68
|
||||||
#define ZMQ_XPUB_WAIT 69
|
#define ZMQ_XPUB_NODROP 69
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
|
@@ -458,9 +458,9 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case ZMQ_XPUB_WAIT:
|
case ZMQ_XPUB_NODROP:
|
||||||
{
|
{
|
||||||
pubWait = true;
|
pub_nodrop = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@@ -811,9 +811,9 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ZMQ_XPUB_WAIT:
|
case ZMQ_XPUB_NODROP:
|
||||||
if( is_int) {
|
if( is_int) {
|
||||||
*value = pubWait;
|
*value = pub_nodrop;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@@ -179,8 +179,8 @@ namespace zmq
|
|||||||
// close socket. Default is 30 secs. 0 means no handshake timeout.
|
// close socket. Default is 30 secs. 0 means no handshake timeout.
|
||||||
int handshake_ivl;
|
int handshake_ivl;
|
||||||
|
|
||||||
// flag if PUB socket should block if reaching HWM
|
// flag if PUB socket should not drop messages if reaching HWM
|
||||||
bool pubWait;
|
bool pub_nodrop;
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@@ -90,7 +90,7 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
|
|||||||
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
||||||
size_t optvallen_)
|
size_t optvallen_)
|
||||||
{
|
{
|
||||||
if (option_ != ZMQ_XPUB_VERBOSE && option_ != ZMQ_XPUB_WAIT) {
|
if (option_ != ZMQ_XPUB_VERBOSE && option_ != ZMQ_XPUB_NODROP) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@@ -100,8 +100,8 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
}
|
}
|
||||||
if (option_ == ZMQ_XPUB_VERBOSE) {
|
if (option_ == ZMQ_XPUB_VERBOSE) {
|
||||||
verbose = (*static_cast <const int*> (optval_) != 0);
|
verbose = (*static_cast <const int*> (optval_) != 0);
|
||||||
} else if (option_ == ZMQ_XPUB_WAIT) {
|
} else if (option_ == ZMQ_XPUB_NODROP) {
|
||||||
wait = (*static_cast <const int*> (optval_) != 0);
|
nodrop = (*static_cast <const int*> (optval_) != 0);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
return -1;
|
return -1;
|
||||||
@@ -135,7 +135,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
|
|||||||
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
|
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
|
||||||
mark_as_matching, this);
|
mark_as_matching, this);
|
||||||
|
|
||||||
if (wait && !dist.check_hwm()) {
|
if (nodrop && !dist.check_hwm()) {
|
||||||
return EAGAIN;
|
return EAGAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -79,8 +79,8 @@ namespace zmq
|
|||||||
// True if we are in the middle of sending a multi-part message.
|
// True if we are in the middle of sending a multi-part message.
|
||||||
bool more;
|
bool more;
|
||||||
|
|
||||||
// wait for reaching LWM if HWM is reached
|
// dont drop messages if hwm reached, just return with EAGAIN
|
||||||
bool wait;
|
bool nodrop;
|
||||||
|
|
||||||
// List of pending (un)subscriptions, ie. those that were already
|
// List of pending (un)subscriptions, ie. those that were already
|
||||||
// applied to the trie, but not yet received by the user.
|
// applied to the trie, but not yet received by the user.
|
||||||
|
@@ -105,7 +105,7 @@ int test_blocking (int send_hwm, int msgCnt)
|
|||||||
//set a hwm on publisher
|
//set a hwm on publisher
|
||||||
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
|
||||||
int wait = 1;
|
int wait = 1;
|
||||||
rc = zmq_setsockopt (pub_socket, ZMQ_XPUB_WAIT, &wait, sizeof(wait));
|
rc = zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof(wait));
|
||||||
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
|
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
|
||||||
|
|
||||||
// Send until we block
|
// Send until we block
|
||||||
|
@@ -33,7 +33,7 @@ int main (void)
|
|||||||
|
|
||||||
// set pub socket options
|
// set pub socket options
|
||||||
int wait = 1;
|
int wait = 1;
|
||||||
rc = zmq_setsockopt (pub, ZMQ_XPUB_WAIT, &wait, 4);
|
rc = zmq_setsockopt (pub, ZMQ_XPUB_NODROP, &wait, 4);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
int hwm = 2000;
|
int hwm = 2000;
|
||||||
|
Reference in New Issue
Block a user