Merge pull request #848 from Prarrot/master

Changed fail behavior of CONNECT_RID to an assert failure instead of silent failure.
This commit is contained in:
Pieter Hintjens 2014-01-21 12:08:00 -08:00
commit 9c6aa1e9e0
4 changed files with 133 additions and 89 deletions

View File

@ -75,9 +75,9 @@ data transfer with the named id. This option applies only to the first
subsequent call to zmq_connect(), calls thereafter use default connection
behavior.
Typical use is to set this socket option on each zmq_connect() attempt
to a new host. Each connection should be assigned a unique name. Duplicated
names will trigger default connection behavior.
Typical use is to set this socket option ahead of each zmq_connect() attempt
to a new host. Each connection MUST be assigned a unique name. Assigning a
name that is already in use is not allowed.
Useful when connecting ROUTER to ROUTER, or STREAM to STREAM, as it
allows for immediate sending to peers. Outbound id framing requirements

View File

@ -393,9 +393,8 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
connect_rid.length());
connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) {
return false; // duplicate connection
}
if (it != outpipes.end ())
zmq_assert(false); // Not allowed to duplicate an existing rid
}
else
if (options.raw_sock) { // Always assign identity for raw-socket

View File

@ -268,10 +268,10 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ())
goto d;
zmq_assert(false);
}
else {
d: put_uint32 (buffer + 1, next_rid++);
put_uint32 (buffer + 1, next_rid++);
identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size ();

View File

@ -1,4 +1,3 @@
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
@ -21,116 +20,162 @@
#include "testutil.hpp"
void test_stream_2_stream(void* ctx){
void test_stream_2_stream(void* ctx_){
void *rbind, *rconn1;
int ret;
char buff[256];
char msg[] = "hi 1";
const char *bindip = "tcp://127.0.0.1:12001";
rbind = zmq_socket(ctx,ZMQ_STREAM);
rconn1 = zmq_socket(ctx,ZMQ_STREAM);
assert(rbind && rconn1 );
ret = zmq_bind(rbind,bindip);
assert(0 == ret);
ret = zmq_setsockopt(rconn1,ZMQ_CONNECT_RID,"conn1",6);
assert(0 == ret);
ret = zmq_connect(rconn1,bindip);
/*test duplicate connect attempt*/
ret = zmq_setsockopt(rconn1,ZMQ_CONNECT_RID,"conn1",6);
assert(0 == ret);
ret = zmq_connect(rconn1,bindip);
assert(0 == ret);
ret = zmq_send(rconn1,"conn1",6,ZMQ_SNDMORE);
assert(6 == ret);
ret = zmq_send(rconn1,msg,5,0);
assert(5 == ret);
const char *bindip = "tcp://127.0.0.1:5556";
int zero = 0;
ret = zmq_recv(rbind,buff,256,0);
assert(ret && 0 == buff[0]);
ret = zmq_recv(rbind,buff,256,0);
// Set up listener STREAM.
rbind = zmq_socket (ctx_, ZMQ_STREAM);
assert (rbind);
ret = zmq_setsockopt (rbind, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
ret = zmq_bind (rbind, bindip);
assert(0 == ret);
// close the duplicate socket
ret = zmq_recv(rbind,buff,256,0);
assert(ret && 0 == buff[0]);
ret = zmq_recv(rbind,buff+128,128,0);
assert(0 == ret);
// handle the good socket
ret = zmq_recv(rbind,buff,256,0);
assert(ret && 0 == buff[0]);
ret = zmq_recv(rbind,buff+128,128,0);
assert(5 == ret && 'h' == buff[128] );
zmq_unbind(rbind,bindip);
zmq_close(rbind);
zmq_close(rconn1);
// Set up connection stream.
rconn1 = zmq_socket (ctx_, ZMQ_STREAM);
assert (rconn1);
ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
// Do the connection.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
/* Uncomment to test assert on duplicate rid.
// Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);
*/
// Send data to the bound stream.
ret = zmq_send (rconn1, "conn1", 6, ZMQ_SNDMORE);
assert (6 == ret);
ret = zmq_send (rconn1, msg, 5, 0);
assert (5 == ret);
// Accept data on the bound stream.
ret = zmq_recv (rbind, buff, 256, 0);
assert (ret && 0 == buff[0]);
assert (0 == buff[0]);
ret = zmq_recv (rbind, buff, 256, 0);
assert (0 == ret);
// Handle close of the socket.
ret = zmq_recv (rbind, buff, 256, 0);
assert (ret);
assert (0 == buff[0]);
ret = zmq_recv (rbind, buff+128, 128, 0);
assert (5 == ret);
assert ('h' == buff[128]);
ret = zmq_unbind (rbind, bindip);
assert(0 == ret);
ret = zmq_close (rbind);
assert(0 == ret);
ret = zmq_close (rconn1);
assert(0 == ret);
}
void test_router_2_router(void* ctx,bool named){
void *rbind, *rconn1;
int ret;
char buff[256];
char msg[] = "hi 1";
const char *bindip = "tcp://127.0.0.1:12001";
rbind = zmq_socket(ctx,ZMQ_ROUTER);
rconn1 = zmq_socket(ctx,ZMQ_ROUTER);
assert(rbind && rconn1 );
ret = zmq_bind(rbind,bindip);
assert(0 == ret);
if(named){/*here we check if this interferes with bound socket naming */
const char *bindip = "tcp://127.0.0.1:5556";
int zero = 0;
// Create bind socket.
rbind = zmq_socket (ctx, ZMQ_ROUTER);
assert (rbind);
ret = zmq_setsockopt (rbind, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
ret = zmq_bind (rbind, bindip);
assert (0 == ret);
// Create connection socket.
rconn1 = zmq_socket (ctx, ZMQ_ROUTER);
assert (rconn1);
ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero));
assert (0 == ret);
// If we're in named mode, set some identities.
if (named) {
ret = zmq_setsockopt (rbind, ZMQ_IDENTITY, "X", 1);
ret = zmq_setsockopt (rconn1, ZMQ_IDENTITY, "Y", 1);
}
ret = zmq_setsockopt(rconn1,ZMQ_CONNECT_RID,"conn1",6);
assert(0 == ret);
ret = zmq_connect(rconn1,bindip);
assert(0 == ret);
/*test duplicate connect attempt*/
ret = zmq_setsockopt(rconn1,ZMQ_CONNECT_RID,"conn1",6);
assert(0 == ret);
ret = zmq_connect(rconn1,bindip);
assert(0 == ret);
ret = zmq_send(rconn1,"conn1",6,ZMQ_SNDMORE);
assert(6 == ret);
ret = zmq_send(rconn1,msg,5,0);
assert(5 == ret);
ret = zmq_recv(rbind,buff,256,0);
if(named) assert(ret && 'Y' == buff[0]);
else assert(ret && 0 == buff[0]);
ret = zmq_recv(rbind,buff+128,128,0);
assert(5 == ret && 'h' == buff[128] );
if(named) {
ret = zmq_send(rbind,buff,1,ZMQ_SNDMORE);
assert(1 == ret);
// Make call to connect using a connect_rid.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);
/* Uncomment to test assert on duplicate rid
// Test duplicate connect attempt.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
assert (0 == ret);
*/
// Send some data.
ret = zmq_send (rconn1, "conn1", 6, ZMQ_SNDMORE);
assert (6 == ret);
ret = zmq_send (rconn1, msg, 5, 0);
assert (5 == ret);
// Receive the name.
ret = zmq_recv (rbind, buff, 256, 0);
if (named)
assert (ret && 'Y' == buff[0]);
else
assert (ret && 0 == buff[0]);
// Receive the data.
ret = zmq_recv (rbind, buff+128, 128, 0);
assert(5 == ret && 'h' == buff[128]);
// Send some data back.
if (named) {
ret = zmq_send (rbind, buff, 1, ZMQ_SNDMORE);
assert (1 == ret);
}
else {
ret = zmq_send(rbind,buff,5,ZMQ_SNDMORE);
assert(5 == ret);
ret = zmq_send (rbind, buff, 5, ZMQ_SNDMORE);
assert (5 == ret);
}
ret = zmq_send_const(rbind,"ok",3,0);
assert(3 == ret);
ret = zmq_send_const (rbind, "ok", 3, 0);
assert (3 == ret);
/*if bound socket identity naming a problem, we'll likely see something funky here */
ret = zmq_recv(rconn1,buff,256,0);
assert('c' == buff[0] && 6 == ret);
ret = zmq_recv(rconn1,buff+128,128,0);
assert(3 == ret && 'o' == buff[128] );
// If bound socket identity naming a problem, we'll likely see something funky here.
ret = zmq_recv (rconn1, buff, 256, 0);
assert ('c' == buff[0] && 6 == ret);
ret = zmq_recv (rconn1, buff+128, 128, 0);
assert (3 == ret && 'o' == buff[128]);
zmq_unbind(rbind,bindip);
zmq_close(rbind);
zmq_close(rconn1);
ret = zmq_unbind (rbind, bindip);
assert(0 == ret);
ret = zmq_close (rbind);
assert(0 == ret);
ret = zmq_close (rconn1);
assert(0 == ret);
}
int main (void)
{
void *ctx;
setup_test_environment();
setup_test_environment ();
ctx = zmq_ctx_new ();
assert (ctx);
test_stream_2_stream(ctx);
test_router_2_router(ctx,false);
test_router_2_router(ctx,true);
zmq_ctx_destroy(ctx);
test_stream_2_stream (ctx);
test_router_2_router (ctx, false);
test_router_2_router (ctx, true);
zmq_ctx_destroy (ctx);
printf ("'test_connect_rid' passed");
return 0;
}