Merge pull request #2745 from sigiesec/rename-identity

Problem: term "identity" is confusing
This commit is contained in:
Luca Boccassi 2017-09-20 10:08:45 +02:00 committed by GitHub
commit 44f96a3652
50 changed files with 490 additions and 453 deletions

View File

@ -289,7 +289,7 @@ ZMQ_HANDSHAKE_IVL: Retrieve maximum handshake interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HANDSHAKE_IVL' option shall retrieve the maximum handshake interval
for the specified 'socket'. Handshaking is the exchange of socket configuration
information (socket type, identity, security) that occurs when a connection
information (socket type, routing id, security) that occurs when a connection
is first opened, only for connection-oriented transports. If handshaking does
not complete within the configured time, the connection shall be closed.
The value 0 means no handshake time limit.
@ -303,19 +303,8 @@ Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented trans
ZMQ_IDENTITY: Retrieve socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'.
Socket identity is used only by request/reply pattern. Namely, it can be used
in tandem with ROUTER socket to route messages to the peer with specific
identity.
Identity should be at least one byte and at most 255 bytes long. Identities
starting with binary zero are reserved for use by 0MQ infrastructure.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REP, ZMQ_REQ, ZMQ_ROUTER, ZMQ_DEALER.
This option name is now deprecated. Use ZMQ_ROUTING_ID instead.
ZMQ_IDENTITY remains as an alias for now.
ZMQ_IMMEDIATE: Retrieve attach-on-connect value
@ -656,6 +645,23 @@ Default value:: 10000
Applicable socket types:: all, when using multicast transports
ZMQ_ROUTING_ID: Retrieve socket routing id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_ROUTING_ID' option shall retrieve the routing id of the specified 'socket'.
Routing ids are used only by the request/reply pattern. Specifically, it can be used
in tandem with ROUTER socket to route messages to the peer with a specific
routing id.
A routing id must be at least one byte and at most 255 bytes long. Identities
starting with a zero byte are reserved for use by the 0MQ infrastructure.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REP, ZMQ_REQ, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_SNDBUF: Retrieve kernel transmit buffer size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SNDBUF' option shall retrieve the underlying kernel transmit buffer

View File

@ -26,14 +26,16 @@ The following ZMTP properties can be retrieved with the _zmq_msg_gets()_
function:
Socket-Type
Identity
Routing-Id
Note: 'Identity' is a deprecated alias for 'Routing-Id'.
Additionally, when available for the underlying transport, the *Peer-Address*
property will return the IP address of the remote endpoint as returned by
getnameinfo(2).
The names of these properties are also defined in _zmq.h_ as
_ZMQ_MSG_PROPERTY_SOCKET_TYPE_ _ZMQ_MSG_PROPERTY_IDENTITY_, and
_ZMQ_MSG_PROPERTY_SOCKET_TYPE_ _ZMQ_MSG_PROPERTY_ROUTING_ID_, and
_ZMQ_MSG_PROPERTY_PEER_ADDRESS_.
Currently, these definitions are only available as a DRAFT API.

View File

@ -90,22 +90,29 @@ Applicable socket types:: all, when using TCP or UDP transports.
ZMQ_CONNECT_RID: Assign the next outbound connection id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_CONNECT_RID' option sets the peer id of the next host connected
via the zmq_connect() call, and immediately readies that connection for
data transfer with the named id. This option applies only to the first
subsequent call to zmq_connect(), calls thereafter use default connection
behaviour.
This option name is now deprecated. Use ZMQ_CONNECT_ROUTING_ID instead.
ZMQ_CONNECT_RID remains as an alias for now.
Typical use is to set this socket option ahead of each zmq_connect() attempt
to a new host. Each connection MUST be assigned a unique name. Assigning a
name that is already in use is not allowed.
ZMQ_CONNECT_ROUTING_ID: Assign the next outbound routing id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_CONNECT_ROUTING_ID' option sets the peer id of the peer connected
via the next zmq_connect() call, such that that connection is immediately ready for
data transfer with the given routing id. This option applies only to the first
subsequent call to zmq_connect(), zmq_connect() calls thereafter use the default
connection behaviour.
Typical use is to set this socket option ahead of each zmq_connect() call.
Each connection MUST be assigned a unique routing id. Assigning a
routing id that is already in use is not allowed.
Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it
allows for immediate sending to peers. Outbound id framing requirements
allows for immediate sending to peers. Outbound routing id framing requirements
for ROUTER and STREAM sockets apply.
The peer id should be from 1 to 255 bytes long and MAY NOT start with
binary zero.
The routing id must be from 1 to 255 bytes long and MAY NOT start with
a zero byte (such routing ids are reserved for internal use by the 0MQ
infrastructure).
[horizontal]
Option value type:: binary data
@ -305,7 +312,7 @@ ZMQ_HANDSHAKE_IVL: Set maximum handshake interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HANDSHAKE_IVL' option shall set the maximum handshake interval for
the specified 'socket'. Handshaking is the exchange of socket configuration
information (socket type, identity, security) that occurs when a connection
information (socket type, routing id, security) that occurs when a connection
is first opened, only for connection-oriented transports. If handshaking does
not complete within the configured time, the connection shall be closed.
The value 0 means no handshake time limit.
@ -364,22 +371,8 @@ Applicable socket types:: all, when using connection-oriented transports
ZMQ_IDENTITY: Set socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'
when connecting to a ROUTER socket. The identity should be from 1 to 255
bytes long and may contain any values.
If two clients use the same identity when connecting to a ROUTER, the
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
is not set (or set to the default of zero), the ROUTER socket shall reject
clients trying to connect with an already-used identity. If that option
is set to 1, the ROUTER socket shall hand-over the connection to the new
client and disconnect the existing one.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER, ZMQ_DEALER.
This option name is now deprecated. Use ZMQ_ROUTING_ID instead.
ZMQ_IDENTITY remains as an alias for now.
ZMQ_IMMEDIATE: Queue messages only to completed connections
@ -737,12 +730,12 @@ Default value:: 0
Applicable socket types:: ZMQ_REQ
ZMQ_ROUTER_HANDOVER: handle duplicate client identities on ROUTER sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If two clients use the same identity when connecting to a ROUTER, the
ZMQ_ROUTER_HANDOVER: handle duplicate client routing ids on ROUTER sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If two clients use the same routing id when connecting to a ROUTER, the
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
is not set (or set to the default of zero), the ROUTER socket shall reject
clients trying to connect with an already-used identity. If that option
clients trying to connect with an already-used routing id. If that option
is set to 1, the ROUTER socket shall hand-over the connection to the new
client and disconnect the existing one.
@ -782,7 +775,7 @@ raw mode, and when using the tcp:// transport, it will read and write TCP data
without 0MQ framing. This lets 0MQ applications talk to non-0MQ applications.
When using raw mode, you cannot set explicit identities, and the ZMQ_SNDMORE
flag is ignored when sending data messages. In raw mode you can close a specific
connection by sending it a zero-length message (following the identity frame).
connection by sending it a zero-length message (following the routing id frame).
NOTE: This option is deprecated, please use ZMQ_STREAM sockets instead.
@ -793,6 +786,28 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER
ZMQ_ROUTING_ID: Set socket routing id
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_ROUTING_ID' option shall set the routing id of the specified 'socket'
when connecting to a ROUTER socket.
A routing id must be at least one byte and at most 255 bytes long. Identities
starting with a zero byte are reserved for use by the 0MQ infrastructure.
If two clients use the same routing id when connecting to a ROUTER, the
results shall depend on the ZMQ_ROUTER_HANDOVER option setting. If that
is not set (or set to the default of zero), the ROUTER socket shall reject
clients trying to connect with an already-used routing id. If that option
is set to 1, the ROUTER socket shall hand-over the connection to the new
client and disconnect the existing one.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_REQ, ZMQ_REP, ZMQ_ROUTER, ZMQ_DEALER.
ZMQ_SNDBUF: Set kernel transmit buffer size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SNDBUF' option shall set the underlying kernel transmit buffer size

View File

@ -383,26 +383,26 @@ non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can
act as client and/or server, sending and/or receiving TCP data asynchronously.
When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part
containing the _identity_ of the originating peer to the message before passing
containing the _routing id_ of the originating peer to the message before passing
it to the application. Messages received are fair-queued from among all
connected peers.
When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the
message and use it to determine the _identity_ of the peer the message shall be
message and use it to determine the _routing id_ of the peer the message shall be
routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.
To open a connection to a server, use the zmq_connect call, and then fetch the
socket identity using the ZMQ_IDENTITY zmq_getsockopt call.
socket routing id using the zmq_getsockopt call with the ZMQ_ROUTING_ID option.
To close a specific connection, send the identity frame followed by a
To close a specific connection, send the routing id frame followed by a
zero-length message (see EXAMPLE section).
When a connection is made, a zero-length message will be received by the
application. Similarly, when the peer disconnects (or the connection is lost),
a zero-length message will be received by the application.
You must send one identity frame followed by one data frame. The ZMQ_SNDMORE
flag is required for identity frames but is ignored on data frames.
You must send one routing id frame followed by one data frame. The ZMQ_SNDMORE
flag is required for routing id frames but is ignored on data frames.
[horizontal]
.Summary of ZMQ_STREAM characteristics
@ -492,10 +492,10 @@ ZMQ_ROUTER
^^^^^^^^^^
A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending
request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall
prepend a message part containing the _identity_ of the originating peer to the
prepend a message part containing the _routing id_ of the originating peer to the
message before passing it to the application. Messages received are fair-queued
from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall
remove the first part of the message and use it to determine the _identity_ of
remove the first part of the message and use it to determine the _routing id _ of
the peer the message shall be routed to. If the peer does not exist anymore, or
has never existed, the message shall be silently discarded. However, if
'ZMQ_ROUTER_MANDATORY' socket option is set to '1', the socket shall fail
@ -514,9 +514,9 @@ or more peers. Likewise, the socket shall generate 'ZMQ_POLLOUT' events when
at least one message can be sent to one or more peers.
When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the
_identity_ of the originating peer each message received shall contain an empty
_routing id_ of the originating peer each message received shall contain an empty
_delimiter_ message part. Hence, the entire structure of each received message
as seen by the application becomes: one or more _identity_ parts, _delimiter_
as seen by the application becomes: one or more _routing id_ parts, _delimiter_
part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the
application must include the _delimiter_ part.
@ -559,16 +559,16 @@ void *socket = zmq_socket (ctx, ZMQ_STREAM);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:8080");
assert (rc == 0);
/* Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t id_size = 256;
/* Data structure to hold the ZMQ_STREAM routing id */
uint8_t routing_id [256];
size_t routing_id_size = 256;
/* Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t raw_size = 256;
while (1) {
/* Get HTTP request; ID frame and then request */
id_size = zmq_recv (socket, id, 256, 0);
assert (id_size > 0);
/* Get HTTP request; routing id frame and then request */
routing_id_size = zmq_recv (socket, routing_id, 256, 0);
assert (routing_id_size > 0);
do {
raw_size = zmq_recv (socket, raw, 256, 0);
assert (raw_size >= 0);
@ -579,11 +579,11 @@ while (1) {
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
/* Sends the ID frame followed by the response */
zmq_send (socket, id, id_size, ZMQ_SNDMORE);
/* Sends the routing id frame followed by the response */
zmq_send (socket, routing_id, routing_id_size, ZMQ_SNDMORE);
zmq_send (socket, http_response, strlen (http_response), 0);
/* Closes the connection by sending the ID frame followed by a zero response */
zmq_send (socket, id, id_size, ZMQ_SNDMORE);
/* Closes the connection by sending the routing id frame followed by a zero response */
zmq_send (socket, routing_id, routing_id_size, ZMQ_SNDMORE);
zmq_send (socket, 0, 0, 0);
}
zmq_close (socket);

View File

@ -299,7 +299,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
/* Socket options. */
#define ZMQ_AFFINITY 4
#define ZMQ_IDENTITY 5
#define ZMQ_ROUTING_ID 5
#define ZMQ_SUBSCRIBE 6
#define ZMQ_UNSUBSCRIBE 7
#define ZMQ_RATE 8
@ -345,7 +345,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
#define ZMQ_ZAP_DOMAIN 55
#define ZMQ_ROUTER_HANDOVER 56
#define ZMQ_TOS 57
#define ZMQ_CONNECT_RID 61
#define ZMQ_CONNECT_ROUTING_ID 61
#define ZMQ_GSSAPI_SERVER 62
#define ZMQ_GSSAPI_PRINCIPAL 63
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
@ -390,6 +390,8 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
#define ZMQ_GROUP_MAX_LENGTH 15
/* Deprecated options and aliases */
#define ZMQ_IDENTITY ZMQ_ROUTING_ID
#define ZMQ_CONNECT_RID ZMQ_CONNECT_ROUTING_ID
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_IPC_FILTER_PID 58
#define ZMQ_IPC_FILTER_UID 59
@ -620,7 +622,7 @@ ZMQ_EXPORT int zmq_msg_set_group(zmq_msg_t *msg, const char *group);
ZMQ_EXPORT const char *zmq_msg_group(zmq_msg_t *msg);
/* DRAFT Msg property names. */
#define ZMQ_MSG_PROPERTY_IDENTITY "Identity"
#define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
@ -662,8 +664,8 @@ ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd);
#endif
ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
const void *identity,
size_t identity_size);
const void *routing_id,
size_t routing_id_size);
/******************************************************************************/
/* Scheduling timers */

View File

@ -529,7 +529,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
bind_socket_->inc_seqnum();
pending_connection_.bind_pipe->set_tid (bind_socket_->get_tid ());
if (!bind_options.recv_identity) {
if (!bind_options.recv_routing_id) {
msg_t msg;
const bool ok = pending_connection_.bind_pipe->read (&msg);
zmq_assert (ok);
@ -569,16 +569,16 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
// When a ctx is terminated all pending inproc connection will be
// connected, but the socket will already be closed and the pipe will be
// in waiting_for_delimiter state, which means no more writes can be done
// and the identity write fails and causes an assert. Check if the socket
// and the routing id write fails and causes an assert. Check if the socket
// is open before sending.
if (pending_connection_.endpoint.options.recv_identity &&
if (pending_connection_.endpoint.options.recv_routing_id &&
pending_connection_.endpoint.socket->check_tag ()) {
msg_t id;
const int rc = id.init_size (bind_options.identity_size);
msg_t routing_id;
const int rc = routing_id.init_size (bind_options.routing_id_size);
errno_assert (rc == 0);
memcpy (id.data (), bind_options.identity, bind_options.identity_size);
id.set_flags (msg_t::identity);
const bool written = pending_connection_.bind_pipe->write (&id);
memcpy (routing_id.data (), bind_options.routing_id, bind_options.routing_id_size);
routing_id.set_flags (msg_t::routing_id);
const bool written = pending_connection_.bind_pipe->write (&routing_id);
zmq_assert (written);
pending_connection_.bind_pipe->flush ();
}

View File

@ -46,17 +46,17 @@ zmq::mechanism_t::~mechanism_t ()
{
}
void zmq::mechanism_t::set_peer_identity (const void *id_ptr, size_t id_size)
void zmq::mechanism_t::set_peer_routing_id (const void *id_ptr, size_t id_size)
{
identity = blob_t (static_cast <const unsigned char*> (id_ptr), id_size);
routing_id = blob_t (static_cast <const unsigned char*> (id_ptr), id_size);
}
void zmq::mechanism_t::peer_identity (msg_t *msg_)
void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
{
const int rc = msg_->init_size (identity.size ());
const int rc = msg_->init_size (routing_id.size ());
errno_assert (rc == 0);
memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::identity);
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
msg_->set_flags (msg_t::routing_id);
}
void zmq::mechanism_t::set_user_id (const void *data_, size_t size_)
@ -121,6 +121,9 @@ size_t zmq::mechanism_t::property_len (const char *name, size_t value_len)
return ::property_len (name_len (name), value_len);
}
#define ZMTP_PROPERTY_SOCKET_TYPE "Socket-Type"
#define ZMTP_PROPERTY_IDENTITY "Identity"
size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
size_t buf_capacity) const
{
@ -129,15 +132,15 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
// Add socket type property
const char *socket_type = socket_type_string (options.type);
ptr += add_property (ptr, buf_capacity,
ZMQ_MSG_PROPERTY_SOCKET_TYPE, socket_type,
ZMTP_PROPERTY_SOCKET_TYPE, socket_type,
strlen (socket_type));
// Add identity property
// Add identity (aka routing id) property
if (options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|| options.type == ZMQ_ROUTER)
ptr += add_property (ptr, buf_capacity - (ptr - buf),
ZMQ_MSG_PROPERTY_IDENTITY, options.identity,
options.identity_size);
ZMTP_PROPERTY_IDENTITY, options.routing_id,
options.routing_id_size);
return ptr - buf;
}
@ -145,11 +148,11 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
size_t zmq::mechanism_t::basic_properties_len() const
{
const char *socket_type = socket_type_string (options.type);
return property_len (ZMQ_MSG_PROPERTY_SOCKET_TYPE, strlen (socket_type))
return property_len (ZMTP_PROPERTY_SOCKET_TYPE, strlen (socket_type))
+ ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|| options.type == ZMQ_ROUTER)
? property_len (ZMQ_MSG_PROPERTY_IDENTITY,
options.identity_size)
? property_len (ZMTP_PROPERTY_IDENTITY,
options.routing_id_size)
: 0);
}
@ -199,10 +202,10 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
ptr_ += value_length;
bytes_left -= value_length;
if (name == ZMQ_MSG_PROPERTY_IDENTITY && options.recv_identity)
set_peer_identity (value, value_length);
if (name == ZMTP_PROPERTY_IDENTITY && options.recv_routing_id)
set_peer_routing_id (value, value_length);
else
if (name == ZMQ_MSG_PROPERTY_SOCKET_TYPE) {
if (name == ZMTP_PROPERTY_SOCKET_TYPE) {
const std::string socket_type ((char *) value, value_length);
if (!check_socket_type (socket_type)) {
errno = EINVAL;

View File

@ -74,9 +74,9 @@ namespace zmq
// Returns the status of this mechanism.
virtual status_t status () const = 0;
void set_peer_identity (const void *id_ptr, size_t id_size);
void set_peer_routing_id (const void *id_ptr, size_t id_size);
void peer_identity (msg_t *msg_);
void peer_routing_id (msg_t *msg_);
void set_user_id (const void *user_id, size_t size);
@ -138,7 +138,7 @@ namespace zmq
private:
blob_t identity;
blob_t routing_id;
blob_t user_id;

View File

@ -39,8 +39,14 @@ zmq::metadata_t::metadata_t (const dict_t &dict) :
const char *zmq::metadata_t::get (const std::string &property) const
{
dict_t::const_iterator it = dict.find (property);
if (it == dict.end ())
if (it == dict.end())
{
/** \todo remove this when support for the deprecated name "Identity" is dropped */
if (property == "Identity")
return get (ZMQ_MSG_PROPERTY_ROUTING_ID);
return NULL;
}
else
return it->second.c_str ();
}

View File

@ -413,9 +413,9 @@ void zmq::msg_t::reset_metadata ()
}
}
bool zmq::msg_t::is_identity () const
bool zmq::msg_t::is_routing_id () const
{
return (u.base.flags & identity) == identity;
return (u.base.flags & routing_id) == routing_id;
}
bool zmq::msg_t::is_credential () const

View File

@ -79,7 +79,7 @@ namespace zmq
more = 1, // Followed by more parts
command = 2, // Command frame (see ZMTP spec)
credential = 32,
identity = 64,
routing_id = 64,
shared = 128
};
@ -109,7 +109,7 @@ namespace zmq
metadata_t *metadata () const;
void set_metadata (metadata_t *metadata_);
void reset_metadata ();
bool is_identity () const;
bool is_routing_id () const;
bool is_credential () const;
bool is_delimiter () const;
bool is_join () const;

View File

@ -48,7 +48,7 @@ zmq::options_t::options_t () :
sndhwm (1000),
rcvhwm (1000),
affinity (0),
identity_size (0),
routing_id_size (0),
rate (100),
recovery_ivl (10000),
multicast_hops (1),
@ -70,7 +70,7 @@ zmq::options_t::options_t () :
immediate (0),
filter (false),
invert_matching(false),
recv_identity (false),
recv_routing_id (false),
raw_socket (false),
raw_notify (true),
tcp_keepalive (-1),
@ -166,11 +166,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
case ZMQ_IDENTITY:
// Identity is any binary string from 1 to 255 octets
case ZMQ_ROUTING_ID:
// Routing id is any binary string from 1 to 255 octets
if (optvallen_ > 0 && optvallen_ < 256) {
identity_size = (unsigned char) optvallen_;
memcpy (identity, optval_, identity_size);
routing_id_size = (unsigned char) optvallen_;
memcpy (routing_id, optval_, routing_id_size);
return 0;
}
break;
@ -679,10 +679,10 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
case ZMQ_IDENTITY:
if (*optvallen_ >= identity_size) {
memcpy (optval_, identity, identity_size);
*optvallen_ = identity_size;
case ZMQ_ROUTING_ID:
if (*optvallen_ >= routing_id_size) {
memcpy (optval_, routing_id, routing_id_size);
*optvallen_ = routing_id_size;
return 0;
}
break;

View File

@ -70,9 +70,9 @@ namespace zmq
// I/O thread affinity.
uint64_t affinity;
// Socket identity
unsigned char identity_size;
unsigned char identity [256];
// Socket routing id.
unsigned char routing_id_size;
unsigned char routing_id [256];
// Maximum transfer rate [kb/s]. Default 100kb/s.
int rate;
@ -143,8 +143,8 @@ namespace zmq
// sockets.
bool invert_matching;
// If true, the identity message is forwarded to the socket.
bool recv_identity;
// If true, the routing id message is forwarded to the socket.
bool recv_routing_id;
// if true, router socket accepts non-zmq tcp connections
bool raw_socket;

View File

@ -92,7 +92,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
sink (NULL),
state (active),
delay (true),
routing_id(0),
server_socket_routing_id (0),
conflate (conflate_)
{
}
@ -115,24 +115,24 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
void zmq::pipe_t::set_server_socket_routing_id (uint32_t server_socket_routing_id_)
{
routing_id = routing_id_;
server_socket_routing_id = server_socket_routing_id_;
}
uint32_t zmq::pipe_t::get_routing_id ()
uint32_t zmq::pipe_t::get_server_socket_routing_id ()
{
return routing_id;
return server_socket_routing_id;
}
void zmq::pipe_t::set_identity (const blob_t &identity_)
void zmq::pipe_t::set_router_socket_routing_id (const blob_t &router_socket_routing_id_)
{
identity = identity_;
router_socket_routing_id = router_socket_routing_id_;
}
zmq::blob_t zmq::pipe_t::get_identity ()
zmq::blob_t zmq::pipe_t::get_routing_id ()
{
return identity;
return router_socket_routing_id;
}
zmq::blob_t zmq::pipe_t::get_credential () const
@ -194,7 +194,7 @@ read_message:
return false;
}
if (!(msg_->flags () & msg_t::more) && !msg_->is_identity ())
if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ())
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@ -224,9 +224,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
return false;
bool more = msg_->flags () & msg_t::more ? true : false;
const bool is_identity = msg_->is_identity ();
const bool is_routing_id = msg_->is_routing_id ();
outpipe->write (*msg_, more);
if (!more && !is_identity)
if (!more && !is_routing_id)
msgs_written++;
return true;

View File

@ -85,12 +85,12 @@ namespace zmq
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an routing ID to be used by its clients.
void set_routing_id (uint32_t routing_id_);
uint32_t get_routing_id ();
void set_server_socket_routing_id (uint32_t routing_id_);
uint32_t get_server_socket_routing_id ();
// Pipe endpoint can store an opaque ID to be used by its clients.
void set_identity (const blob_t &identity_);
blob_t get_identity ();
void set_router_socket_routing_id (const blob_t &identity_);
blob_t get_routing_id ();
blob_t get_credential () const;
@ -226,11 +226,11 @@ namespace zmq
// asks us to.
bool delay;
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
// Routing id of the writer. Used uniquely by the reader side.
blob_t router_socket_routing_id;
// Identity of the writer. Used uniquely by the reader side.
int routing_id;
// Routing id of the writer. Used uniquely by the reader side.
int server_socket_routing_id;
// Pipe's credential.
blob_t credential;

View File

@ -75,7 +75,7 @@ int zmq::req_t::xsend (msg_t *msg_)
message_begins = true;
}
// First part of the request is the request identity.
// First part of the request is the request routing id.
if (message_begins) {
reply_pipe = NULL;

View File

@ -39,13 +39,13 @@
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (false),
identity_sent (false),
routing_id_sent (false),
current_in (NULL),
terminate_current_in (false),
more_in (false),
current_out (NULL),
more_out (false),
next_rid (generate_random ()),
next_integral_routing_id (generate_random ()),
mandatory (false),
// raw_socket functionality in ROUTER is deprecated
raw_socket (false),
@ -53,7 +53,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
handover (false)
{
options.type = ZMQ_ROUTER;
options.recv_identity = true;
options.recv_routing_id = true;
options.raw_socket = false;
prefetched_id.init ();
@ -87,8 +87,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
errno_assert (rc == 0);
}
bool identity_ok = identify_peer (pipe_);
if (identity_ok)
bool routing_id_ok = identify_peer (pipe_);
if (routing_id_ok)
fq.attach (pipe_);
else
anonymous_pipes.insert (pipe_);
@ -102,9 +102,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
if (is_int) memcpy(&value, optval_, sizeof (int));
switch (option_) {
case ZMQ_CONNECT_RID:
case ZMQ_CONNECT_ROUTING_ID:
if (optval_ && optvallen_) {
connect_rid.assign ((char *) optval_, optvallen_);
connect_routing_id.assign ((char *) optval_, optvallen_);
return 0;
}
break;
@ -113,7 +113,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
if (is_int && value >= 0) {
raw_socket = (value != 0);
if (raw_socket) {
options.recv_identity = false;
options.recv_routing_id = false;
options.raw_socket = true;
}
return 0;
@ -155,7 +155,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
if (it != anonymous_pipes.end ())
anonymous_pipes.erase (it);
else {
outpipes_t::iterator iter = outpipes.find (pipe_->get_identity ());
outpipes_t::iterator iter = outpipes.find (pipe_->get_routing_id ());
zmq_assert (iter != outpipes.end ());
outpipes.erase (iter);
fq.pipe_terminated (pipe_);
@ -171,8 +171,8 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
if (it == anonymous_pipes.end ())
fq.activated (pipe_);
else {
bool identity_ok = identify_peer (pipe_);
if (identity_ok) {
bool routing_id_ok = identify_peer (pipe_);
if (routing_id_ok) {
anonymous_pipes.erase (it);
fq.attach (pipe_);
}
@ -205,11 +205,11 @@ int zmq::router_t::xsend (msg_t *msg_)
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
// Find the pipe associated with the routing id stored in the prefix.
// If there's no such pipe just silently ignore the message, unless
// router_mandatory is set.
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
blob_t routing_id ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (routing_id);
if (it != outpipes.end ()) {
current_out = it->second.pipe;
@ -300,10 +300,10 @@ int zmq::router_t::xsend (msg_t *msg_)
int zmq::router_t::xrecv (msg_t *msg_)
{
if (prefetched) {
if (!identity_sent) {
if (!routing_id_sent) {
int rc = msg_->move (prefetched_id);
errno_assert (rc == 0);
identity_sent = true;
routing_id_sent = true;
}
else {
int rc = msg_->move (prefetched_msg);
@ -325,10 +325,10 @@ int zmq::router_t::xrecv (msg_t *msg_)
pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe);
// It's possible that we receive peer's identity. That happens
// It's possible that we receive peer's routing id. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
while (rc == 0 && msg_->is_identity ())
// the peer always uses the same routing id.
while (rc == 0 && msg_->is_routing_id ())
rc = fq.recvpipe (msg_, &pipe);
if (rc != 0)
@ -357,14 +357,14 @@ int zmq::router_t::xrecv (msg_t *msg_)
prefetched = true;
current_in = pipe;
blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ());
blob_t routing_id = pipe->get_routing_id ();
rc = msg_->init_size (routing_id.size ());
errno_assert (rc == 0);
memcpy (msg_->data (), identity.data (), identity.size ());
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
msg_->set_flags (msg_t::more);
if (prefetched_msg.metadata())
msg_->set_metadata(prefetched_msg.metadata());
identity_sent = true;
routing_id_sent = true;
}
return 0;
@ -396,11 +396,11 @@ bool zmq::router_t::xhas_in ()
pipe_t *pipe = NULL;
int rc = fq.recvpipe (&prefetched_msg, &pipe);
// It's possible that we receive peer's identity. That happens
// It's possible that we receive peer's routing id. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && prefetched_msg.is_identity ())
// the peer always uses the same routing id.
// TODO: handle the situation when the peer changes its routing id.
while (rc == 0 && prefetched_msg.is_routing_id ())
rc = fq.recvpipe (&prefetched_msg, &pipe);
if (rc != 0)
@ -408,14 +408,14 @@ bool zmq::router_t::xhas_in ()
zmq_assert (pipe != NULL);
blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ());
blob_t routing_id = pipe->get_routing_id ();
rc = prefetched_id.init_size (routing_id.size ());
errno_assert (rc == 0);
memcpy (prefetched_id.data (), identity.data (), identity.size ());
memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ());
prefetched_id.set_flags (msg_t::more);
prefetched = true;
identity_sent = false;
routing_id_sent = false;
current_in = pipe;
return true;
@ -443,13 +443,13 @@ zmq::blob_t zmq::router_t::get_credential () const
return fq.get_credential ();
}
int zmq::router_t::get_peer_state (const void *identity,
size_t identity_size) const
int zmq::router_t::get_peer_state (const void *routing_id_,
size_t routing_id_size_) const
{
int res = 0;
blob_t identity_blob ((unsigned char *) identity, identity_size);
outpipes_t::const_iterator it = outpipes.find (identity_blob);
blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_);
outpipes_t::const_iterator it = outpipes.find (routing_id_blob);
if (it == outpipes.end ()) {
errno = EHOSTUNREACH;
return -1;
@ -467,27 +467,27 @@ int zmq::router_t::get_peer_state (const void *identity,
bool zmq::router_t::identify_peer (pipe_t *pipe_)
{
msg_t msg;
blob_t identity;
blob_t routing_id;
bool ok;
if (connect_rid.length()) {
identity = blob_t ((unsigned char*) connect_rid.c_str (),
connect_rid.length());
connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
if (connect_routing_id.length()) {
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str (),
connect_routing_id.length());
connect_routing_id.clear ();
outpipes_t::iterator it = outpipes.find (routing_id);
if (it != outpipes.end ())
zmq_assert(false); // Not allowed to duplicate an existing rid
}
else
if (options.raw_socket) { // Always assign identity for raw-socket
if (options.raw_socket) { // Always assign an integral routing id for raw-socket
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_rid++);
identity = blob_t (buf, sizeof buf);
put_uint32 (buf + 1, next_integral_routing_id++);
routing_id = blob_t (buf, sizeof buf);
}
else
if (!options.raw_socket) {
// Pick up handshake cases and also case where next identity is set
// Pick up handshake cases and also case where next integral routing id is set
msg.init ();
ok = pipe_->read (&msg);
if (!ok)
@ -497,13 +497,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
// Fall back on the auto-generation
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_rid++);
identity = blob_t (buf, sizeof buf);
put_uint32 (buf + 1, next_integral_routing_id++);
routing_id = blob_t (buf, sizeof buf);
msg.close ();
}
else {
identity = blob_t ((unsigned char*) msg.data (), msg.size ());
outpipes_t::iterator it = outpipes.find (identity);
routing_id = blob_t ((unsigned char*) msg.data (), msg.size ());
outpipes_t::iterator it = outpipes.find (routing_id);
msg.close ();
if (it != outpipes.end ()) {
@ -512,23 +512,23 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
return false;
else {
// We will allow the new connection to take over this
// identity. Temporarily assign a new identity to the
// routing id. Temporarily assign a new routing id to the
// existing pipe so we can terminate it asynchronously.
unsigned char buf [5];
buf [0] = 0;
put_uint32 (buf + 1, next_rid++);
blob_t new_identity = blob_t (buf, sizeof buf);
put_uint32 (buf + 1, next_integral_routing_id++);
blob_t new_routing_id = blob_t (buf, sizeof buf);
it->second.pipe->set_identity (new_identity);
it->second.pipe->set_router_socket_routing_id (new_routing_id);
outpipe_t existing_outpipe =
{it->second.pipe, it->second.active};
ok = outpipes.insert (outpipes_t::value_type (
new_identity, existing_outpipe)).second;
new_routing_id, existing_outpipe)).second;
zmq_assert (ok);
// Remove the existing identity entry to allow the new
// connection to take the identity.
// Remove the existing routing id entry to allow the new
// connection to take the routing id.
outpipes.erase (it);
if (existing_outpipe.pipe == current_in)
@ -540,10 +540,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
}
}
pipe_->set_identity (identity);
pipe_->set_router_socket_routing_id (routing_id);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
zmq_assert (ok);
return true;

View File

@ -85,7 +85,7 @@ namespace zmq
// If true, the receiver got the message part with
// the peer's identity.
bool identity_sent;
bool routing_id_sent;
// Holds the prefetched identity.
msg_t prefetched_id;
@ -123,7 +123,7 @@ namespace zmq
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_rid;
uint32_t next_integral_routing_id;
// If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer.

View File

@ -38,7 +38,7 @@
zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true),
next_rid (generate_random ())
next_routing_id (generate_random ())
{
options.type = ZMQ_SERVER;
}
@ -54,11 +54,11 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
zmq_assert (pipe_);
uint32_t routing_id = next_rid++;
uint32_t routing_id = next_routing_id++;
if (!routing_id)
routing_id = next_rid++; // Never use RID zero
routing_id = next_routing_id++; // Never use Routing ID zero
pipe_->set_routing_id (routing_id);
pipe_->set_server_socket_routing_id (routing_id);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
@ -69,7 +69,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
{
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
outpipes_t::iterator it = outpipes.find (pipe_->get_server_socket_routing_id ());
zmq_assert (it != outpipes.end ());
outpipes.erase (it);
fq.pipe_terminated (pipe_);
@ -159,7 +159,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
zmq_assert (pipe != NULL);
uint32_t routing_id = pipe->get_routing_id ();
uint32_t routing_id = pipe->get_server_socket_routing_id ();
msg_->set_routing_id (routing_id);
return 0;

View File

@ -85,7 +85,7 @@ namespace zmq
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_rid;
uint32_t next_routing_id;
server_t (const server_t&);
const server_t &operator = (const server_t&);

View File

@ -358,12 +358,12 @@ int zmq::session_base_t::zap_connect ()
send_bind (peer.socket, new_pipes [1], false);
// Send empty identity if required by the peer.
if (peer.options.recv_identity) {
// Send empty routing id if required by the peer.
if (peer.options.recv_routing_id) {
msg_t id;
rc = id.init ();
errno_assert (rc == 0);
id.set_flags (msg_t::identity);
id.set_flags (msg_t::routing_id);
bool ok = zap_pipe->write (&id);
zmq_assert (ok);
zap_pipe->flush ();

View File

@ -221,11 +221,11 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
}
}
int zmq::socket_base_t::get_peer_state (const void *identity,
size_t identity_size) const
int zmq::socket_base_t::get_peer_state (const void *routing_id_,
size_t routing_id_size_) const
{
LIBZMQ_UNUSED (identity);
LIBZMQ_UNUSED (identity_size);
LIBZMQ_UNUSED (routing_id_);
LIBZMQ_UNUSED (routing_id_size_);
// Only ROUTER sockets support this
errno = ENOTSUP;
@ -764,14 +764,14 @@ int zmq::socket_base_t::connect (const char *addr_)
if (!peer.socket) {
// The peer doesn't exist yet so we don't know whether
// to send the identity message or not. To resolve this,
// we always send our identity and drop it later if
// to send the routing id message or not. To resolve this,
// we always send our routing id and drop it later if
// the peer doesn't expect it.
msg_t id;
rc = id.init_size (options.identity_size);
rc = id.init_size (options.routing_id_size);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
memcpy (id.data (), options.routing_id, options.routing_id_size);
id.set_flags (msg_t::routing_id);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
@ -780,25 +780,25 @@ int zmq::socket_base_t::connect (const char *addr_)
pend_connection (std::string (addr_), endpoint, new_pipes);
}
else {
// If required, send the identity of the local socket to the peer.
if (peer.options.recv_identity) {
// If required, send the routing id of the local socket to the peer.
if (peer.options.recv_routing_id) {
msg_t id;
rc = id.init_size (options.identity_size);
rc = id.init_size (options.routing_id_size);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
memcpy (id.data (), options.routing_id, options.routing_id_size);
id.set_flags (msg_t::routing_id);
bool written = new_pipes [0]->write (&id);
zmq_assert (written);
new_pipes [0]->flush ();
}
// If required, send the identity of the peer to the local socket.
if (options.recv_identity) {
// If required, send the routing id of the peer to the local socket.
if (options.recv_routing_id) {
msg_t id;
rc = id.init_size (peer.options.identity_size);
rc = id.init_size (peer.options.routing_id_size);
errno_assert (rc == 0);
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
id.set_flags (msg_t::identity);
memcpy (id.data (), peer.options.routing_id, peer.options.routing_id_size);
id.set_flags (msg_t::routing_id);
bool written = new_pipes [1]->write (&id);
zmq_assert (written);
new_pipes [1]->flush ();
@ -1597,9 +1597,9 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
// Test whether IDENTITY flag is valid for this socket type.
if (unlikely (msg_->flags () & msg_t::identity))
zmq_assert (options.recv_identity);
// Test whether routing_id flag is valid for this socket type.
if (unlikely (msg_->flags () & msg_t::routing_id))
zmq_assert (options.recv_routing_id);
// Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;

View File

@ -186,7 +186,7 @@ namespace zmq
void process_destroy ();
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string connect_rid;
std::string connect_routing_id;
private:
// test if event should be sent and then dispatch it

View File

@ -39,22 +39,22 @@
zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (false),
identity_sent (false),
routing_id_sent (false),
current_out (NULL),
more_out (false),
next_rid (generate_random ())
next_integral_routing_id (generate_random ())
{
options.type = ZMQ_STREAM;
options.raw_socket = true;
prefetched_id.init ();
prefetched_routing_id.init ();
prefetched_msg.init ();
}
zmq::stream_t::~stream_t ()
{
zmq_assert (outpipes.empty ());
prefetched_id.close ();
prefetched_routing_id.close ();
prefetched_msg.close ();
}
@ -70,7 +70,7 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
{
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
zmq_assert (it != outpipes.end ());
outpipes.erase (it);
fq.pipe_terminated (pipe_);
@ -107,10 +107,10 @@ int zmq::stream_t::xsend (msg_t *msg_)
// TODO: The connections should be killed instead.
if (msg_->flags () & msg_t::more) {
// Find the pipe associated with the identity stored in the prefix.
// Find the pipe associated with the routing id stored in the prefix.
// If there's no such pipe return an error
blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
blob_t routing_id ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (routing_id);
if (it != outpipes.end ()) {
current_out = it->second.pipe;
@ -183,9 +183,9 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
if (is_int) memcpy(&value, optval_, sizeof (int));
switch (option_) {
case ZMQ_CONNECT_RID:
case ZMQ_CONNECT_ROUTING_ID:
if (optval_ && optvallen_) {
connect_rid.assign ((char*) optval_, optvallen_);
connect_routing_id.assign ((char*) optval_, optvallen_);
return 0;
}
break;
@ -207,10 +207,10 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
int zmq::stream_t::xrecv (msg_t *msg_)
{
if (prefetched) {
if (!identity_sent) {
int rc = msg_->move (prefetched_id);
if (!routing_id_sent) {
int rc = msg_->move (prefetched_routing_id);
errno_assert (rc == 0);
identity_sent = true;
routing_id_sent = true;
}
else {
int rc = msg_->move (prefetched_msg);
@ -231,10 +231,10 @@ int zmq::stream_t::xrecv (msg_t *msg_)
// We have received a frame with TCP data.
// Rather than sending this frame, we keep it in prefetched
// buffer and send a frame with peer's ID.
blob_t identity = pipe->get_identity ();
blob_t routing_id = pipe->get_routing_id ();
rc = msg_->close();
errno_assert (rc == 0);
rc = msg_->init_size (identity.size ());
rc = msg_->init_size (routing_id.size ());
errno_assert (rc == 0);
// forward metadata (if any)
@ -242,11 +242,11 @@ int zmq::stream_t::xrecv (msg_t *msg_)
if (metadata)
msg_->set_metadata(metadata);
memcpy (msg_->data (), identity.data (), identity.size ());
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
msg_->set_flags (msg_t::more);
prefetched = true;
identity_sent = true;
routing_id_sent = true;
return 0;
}
@ -267,20 +267,20 @@ bool zmq::stream_t::xhas_in ()
zmq_assert (pipe != NULL);
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ());
blob_t routing_id = pipe->get_routing_id ();
rc = prefetched_routing_id.init_size (routing_id.size ());
errno_assert (rc == 0);
// forward metadata (if any)
metadata_t *metadata = prefetched_msg.metadata();
if (metadata)
prefetched_id.set_metadata(metadata);
prefetched_routing_id.set_metadata(metadata);
memcpy (prefetched_id.data (), identity.data (), identity.size ());
prefetched_id.set_flags (msg_t::more);
memcpy (prefetched_routing_id.data (), routing_id.data (), routing_id.size ());
prefetched_routing_id.set_flags (msg_t::more);
prefetched = true;
identity_sent = false;
routing_id_sent = false;
return true;
}
@ -295,27 +295,27 @@ bool zmq::stream_t::xhas_out ()
void zmq::stream_t::identify_peer (pipe_t *pipe_)
{
// Always assign identity for raw-socket
// Always assign routing id for raw-socket
unsigned char buffer [5];
buffer [0] = 0;
blob_t identity;
if (connect_rid.length ()) {
identity = blob_t ((unsigned char*) connect_rid.c_str(),
connect_rid.length ());
connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
blob_t routing_id;
if (connect_routing_id.length ()) {
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str(),
connect_routing_id.length ());
connect_routing_id.clear ();
outpipes_t::iterator it = outpipes.find (routing_id);
zmq_assert (it == outpipes.end ());
}
else {
put_uint32 (buffer + 1, next_rid++);
identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = (unsigned char) identity.size ();
put_uint32 (buffer + 1, next_integral_routing_id++);
routing_id = blob_t (buffer, sizeof buffer);
memcpy (options.routing_id, routing_id.data (), routing_id.size ());
options.routing_id_size = (unsigned char) routing_id.size ();
}
pipe_->set_identity (identity);
pipe_->set_router_socket_routing_id (routing_id);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
const bool ok = outpipes.insert (
outpipes_t::value_type (identity, outpipe)).second;
outpipes_t::value_type (routing_id, outpipe)).second;
zmq_assert (ok);
}

View File

@ -70,10 +70,10 @@ namespace zmq
// If true, the receiver got the message part with
// the peer's identity.
bool identity_sent;
bool routing_id_sent;
// Holds the prefetched identity.
msg_t prefetched_id;
msg_t prefetched_routing_id;
// Holds the prefetched message.
msg_t prefetched_msg;
@ -96,7 +96,7 @@ namespace zmq
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_rid;
uint32_t next_integral_routing_id;
stream_t (const stream_t&);
const stream_t &operator = (const stream_t&);

View File

@ -81,8 +81,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
options (options_),
endpoint (endpoint_),
plugged (false),
next_msg (&stream_engine_t::identity_msg),
process_msg (&stream_engine_t::process_identity_msg),
next_msg (&stream_engine_t::routing_id_msg),
process_msg (&stream_engine_t::process_routing_id_msg),
io_error (false),
subscription_required (false),
mechanism (NULL),
@ -229,11 +229,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// start optional timer, to prevent handshake hanging on no input
set_handshake_timer ();
// Send the 'length' and 'flags' fields of the identity message.
// Send the 'length' and 'flags' fields of the routing id message.
// The 'length' field is encoded in the long format.
outpos = greeting_send;
outpos [outsize++] = 0xff;
put_uint64 (&outpos [outsize], options.identity_size + 1);
put_uint64 (&outpos [outsize], options.routing_id_size + 1);
outsize += 8;
outpos [outsize++] = 0x7f;
}
@ -520,7 +520,7 @@ bool zmq::stream_engine_t::handshake ()
// Inspect the right-most bit of the 10th byte (which coincides
// with the 'flags' field if a regular message was sent).
// Zero indicates this is a header of identity message
// Zero indicates this is a header of a routing id message
// (i.e. the peer is using the unversioned protocol).
if (!(greeting_recv [9] & 0x01))
break;
@ -575,7 +575,7 @@ bool zmq::stream_engine_t::handshake ()
const size_t revision_pos = 10;
// Is the peer using ZMTP/1.0 with no revision number?
// If so, we send and receive rest of identity message
// If so, we send and receive rest of routing id message
if (greeting_recv [0] != 0xff || !(greeting_recv [9] & 0x01)) {
if (session->zap_enabled ()) {
// reject ZMTP 1.0 connections if ZAP is enabled
@ -593,14 +593,14 @@ bool zmq::stream_engine_t::handshake ()
// Since there is no way to tell the encoder to
// skip the message header, we simply throw that
// header data away.
const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2;
const size_t header_size = options.routing_id_size + 1 >= 255 ? 10 : 2;
unsigned char tmp [10], *bufferp = tmp;
// Prepare the identity message and load it into encoder.
// Prepare the routing id message and load it into encoder.
// Then consume bytes we have already sent to the peer.
const int rc = tx_msg.init_size (options.identity_size);
const int rc = tx_msg.init_size (options.routing_id_size);
zmq_assert (rc == 0);
memcpy (tx_msg.data (), options.identity, options.identity_size);
memcpy (tx_msg.data (), options.routing_id, options.routing_id_size);
encoder->load_msg (&tx_msg);
size_t buffer_size = encoder->encode (&bufferp, header_size);
zmq_assert (buffer_size == header_size);
@ -615,12 +615,12 @@ bool zmq::stream_engine_t::handshake ()
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
subscription_required = true;
// We are sending our identity now and the next message
// We are sending our routing id now and the next message
// will come from the socket.
next_msg = &stream_engine_t::pull_msg_from_session;
// We are expecting identity message.
process_msg = &stream_engine_t::process_identity_msg;
// We are expecting routing id message.
process_msg = &stream_engine_t::process_routing_id_msg;
}
else
if (greeting_recv [revision_pos] == ZMTP_1_0) {
@ -729,20 +729,20 @@ bool zmq::stream_engine_t::handshake ()
return true;
}
int zmq::stream_engine_t::identity_msg (msg_t *msg_)
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
{
int rc = msg_->init_size (options.identity_size);
int rc = msg_->init_size (options.routing_id_size);
errno_assert (rc == 0);
if (options.identity_size > 0)
memcpy (msg_->data (), options.identity, options.identity_size);
if (options.routing_id_size > 0)
memcpy (msg_->data (), options.routing_id, options.routing_id_size);
next_msg = &stream_engine_t::pull_msg_from_session;
return 0;
}
int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
int zmq::stream_engine_t::process_routing_id_msg (msg_t *msg_)
{
if (options.recv_identity) {
msg_->set_flags (msg_t::identity);
if (options.recv_routing_id) {
msg_->set_flags (msg_t::routing_id);
int rc = session->push_msg (msg_);
errno_assert (rc == 0);
}
@ -839,14 +839,14 @@ void zmq::stream_engine_t::mechanism_ready ()
has_heartbeat_timer = true;
}
if (options.recv_identity) {
msg_t identity;
mechanism->peer_identity (&identity);
const int rc = session->push_msg (&identity);
if (options.recv_routing_id) {
msg_t routing_id;
mechanism->peer_routing_id (&routing_id);
const int rc = session->push_msg (&routing_id);
if (rc == -1 && errno == EAGAIN) {
// If the write is failing at this stage with
// an EAGAIN the pipe must be being shut down,
// so we can just bail out of the identity set.
// so we can just bail out of the routing id set.
return;
}
errno_assert (rc == 0);

View File

@ -99,8 +99,8 @@ namespace zmq
// Detects the protocol used by the peer.
bool handshake ();
int identity_msg (msg_t *msg_);
int process_identity_msg (msg_t *msg_);
int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_);
int next_handshake_command (msg_t *msg);
int process_handshake_command (msg_t *msg);

View File

@ -104,10 +104,10 @@ void zap_client_t::send_zap_request (const char *mechanism,
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);
// Identity frame
rc = msg.init_size (options.identity_size);
// Routing id frame
rc = msg.init_size (options.routing_id_size);
errno_assert (rc == 0);
memcpy (msg.data (), options.identity, options.identity_size);
memcpy (msg.data (), options.routing_id, options.routing_id_size);
msg.set_flags (msg_t::more);
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);

View File

@ -1359,14 +1359,14 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event
// Peer-specific state
int zmq_socket_get_peer_state (void *s_,
const void *identity,
size_t identity_size)
const void *routing_id_,
size_t routing_id_size_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->get_peer_state (identity, identity_size);
return s->get_peer_state (routing_id_, routing_id_size_);
}
// Timers

View File

@ -102,7 +102,7 @@ int zmq_msg_set_group(zmq_msg_t *msg, const char *group);
const char *zmq_msg_group(zmq_msg_t *msg);
/* DRAFT Msg property names. */
#define ZMQ_MSG_PROPERTY_IDENTITY "Identity"
#define ZMQ_MSG_PROPERTY_ROUTING_ID "Routing-Id"
#define ZMQ_MSG_PROPERTY_SOCKET_TYPE "Socket-Type"
#define ZMQ_MSG_PROPERTY_USER_ID "User-Id"
#define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address"
@ -142,8 +142,8 @@ int zmq_poller_remove_fd (void *poller, int fd);
#endif
int zmq_socket_get_peer_state (void *socket,
const void *identity,
size_t identity_size);
const void *routing_id,
size_t routing_id_size);
/******************************************************************************/
/* Scheduling timers */

View File

@ -61,13 +61,13 @@ void test_stream_2_stream(){
assert (0 == ret);
// Do the connection.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, my_endpoint);
/* Uncomment to test assert on duplicate rid.
/* Uncomment to test assert on duplicate routing id.
// Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);
@ -126,18 +126,18 @@ void test_router_2_router(bool named){
// If we're in named mode, set some identities.
if (named) {
ret = zmq_setsockopt (rbind, ZMQ_IDENTITY, "X", 1);
ret = zmq_setsockopt (rconn1, ZMQ_IDENTITY, "Y", 1);
ret = zmq_setsockopt (rbind, ZMQ_ROUTING_ID, "X", 1);
ret = zmq_setsockopt (rconn1, ZMQ_ROUTING_ID, "Y", 1);
}
// Make call to connect using a connect_rid.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
// Make call to connect using a connect_routing_id.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, my_endpoint);
assert (0 == ret);
/* Uncomment to test assert on duplicate rid
/* Uncomment to test assert on duplicate routing id
// Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);

View File

@ -372,7 +372,7 @@ void test_simultaneous_connect_bind_threads ()
assert (rc == 0);
}
void test_identity ()
void test_routing_id ()
{
// Create the infrastructure
void *ctx = zmq_ctx_new ();
@ -381,13 +381,13 @@ void test_identity ()
void *sc = zmq_socket (ctx, ZMQ_DEALER);
assert (sc);
int rc = zmq_connect (sc, "inproc://identity");
int rc = zmq_connect (sc, "inproc://routing_id");
assert (rc == 0);
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb);
rc = zmq_bind (sb, "inproc://identity");
rc = zmq_bind (sb, "inproc://routing_id");
assert (rc == 0);
// Send 2-part message.
@ -396,7 +396,7 @@ void test_identity ()
rc = zmq_send (sc, "B", 1, 0);
assert (rc == 1);
// Identity comes first.
// Routing id comes first.
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
@ -527,7 +527,7 @@ int main (void)
test_multiple_connects ();
test_multiple_threads ();
test_simultaneous_connect_bind_threads ();
test_identity ();
test_routing_id ();
test_connect_only ();
test_unbind ();
test_shutdown_during_pend ();

View File

@ -57,12 +57,12 @@ int main (void)
// Repeat often enough to be sure this works as it should
for (int cycle = 0; cycle < 100; cycle++) {
// Create dealer with unique explicit identity
// Create dealer with unique explicit routing id
// We assume the router learns this out-of-band
void *dealer = zmq_socket (ctx2, ZMQ_DEALER);
char identity [10];
sprintf (identity, "%09d", cycle);
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, identity, 10);
char routing_id [10];
sprintf (routing_id, "%09d", cycle);
rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, routing_id, 10);
assert (rc == 0);
int rcvtimeo = 1000;
rc = zmq_setsockopt (dealer, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
@ -77,7 +77,7 @@ int main (void)
// a very slow system).
for (int attempt = 0; attempt < 500; attempt++) {
zmq_poll (0, 0, 2);
rc = zmq_send (router, identity, 10, ZMQ_SNDMORE);
rc = zmq_send (router, routing_id, 10, ZMQ_SNDMORE);
if (rc == -1 && errno == EHOSTUNREACH)
continue;
assert (rc == 10);

View File

@ -46,7 +46,7 @@ zap_handler (void *handler)
char *sequence = s_recv (handler);
char *domain = s_recv (handler);
char *address = s_recv (handler);
char *identity = s_recv (handler);
char *routing_id = s_recv (handler);
char *mechanism = s_recv (handler);
assert (streq (version, "1.0"));
@ -70,7 +70,7 @@ zap_handler (void *handler)
free (sequence);
free (domain);
free (address);
free (identity);
free (routing_id);
free (mechanism);
}
close_zero_linger (handler);

View File

@ -54,7 +54,7 @@ int main (void)
rc = zmq_send (sc, "B", 1, 0);
assert (rc == 1);
// Identity comes first.
// Routing id comes first.
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);

View File

@ -48,7 +48,7 @@ int main (void)
// Create client and connect to server, doing a probe
void *client = zmq_socket (ctx, ZMQ_ROUTER);
assert (client);
rc = zmq_setsockopt (client, ZMQ_IDENTITY, "X", 1);
rc = zmq_setsockopt (client, ZMQ_ROUTING_ID, "X", 1);
assert (rc == 0);
int probe = 1;
rc = zmq_setsockopt (client, ZMQ_PROBE_ROUTER, &probe, sizeof (probe));
@ -56,7 +56,7 @@ int main (void)
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
// We expect an identity=X + empty message from client
// We expect a routing id=X + empty message from client
unsigned char buffer [255];
rc = zmq_recv (server, buffer, 255, 0);
assert (rc == 1);

View File

@ -44,8 +44,8 @@
#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
#define ID_SIZE 10
#define ID_SIZE_MAX 32
#define ROUTING_ID_SIZE 10
#define ROUTING_ID_SIZE_MAX 32
#define QT_WORKERS 5
#define QT_CLIENTS 3
#define is_verbose 0
@ -104,10 +104,10 @@ client_task (void *db)
assert (rc == 0);
char content [CONTENT_SIZE_MAX];
// Set random identity to make tracing easier
char identity [ID_SIZE];
sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF);
rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf
// Set random routing id to make tracing easier
char routing_id [ROUTING_ID_SIZE];
sprintf (routing_id, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF);
rc = zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id, ROUTING_ID_SIZE); // includes '\0' as an helper for printf
assert (rc == 0);
linger = 0;
rc = zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger));
@ -129,7 +129,7 @@ client_task (void *db)
size_t sz = sizeof (rcvmore);
rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
assert (rc == CONTENT_SIZE);
if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content);
if (is_verbose) printf("client receive - routing_id = %s content = %s\n", routing_id, content);
// Check that message is still the same
assert (memcmp (content, "request #", 9) == 0);
rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
@ -142,7 +142,7 @@ client_task (void *db)
if (rc > 0)
{
content[rc] = 0; // NULL-terminate the command string
if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
if (is_verbose) printf("client receive - routing_id = %s command = %s\n", routing_id, content);
if (memcmp (content, "TERMINATE", 9) == 0) {
run = false;
break;
@ -158,7 +158,7 @@ client_task (void *db)
if (keep_sending)
{
sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
if (is_verbose) printf("client send - identity = %s request #%03d\n", identity, request_nbr);
if (is_verbose) printf("client send - routing_id = %s request #%03d\n", routing_id, request_nbr);
zmq_atomic_counter_inc(g_clients_pkts_out);
rc = zmq_send (client, content, CONTENT_SIZE, 0);
@ -285,7 +285,7 @@ server_worker (void *ctx)
assert (rc == 0);
char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that
char identity [ID_SIZE_MAX]; // the size received is the size sent
char routing_id [ROUTING_ID_SIZE_MAX]; // the size received is the size sent
bool run = true;
bool keep_sending = true;
@ -302,12 +302,12 @@ server_worker (void *ctx)
}
// The DEALER socket gives us the reply envelope and message
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT);
if (rc == ID_SIZE) {
rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
if (rc == ROUTING_ID_SIZE) {
rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
assert (rc == CONTENT_SIZE);
if (is_verbose)
printf ("server receive - identity = %s content = %s\n", identity, content);
printf ("server receive - routing_id = %s content = %s\n", routing_id, content);
// Send 0..4 replies back
if (keep_sending)
@ -318,11 +318,11 @@ server_worker (void *ctx)
msleep (rand () % 10 + 1);
// Send message from server to client
if (is_verbose) printf("server send - identity = %s reply\n", identity);
if (is_verbose) printf("server send - routing_id = %s reply\n", routing_id);
zmq_atomic_counter_inc(g_workers_pkts_out);
rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
assert (rc == ID_SIZE);
rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE, ZMQ_SNDMORE);
assert (rc == ROUTING_ID_SIZE);
rc = zmq_send (worker, content, CONTENT_SIZE, 0);
assert (rc == CONTENT_SIZE);
}

View File

@ -65,7 +65,7 @@ int main (void)
zmq_msg_t msg;
zmq_msg_init (&msg);
// Receive peer identity
// Receive peer routing id
rc = zmq_msg_recv (&msg, router, 0);
assert (rc != -1);
assert (zmq_msg_size (&msg) > 0);

View File

@ -53,7 +53,7 @@ int main (void)
// Create dealer called "X" and connect it to our router
void *dealer_one = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer_one);
rc = zmq_setsockopt (dealer_one, ZMQ_IDENTITY, "X", 1);
rc = zmq_setsockopt (dealer_one, ZMQ_ROUTING_ID, "X", 1);
assert (rc == 0);
rc = zmq_connect (dealer_one, my_endpoint);
assert (rc == 0);
@ -68,10 +68,10 @@ int main (void)
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 5);
// Now create a second dealer that uses the same identity
// Now create a second dealer that uses the same routing id
void *dealer_two = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer_two);
rc = zmq_setsockopt (dealer_two, ZMQ_IDENTITY, "X", 1);
rc = zmq_setsockopt (dealer_two, ZMQ_ROUTING_ID, "X", 1);
assert (rc == 0);
rc = zmq_connect (dealer_two, my_endpoint);
assert (rc == 0);
@ -85,7 +85,7 @@ int main (void)
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 5);
// Send a message to 'X' identity. This should be delivered
// Send a message to 'X' routing id. This should be delivered
// to the second dealer, instead of the first beccause of the handover.
rc = zmq_send (router, "X", 1, ZMQ_SNDMORE);
assert (rc == 1);

View File

@ -30,15 +30,15 @@
#include "testutil.hpp"
#ifdef ZMQ_BUILD_DRAFT_API
bool send_msg_to_peer_if_ready (void *router, const char *peer_identity)
bool send_msg_to_peer_if_ready (void *router, const char *peer_routing_id)
{
int rc = zmq_socket_get_peer_state (router, peer_identity, 1);
int rc = zmq_socket_get_peer_state (router, peer_routing_id, 1);
if (rc == -1)
printf ("zmq_socket_get_peer_state failed for %s: %i\n", peer_identity,
printf ("zmq_socket_get_peer_state failed for %s: %i\n", peer_routing_id,
errno);
assert (rc != -1);
if (rc & ZMQ_POLLOUT) {
rc = zmq_send (router, peer_identity, 1, ZMQ_SNDMORE | ZMQ_DONTWAIT);
rc = zmq_send (router, peer_routing_id, 1, ZMQ_SNDMORE | ZMQ_DONTWAIT);
assert (rc == 1);
rc = zmq_send (router, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == 5);
@ -81,17 +81,17 @@ void test_get_peer_state ()
rc = zmq_setsockopt (dealer2, ZMQ_RCVHWM, &hwm, sizeof (int));
assert (rc == 0);
const char *dealer1_identity = "X";
const char *dealer2_identity = "Y";
const char *dealer1_routing_id = "X";
const char *dealer2_routing_id = "Y";
// Name dealer1 "X" and connect it to our router
rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, dealer1_identity, 1);
rc = zmq_setsockopt (dealer1, ZMQ_ROUTING_ID, dealer1_routing_id, 1);
assert (rc == 0);
rc = zmq_connect (dealer1, my_endpoint);
assert (rc == 0);
// Name dealer2 "Y" and connect it to our router
rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, dealer2_identity, 1);
rc = zmq_setsockopt (dealer2, ZMQ_ROUTING_ID, dealer2_routing_id, 1);
assert (rc == 0);
rc = zmq_connect (dealer2, my_endpoint);
assert (rc == 0);
@ -102,7 +102,7 @@ void test_get_peer_state ()
assert (rc == 5);
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 1);
assert (0 == memcmp (buffer, dealer1_identity, rc));
assert (0 == memcmp (buffer, dealer1_routing_id, rc));
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 5);
@ -110,7 +110,7 @@ void test_get_peer_state ()
assert (rc == 5);
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 1);
assert (0 == memcmp (buffer, dealer2_identity, rc));
assert (0 == memcmp (buffer, dealer2_routing_id, rc));
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 5);
@ -135,10 +135,10 @@ void test_get_peer_state ()
const zmq_poller_event_t &current_event = events[event_no];
if (current_event.socket == router
&& current_event.events & ZMQ_POLLOUT) {
if (send_msg_to_peer_if_ready (router, dealer1_identity))
if (send_msg_to_peer_if_ready (router, dealer1_routing_id))
++dealer1_sent;
if (send_msg_to_peer_if_ready (router, dealer2_identity))
if (send_msg_to_peer_if_ready (router, dealer2_routing_id))
++dealer2_sent;
else
dealer2_blocked = true;
@ -180,11 +180,11 @@ void test_get_peer_state ()
void test_get_peer_state_corner_cases ()
{
#ifdef ZMQ_BUILD_DRAFT_API
const char peer_identity[] = "foo";
const char peer_routing_id[] = "foo";
// call get_peer_state with NULL socket
int rc =
zmq_socket_get_peer_state (NULL, peer_identity, strlen (peer_identity));
zmq_socket_get_peer_state (NULL, peer_routing_id, strlen (peer_routing_id));
assert (rc == -1 && errno == ENOTSOCK);
void *ctx = zmq_ctx_new ();
@ -196,12 +196,12 @@ void test_get_peer_state_corner_cases ()
// call get_peer_state with a non-ROUTER socket
rc =
zmq_socket_get_peer_state (dealer, peer_identity, strlen (peer_identity));
zmq_socket_get_peer_state (dealer, peer_routing_id, strlen (peer_routing_id));
assert (rc == -1 && errno == ENOTSUP);
// call get_peer_state for an unknown identity
// call get_peer_state for an unknown routing id
rc =
zmq_socket_get_peer_state (router, peer_identity, strlen (peer_identity));
zmq_socket_get_peer_state (router, peer_routing_id, strlen (peer_routing_id));
assert (rc == -1 && errno == EHOSTUNREACH);
rc = zmq_close (router);
@ -250,7 +250,7 @@ void test_basic ()
// Create dealer called "X" and connect it to our router
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, "X", 1);
rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, "X", 1);
assert (rc == 0);
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);

View File

@ -67,7 +67,7 @@ int main (void)
// Create dealer called "X" and connect it to our router, configure HWM
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, "X", 1);
rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, "X", 1);
assert (rc == 0);
int rcvhwm = 1;
rc = zmq_setsockopt (dealer, ZMQ_RCVHWM, &rcvhwm, sizeof (rcvhwm));

View File

@ -45,7 +45,7 @@
#include "../src/curve_client_tools.hpp"
#include "../src/random.hpp"
const char large_identity[] = "0123456789012345678901234567890123456789"
const char large_routing_id[] = "0123456789012345678901234567890123456789"
"0123456789012345678901234567890123456789"
"0123456789012345678901234567890123456789"
"0123456789012345678901234567890123456789"
@ -53,9 +53,9 @@ const char large_identity[] = "0123456789012345678901234567890123456789"
"0123456789012345678901234567890123456789"
"012345678901234";
static void zap_handler_large_identity (void *ctx)
static void zap_handler_large_routing_id (void *ctx)
{
zap_handler_generic (ctx, zap_ok, large_identity);
zap_handler_generic (ctx, zap_ok, large_routing_id);
}
void expect_new_client_curve_bounce_fail (void *ctx,
@ -656,7 +656,7 @@ int main (void)
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint);
test_null_key (ctx, server, server_mon, my_endpoint, null_key,
valid_client_public, valid_client_secret);
valid_client_public, valid_client_secret);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler);
@ -666,7 +666,7 @@ int main (void)
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint);
test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
null_key, valid_client_secret);
null_key, valid_client_secret);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler);
@ -676,7 +676,7 @@ int main (void)
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint);
test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
valid_client_public, null_key);
valid_client_public, null_key);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler);
@ -750,7 +750,8 @@ int main (void)
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler);
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_cookie\n");
fprintf (stderr,
"test_curve_security_invalid_initiate_command_encrypted_cookie\n");
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint);
test_curve_security_invalid_initiate_command_encrypted_cookie (
@ -758,7 +759,9 @@ int main (void)
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler);
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_content\n");
fprintf (
stderr,
"test_curve_security_invalid_initiate_command_encrypted_content\n");
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint);
test_curve_security_invalid_initiate_command_encrypted_content (
@ -766,16 +769,17 @@ int main (void)
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler);
// test with a large identity (resulting in large metadata)
fprintf (stderr, "test_curve_security_with_valid_credentials (large identity)\n");
// test with a large routing id (resulting in large metadata)
fprintf (stderr,
"test_curve_security_with_valid_credentials (large routing id)\n");
setup_context_and_server_side (
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
&zap_handler_large_identity, &socket_config_curve_server, &valid_server_secret,
large_identity);
&zap_handler_large_routing_id, &socket_config_curve_server,
&valid_server_secret, large_routing_id);
test_curve_security_with_valid_credentials (ctx, my_endpoint, server,
server_mon, timeout);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler);
handler);
ctx = zmq_ctx_new ();
test_curve_security_invalid_keysize (ctx);

View File

@ -111,7 +111,7 @@ static void zap_handler (void *handler)
char *sequence = s_recv (handler);
char *domain = s_recv (handler);
char *address = s_recv (handler);
char *identity = s_recv (handler);
char *routing_id = s_recv (handler);
char *mechanism = s_recv (handler);
char *principal = s_recv (handler);
@ -139,7 +139,7 @@ static void zap_handler (void *handler)
free (sequence);
free (domain);
free (address);
free (identity);
free (routing_id);
free (mechanism);
free (principal);
}

View File

@ -52,7 +52,7 @@ zap_handler (void *handler)
char *sequence = s_recv (handler);
char *domain = s_recv (handler);
char *address = s_recv (handler);
char *identity = s_recv (handler);
char *routing_id = s_recv (handler);
char *mechanism = s_recv (handler);
assert (streq (version, "1.0"));
@ -76,7 +76,7 @@ zap_handler (void *handler)
free (sequence);
free (domain);
free (address);
free (identity);
free (routing_id);
free (mechanism);
}
close_zero_linger (handler);

View File

@ -57,14 +57,14 @@ zap_handler (void *ctx)
char *sequence = s_recv (zap);
char *domain = s_recv (zap);
char *address = s_recv (zap);
char *identity = s_recv (zap);
char *routing_id = s_recv (zap);
char *mechanism = s_recv (zap);
char *username = s_recv (zap);
char *password = s_recv (zap);
assert (streq (version, "1.0"));
assert (streq (mechanism, "PLAIN"));
assert (streq (identity, "IDENT"));
assert (streq (routing_id, "IDENT"));
s_sendmore (zap, version);
s_sendmore (zap, sequence);
@ -85,7 +85,7 @@ zap_handler (void *ctx)
free (sequence);
free (domain);
free (address);
free (identity);
free (routing_id);
free (mechanism);
free (username);
free (password);
@ -108,7 +108,7 @@ int main (void)
// Server socket will accept connections
void *server = zmq_socket (ctx, ZMQ_DEALER);
assert (server);
int rc = zmq_setsockopt (server, ZMQ_IDENTITY, "IDENT", 6);
int rc = zmq_setsockopt (server, ZMQ_ROUTING_ID, "IDENT", 6);
const char domain[] = "test";
assert (rc == 0);
rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, domain, strlen (domain));

View File

@ -82,7 +82,7 @@ void test_req_only_listens_to_current_peer (void *ctx)
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2);
int rc = zmq_setsockopt(req, ZMQ_ROUTING_ID, "A", 2);
assert (rc == 0);
rc = zmq_bind (req, bind_address);
@ -167,7 +167,7 @@ void test_req_message_format (void *ctx)
zmq_msg_t msg;
zmq_msg_init (&msg);
// Receive peer identity
// Receive peer routing id
rc = zmq_msg_recv (&msg, router, 0);
assert (rc != -1);
assert (zmq_msg_size (&msg) > 0);

View File

@ -58,7 +58,7 @@ void test_fair_queue_in (void *ctx)
char *str = strdup("A");
str [0] += peer;
rc = zmq_setsockopt (senders [peer], ZMQ_IDENTITY, str, 2);
rc = zmq_setsockopt (senders [peer], ZMQ_ROUTING_ID, str, 2);
assert (rc == 0);
free (str);
@ -130,7 +130,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
void *B = zmq_socket (ctx, ZMQ_DEALER);
assert (B);
rc = zmq_setsockopt (B, ZMQ_IDENTITY, "B", 2);
rc = zmq_setsockopt (B, ZMQ_ROUTING_ID, "B", 2);
assert (rc == 0);
rc = zmq_connect (B, connect_address);

View File

@ -92,16 +92,16 @@ test_stream_to_dealer (void)
assert (rc == 5);
// Connecting sends a zero message
// First frame is identity
zmq_msg_t identity;
rc = zmq_msg_init (&identity);
// First frame is routing id
zmq_msg_t routing_id;
rc = zmq_msg_init (&routing_id);
assert (rc == 0);
rc = zmq_msg_recv (&identity, stream, 0);
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
assert (zmq_msg_more (&routing_id));
// Verify the existence of Peer-Address metadata
char const *peer_address = zmq_msg_gets (&identity, "Peer-Address");
char const *peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
assert (peer_address != 0);
assert (streq (peer_address, "127.0.0.1"));
@ -111,18 +111,18 @@ test_stream_to_dealer (void)
assert (rc == 0);
// Verify the existence of Peer-Address metadata
peer_address = zmq_msg_gets (&identity, "Peer-Address");
peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
assert (peer_address != 0);
assert (streq (peer_address, "127.0.0.1"));
// Real data follows
// First frame is identity
rc = zmq_msg_recv (&identity, stream, 0);
// First frame is routing id
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
assert (zmq_msg_more (&routing_id));
// Verify the existence of Peer-Address metadata
peer_address = zmq_msg_gets (&identity, "Peer-Address");
peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
assert (peer_address != 0);
assert (streq (peer_address, "127.0.0.1"));
@ -132,7 +132,7 @@ test_stream_to_dealer (void)
assert (memcmp (buffer, greeting.signature, 10) == 0);
// Send our own protocol greeting
rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE);
rc = zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE);
assert (rc > 0);
rc = zmq_send (stream, &greeting, sizeof (greeting), 0);
assert (rc == sizeof (greeting));
@ -141,10 +141,10 @@ test_stream_to_dealer (void)
// We want the rest of greeting along with the Ready command
int bytes_read = 0;
while (bytes_read < 97) {
// First frame is the identity of the connection (each time)
rc = zmq_msg_recv (&identity, stream, 0);
// First frame is the routing id of the connection (each time)
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
assert (zmq_msg_more (&routing_id));
// Second frame contains the next chunk of data
rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0);
assert (rc >= 0);
@ -167,16 +167,16 @@ test_stream_to_dealer (void)
memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);
// Send Ready command
rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE);
rc = zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE);
assert (rc > 0);
rc = zmq_send (stream, buffer, 43, 0);
assert (rc == 43);
// Now we expect the data from the DEALER socket
// First frame is, again, the identity of the connection
rc = zmq_msg_recv (&identity, stream, 0);
// First frame is, again, the routing id of the connection
rc = zmq_msg_recv (&routing_id, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
assert (zmq_msg_more (&routing_id));
// Third frame contains Hello message from DEALER
rc = zmq_recv (stream, buffer, sizeof buffer, 0);
@ -188,7 +188,7 @@ test_stream_to_dealer (void)
assert (memcmp (buffer + 2, "Hello", 5) == 0);
// Send "World" back to DEALER
rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE);
rc = zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE);
assert (rc > 0);
byte world [] = { 0, 5, 'W', 'o', 'r', 'l', 'd' };
rc = zmq_send (stream, world, sizeof (world), 0);
@ -209,7 +209,7 @@ test_stream_to_dealer (void)
memset (msgin, 0, 9 + size);
bytes_read = 0;
while (bytes_read < 9 + size) {
// Get identity frame
// Get routing id frame
rc = zmq_recv (stream, buffer, 256, 0);
assert (rc > 0);
// Get next chunk
@ -264,22 +264,22 @@ test_stream_to_stream (void)
uint8_t buffer [256];
// Connecting sends a zero message
// Server: First frame is identity, second frame is zero
// Server: First frame is routing id, second frame is zero
id_size = zmq_recv (server, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (server, buffer, 256, 0);
assert (rc == 0);
// Client: First frame is identity, second frame is zero
// Client: First frame is routing id, second frame is zero
id_size = zmq_recv (client, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (client, buffer, 256, 0);
assert (rc == 0);
// Sent HTTP request on client socket
// Get server identity
rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size);
// Get server routing id
rc = zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size);
assert (rc == 0);
// First frame is server identity
// First frame is server routing id
rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
assert (rc == (int) id_size);
// Second frame is HTTP GET request

View File

@ -55,9 +55,9 @@ bool has_more (void* socket)
return more != 0;
}
bool get_identity (void* socket, char* data, size_t* size)
bool get_routing_id (void* socket, char* data, size_t* size)
{
int rc = zmq_getsockopt (socket, ZMQ_IDENTITY, data, size);
int rc = zmq_getsockopt (socket, ZMQ_ROUTING_ID, data, size);
return rc == 0;
}
@ -97,7 +97,7 @@ int main(int, char**)
assert (rc == 0);
// wait for connect notification
// Server: Grab the 1st frame (peer identity).
// Server: Grab the 1st frame (peer routing id).
zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
@ -118,7 +118,7 @@ int main(int, char**)
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
// Client: Grab the 1st frame (peer identity).
// Client: Grab the 1st frame (peer routing id).
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
@ -140,7 +140,7 @@ int main(int, char**)
// Send initial message.
char blob_data [256];
size_t blob_size = sizeof(blob_data);
rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size);
rc = zmq_getsockopt (sockets [CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size);
assert (rc != -1);
assert(blob_size > 0);
zmq_msg_t msg;
@ -176,7 +176,7 @@ int main(int, char**)
if (items [SERVER].revents & ZMQ_POLLIN) {
assert (dialog [step].turn == CLIENT);
// Grab the 1st frame (peer identity).
// Grab the 1st frame (peer routing id).
zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
@ -237,7 +237,7 @@ int main(int, char**)
if (items [CLIENT].revents & ZMQ_POLLIN) {
assert (dialog [step].turn == SERVER);
// Grab the 1st frame (peer identity).
// Grab the 1st frame (peer routing id).
zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);

View File

@ -81,7 +81,7 @@ void socket_config_plain_server (void *server, void *server_secret)
rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, test_zap_domain,
strlen (test_zap_domain));
assert(rc == 0);
assert (rc == 0);
}
// CURVE specific functions
@ -113,7 +113,7 @@ void socket_config_curve_server (void *server, void *server_secret)
rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, test_zap_domain,
strlen (test_zap_domain));
assert(rc == 0);
assert (rc == 0);
}
struct curve_client_data_t
@ -163,7 +163,7 @@ void *zap_requests_handled;
void zap_handler_generic (void *ctx,
zap_protocol_t zap_protocol,
const char *expected_identity = "IDENT")
const char *expected_routing_id = "IDENT")
{
void *control = zmq_socket (ctx, ZMQ_REQ);
assert (control);
@ -202,8 +202,7 @@ void zap_handler_generic (void *ctx,
char *version = s_recv (handler);
if (!version)
break; // Terminating - peer's socket closed
if (zap_protocol == zap_disconnect)
{
if (zap_protocol == zap_disconnect) {
free (version);
break;
}
@ -211,7 +210,7 @@ void zap_handler_generic (void *ctx,
char *sequence = s_recv (handler);
char *domain = s_recv (handler);
char *address = s_recv (handler);
char *identity = s_recv (handler);
char *routing_id = s_recv (handler);
char *mechanism = s_recv (handler);
bool authentication_succeeded = false;
if (streq (mechanism, "CURVE")) {
@ -225,15 +224,15 @@ void zap_handler_generic (void *ctx,
authentication_succeeded =
streq (client_key_text, valid_client_public);
} else if (streq (mechanism, "PLAIN")) {
char client_username [32];
char client_username[32];
int size = zmq_recv (handler, client_username, 32, 0);
assert (size > 0);
client_username [size] = 0;
client_username[size] = 0;
char client_password [32];
char client_password[32];
size = zmq_recv (handler, client_password, 32, 0);
assert (size > 0);
client_password [size] = 0;
client_password[size] = 0;
authentication_succeeded =
streq (test_plain_username, client_username)
@ -246,7 +245,7 @@ void zap_handler_generic (void *ctx,
}
assert (streq (version, "1.0"));
assert (streq (identity, expected_identity));
assert (streq (routing_id, expected_routing_id));
s_sendmore (handler, zap_protocol == zap_wrong_version
? "invalid_version"
@ -283,13 +282,13 @@ void zap_handler_generic (void *ctx,
s_sendmore (handler, "Invalid client public key");
s_sendmore (handler, "");
if (zap_protocol != zap_do_not_send)
s_send(handler, "");
s_send (handler, "");
}
free (version);
free (sequence);
free (domain);
free (address);
free (identity);
free (routing_id);
free (mechanism);
zmq_atomic_counter_inc (zap_requests_handled);
@ -298,10 +297,9 @@ void zap_handler_generic (void *ctx,
assert (rc == 0);
close_zero_linger (handler);
if (zap_protocol != zap_disconnect)
{
rc = s_send(control, "STOPPED");
assert(rc == 7);
if (zap_protocol != zap_disconnect) {
rc = s_send (control, "STOPPED");
assert (rc == 7);
}
close_zero_linger (control);
}
@ -530,7 +528,7 @@ void setup_context_and_server_side (
zmq_thread_fn zap_handler_ = &zap_handler,
socket_config_fn socket_config_ = &socket_config_curve_server,
void *socket_config_data_ = valid_server_secret,
const char *identity = "IDENT")
const char *routing_id = "IDENT")
{
*ctx = zmq_ctx_new ();
assert (*ctx);
@ -560,7 +558,8 @@ void setup_context_and_server_side (
socket_config_ (*server, socket_config_data_);
rc = zmq_setsockopt (*server, ZMQ_IDENTITY, identity, strlen (identity));
rc =
zmq_setsockopt (*server, ZMQ_ROUTING_ID, routing_id, strlen (routing_id));
assert (rc == 0);
rc = zmq_bind (*server, "tcp://127.0.0.1:*");
@ -570,7 +569,7 @@ void setup_context_and_server_side (
rc = zmq_getsockopt (*server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
const char server_monitor_endpoint [] = "inproc://monitor-server";
const char server_monitor_endpoint[] = "inproc://monitor-server";
setup_handshake_socket_monitor (*ctx, *server, server_mon,
server_monitor_endpoint);
}
@ -592,7 +591,7 @@ void shutdown_context_and_server_side (void *ctx,
rc = zmq_unbind (zap_control, "inproc://handler-control");
assert (rc == 0);
}
close_zero_linger(zap_control);
close_zero_linger (zap_control);
#ifdef ZMQ_BUILD_DRAFT_API
close_zero_linger (server_mon);