Fix identity handling for inproc transport

Fixes #872
This commit is contained in:
Martin Hurton 2014-02-09 18:57:15 +01:00
parent 8cda54c52b
commit a09407829e
3 changed files with 23 additions and 14 deletions

View File

@ -455,18 +455,14 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]);
pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]);
if (bind_options.recv_identity) {
msg_t id;
int rc = id.init_size (pending_connection_.endpoint.options.identity_size);
if (!bind_options.recv_identity) {
msg_t msg;
const bool ok = pending_connection_.bind_pipe->read (&msg);
zmq_assert (ok);
const int rc = msg.close ();
errno_assert (rc == 0);
memcpy (id.data (), pending_connection_.endpoint.options.identity,
pending_connection_.endpoint.options.identity_size);
id.set_flags (msg_t::identity);
bool written = pending_connection_.connect_pipe->write (&id);
zmq_assert (written);
pending_connection_.connect_pipe->flush ();
}
if (pending_connection_.endpoint.options.recv_identity) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);

View File

@ -169,7 +169,7 @@ read_message:
return false;
}
if (!(msg_->flags () & msg_t::more))
if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@ -199,8 +199,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
return false;
bool more = msg_->flags () & msg_t::more ? true : false;
const bool is_identity = msg_->is_identity ();
outpipe->write (*msg_, more);
if (!more)
if (!more && !is_identity)
msgs_written++;
return true;

View File

@ -503,8 +503,20 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach local end of the pipe to this socket object.
attach_pipe (new_pipes [0]);
if (!peer.socket)
{
if (!peer.socket) {
// The peer doesn't exist yet so we don't know whether
// to send the identity message or not. To resolve this,
// we always send our identity and drop it later if
// the peer doesn't expect it.
msg_t id;
rc = id.init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
endpoint_t endpoint = {this, options};
pending_connection_t pending_connection = {endpoint, new_pipes [0], new_pipes [1]};
pend_connection (addr_, pending_connection);