Merge pull request #406 from hurtonm/master

Fix issue #406
This commit is contained in:
Ian Barber 2012-07-31 11:17:45 -07:00
commit 1f22954762
3 changed files with 25 additions and 8 deletions

View File

@ -232,6 +232,11 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_;
}
bool zmq::msg_t::is_identity () const
{
return (u.base.flags & identity) == identity;
}
bool zmq::msg_t::is_delimiter ()
{
return u.base.type == type_delimiter;

View File

@ -69,6 +69,7 @@ namespace zmq
unsigned char flags ();
void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_);
bool is_identity () const;
bool is_delimiter ();
bool is_vsm ();

View File

@ -211,13 +211,17 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_)
pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe);
if (rc != 0) {
errno = EAGAIN;
return -1;
}
// Identity is not expected
zmq_assert ((msg_->flags () & msg_t::identity) == 0);
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && msg_->is_identity ())
rc = fq.recvpipe (msg_, &pipe);
if (rc != 0)
return -1;
zmq_assert (pipe != NULL);
// If we are in the middle of reading a message, just return the next part.
@ -267,11 +271,18 @@ bool zmq::router_t::xhas_in ()
// The message, if read, is kept in the pre-fetch buffer.
pipe_t *pipe = NULL;
int rc = fq.recvpipe (&prefetched_msg, &pipe);
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && prefetched_msg.is_identity ())
rc = fq.recvpipe (&prefetched_msg, &pipe);
if (rc != 0)
return false;
// Identity is not expected
zmq_assert ((prefetched_msg.flags () & msg_t::identity) == 0);
zmq_assert (pipe != NULL);
blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ());