diff --git a/src/router.cpp b/src/router.cpp index e1335289..f87242a5 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -29,7 +29,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), - prefetched (0), + prefetched (false), + identity_sent (false), more_in (false), current_out (NULL), more_out (false), @@ -47,12 +48,15 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : options.send_identity = true; options.recv_identity = true; + prefetched_id.init (); prefetched_msg.init (); } zmq::router_t::~router_t () { + zmq_assert (anonymous_pipes.empty ());; zmq_assert (outpipes.empty ()); + prefetched_id.close (); prefetched_msg.close (); } @@ -60,22 +64,11 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); - // Generate a new unique peer identity. - unsigned char buf [5]; - buf [0] = 0; - put_uint32 (buf + 1, next_peer_id); - blob_t identity (buf, 5); - ++next_peer_id; - - // Add the pipe to the map out outbound pipes. - outpipe_t outpipe = {pipe_, true}; - bool ok = outpipes.insert (outpipes_t::value_type ( - identity, outpipe)).second; - zmq_assert (ok); - - // Add the pipe to the list of inbound pipes. - pipe_->set_identity (identity); - fq.attach (pipe_); + bool identity_ok = identify_peer (pipe_); + if (identity_ok) + fq.attach (pipe_); + else + anonymous_pipes.insert (pipe_); } int zmq::router_t::xsetsockopt (int option_, const void *optval_, @@ -85,34 +78,39 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } - if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { + if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { errno = EINVAL; return -1; } - fail_unroutable = *((const int*) optval_); + fail_unroutable = *static_cast (optval_); return 0; } void zmq::router_t::xterminated (pipe_t *pipe_) { - fq.terminated (pipe_); - - for (outpipes_t::iterator it = outpipes.begin (); - it != outpipes.end (); ++it) { - if (it->second.pipe == pipe_) { - outpipes.erase (it); - if (pipe_ == current_out) - current_out = NULL; - return; - } + std::set ::iterator it = anonymous_pipes.find (pipe_); + if (it != anonymous_pipes.end ()) + anonymous_pipes.erase (it); + else { + outpipes_t::iterator it = outpipes.find (pipe_->get_identity ()); + zmq_assert (it != outpipes.end ()); + outpipes.erase (it); + fq.terminated (pipe_); + if (pipe_ == current_out) + current_out = NULL; } - // We should never get here - zmq_assert (false); } void zmq::router_t::xread_activated (pipe_t *pipe_) { - fq.activated (pipe_); + std::set ::iterator it = anonymous_pipes.find (pipe_); + if (it == anonymous_pipes.end ()) + fq.activated (pipe_); + else { + bool identity_ok = identify_peer (pipe_); + if (identity_ok) + anonymous_pipes.erase (it); + } } void zmq::router_t::xwrite_activated (pipe_t *pipe_) @@ -198,85 +196,51 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) int zmq::router_t::xrecv (msg_t *msg_, int flags_) { - // if there is a prefetched identity, return it. - if (prefetched == 2) - { - int rc = msg_->init_size (prefetched_id.size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ()); - msg_->set_flags (msg_t::more); - prefetched = 1; - return 0; - } - - // If there is a prefetched message, return it. - if (prefetched == 1) { - int rc = msg_->move (prefetched_msg); - errno_assert (rc == 0); + if (prefetched) { + if (!identity_sent) { + int rc = msg_->move (prefetched_id); + errno_assert (rc == 0); + identity_sent = true; + } + else { + int rc = msg_->move (prefetched_msg); + errno_assert (rc == 0); + prefetched = false; + } more_in = msg_->flags () & msg_t::more ? true : false; - prefetched = 0; return 0; } pipe_t *pipe = NULL; - while (true) { - - // Get next message part. - int rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; - - // If identity is received, change the key assigned to the pipe. - if (likely (!(msg_->flags () & msg_t::identity))) - break; - - zmq_assert (!more_in); - - // Empty identity means we can preserve the auto-generated identity - if (msg_->size ()) { - blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); - outpipes_t::iterator it = outpipes.find (identity); - if (it == outpipes.end ()) { - // Find the pipe and change its identity - bool changed = false; - it = outpipes.begin (); - while (it != outpipes.end ()) { - if (it->second.pipe == pipe) { - pipe->set_identity (identity); - outpipes.erase (it); - outpipe_t outpipe = {pipe, true}; - if (!outpipes.insert ( - outpipes_t::value_type (identity, outpipe)).second) - zmq_assert (false); - changed = true; - break; - } - ++it; - } - zmq_assert (changed); - } - } + int rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) { + errno = EAGAIN; + return -1; } + // Identity is not expected + assert ((msg_->flags () & msg_t::identity) == 0); + assert (pipe != NULL); + // If we are in the middle of reading a message, just return the next part. - if (more_in) { + if (more_in) more_in = msg_->flags () & msg_t::more ? true : false; - return 0; - } - - // We are at the beginning of a new message. Move the message part we - // have to the prefetched and return the ID of the peer instead. - int rc = prefetched_msg.move (*msg_); - errno_assert (rc == 0); - prefetched = 1; - rc = msg_->close (); - errno_assert (rc == 0); + else { + // We are at the beginning of a message. + // Keep the message part we have in the prefetch buffer + // and return the ID of the peer instead. + rc = prefetched_msg.move (*msg_); + errno_assert (rc == 0); + prefetched = true; + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), identity.data (), identity.size ()); + msg_->set_flags (msg_t::more); + identity_sent = true; + } - blob_t identity = pipe->get_identity (); - rc = msg_->init_size (identity.size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), identity.data (), identity.size ()); - msg_->set_flags (msg_t::more); return 0; } @@ -292,31 +256,33 @@ int zmq::router_t::rollback (void) bool zmq::router_t::xhas_in () { - // If we are in the middle of reading the messages, there are + // If we are in the middle of reading the messages, there are // definitely more parts available. if (more_in) return true; // We may already have a message pre-fetched. - if (prefetched > 0) + if (prefetched) return true; - // Try to read the next message to the pre-fetch buffer. If anything, - // it will be identity of the peer sending the message. - msg_t id; - id.init (); - int rc = router_t::xrecv (&id, ZMQ_DONTWAIT); - if (rc != 0 && errno == EAGAIN) { - id.close (); + // Try to read the next message. + // The message, if read, is kept in the pre-fetch buffer. + pipe_t *pipe = NULL; + int rc = fq.recvpipe (&prefetched_msg, ZMQ_DONTWAIT, &pipe); + if (rc != 0) return false; - } - zmq_assert (rc == 0); - // We have first part of the message prefetched now. We will store the - // prefetched identity as well. - prefetched_id.assign ((unsigned char*) id.data (), id.size ()); - id.close (); - prefetched = 2; + // Identity is not expected + assert ((prefetched_msg.flags () & msg_t::identity) == 0); + + blob_t identity = pipe->get_identity (); + rc = prefetched_id.init_size (identity.size ()); + errno_assert (rc == 0); + memcpy (prefetched_id.data (), identity.data (), identity.size ()); + prefetched_id.set_flags (msg_t::more); + + prefetched = true; + identity_sent = false; return true; } @@ -329,6 +295,43 @@ bool zmq::router_t::xhas_out () return true; } +bool zmq::router_t::identify_peer (pipe_t *pipe_) +{ + msg_t msg; + blob_t identity; + + msg.init (); + bool ok = pipe_->read (&msg); + if (!ok) + return false; + + if (msg.size () == 0) { + // Fall back on the auto-generation + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id++); + identity = blob_t (buf, sizeof buf); + msg.close (); + } + else { + identity = blob_t ((unsigned char*) msg.data (), msg.size ()); + outpipes_t::iterator it = outpipes.find (identity); + msg.close (); + + // Ignore peers with duplicate ID. + if (it != outpipes.end ()) + return false; + } + + pipe_->set_identity (identity); + // Add the record into output pipes lookup table + outpipe_t outpipe = {pipe_, true}; + ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second; + zmq_assert (ok); + + return true; +} + zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_) : diff --git a/src/router.hpp b/src/router.hpp index 2e18176a..73bdf90c 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -65,15 +65,21 @@ namespace zmq private: + // Receive peer id and update lookup map + bool identify_peer (pipe_t *pipe_); + // Fair queueing object for inbound pipes. fq_t fq; - // This value is either 0 (nothing is prefetched), 1 (only message body - // is prefetched) or 2 (both identity and message body are prefetched). - int prefetched; + // True iff there is a message held in the pre-fetch buffer. + bool prefetched; + + // If true, the receiver got the message part with + // the peer's identity. + bool identity_sent; // Holds the prefetched identity. - blob_t prefetched_id; + msg_t prefetched_id; // Holds the prefetched message. msg_t prefetched_msg; @@ -87,6 +93,9 @@ namespace zmq bool active; }; + // We keep a set of pipes that have not been identified yet. + std::set anonymous_pipes; + // Outbound pipes indexed by the peer IDs. typedef std::map outpipes_t; outpipes_t outpipes;