mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-27 02:53:12 +01:00
Problem: term "identity" is confusing
Solution: replace by "routing id"
This commit is contained in:
106
src/router.cpp
106
src/router.cpp
@@ -39,13 +39,13 @@
|
||||
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
prefetched (false),
|
||||
identity_sent (false),
|
||||
routing_id_sent (false),
|
||||
current_in (NULL),
|
||||
terminate_current_in (false),
|
||||
more_in (false),
|
||||
current_out (NULL),
|
||||
more_out (false),
|
||||
next_rid (generate_random ()),
|
||||
next_integral_routing_id (generate_random ()),
|
||||
mandatory (false),
|
||||
// raw_socket functionality in ROUTER is deprecated
|
||||
raw_socket (false),
|
||||
@@ -53,7 +53,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
handover (false)
|
||||
{
|
||||
options.type = ZMQ_ROUTER;
|
||||
options.recv_identity = true;
|
||||
options.recv_routing_id = true;
|
||||
options.raw_socket = false;
|
||||
|
||||
prefetched_id.init ();
|
||||
@@ -87,8 +87,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
bool identity_ok = identify_peer (pipe_);
|
||||
if (identity_ok)
|
||||
bool routing_id_ok = identify_peer (pipe_);
|
||||
if (routing_id_ok)
|
||||
fq.attach (pipe_);
|
||||
else
|
||||
anonymous_pipes.insert (pipe_);
|
||||
@@ -113,7 +113,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
||||
if (is_int && value >= 0) {
|
||||
raw_socket = (value != 0);
|
||||
if (raw_socket) {
|
||||
options.recv_identity = false;
|
||||
options.recv_routing_id = false;
|
||||
options.raw_socket = true;
|
||||
}
|
||||
return 0;
|
||||
@@ -155,7 +155,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
||||
if (it != anonymous_pipes.end ())
|
||||
anonymous_pipes.erase (it);
|
||||
else {
|
||||
outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
|
||||
outpipes_t::iterator iter = outpipes.find (pipe_->get_routing_id ());
|
||||
zmq_assert (iter != outpipes.end ());
|
||||
outpipes.erase (iter);
|
||||
fq.pipe_terminated (pipe_);
|
||||
@@ -171,8 +171,8 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
|
||||
if (it == anonymous_pipes.end ())
|
||||
fq.activated (pipe_);
|
||||
else {
|
||||
bool identity_ok = identify_peer (pipe_);
|
||||
if (identity_ok) {
|
||||
bool routing_id_ok = identify_peer (pipe_);
|
||||
if (routing_id_ok) {
|
||||
anonymous_pipes.erase (it);
|
||||
fq.attach (pipe_);
|
||||
}
|
||||
@@ -205,11 +205,11 @@ int zmq::router_t::xsend (msg_t *msg_)
|
||||
|
||||
more_out = true;
|
||||
|
||||
// Find the pipe associated with the identity stored in the prefix.
|
||||
// Find the pipe associated with the routing id stored in the prefix.
|
||||
// If there's no such pipe just silently ignore the message, unless
|
||||
// router_mandatory is set.
|
||||
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
|
||||
outpipes_t::iterator it = outpipes.find (identity);
|
||||
blob_t routing_id ((unsigned char*) msg_->data (), msg_->size ());
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
|
||||
if (it != outpipes.end ()) {
|
||||
current_out = it->second.pipe;
|
||||
@@ -300,10 +300,10 @@ int zmq::router_t::xsend (msg_t *msg_)
|
||||
int zmq::router_t::xrecv (msg_t *msg_)
|
||||
{
|
||||
if (prefetched) {
|
||||
if (!identity_sent) {
|
||||
if (!routing_id_sent) {
|
||||
int rc = msg_->move (prefetched_id);
|
||||
errno_assert (rc == 0);
|
||||
identity_sent = true;
|
||||
routing_id_sent = true;
|
||||
}
|
||||
else {
|
||||
int rc = msg_->move (prefetched_msg);
|
||||
@@ -325,10 +325,10 @@ int zmq::router_t::xrecv (msg_t *msg_)
|
||||
pipe_t *pipe = NULL;
|
||||
int rc = fq.recvpipe (msg_, &pipe);
|
||||
|
||||
// It's possible that we receive peer's identity. That happens
|
||||
// It's possible that we receive peer's routing id. That happens
|
||||
// after reconnection. The current implementation assumes that
|
||||
// the peer always uses the same identity.
|
||||
while (rc == 0 && msg_->is_identity ())
|
||||
// the peer always uses the same routing id.
|
||||
while (rc == 0 && msg_->is_routing_id ())
|
||||
rc = fq.recvpipe (msg_, &pipe);
|
||||
|
||||
if (rc != 0)
|
||||
@@ -357,14 +357,14 @@ int zmq::router_t::xrecv (msg_t *msg_)
|
||||
prefetched = true;
|
||||
current_in = pipe;
|
||||
|
||||
blob_t identity = pipe->get_identity ();
|
||||
rc = msg_->init_size (identity.size ());
|
||||
blob_t routing_id = pipe->get_routing_id ();
|
||||
rc = msg_->init_size (routing_id.size ());
|
||||
errno_assert (rc == 0);
|
||||
memcpy (msg_->data (), identity.data (), identity.size ());
|
||||
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
|
||||
msg_->set_flags (msg_t::more);
|
||||
if (prefetched_msg.metadata())
|
||||
msg_->set_metadata(prefetched_msg.metadata());
|
||||
identity_sent = true;
|
||||
routing_id_sent = true;
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -396,11 +396,11 @@ bool zmq::router_t::xhas_in ()
|
||||
pipe_t *pipe = NULL;
|
||||
int rc = fq.recvpipe (&prefetched_msg, &pipe);
|
||||
|
||||
// It's possible that we receive peer's identity. That happens
|
||||
// It's possible that we receive peer's routing id. That happens
|
||||
// after reconnection. The current implementation assumes that
|
||||
// the peer always uses the same identity.
|
||||
// TODO: handle the situation when the peer changes its identity.
|
||||
while (rc == 0 && prefetched_msg.is_identity ())
|
||||
// the peer always uses the same routing id.
|
||||
// TODO: handle the situation when the peer changes its routing id.
|
||||
while (rc == 0 && prefetched_msg.is_routing_id ())
|
||||
rc = fq.recvpipe (&prefetched_msg, &pipe);
|
||||
|
||||
if (rc != 0)
|
||||
@@ -408,14 +408,14 @@ bool zmq::router_t::xhas_in ()
|
||||
|
||||
zmq_assert (pipe != NULL);
|
||||
|
||||
blob_t identity = pipe->get_identity ();
|
||||
rc = prefetched_id.init_size (identity.size ());
|
||||
blob_t routing_id = pipe->get_routing_id ();
|
||||
rc = prefetched_id.init_size (routing_id.size ());
|
||||
errno_assert (rc == 0);
|
||||
memcpy (prefetched_id.data (), identity.data (), identity.size ());
|
||||
memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ());
|
||||
prefetched_id.set_flags (msg_t::more);
|
||||
|
||||
prefetched = true;
|
||||
identity_sent = false;
|
||||
routing_id_sent = false;
|
||||
current_in = pipe;
|
||||
|
||||
return true;
|
||||
@@ -443,13 +443,13 @@ zmq::blob_t zmq::router_t::get_credential () const
|
||||
return fq.get_credential ();
|
||||
}
|
||||
|
||||
int zmq::router_t::get_peer_state (const void *identity,
|
||||
size_t identity_size) const
|
||||
int zmq::router_t::get_peer_state (const void *routing_id_,
|
||||
size_t routing_id_size_) const
|
||||
{
|
||||
int res = 0;
|
||||
|
||||
blob_t identity_blob ((unsigned char *) identity, identity_size);
|
||||
outpipes_t::const_iterator it = outpipes.find (identity_blob);
|
||||
blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_);
|
||||
outpipes_t::const_iterator it = outpipes.find (routing_id_blob);
|
||||
if (it == outpipes.end ()) {
|
||||
errno = EHOSTUNREACH;
|
||||
return -1;
|
||||
@@ -467,27 +467,27 @@ int zmq::router_t::get_peer_state (const void *identity,
|
||||
bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
{
|
||||
msg_t msg;
|
||||
blob_t identity;
|
||||
blob_t routing_id;
|
||||
bool ok;
|
||||
|
||||
if (connect_rid.length()) {
|
||||
identity = blob_t ((unsigned char*) connect_rid.c_str (),
|
||||
routing_id = blob_t ((unsigned char*) connect_rid.c_str (),
|
||||
connect_rid.length());
|
||||
connect_rid.clear ();
|
||||
outpipes_t::iterator it = outpipes.find (identity);
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
if (it != outpipes.end ())
|
||||
zmq_assert(false); // Not allowed to duplicate an existing rid
|
||||
}
|
||||
else
|
||||
if (options.raw_socket) { // Always assign identity for raw-socket
|
||||
if (options.raw_socket) { // Always assign an integral routing id for raw-socket
|
||||
unsigned char buf [5];
|
||||
buf [0] = 0;
|
||||
put_uint32 (buf + 1, next_rid++);
|
||||
identity = blob_t (buf, sizeof buf);
|
||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||
routing_id = blob_t (buf, sizeof buf);
|
||||
}
|
||||
else
|
||||
if (!options.raw_socket) {
|
||||
// Pick up handshake cases and also case where next identity is set
|
||||
// Pick up handshake cases and also case where next integral routing id is set
|
||||
msg.init ();
|
||||
ok = pipe_->read (&msg);
|
||||
if (!ok)
|
||||
@@ -497,13 +497,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
// Fall back on the auto-generation
|
||||
unsigned char buf [5];
|
||||
buf [0] = 0;
|
||||
put_uint32 (buf + 1, next_rid++);
|
||||
identity = blob_t (buf, sizeof buf);
|
||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||
routing_id = blob_t (buf, sizeof buf);
|
||||
msg.close ();
|
||||
}
|
||||
else {
|
||||
identity = blob_t ((unsigned char*) msg.data (), msg.size ());
|
||||
outpipes_t::iterator it = outpipes.find (identity);
|
||||
routing_id = blob_t ((unsigned char*) msg.data (), msg.size ());
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
msg.close ();
|
||||
|
||||
if (it != outpipes.end ()) {
|
||||
@@ -512,23 +512,23 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
return false;
|
||||
else {
|
||||
// We will allow the new connection to take over this
|
||||
// identity. Temporarily assign a new identity to the
|
||||
// routing id. Temporarily assign a new routing id to the
|
||||
// existing pipe so we can terminate it asynchronously.
|
||||
unsigned char buf [5];
|
||||
buf [0] = 0;
|
||||
put_uint32 (buf + 1, next_rid++);
|
||||
blob_t new_identity = blob_t (buf, sizeof buf);
|
||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||
blob_t new_routing_id = blob_t (buf, sizeof buf);
|
||||
|
||||
it->second.pipe->set_identity (new_identity);
|
||||
it->second.pipe->set_routing_id (new_routing_id);
|
||||
outpipe_t existing_outpipe =
|
||||
{it->second.pipe, it->second.active};
|
||||
|
||||
ok = outpipes.insert (outpipes_t::value_type (
|
||||
new_identity, existing_outpipe)).second;
|
||||
new_routing_id, existing_outpipe)).second;
|
||||
zmq_assert (ok);
|
||||
|
||||
// Remove the existing identity entry to allow the new
|
||||
// connection to take the identity.
|
||||
// Remove the existing routing id entry to allow the new
|
||||
// connection to take the routing id.
|
||||
outpipes.erase (it);
|
||||
|
||||
if (existing_outpipe.pipe == current_in)
|
||||
@@ -540,10 +540,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
}
|
||||
}
|
||||
|
||||
pipe_->set_identity (identity);
|
||||
pipe_->set_routing_id (routing_id);
|
||||
// Add the record into output pipes lookup table
|
||||
outpipe_t outpipe = {pipe_, true};
|
||||
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
|
||||
ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
||||
zmq_assert (ok);
|
||||
|
||||
return true;
|
||||
|
||||
Reference in New Issue
Block a user