mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-12 18:40:27 +01:00
commit
acc4fe8794
18
src/dist.cpp
18
src/dist.cpp
@ -196,21 +196,11 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
|
||||
|
||||
bool zmq::dist_t::check_hwm ()
|
||||
{
|
||||
// If there are no matching pipes available, there is nothing to write.
|
||||
bool pipes_hwm_ok = true;
|
||||
for (pipes_t::size_type i = 0; i < matching; ++i)
|
||||
if (!pipes [i]->check_hwm ())
|
||||
return false;
|
||||
|
||||
if (matching == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
for (pipes_t::size_type i = 0; i < matching; ++i) {
|
||||
if( !pipes [i] -> check_hwm()) {
|
||||
pipes_hwm_ok = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return pipes_hwm_ok;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -409,7 +409,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
}
|
||||
break;
|
||||
# endif
|
||||
|
||||
|
||||
case ZMQ_CONFLATE:
|
||||
if (is_int && (value == 0 || value == 1)) {
|
||||
conflate = (value != 0);
|
||||
@ -458,12 +458,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
case ZMQ_XPUB_NODROP:
|
||||
{
|
||||
pub_nodrop = true;
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
#if defined (ZMQ_ACT_MILITANT)
|
||||
@ -558,7 +552,7 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
case ZMQ_TYPE:
|
||||
if (is_int) {
|
||||
*value = type;
|
||||
@ -770,7 +764,7 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
// If libgssapi isn't installed, these options provoke EINVAL
|
||||
# ifdef HAVE_LIBGSSAPI_KRB5
|
||||
case ZMQ_GSSAPI_SERVER:
|
||||
@ -811,12 +805,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_XPUB_NODROP:
|
||||
if( is_int) {
|
||||
*value = pub_nodrop;
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
#if defined (ZMQ_ACT_MILITANT)
|
||||
malformed = false;
|
||||
|
@ -179,9 +179,6 @@ namespace zmq
|
||||
// close socket. Default is 30 secs. 0 means no handshake timeout.
|
||||
int handshake_ivl;
|
||||
|
||||
// flag if PUB socket should not drop messages if reaching HWM
|
||||
bool pub_nodrop;
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -501,7 +501,7 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
|
||||
hwm = outhwm_;
|
||||
}
|
||||
|
||||
bool zmq::pipe_t::check_hwm ()
|
||||
bool zmq::pipe_t::check_hwm () const
|
||||
{
|
||||
bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm - 1);
|
||||
return( !full );
|
||||
|
@ -69,7 +69,7 @@ namespace zmq
|
||||
// This allows pipepair to create pipe objects.
|
||||
friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
|
||||
int hwms_ [2], bool conflate_ [2]);
|
||||
|
||||
|
||||
public:
|
||||
|
||||
// Specifies the object to send events to.
|
||||
@ -105,7 +105,7 @@ namespace zmq
|
||||
// all the messages on the fly. Causes 'hiccuped' event to be generated
|
||||
// in the peer.
|
||||
void hiccup ();
|
||||
|
||||
|
||||
// Ensure the pipe wont block on receiving pipe_term.
|
||||
void set_nodelay ();
|
||||
|
||||
@ -119,7 +119,7 @@ namespace zmq
|
||||
void set_hwms (int inhwm_, int outhwm_);
|
||||
|
||||
// check HWM
|
||||
bool check_hwm ();
|
||||
bool check_hwm () const;
|
||||
// provide a way to link pipe to engine fd. Set on session initialization
|
||||
fd_t assoc_fd; //=retired_fd
|
||||
private:
|
||||
|
16
src/xpub.cpp
16
src/xpub.cpp
@ -98,14 +98,10 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if (option_ == ZMQ_XPUB_VERBOSE) {
|
||||
if (option_ == ZMQ_XPUB_VERBOSE)
|
||||
verbose = (*static_cast <const int*> (optval_) != 0);
|
||||
} else if (option_ == ZMQ_XPUB_NODROP) {
|
||||
else
|
||||
nodrop = (*static_cast <const int*> (optval_) != 0);
|
||||
}
|
||||
else {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -135,11 +131,11 @@ int zmq::xpub_t::xsend (msg_t *msg_)
|
||||
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
|
||||
mark_as_matching, this);
|
||||
|
||||
if (nodrop && !dist.check_hwm()) {
|
||||
return EAGAIN;
|
||||
if (nodrop && !dist.check_hwm ()) {
|
||||
errno = EAGAIN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
// Send the message to all the pipes that were marked as matching
|
||||
// in the previous step.
|
||||
int rc = dist.send_to_matching (msg_);
|
||||
@ -163,7 +159,7 @@ bool zmq::xpub_t::xhas_out ()
|
||||
|
||||
int zmq::xpub_t::xrecv (msg_t *msg_)
|
||||
{
|
||||
// If there is at least one
|
||||
// If there is at least one
|
||||
if (pending_data.empty ()) {
|
||||
errno = EAGAIN;
|
||||
return -1;
|
||||
|
Loading…
Reference in New Issue
Block a user