mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-14 15:05:38 +02:00
GENERIC socket type and COMMAND flag added
GENERIC allows to use 0MQ as a dumb networking framework. It provides user with connect/disconnect notifications. Also, each inbound message is labeled by ID of the connection it originated from. Outbound messages should be labeled by the ID of the connection to send them to. To distinguish connect/disconnect notifications from common messages, COMMAND flag was introduced. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
@@ -58,6 +58,7 @@
|
||||
#include "xrep.hpp"
|
||||
#include "xpub.hpp"
|
||||
#include "xsub.hpp"
|
||||
#include "generic.hpp"
|
||||
|
||||
bool zmq::socket_base_t::check_tag ()
|
||||
{
|
||||
@@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
|
||||
case ZMQ_XSUB:
|
||||
s = new (std::nothrow) xsub_t (parent_, tid_);
|
||||
break;
|
||||
case ZMQ_GENERIC:
|
||||
s = new (std::nothrow) generic_t (parent_, tid_);
|
||||
break;
|
||||
default:
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
@@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
|
||||
last_tsc (0),
|
||||
ticks (0),
|
||||
rcvlabel (false),
|
||||
rcvcmd (false),
|
||||
rcvmore (false)
|
||||
{
|
||||
}
|
||||
@@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (option_ == ZMQ_RCVCMD) {
|
||||
if (*optvallen_ < sizeof (int)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
*((int*) optval_) = rcvcmd ? 1 : 0;
|
||||
*optvallen_ = sizeof (int);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (option_ == ZMQ_RCVMORE) {
|
||||
if (*optvallen_ < sizeof (int)) {
|
||||
errno = EINVAL;
|
||||
@@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
|
||||
if (unlikely (rc != 0))
|
||||
return -1;
|
||||
|
||||
// At this point we impose the LABEL & MORE flags on the message.
|
||||
// At this point we impose the flags on the message.
|
||||
if (flags_ & ZMQ_SNDLABEL)
|
||||
msg_->set_flags (msg_t::label);
|
||||
if (flags_ & ZMQ_SNDMORE)
|
||||
msg_->set_flags (msg_t::more);
|
||||
if (flags_ & ZMQ_SNDCMD)
|
||||
msg_->set_flags (msg_t::command);
|
||||
|
||||
// Try to send the message.
|
||||
rc = xsend (msg_, flags_);
|
||||
@@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
||||
|
||||
// If we have the message, return immediately.
|
||||
if (rc == 0) {
|
||||
rcvlabel = msg_->flags () & msg_t::label;
|
||||
if (rcvlabel)
|
||||
msg_->reset_flags (msg_t::label);
|
||||
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
||||
if (rcvmore)
|
||||
msg_->reset_flags (msg_t::more);
|
||||
extract_flags (msg_);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
||||
rc = xrecv (msg_, flags_);
|
||||
if (rc < 0)
|
||||
return rc;
|
||||
rcvlabel = msg_->flags () & msg_t::label;
|
||||
if (rcvlabel)
|
||||
msg_->reset_flags (msg_t::label);
|
||||
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
||||
if (rcvmore)
|
||||
msg_->reset_flags (msg_t::more);
|
||||
extract_flags (msg_);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
||||
}
|
||||
}
|
||||
|
||||
// Extract LABEL & MORE flags from the message.
|
||||
rcvlabel = msg_->flags () & msg_t::label;
|
||||
if (rcvlabel)
|
||||
msg_->reset_flags (msg_t::label);
|
||||
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
||||
if (rcvmore)
|
||||
msg_->reset_flags (msg_t::more);
|
||||
extract_flags (msg_);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
|
||||
unregister_term_ack ();
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
||||
{
|
||||
rcvlabel = msg_->flags () & msg_t::label;
|
||||
if (rcvlabel)
|
||||
msg_->reset_flags (msg_t::label);
|
||||
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
||||
if (rcvmore)
|
||||
msg_->reset_flags (msg_t::more);
|
||||
rcvcmd = msg_->flags () & msg_t::command ? true : false;
|
||||
if (rcvcmd)
|
||||
msg_->reset_flags (msg_t::command);
|
||||
}
|
||||
|
Reference in New Issue
Block a user