mirror of
https://github.com/zeromq/libzmq.git
synced 2025-05-29 23:42:41 +02:00
Cleaned up option to force identity on outgoing connection
- renamed to ZMQ_CONNECT_RID - fixed whitespace malformating around previous patch - renamamed next_peer_id to next_rid in preparation for larger rename of IDENTITY to ROUTING_ID Note: ZMQ_CONNECT_RID has no test case and no entry in the man page, as yet.
This commit is contained in:
parent
5f07d103a7
commit
50bd28c037
@ -293,7 +293,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
|||||||
#define ZMQ_IPC_FILTER_PID 58
|
#define ZMQ_IPC_FILTER_PID 58
|
||||||
#define ZMQ_IPC_FILTER_UID 59
|
#define ZMQ_IPC_FILTER_UID 59
|
||||||
#define ZMQ_IPC_FILTER_GID 60
|
#define ZMQ_IPC_FILTER_GID 60
|
||||||
#define ZMQ_NEXT_CONNECT_PEER_ID 61
|
#define ZMQ_CONNECT_RID 61
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
#define ZMQ_SRCFD 2
|
#define ZMQ_SRCFD 2
|
||||||
|
@ -31,7 +31,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
|||||||
more_in (false),
|
more_in (false),
|
||||||
current_out (NULL),
|
current_out (NULL),
|
||||||
more_out (false),
|
more_out (false),
|
||||||
next_peer_id (generate_random ()),
|
next_rid (generate_random ()),
|
||||||
mandatory (false),
|
mandatory (false),
|
||||||
// raw_sock functionality in ROUTER is deprecated
|
// raw_sock functionality in ROUTER is deprecated
|
||||||
raw_sock (false),
|
raw_sock (false),
|
||||||
@ -88,12 +88,12 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
int value = is_int? *((int *) optval_): 0;
|
int value = is_int? *((int *) optval_): 0;
|
||||||
|
|
||||||
switch (option_) {
|
switch (option_) {
|
||||||
case ZMQ_NEXT_CONNECT_PEER_ID:
|
case ZMQ_CONNECT_RID:
|
||||||
if(optval_ && optvallen_) {
|
if (optval_ && optvallen_) {
|
||||||
next_identity.assign((char*)optval_,optvallen_);
|
connect_rid.assign ((char *) optval_, optvallen_);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case ZMQ_ROUTER_RAW:
|
case ZMQ_ROUTER_RAW:
|
||||||
if (is_int && value >= 0) {
|
if (is_int && value >= 0) {
|
||||||
raw_sock = (value != 0);
|
raw_sock = (value != 0);
|
||||||
@ -387,33 +387,36 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
msg_t msg;
|
msg_t msg;
|
||||||
blob_t identity;
|
blob_t identity;
|
||||||
bool ok;
|
bool ok;
|
||||||
bool next_identity_used = false;
|
bool connect_rid_used = false;
|
||||||
|
|
||||||
if (next_identity.length()) {
|
if (connect_rid.length()) {
|
||||||
identity = blob_t((unsigned char*) next_identity.c_str(),
|
identity = blob_t ((unsigned char*) connect_rid.c_str (),
|
||||||
next_identity.length());
|
connect_rid.length());
|
||||||
next_identity.clear();
|
connect_rid.clear ();
|
||||||
next_identity_used = true;
|
connect_rid_used = true;
|
||||||
}
|
}
|
||||||
else if (options.raw_sock) { // Always assign identity for raw-socket
|
else
|
||||||
unsigned char buf [5];
|
if (options.raw_sock) { // Always assign identity for raw-socket
|
||||||
|
unsigned char buf [5];
|
||||||
buf [0] = 0;
|
buf [0] = 0;
|
||||||
put_uint32 (buf + 1, next_peer_id++);
|
put_uint32 (buf + 1, next_rid++);
|
||||||
identity = blob_t (buf, sizeof buf);
|
identity = blob_t (buf, sizeof buf);
|
||||||
}
|
}
|
||||||
if (!options.raw_sock){ // pick up handshake cases and also case where next identity is set
|
if (!options.raw_sock) {
|
||||||
|
// Pick up handshake cases and also case where next identity is set
|
||||||
msg.init ();
|
msg.init ();
|
||||||
ok = pipe_->read (&msg);
|
ok = pipe_->read (&msg);
|
||||||
if (!ok)
|
if (!ok)
|
||||||
return false;
|
return false;
|
||||||
if (next_identity_used){ // we read but do not use identity from peer
|
|
||||||
msg.close();
|
if (connect_rid_used) // we read but do not use identity from peer
|
||||||
}
|
msg.close();
|
||||||
else if (msg.size () == 0) {
|
else
|
||||||
|
if (msg.size () == 0) {
|
||||||
// Fall back on the auto-generation
|
// Fall back on the auto-generation
|
||||||
unsigned char buf [5];
|
unsigned char buf [5];
|
||||||
buf [0] = 0;
|
buf [0] = 0;
|
||||||
put_uint32 (buf + 1, next_peer_id++);
|
put_uint32 (buf + 1, next_rid++);
|
||||||
identity = blob_t (buf, sizeof buf);
|
identity = blob_t (buf, sizeof buf);
|
||||||
msg.close ();
|
msg.close ();
|
||||||
}
|
}
|
||||||
@ -432,7 +435,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
// existing pipe so we can terminate it asynchronously.
|
// existing pipe so we can terminate it asynchronously.
|
||||||
unsigned char buf [5];
|
unsigned char buf [5];
|
||||||
buf [0] = 0;
|
buf [0] = 0;
|
||||||
put_uint32 (buf + 1, next_peer_id++);
|
put_uint32 (buf + 1, next_rid++);
|
||||||
blob_t new_identity = blob_t (buf, sizeof buf);
|
blob_t new_identity = blob_t (buf, sizeof buf);
|
||||||
|
|
||||||
it->second.pipe->set_identity (new_identity);
|
it->second.pipe->set_identity (new_identity);
|
||||||
|
@ -104,9 +104,9 @@ namespace zmq
|
|||||||
// If true, more outgoing message parts are expected.
|
// If true, more outgoing message parts are expected.
|
||||||
bool more_out;
|
bool more_out;
|
||||||
|
|
||||||
// Peer ID are generated. It's a simple increment and wrap-over
|
// Routing IDs are generated. It's a simple increment and wrap-over
|
||||||
// algorithm. This value is the next ID to use (if not used already).
|
// algorithm. This value is the next ID to use (if not used already).
|
||||||
uint32_t next_peer_id;
|
uint32_t next_rid;
|
||||||
|
|
||||||
// If true, report EAGAIN to the caller instead of silently dropping
|
// If true, report EAGAIN to the caller instead of silently dropping
|
||||||
// the message targeting an unknown peer.
|
// the message targeting an unknown peer.
|
||||||
|
@ -164,8 +164,10 @@ namespace zmq
|
|||||||
|
|
||||||
// Monitor socket cleanup
|
// Monitor socket cleanup
|
||||||
void stop_monitor ();
|
void stop_monitor ();
|
||||||
|
|
||||||
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
||||||
std::string next_identity;
|
std::string connect_rid;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Creates new endpoint ID and adds the endpoint to the map.
|
// Creates new endpoint ID and adds the endpoint to the map.
|
||||||
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
|
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
|
||||||
|
@ -30,7 +30,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
|||||||
identity_sent (false),
|
identity_sent (false),
|
||||||
current_out (NULL),
|
current_out (NULL),
|
||||||
more_out (false),
|
more_out (false),
|
||||||
next_peer_id (generate_random ())
|
next_rid (generate_random ())
|
||||||
{
|
{
|
||||||
options.type = ZMQ_STREAM;
|
options.type = ZMQ_STREAM;
|
||||||
options.raw_sock = true;
|
options.raw_sock = true;
|
||||||
@ -163,13 +163,14 @@ int zmq::stream_t::xsend (msg_t *msg_)
|
|||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
||||||
size_t optvallen_)
|
size_t optvallen_)
|
||||||
{
|
{
|
||||||
switch (option_) {
|
switch (option_) {
|
||||||
case ZMQ_NEXT_CONNECT_PEER_ID:
|
case ZMQ_CONNECT_RID:
|
||||||
if(optval_ && optvallen_) {
|
if (optval_ && optvallen_) {
|
||||||
next_identity.assign((char*)optval_,optvallen_);
|
connect_rid.assign ((char*) optval_, optvallen_);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -179,6 +180,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::stream_t::xrecv (msg_t *msg_)
|
int zmq::stream_t::xrecv (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (prefetched) {
|
if (prefetched) {
|
||||||
@ -260,13 +262,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
|||||||
unsigned char buffer [5];
|
unsigned char buffer [5];
|
||||||
buffer [0] = 0;
|
buffer [0] = 0;
|
||||||
blob_t identity;
|
blob_t identity;
|
||||||
if (next_identity.length()) {
|
if (connect_rid.length ()) {
|
||||||
identity = blob_t((unsigned char*) next_identity.c_str(),
|
identity = blob_t ((unsigned char*) connect_rid.c_str(),
|
||||||
next_identity.length());
|
connect_rid.length ());
|
||||||
next_identity.clear();
|
connect_rid.clear ();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
put_uint32 (buffer + 1, next_peer_id++);
|
put_uint32 (buffer + 1, next_rid++);
|
||||||
blob_t identity = blob_t (buffer, sizeof buffer);
|
blob_t identity = blob_t (buffer, sizeof buffer);
|
||||||
memcpy (options.identity, identity.data (), identity.size ());
|
memcpy (options.identity, identity.data (), identity.size ());
|
||||||
options.identity_size = identity.size ();
|
options.identity_size = identity.size ();
|
||||||
|
@ -84,9 +84,9 @@ namespace zmq
|
|||||||
// If true, more outgoing message parts are expected.
|
// If true, more outgoing message parts are expected.
|
||||||
bool more_out;
|
bool more_out;
|
||||||
|
|
||||||
// Peer ID are generated. It's a simple increment and wrap-over
|
// Routing IDs are generated. It's a simple increment and wrap-over
|
||||||
// algorithm. This value is the next ID to use (if not used already).
|
// algorithm. This value is the next ID to use (if not used already).
|
||||||
uint32_t next_peer_id;
|
uint32_t next_rid;
|
||||||
|
|
||||||
stream_t (const stream_t&);
|
stream_t (const stream_t&);
|
||||||
const stream_t &operator = (const stream_t&);
|
const stream_t &operator = (const stream_t&);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user