From 9fab9937e516e78e6dbdf890e34f56cf80c9ff14 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Tue, 31 Jul 2012 16:31:41 +0200 Subject: [PATCH] Fix issue #406 When a peer reconnects, the router socket receives an identity message containing this peer id. When this happens, the current implementation crashes. This patch makes a router socket to silently ignore all identity messages coming from reconnected peers. --- src/msg.cpp | 5 +++++ src/msg.hpp | 1 + src/router.cpp | 27 +++++++++++++++++++-------- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/msg.cpp b/src/msg.cpp index ba4dcd37..7627c392 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -232,6 +232,11 @@ void zmq::msg_t::reset_flags (unsigned char flags_) u.base.flags &= ~flags_; } +bool zmq::msg_t::is_identity () const +{ + return (u.base.flags & identity) == identity; +} + bool zmq::msg_t::is_delimiter () { return u.base.type == type_delimiter; diff --git a/src/msg.hpp b/src/msg.hpp index 34a6bfa6..3356e36d 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -69,6 +69,7 @@ namespace zmq unsigned char flags (); void set_flags (unsigned char flags_); void reset_flags (unsigned char flags_); + bool is_identity () const; bool is_delimiter (); bool is_vsm (); diff --git a/src/router.cpp b/src/router.cpp index ab133b97..df43d3f6 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -211,13 +211,17 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_) pipe_t *pipe = NULL; int rc = fq.recvpipe (msg_, &pipe); - if (rc != 0) { - errno = EAGAIN; - return -1; - } - // Identity is not expected - zmq_assert ((msg_->flags () & msg_t::identity) == 0); + // It's possible that we receive peer's identity. That happens + // after reconnection. The current implementation assumes that + // the peer always uses the same identity. + // TODO: handle the situation when the peer changes its identity. + while (rc == 0 && msg_->is_identity ()) + rc = fq.recvpipe (msg_, &pipe); + + if (rc != 0) + return -1; + zmq_assert (pipe != NULL); // If we are in the middle of reading a message, just return the next part. @@ -267,11 +271,18 @@ bool zmq::router_t::xhas_in () // The message, if read, is kept in the pre-fetch buffer. pipe_t *pipe = NULL; int rc = fq.recvpipe (&prefetched_msg, &pipe); + + // It's possible that we receive peer's identity. That happens + // after reconnection. The current implementation assumes that + // the peer always uses the same identity. + // TODO: handle the situation when the peer changes its identity. + while (rc == 0 && prefetched_msg.is_identity ()) + rc = fq.recvpipe (&prefetched_msg, &pipe); + if (rc != 0) return false; - // Identity is not expected - zmq_assert ((prefetched_msg.flags () & msg_t::identity) == 0); + zmq_assert (pipe != NULL); blob_t identity = pipe->get_identity (); rc = prefetched_id.init_size (identity.size ());