mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-01 02:56:56 +01:00
an option to fail on unroutable messages in ROUTER sockets
This commit is contained in:
parent
62d27b7af3
commit
df584a3be0
@ -230,6 +230,7 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context);
|
|||||||
#define ZMQ_SNDTIMEO 28
|
#define ZMQ_SNDTIMEO 28
|
||||||
#define ZMQ_IPV4ONLY 31
|
#define ZMQ_IPV4ONLY 31
|
||||||
#define ZMQ_LAST_ENDPOINT 32
|
#define ZMQ_LAST_ENDPOINT 32
|
||||||
|
#define ZMQ_FAIL_UNROUTEABLE 33
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
|
31
src/xrep.cpp
31
src/xrep.cpp
@ -33,7 +33,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
|
|||||||
more_in (false),
|
more_in (false),
|
||||||
current_out (NULL),
|
current_out (NULL),
|
||||||
more_out (false),
|
more_out (false),
|
||||||
next_peer_id (generate_random ())
|
next_peer_id (generate_random ()),
|
||||||
|
fail_unrouteable(false)
|
||||||
{
|
{
|
||||||
options.type = ZMQ_XREP;
|
options.type = ZMQ_XREP;
|
||||||
|
|
||||||
@ -77,6 +78,24 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
|||||||
fq.attach (pipe_);
|
fq.attach (pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
|
||||||
|
size_t optvallen_)
|
||||||
|
{
|
||||||
|
if (option_ != ZMQ_FAIL_UNROUTABLE) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(sizeof(optvallen_) != sizeof(uint64_t)) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
fail_unroutable = *((const uint64_t*) optval_);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::xrep_t::xterminated (pipe_t *pipe_)
|
void zmq::xrep_t::xterminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
fq.terminated (pipe_);
|
fq.terminated (pipe_);
|
||||||
@ -118,6 +137,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
|
|||||||
if (!more_out) {
|
if (!more_out) {
|
||||||
zmq_assert (!current_out);
|
zmq_assert (!current_out);
|
||||||
|
|
||||||
|
int retval = 0;
|
||||||
|
|
||||||
// If we have malformed message (prefix with no subsequent message)
|
// If we have malformed message (prefix with no subsequent message)
|
||||||
// then just silently ignore it.
|
// then just silently ignore it.
|
||||||
// TODO: The connections should be killed instead.
|
// TODO: The connections should be killed instead.
|
||||||
@ -126,7 +147,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
|
|||||||
more_out = true;
|
more_out = true;
|
||||||
|
|
||||||
// Find the pipe associated with the identity stored in the prefix.
|
// Find the pipe associated with the identity stored in the prefix.
|
||||||
// If there's no such pipe just silently ignore the message.
|
// If there's no such pipe just silently ignore the message, unless
|
||||||
|
// fail_unreachable is set.
|
||||||
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
|
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
|
||||||
outpipes_t::iterator it = outpipes.find (identity);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
|
|
||||||
@ -142,6 +164,9 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
|
|||||||
}
|
}
|
||||||
rc = empty.close ();
|
rc = empty.close ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
} else if(fail_unreachable) {
|
||||||
|
more_out = false;
|
||||||
|
retval = EHOSTUNREACH;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -150,7 +175,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
|
|||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
rc = msg_->init ();
|
rc = msg_->init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
return 0;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check whether this is the last part of the message.
|
// Check whether this is the last part of the message.
|
||||||
|
@ -49,6 +49,7 @@ namespace zmq
|
|||||||
|
|
||||||
// Overloads of functions from socket_base_t.
|
// Overloads of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
||||||
|
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||||
int xsend (msg_t *msg_, int flags_);
|
int xsend (msg_t *msg_, int flags_);
|
||||||
int xrecv (msg_t *msg_, int flags_);
|
int xrecv (msg_t *msg_, int flags_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
@ -100,6 +101,9 @@ namespace zmq
|
|||||||
// algorithm. This value is the next ID to use (if not used already).
|
// algorithm. This value is the next ID to use (if not used already).
|
||||||
uint32_t next_peer_id;
|
uint32_t next_peer_id;
|
||||||
|
|
||||||
|
// If true, fail on unroutable messages instead of silently dropping them.
|
||||||
|
bool fail_unrouteable;
|
||||||
|
|
||||||
xrep_t (const xrep_t&);
|
xrep_t (const xrep_t&);
|
||||||
const xrep_t &operator = (const xrep_t&);
|
const xrep_t &operator = (const xrep_t&);
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user