Added support for non-zmq tcp client connections to router socket.

- Created a new option ZMQ_ROUTER_RAW_SOCK
    - Added new raw_encoder and raw_decoder to receive and send messages in raw form to remote client
    - Added test case file tests/test_raw_sock.cpp

    o To create a raw router sock set the ZMQ_ROUTER_RAW_SOCK option
    o ZMQ_MSGMORE flag is ignored for non-id messages
    o To terminate a remote connection send id message followed by zero length data message
This commit is contained in:
Hardeep
2012-10-29 00:03:36 -07:00
parent 19f77a1ccf
commit 83387b4073
17 changed files with 615 additions and 31 deletions

View File

@@ -35,7 +35,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
current_out (NULL),
more_out (false),
next_peer_id (generate_random ()),
mandatory(false)
mandatory(false),
raw_sock(false)
{
options.type = ZMQ_ROUTER;
@@ -46,6 +47,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
// options.delay_on_disconnect = false;
options.recv_identity = true;
options.raw_sock = false;
prefetched_id.init ();
prefetched_msg.init ();
@@ -76,7 +78,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != ZMQ_ROUTER_MANDATORY) {
if (option_ != ZMQ_ROUTER_MANDATORY &&
option_ != ZMQ_ROUTER_RAW_SOCK) {
errno = EINVAL;
return -1;
}
@@ -84,7 +87,16 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL;
return -1;
}
mandatory = *static_cast <const int*> (optval_);
if(option_ == ZMQ_ROUTER_RAW_SOCK){
raw_sock = *static_cast <const int*> (optval_);
if(raw_sock){
options.recv_identity = false;
options.raw_sock = true;
}
}else{
mandatory = *static_cast <const int*> (optval_);
}
return 0;
}
@@ -174,11 +186,27 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
return 0;
}
// ignore the MORE flag for raw-sock or assert?
if(options.raw_sock)
msg_->reset_flags(msg_t::more);
// Check whether this is the last part of the message.
more_out = msg_->flags () & msg_t::more ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
// Close the remote connection if user has asked to do so
// by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack)
if (raw_sock && msg_->size() == 0){
current_out->terminate(false);
int rc = msg_->close ();
errno_assert (rc == 0);
current_out = NULL;
return 0;
}
bool ok = current_out->write (msg_);
if (unlikely (!ok))
current_out = NULL;
@@ -319,28 +347,36 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
msg_t msg;
blob_t identity;
bool ok;
msg.init ();
bool ok = pipe_->read (&msg);
if (!ok)
return false;
if (msg.size () == 0) {
// Fall back on the auto-generation
if(options.raw_sock){ // always assign identity for raw-socket
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++);
identity = blob_t (buf, sizeof buf);
msg.close ();
}
else {
identity = blob_t ((unsigned char*) msg.data (), msg.size ());
outpipes_t::iterator it = outpipes.find (identity);
msg.close ();
// Ignore peers with duplicate ID.
if (it != outpipes.end ())
}else{
msg.init ();
ok = pipe_->read (&msg);
if (!ok)
return false;
if (msg.size () == 0) {
// Fall back on the auto-generation
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++);
identity = blob_t (buf, sizeof buf);
msg.close ();
}
else {
identity = blob_t ((unsigned char*) msg.data (), msg.size ());
outpipes_t::iterator it = outpipes.find (identity);
msg.close ();
// Ignore peers with duplicate ID.
if (it != outpipes.end ())
return false;
}
}
pipe_->set_identity (identity);