From 3825f3b3009f3a123c0144486d5507d195f24b86 Mon Sep 17 00:00:00 2001 From: Mark Barbisan Date: Wed, 30 Oct 2013 23:45:46 -0400 Subject: [PATCH] Add support to the ROUTER socket to reassign identities upon name collision. --- include/zmq.h | 1 + src/router.cpp | 41 ++++++++++++++++++++++++++++++++++++++--- src/router.hpp | 5 +++++ 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/include/zmq.h b/include/zmq.h index 5ab51a1b..7547cc39 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -292,6 +292,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_REQ_RELAXED 53 #define ZMQ_CONFLATE 54 #define ZMQ_ZAP_DOMAIN 55 +#define ZMQ_ROUTER_REASSIGN_IDENTITES 56 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/router.cpp b/src/router.cpp index 6fabf67f..add79649 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -35,7 +35,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : mandatory (false), // raw_sock functionality in ROUTER is deprecated raw_sock (false), - probe_router (false) + probe_router (false), + reassign_identities(false) { options.type = ZMQ_ROUTER; options.recv_identity = true; @@ -111,6 +112,12 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, return 0; } break; + case ZMQ_ROUTER_REASSIGN_IDENTITES: + if (is_int && value >= 0) { + reassign_identities = (value != 0); + return 0; + } + break; default: break; @@ -394,9 +401,37 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) outpipes_t::iterator it = outpipes.find (identity); msg.close (); - // Ignore peers with duplicate ID. if (it != outpipes.end ()) - return false; + { + if (!reassign_identities) { + // Ignore peers with duplicate ID. + return false; + } + else + { + // We will allow the new connection to take over this + // identity. Temporarily assign a new identity to the + // existing pipe so we can terminate it asynchronously. + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id++); + blob_t new_identity = blob_t (buf, sizeof buf); + + it->second.pipe->set_identity (new_identity); + outpipe_t existing_outpipe = + {it->second.pipe, it->second.active}; + + ok = outpipes.insert (outpipes_t::value_type ( + new_identity, existing_outpipe)).second; + zmq_assert (ok); + + // Remove the existing identity entry to allow the new + // connection to take the identity. + outpipes.erase (it); + + existing_outpipe.pipe->terminate (true); + } + } } } diff --git a/src/router.hpp b/src/router.hpp index 7b297080..82ee5ae0 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -115,6 +115,11 @@ namespace zmq // if true, send an empty message to every connected router peer bool probe_router; + // If true, the router will reassign an identity upon encountering a + // name collision. The new pipe will take the identity, the old pipe + // will be terminated. + bool reassign_identities; + router_t (const router_t&); const router_t &operator = (const router_t&); };