mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 10:52:56 +01:00
Problem: inconsistent naming related to routing ids
Solution: renamed routing_id fields in pipe_t, renamed ZMQ_CONNECT_RID to ZMQ_CONNECT_ROUTING_ID
This commit is contained in:
parent
9e7507b38b
commit
41bae55af7
@ -345,7 +345,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
|
|||||||
#define ZMQ_ZAP_DOMAIN 55
|
#define ZMQ_ZAP_DOMAIN 55
|
||||||
#define ZMQ_ROUTER_HANDOVER 56
|
#define ZMQ_ROUTER_HANDOVER 56
|
||||||
#define ZMQ_TOS 57
|
#define ZMQ_TOS 57
|
||||||
#define ZMQ_CONNECT_RID 61
|
#define ZMQ_CONNECT_ROUTING_ID 61
|
||||||
#define ZMQ_GSSAPI_SERVER 62
|
#define ZMQ_GSSAPI_SERVER 62
|
||||||
#define ZMQ_GSSAPI_PRINCIPAL 63
|
#define ZMQ_GSSAPI_PRINCIPAL 63
|
||||||
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
|
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
|
||||||
@ -391,6 +391,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg, const char *property)
|
|||||||
|
|
||||||
/* Deprecated options and aliases */
|
/* Deprecated options and aliases */
|
||||||
#define ZMQ_IDENTITY ZMQ_ROUTING_ID
|
#define ZMQ_IDENTITY ZMQ_ROUTING_ID
|
||||||
|
#define ZMQ_CONNECT_RID ZMQ_CONNECT_ROUTING_ID
|
||||||
#define ZMQ_TCP_ACCEPT_FILTER 38
|
#define ZMQ_TCP_ACCEPT_FILTER 38
|
||||||
#define ZMQ_IPC_FILTER_PID 58
|
#define ZMQ_IPC_FILTER_PID 58
|
||||||
#define ZMQ_IPC_FILTER_UID 59
|
#define ZMQ_IPC_FILTER_UID 59
|
||||||
|
16
src/pipe.cpp
16
src/pipe.cpp
@ -92,7 +92,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
|||||||
sink (NULL),
|
sink (NULL),
|
||||||
state (active),
|
state (active),
|
||||||
delay (true),
|
delay (true),
|
||||||
integral_routing_id(0),
|
server_socket_routing_id (0),
|
||||||
conflate (conflate_)
|
conflate (conflate_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -115,24 +115,24 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
|
|||||||
sink = sink_;
|
sink = sink_;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pipe_t::set_integral_routing_id (uint32_t integral_routing_id_)
|
void zmq::pipe_t::set_server_socket_routing_id (uint32_t server_socket_routing_id_)
|
||||||
{
|
{
|
||||||
integral_routing_id = integral_routing_id_;
|
server_socket_routing_id = server_socket_routing_id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t zmq::pipe_t::get_integral_routing_id ()
|
uint32_t zmq::pipe_t::get_server_socket_routing_id ()
|
||||||
{
|
{
|
||||||
return integral_routing_id;
|
return server_socket_routing_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pipe_t::set_routing_id (const blob_t &routing_id_)
|
void zmq::pipe_t::set_router_socket_routing_id (const blob_t &router_socket_routing_id_)
|
||||||
{
|
{
|
||||||
routing_id = routing_id_;
|
router_socket_routing_id = router_socket_routing_id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::blob_t zmq::pipe_t::get_routing_id ()
|
zmq::blob_t zmq::pipe_t::get_routing_id ()
|
||||||
{
|
{
|
||||||
return routing_id;
|
return router_socket_routing_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::blob_t zmq::pipe_t::get_credential () const
|
zmq::blob_t zmq::pipe_t::get_credential () const
|
||||||
|
10
src/pipe.hpp
10
src/pipe.hpp
@ -85,11 +85,11 @@ namespace zmq
|
|||||||
void set_event_sink (i_pipe_events *sink_);
|
void set_event_sink (i_pipe_events *sink_);
|
||||||
|
|
||||||
// Pipe endpoint can store an routing ID to be used by its clients.
|
// Pipe endpoint can store an routing ID to be used by its clients.
|
||||||
void set_integral_routing_id (uint32_t routing_id_);
|
void set_server_socket_routing_id (uint32_t routing_id_);
|
||||||
uint32_t get_integral_routing_id ();
|
uint32_t get_server_socket_routing_id ();
|
||||||
|
|
||||||
// Pipe endpoint can store an opaque ID to be used by its clients.
|
// Pipe endpoint can store an opaque ID to be used by its clients.
|
||||||
void set_routing_id (const blob_t &identity_);
|
void set_router_socket_routing_id (const blob_t &identity_);
|
||||||
blob_t get_routing_id ();
|
blob_t get_routing_id ();
|
||||||
|
|
||||||
blob_t get_credential () const;
|
blob_t get_credential () const;
|
||||||
@ -227,10 +227,10 @@ namespace zmq
|
|||||||
bool delay;
|
bool delay;
|
||||||
|
|
||||||
// Identity of the writer. Used uniquely by the reader side.
|
// Identity of the writer. Used uniquely by the reader side.
|
||||||
blob_t routing_id;
|
blob_t router_socket_routing_id;
|
||||||
|
|
||||||
// Identity of the writer. Used uniquely by the reader side.
|
// Identity of the writer. Used uniquely by the reader side.
|
||||||
int integral_routing_id;
|
int server_socket_routing_id;
|
||||||
|
|
||||||
// Pipe's credential.
|
// Pipe's credential.
|
||||||
blob_t credential;
|
blob_t credential;
|
||||||
|
@ -102,9 +102,9 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
if (is_int) memcpy(&value, optval_, sizeof (int));
|
if (is_int) memcpy(&value, optval_, sizeof (int));
|
||||||
|
|
||||||
switch (option_) {
|
switch (option_) {
|
||||||
case ZMQ_CONNECT_RID:
|
case ZMQ_CONNECT_ROUTING_ID:
|
||||||
if (optval_ && optvallen_) {
|
if (optval_ && optvallen_) {
|
||||||
connect_rid.assign ((char *) optval_, optvallen_);
|
connect_routing_id.assign ((char *) optval_, optvallen_);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -470,10 +470,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
blob_t routing_id;
|
blob_t routing_id;
|
||||||
bool ok;
|
bool ok;
|
||||||
|
|
||||||
if (connect_rid.length()) {
|
if (connect_routing_id.length()) {
|
||||||
routing_id = blob_t ((unsigned char*) connect_rid.c_str (),
|
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str (),
|
||||||
connect_rid.length());
|
connect_routing_id.length());
|
||||||
connect_rid.clear ();
|
connect_routing_id.clear ();
|
||||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||||
if (it != outpipes.end ())
|
if (it != outpipes.end ())
|
||||||
zmq_assert(false); // Not allowed to duplicate an existing rid
|
zmq_assert(false); // Not allowed to duplicate an existing rid
|
||||||
@ -519,7 +519,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||||
blob_t new_routing_id = blob_t (buf, sizeof buf);
|
blob_t new_routing_id = blob_t (buf, sizeof buf);
|
||||||
|
|
||||||
it->second.pipe->set_routing_id (new_routing_id);
|
it->second.pipe->set_router_socket_routing_id (new_routing_id);
|
||||||
outpipe_t existing_outpipe =
|
outpipe_t existing_outpipe =
|
||||||
{it->second.pipe, it->second.active};
|
{it->second.pipe, it->second.active};
|
||||||
|
|
||||||
@ -540,7 +540,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pipe_->set_routing_id (routing_id);
|
pipe_->set_router_socket_routing_id (routing_id);
|
||||||
// Add the record into output pipes lookup table
|
// Add the record into output pipes lookup table
|
||||||
outpipe_t outpipe = {pipe_, true};
|
outpipe_t outpipe = {pipe_, true};
|
||||||
ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
||||||
|
@ -58,7 +58,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
|||||||
if (!routing_id)
|
if (!routing_id)
|
||||||
routing_id = next_rid++; // Never use RID zero
|
routing_id = next_rid++; // Never use RID zero
|
||||||
|
|
||||||
pipe_->set_integral_routing_id (routing_id);
|
pipe_->set_server_socket_routing_id (routing_id);
|
||||||
// Add the record into output pipes lookup table
|
// Add the record into output pipes lookup table
|
||||||
outpipe_t outpipe = {pipe_, true};
|
outpipe_t outpipe = {pipe_, true};
|
||||||
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
||||||
@ -69,7 +69,7 @@ void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
|||||||
|
|
||||||
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
outpipes_t::iterator it = outpipes.find (pipe_->get_integral_routing_id ());
|
outpipes_t::iterator it = outpipes.find (pipe_->get_server_socket_routing_id ());
|
||||||
zmq_assert (it != outpipes.end ());
|
zmq_assert (it != outpipes.end ());
|
||||||
outpipes.erase (it);
|
outpipes.erase (it);
|
||||||
fq.pipe_terminated (pipe_);
|
fq.pipe_terminated (pipe_);
|
||||||
@ -159,7 +159,7 @@ int zmq::server_t::xrecv (msg_t *msg_)
|
|||||||
|
|
||||||
zmq_assert (pipe != NULL);
|
zmq_assert (pipe != NULL);
|
||||||
|
|
||||||
uint32_t routing_id = pipe->get_integral_routing_id ();
|
uint32_t routing_id = pipe->get_server_socket_routing_id ();
|
||||||
msg_->set_routing_id (routing_id);
|
msg_->set_routing_id (routing_id);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -186,7 +186,7 @@ namespace zmq
|
|||||||
void process_destroy ();
|
void process_destroy ();
|
||||||
|
|
||||||
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
||||||
std::string connect_rid;
|
std::string connect_routing_id;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// test if event should be sent and then dispatch it
|
// test if event should be sent and then dispatch it
|
||||||
|
@ -183,9 +183,9 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
if (is_int) memcpy(&value, optval_, sizeof (int));
|
if (is_int) memcpy(&value, optval_, sizeof (int));
|
||||||
|
|
||||||
switch (option_) {
|
switch (option_) {
|
||||||
case ZMQ_CONNECT_RID:
|
case ZMQ_CONNECT_ROUTING_ID:
|
||||||
if (optval_ && optvallen_) {
|
if (optval_ && optvallen_) {
|
||||||
connect_rid.assign ((char*) optval_, optvallen_);
|
connect_routing_id.assign ((char*) optval_, optvallen_);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -299,10 +299,10 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
|||||||
unsigned char buffer [5];
|
unsigned char buffer [5];
|
||||||
buffer [0] = 0;
|
buffer [0] = 0;
|
||||||
blob_t routing_id;
|
blob_t routing_id;
|
||||||
if (connect_rid.length ()) {
|
if (connect_routing_id.length ()) {
|
||||||
routing_id = blob_t ((unsigned char*) connect_rid.c_str(),
|
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str(),
|
||||||
connect_rid.length ());
|
connect_routing_id.length ());
|
||||||
connect_rid.clear ();
|
connect_routing_id.clear ();
|
||||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||||
zmq_assert (it == outpipes.end ());
|
zmq_assert (it == outpipes.end ());
|
||||||
}
|
}
|
||||||
@ -312,7 +312,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
|||||||
memcpy (options.routing_id, routing_id.data (), routing_id.size ());
|
memcpy (options.routing_id, routing_id.data (), routing_id.size ());
|
||||||
options.routing_id_size = (unsigned char) routing_id.size ();
|
options.routing_id_size = (unsigned char) routing_id.size ();
|
||||||
}
|
}
|
||||||
pipe_->set_routing_id (routing_id);
|
pipe_->set_router_socket_routing_id (routing_id);
|
||||||
// Add the record into output pipes lookup table
|
// Add the record into output pipes lookup table
|
||||||
outpipe_t outpipe = {pipe_, true};
|
outpipe_t outpipe = {pipe_, true};
|
||||||
const bool ok = outpipes.insert (
|
const bool ok = outpipes.insert (
|
||||||
|
@ -520,7 +520,7 @@ bool zmq::stream_engine_t::handshake ()
|
|||||||
|
|
||||||
// Inspect the right-most bit of the 10th byte (which coincides
|
// Inspect the right-most bit of the 10th byte (which coincides
|
||||||
// with the 'flags' field if a regular message was sent).
|
// with the 'flags' field if a regular message was sent).
|
||||||
// Zero indicates this is a header of routing id message
|
// Zero indicates this is a header of a routing id message
|
||||||
// (i.e. the peer is using the unversioned protocol).
|
// (i.e. the peer is using the unversioned protocol).
|
||||||
if (!(greeting_recv [9] & 0x01))
|
if (!(greeting_recv [9] & 0x01))
|
||||||
break;
|
break;
|
||||||
|
@ -61,13 +61,13 @@ void test_stream_2_stream(){
|
|||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
|
|
||||||
// Do the connection.
|
// Do the connection.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, my_endpoint);
|
ret = zmq_connect (rconn1, my_endpoint);
|
||||||
|
|
||||||
/* Uncomment to test assert on duplicate rid.
|
/* Uncomment to test assert on duplicate routing id.
|
||||||
// Test duplicate connect attempt.
|
// Test duplicate connect attempt.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, bindip);
|
ret = zmq_connect (rconn1, bindip);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
@ -130,14 +130,14 @@ void test_router_2_router(bool named){
|
|||||||
ret = zmq_setsockopt (rconn1, ZMQ_ROUTING_ID, "Y", 1);
|
ret = zmq_setsockopt (rconn1, ZMQ_ROUTING_ID, "Y", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make call to connect using a connect_rid.
|
// Make call to connect using a connect_routing_id.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, my_endpoint);
|
ret = zmq_connect (rconn1, my_endpoint);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
/* Uncomment to test assert on duplicate rid
|
/* Uncomment to test assert on duplicate routing id
|
||||||
// Test duplicate connect attempt.
|
// Test duplicate connect attempt.
|
||||||
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
|
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, "conn1", 6);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
ret = zmq_connect (rconn1, bindip);
|
ret = zmq_connect (rconn1, bindip);
|
||||||
assert (0 == ret);
|
assert (0 == ret);
|
||||||
|
@ -750,7 +750,8 @@ int main (void)
|
|||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_cookie\n");
|
fprintf (stderr,
|
||||||
|
"test_curve_security_invalid_initiate_command_encrypted_cookie\n");
|
||||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||||
&server_mon, my_endpoint);
|
&server_mon, my_endpoint);
|
||||||
test_curve_security_invalid_initiate_command_encrypted_cookie (
|
test_curve_security_invalid_initiate_command_encrypted_cookie (
|
||||||
@ -758,7 +759,9 @@ int main (void)
|
|||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
handler);
|
handler);
|
||||||
|
|
||||||
fprintf (stderr, "test_curve_security_invalid_initiate_command_encrypted_content\n");
|
fprintf (
|
||||||
|
stderr,
|
||||||
|
"test_curve_security_invalid_initiate_command_encrypted_content\n");
|
||||||
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
|
||||||
&server_mon, my_endpoint);
|
&server_mon, my_endpoint);
|
||||||
test_curve_security_invalid_initiate_command_encrypted_content (
|
test_curve_security_invalid_initiate_command_encrypted_content (
|
||||||
@ -767,11 +770,12 @@ int main (void)
|
|||||||
handler);
|
handler);
|
||||||
|
|
||||||
// test with a large identity (resulting in large metadata)
|
// test with a large identity (resulting in large metadata)
|
||||||
fprintf (stderr, "test_curve_security_with_valid_credentials (large identity)\n");
|
fprintf (stderr,
|
||||||
|
"test_curve_security_with_valid_credentials (large identity)\n");
|
||||||
setup_context_and_server_side (
|
setup_context_and_server_side (
|
||||||
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
|
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
|
||||||
&zap_handler_large_identity, &socket_config_curve_server, &valid_server_secret,
|
&zap_handler_large_identity, &socket_config_curve_server,
|
||||||
large_identity);
|
&valid_server_secret, large_identity);
|
||||||
test_curve_security_with_valid_credentials (ctx, my_endpoint, server,
|
test_curve_security_with_valid_credentials (ctx, my_endpoint, server,
|
||||||
server_mon, timeout);
|
server_mon, timeout);
|
||||||
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
|
||||||
|
Loading…
Reference in New Issue
Block a user