Problem: tests bind to hardcoded TCP ports

Solution: use ZMQ_LAST_ENDPOINT in most places. This alllows running
tests in paralle, and on over-booked shared machines where many of
the ports would be already in use.
Keep 3 tests with an hardcoded port, as there are some code paths that
require it (eg: connect before bind), but list those ports in
tests/testutil.hpp as macros so that they do not overlap and still
allow parallel runs.

These changes were inspired by a patch uploaded to Ubuntu by the
package maintainer, Steve Langasek <steve.langasek@ubuntu.com>.
Thank you Steve!
This commit is contained in:
Luca Boccassi 2017-05-01 12:11:11 +01:00
parent ae461dc2a9
commit 5934919f3e
51 changed files with 792 additions and 600 deletions

View File

@ -41,7 +41,7 @@ int main (void)
void *sc = zmq_socket (ctx, ZMQ_DEALER);
assert (sc);
int rc = zmq_connect (sc, "tcp://127.0.0.1:7722");
int rc = zmq_connect (sc, ENDPOINT_3);
assert (rc == 0);
rc = zmq_send_const (sc, "foobar", 6, 0);
@ -53,7 +53,7 @@ int main (void)
rc = zmq_send_const (sc, "buzz", 4, 0);
assert (rc == 4);
rc = zmq_bind (sb, "tcp://127.0.0.1:7722");
rc = zmq_bind (sb, ENDPOINT_3);
assert (rc == 0);
zmq_msg_t msg;

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -31,7 +31,9 @@
int main (int, char *[])
{
const char *bind_to = "tcp://127.0.0.1:5555";
const char *bind_to = "tcp://127.0.0.1:*";
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
int rc;
@ -47,11 +49,13 @@ int main (int, char *[])
rc = zmq_bind (s_in, bind_to);
assert (rc == 0);
rc = zmq_getsockopt (s_in, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void* s_out = zmq_socket (ctx, ZMQ_PUSH);
assert (s_out);
rc = zmq_connect (s_out, bind_to);
rc = zmq_connect (s_out, my_endpoint);
assert (rc == 0);
int message_count = 20;

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -35,9 +35,11 @@ void test_stream_2_stream(){
int ret;
char buff[256];
char msg[] = "hi 1";
const char *bindip = "tcp://127.0.0.1:5556";
const char *bindip = "tcp://127.0.0.1:*";
int disabled = 0;
int zero = 0;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
// Set up listener STREAM.
@ -49,6 +51,8 @@ void test_stream_2_stream(){
assert (0 == ret);
ret = zmq_bind (rbind, bindip);
assert(0 == ret);
ret = zmq_getsockopt (rbind, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (0 == ret);
// Set up connection stream.
rconn1 = zmq_socket (ctx, ZMQ_STREAM);
@ -59,7 +63,7 @@ void test_stream_2_stream(){
// Do the connection.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
ret = zmq_connect (rconn1, my_endpoint);
/* Uncomment to test assert on duplicate rid.
// Test duplicate connect attempt.
@ -83,7 +87,7 @@ void test_stream_2_stream(){
assert ('h' == buff[128]);
// Handle close of the socket.
ret = zmq_unbind (rbind, bindip);
ret = zmq_unbind (rbind, my_endpoint);
assert(0 == ret);
ret = zmq_close (rbind);
assert(0 == ret);
@ -98,8 +102,10 @@ void test_router_2_router(bool named){
int ret;
char buff[256];
char msg[] = "hi 1";
const char *bindip = "tcp://127.0.0.1:5556";
const char *bindip = "tcp://127.0.0.1:*";
int zero = 0;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
// Create bind socket.
@ -109,6 +115,8 @@ void test_router_2_router(bool named){
assert (0 == ret);
ret = zmq_bind (rbind, bindip);
assert (0 == ret);
ret = zmq_getsockopt (rbind, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (0 == ret);
// Create connection socket.
rconn1 = zmq_socket (ctx, ZMQ_ROUTER);
@ -125,7 +133,7 @@ void test_router_2_router(bool named){
// Make call to connect using a connect_rid.
ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6);
assert (0 == ret);
ret = zmq_connect (rconn1, bindip);
ret = zmq_connect (rconn1, my_endpoint);
assert (0 == ret);
/* Uncomment to test assert on duplicate rid
// Test duplicate connect attempt.
@ -169,7 +177,7 @@ void test_router_2_router(bool named){
ret = zmq_recv (rconn1, buff+128, 128, 0);
assert (3 == ret && 'o' == buff[128]);
ret = zmq_unbind (rbind, bindip);
ret = zmq_unbind (rbind, my_endpoint);
assert(0 == ret);
ret = zmq_close (rbind);
assert(0 == ret);

View File

@ -61,20 +61,20 @@ int main (void)
void *listener = zmq_socket (ctx, ZMQ_DGRAM);
// Connecting dgram shoudl fail
int rc = zmq_connect (listener, "udp://127.0.0.1:5556");
int rc = zmq_connect (listener, ENDPOINT_4);
assert (rc == -1);
rc = zmq_bind (listener, "udp://*:5556");
rc = zmq_bind (listener, ENDPOINT_4);
assert (rc == 0);
rc = zmq_bind (sender, "udp://*:5557");
rc = zmq_bind (sender, ENDPOINT_5);
assert (rc == 0);
str_send_to (sender, "Is someone there ?", "127.0.0.1:5556");
str_send_to (sender, "Is someone there ?", strrchr (ENDPOINT_4, '/') + 1);
str_recv_from (listener, &message_string, &address);
assert (strcmp(message_string, "Is someone there ?") == 0);
assert (strcmp(address, "127.0.0.1:5557") == 0);
assert (strcmp(address, strrchr (ENDPOINT_5, '/') + 1) == 0);
free (message_string);
str_send_to (listener, "Yes, there is !", address);
@ -82,7 +82,7 @@ int main (void)
str_recv_from (sender, &message_string, &address);
assert (strcmp(message_string, "Yes, there is !") == 0);
assert (strcmp(address, "127.0.0.1:5556") == 0);
assert (strcmp(address, strrchr (ENDPOINT_4, '/') + 1) == 0);
free (message_string);
free (address);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -35,6 +35,8 @@ int main (void)
int tos = 0x28;
int o_tos;
size_t tos_size = sizeof(tos);
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
setup_test_environment();
void *ctx = zmq_ctx_new ();
@ -44,8 +46,11 @@ int main (void)
assert (sb);
rc = zmq_setsockopt (sb, ZMQ_TOS, &tos, tos_size);
assert (rc == 0);
rc = zmq_bind (sb, "tcp://127.0.0.1:5560");
rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_getsockopt (sb, ZMQ_TOS, &o_tos, &tos_size);
assert (rc == 0);
assert (o_tos == tos);
@ -55,7 +60,7 @@ int main (void)
tos = 0x58;
rc = zmq_setsockopt (sc, ZMQ_TOS, &tos, tos_size);
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
rc = zmq_getsockopt (sc, ZMQ_TOS, &o_tos, &tos_size);
assert (rc == 0);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -29,7 +29,8 @@
#include "testutil.hpp"
const char *address = "tcp://127.0.0.1:6571";
const char *address = "tcp://127.0.0.1:*";
char connect_address[MAX_SOCKET_STRING];
#define NUM_MESSAGES 5
@ -45,6 +46,9 @@ int main (void)
assert (pull);
int rc = zmq_bind (pull, address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
int pid = fork ();
if (pid == 0) {
@ -58,7 +62,7 @@ int main (void)
assert (child_ctx);
void *push = zmq_socket (child_ctx, ZMQ_PUSH);
assert (push);
rc = zmq_connect (push, address);
rc = zmq_connect (push, connect_address);
assert (rc == 0);
int count;
for (count = 0; count < NUM_MESSAGES; count++)

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
@ -102,7 +102,8 @@ mock_handshake (raw_socket fd) {
}
static void
prep_server_socket(void * ctx, int set_heartbeats, void ** server_out, void ** mon_out)
prep_server_socket(void * ctx, int set_heartbeats, void ** server_out, void ** mon_out,
char *endpoint, size_t ep_length)
{
int rc;
// We'll be using this socket in raw mode
@ -119,7 +120,9 @@ prep_server_socket(void * ctx, int set_heartbeats, void ** server_out, void ** m
assert (rc == 0);
}
rc = zmq_bind (server, "tcp://127.0.0.1:5556");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, endpoint, &ep_length);
assert (rc == 0);
// Create and connect a socket for collecting monitor events on dealer
@ -145,19 +148,21 @@ static void
test_heartbeat_timeout (void)
{
int rc;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void * server, * server_mon;
prep_server_socket (ctx, 1, &server, &server_mon);
prep_server_socket (ctx, 1, &server, &server_mon, my_endpoint,
MAX_SOCKET_STRING);
struct sockaddr_in ip4addr;
raw_socket s;
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons (5556);
ip4addr.sin_port = htons (atoi (strrchr (my_endpoint, ':') + 1));
#if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else
@ -200,13 +205,15 @@ static void
test_heartbeat_ttl (void)
{
int rc, value;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void * server, * server_mon, *client;
prep_server_socket (ctx, 0, &server, &server_mon);
prep_server_socket (ctx, 0, &server, &server_mon, my_endpoint,
MAX_SOCKET_STRING);
client = zmq_socket (ctx, ZMQ_DEALER);
assert (client != NULL);
@ -222,7 +229,7 @@ test_heartbeat_ttl (void)
rc = zmq_setsockopt (client, ZMQ_HEARTBEAT_IVL, &value, sizeof (value));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:5556");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
// By now everything should report as connected
@ -255,16 +262,18 @@ static void
test_heartbeat_notimeout (void)
{
int rc;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void * server, * server_mon;
prep_server_socket(ctx, 1, &server, &server_mon);
prep_server_socket(ctx, 1, &server, &server_mon, my_endpoint,
MAX_SOCKET_STRING);
void * client = zmq_socket (ctx, ZMQ_DEALER);
rc = zmq_connect (client, "tcp://127.0.0.1:5556");
rc = zmq_connect (client, my_endpoint);
// Give it a sec to connect and handshake
msleep (SETTLE_TIME);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -157,6 +157,8 @@ void test_reset_hwm ()
int first_count = 9999;
int second_count = 1100;
int hwm = 11024;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -167,7 +169,9 @@ void test_reset_hwm ()
assert (pub_socket);
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_bind (pub_socket, "tcp://127.0.0.1:1234");
rc = zmq_bind (pub_socket, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (pub_socket, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Set up connect socket
@ -175,7 +179,7 @@ void test_reset_hwm ()
assert (sub_socket);
rc = zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm));
assert (rc == 0);
rc = zmq_connect (sub_socket, "tcp://127.0.0.1:1234");
rc = zmq_connect (sub_socket, my_endpoint);
assert (rc == 0);
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
assert (rc == 0);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -35,6 +35,8 @@ int main (void)
int val;
int rc;
char buffer[16];
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// TEST 1.
// First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see
@ -51,7 +53,9 @@ int main (void)
val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
rc = zmq_bind (to, "tcp://127.0.0.1:6555");
rc = zmq_bind (to, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (to, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive.
@ -64,7 +68,7 @@ int main (void)
rc = zmq_connect (from, "tcp://localhost:5556");
assert (rc == 0);
// This pipe will
rc = zmq_connect (from, "tcp://localhost:6555");
rc = zmq_connect (from, my_endpoint);
assert (rc == 0);
msleep (SETTLE_TIME);
@ -112,7 +116,10 @@ int main (void)
// Bind the valid socket
to = zmq_socket (context, ZMQ_PULL);
assert (to);
rc = zmq_bind (to, "tcp://127.0.0.1:5560");
rc = zmq_bind (to, "tcp://127.0.0.1:*");
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (to, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
val = 0;
@ -136,7 +143,7 @@ int main (void)
rc = zmq_connect (from, "tcp://localhost:5561");
assert (rc == 0);
// Connect to the valid socket
rc = zmq_connect (from, "tcp://localhost:5560");
rc = zmq_connect (from, my_endpoint);
assert (rc == 0);
// Send 10 messages, all should be routed to the connected pipe
@ -186,9 +193,12 @@ int main (void)
int on = 1;
rc = zmq_setsockopt (frontend, ZMQ_IMMEDIATE, &on, sizeof (on));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://127.0.0.1:5560");
rc = zmq_bind (backend, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_connect (frontend, "tcp://localhost:5560");
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (backend, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (frontend, my_endpoint);
assert (rc == 0);
// Ping backend to frontend so we know when the connection is up
@ -216,7 +226,7 @@ int main (void)
assert (backend);
rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://127.0.0.1:5560");
rc = zmq_bind (backend, my_endpoint);
assert (rc == 0);
// Ping backend to frontend so we know when the connection is up

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -38,6 +38,8 @@ int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx1 = zmq_ctx_new ();
assert (ctx1);
@ -48,8 +50,10 @@ int main (void)
int on = 1;
int rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &on, sizeof (on));
assert (rc == 0);
rc = zmq_bind (router, "tcp://127.0.0.1:5555");
rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc != -1);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Repeat often enough to be sure this works as it should
for (int cycle = 0; cycle < 100; cycle++) {
@ -63,7 +67,7 @@ int main (void)
int rcvtimeo = 1000;
rc = zmq_setsockopt (dealer, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
assert (rc == 0);
rc = zmq_connect (dealer, "tcp://127.0.0.1:5555");
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);
// Router will try to send to dealer, at short intervals.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -52,8 +52,8 @@ int main (void)
int rc = zmq_setsockopt (sb, ZMQ_LINGER, &val, sizeof (val));
assert (rc == 0);
do_bind_and_verify (sb, "tcp://127.0.0.1:5560");
do_bind_and_verify (sb, "tcp://127.0.0.1:5561");
do_bind_and_verify (sb, ENDPOINT_1);
do_bind_and_verify (sb, ENDPOINT_2);
rc = zmq_close (sb);
assert (rc == 0);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -79,6 +79,8 @@ zap_handler (void *handler)
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -97,9 +99,11 @@ int main (void)
assert (client);
rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, "DOMAIN", 6);
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9001");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9001");
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
s_send (client, "This is a message");

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -68,6 +68,8 @@ int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -78,7 +80,7 @@ int main (void)
assert (server);
// Socket monitoring only works over inproc://
int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0);
int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0);
assert (rc == -1);
assert (zmq_errno () == EPROTONOSUPPORT);
@ -101,9 +103,11 @@ int main (void)
assert (rc == 0);
// Now do a basic ping test
rc = zmq_bind (server, "tcp://127.0.0.1:9998");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9998");
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
bounce (server, client);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -41,16 +41,21 @@ int main (void) {
void *ctx = zmq_ctx_new ();
assert (ctx);
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int rc = zmq_bind (router, "tcp://127.0.0.1:5555");
int rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_connect (dealer, "tcp://127.0.0.1:5555");
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);
// Test that creating and closing a message triggers ffn

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,17 +32,21 @@
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
int rc = zmq_bind (sb, "tcp://127.0.0.1:5560");
int rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
bounce (sb, sc);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -31,6 +31,10 @@
int main (void)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint_0[MAX_SOCKET_STRING];
char my_endpoint_1[MAX_SOCKET_STRING];
setup_test_environment ();
void *ctx = zmq_ctx_new ();
@ -39,12 +43,14 @@ int main (void)
// Create few sockets
void *vent = zmq_socket (ctx, ZMQ_PUSH);
assert (vent);
int rc = zmq_bind (vent, "tcp://127.0.0.1:55556");
int rc = zmq_bind (vent, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (vent, ZMQ_LAST_ENDPOINT, my_endpoint_0, &len);
assert (rc == 0);
void *sink = zmq_socket (ctx, ZMQ_PULL);
assert (sink);
rc = zmq_connect (sink, "tcp://127.0.0.1:55556");
rc = zmq_connect (sink, my_endpoint_0);
assert (rc == 0);
void *bowl = zmq_socket (ctx, ZMQ_PULL);
@ -53,7 +59,10 @@ int main (void)
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
void *server = zmq_socket (ctx, ZMQ_SERVER);
assert (server);
rc = zmq_bind (server, "tcp://127.0.0.1:55557");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint_1, &len);
assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_CLIENT);
@ -96,7 +105,7 @@ int main (void)
assert (rc == 0);
// Check we can poll an FD
rc = zmq_connect (bowl, "tcp://127.0.0.1:55556");
rc = zmq_connect (bowl, my_endpoint_0);
assert (rc == 0);
#if defined _WIN32
@ -122,7 +131,7 @@ int main (void)
// Polling on thread safe sockets
rc = zmq_poller_add (poller, server, NULL, ZMQ_POLLIN);
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:55557");
rc = zmq_connect (client, my_endpoint_1);
assert (rc == 0);
rc = zmq_send_const (client, data, 1, 0);
assert (rc == 1);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,13 +32,17 @@
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create server and bind to endpoint
void *server = zmq_socket (ctx, ZMQ_ROUTER);
assert (server);
int rc = zmq_bind (server, "tcp://127.0.0.1:5560");
int rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Create client and connect to server, doing a probe
@ -49,7 +53,7 @@ int main (void)
int probe = 1;
rc = zmq_setsockopt (client, ZMQ_PROBE_ROUTER, &probe, sizeof (probe));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:5560");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
// We expect an identity=X + empty message from client

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -50,18 +50,36 @@
#define QT_CLIENTS 3
#define is_verbose 0
struct thread_data {
void *ctx;
int id;
};
static void
client_task (void *ctx)
client_task (void *db)
{
void *client = zmq_socket (ctx, ZMQ_DEALER);
struct thread_data *databag = (struct thread_data *)db;
// Endpoint socket gets random port to avoid test failing when port in use
void *endpoint = zmq_socket (databag->ctx, ZMQ_PAIR);
assert (endpoint);
int linger = 0;
int rc = zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
char endpoint_source [256];
sprintf (endpoint_source, "inproc://endpoint%d", databag->id);
rc = zmq_connect (endpoint, endpoint_source);
assert (rc == 0);
char *my_endpoint = s_recv (endpoint);
assert (my_endpoint);
void *client = zmq_socket (databag->ctx, ZMQ_DEALER);
assert (client);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
void *control = zmq_socket (databag->ctx, ZMQ_SUB);
assert (control);
int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
int linger = 0;
rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
@ -76,7 +94,7 @@ client_task (void *ctx)
linger = 0;
rc = zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:5563");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
@ -117,6 +135,9 @@ client_task (void *ctx)
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
rc = zmq_close (endpoint);
assert (rc == 0);
free (my_endpoint);
}
// This is our server task.
@ -131,12 +152,16 @@ void
server_task (void *ctx)
{
// Frontend socket talks to clients over TCP
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
assert (frontend);
int linger = 0;
int rc = zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_bind (frontend, "tcp://127.0.0.1:5563");
rc = zmq_bind (frontend, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (frontend, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Backend socket talks to workers over inproc
@ -163,6 +188,25 @@ server_task (void *ctx)
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
// Endpoint socket sends random port to avoid test failing when port in use
void *endpoint_receivers [QT_CLIENTS];
char endpoint_source [256];
for (int i = 0; i < QT_CLIENTS; ++i) {
endpoint_receivers [i] = zmq_socket (ctx, ZMQ_PAIR);
assert (endpoint_receivers [i]);
rc = zmq_setsockopt (endpoint_receivers [i], ZMQ_LINGER, &linger,
sizeof (linger));
assert (rc == 0);
sprintf (endpoint_source, "inproc://endpoint%d", i);
rc = zmq_bind (endpoint_receivers [i], endpoint_source);
assert (rc == 0);
}
for (int i = 0; i < QT_CLIENTS; ++i) {
rc = s_send (endpoint_receivers [i], my_endpoint);
assert (rc > 0);
}
// Connect backend to frontend via a proxy
rc = zmq_proxy_steerable (frontend, backend, NULL, control);
assert (rc == 0);
@ -176,6 +220,10 @@ server_task (void *ctx)
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
for (int i = 0; i < QT_CLIENTS; ++i) {
rc = zmq_close(endpoint_receivers [i]);
assert (rc == 0);
}
}
// Each worker task works on one request at a time and sends a random number
@ -262,8 +310,12 @@ int main (void)
assert (rc == 0);
void *threads [QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++)
threads[i] = zmq_threadstart (&client_task, ctx);
struct thread_data databags [QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++) {
databags [i].ctx = ctx;
databags [i].id = i;
threads[i] = zmq_threadstart (&client_task, &databags [i]);
}
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
msleep (500); // Run for 500 ms then quit

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -37,18 +37,22 @@
void
server_task (void *ctx)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
int rc = zmq_bind (rep, "tcp://127.0.0.1:5563");
int rc = zmq_bind (rep, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
void *control = zmq_socket (ctx, ZMQ_REQ);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
rc = s_send (control, my_endpoint);
assert (rc > 0);
// Use rep as both frontend and backend
rc = zmq_proxy_steerable (rep, rep, NULL, control);
@ -70,20 +74,23 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server_thread = zmq_threadstart(&server_task, ctx);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_REP);
assert (control);
int rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
char *my_endpoint = s_recv (control);
assert (my_endpoint);
// client socket pings proxy over tcp
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
int rc = zmq_connect (req, "tcp://127.0.0.1:5563");
rc = zmq_connect (req, my_endpoint);
assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB);
assert (control);
rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
void *server_thread = zmq_threadstart(&server_task, ctx);
char buf[255];
rc = zmq_send(req, "msg1", 4, 0);
assert (rc == 4);
@ -104,6 +111,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (req);
assert (rc == 0);
free (my_endpoint);
zmq_threadclose (server_thread);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -37,27 +37,31 @@
void
server_task (void *ctx)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Frontend socket talks to main process
void *frontend = zmq_socket (ctx, ZMQ_SUB);
assert (frontend);
int rc = zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_bind (frontend, "tcp://127.0.0.1:15564");
rc = zmq_bind (frontend, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (frontend, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Nice socket which is never read
void *backend = zmq_socket (ctx, ZMQ_PUSH);
assert (backend);
rc = zmq_bind (backend, "tcp://127.0.0.1:15563");
rc = zmq_bind (backend, "tcp://127.0.0.1:*");
assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
void *control = zmq_socket (ctx, ZMQ_REQ);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
rc = s_send (control, my_endpoint);
assert (rc > 0);
// Connect backend to frontend via a proxy
rc = zmq_proxy_steerable (frontend, backend, NULL, control);
@ -81,19 +85,23 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
void *thread = zmq_threadstart(&server_task, ctx);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB);
void *control = zmq_socket (ctx, ZMQ_REP);
assert (control);
int rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
char *my_endpoint = s_recv (control);
assert (my_endpoint);
void *thread = zmq_threadstart(&server_task, ctx);
msleep (500); // Run for 500 ms
// Start a secondary publisher which writes data to the SUB-PUSH server socket
void *publisher = zmq_socket (ctx, ZMQ_PUB);
assert (publisher);
rc = zmq_connect (publisher, "tcp://127.0.0.1:15564");
rc = zmq_connect (publisher, my_endpoint);
assert (rc == 0);
msleep (SETTLE_TIME);
@ -114,6 +122,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
free (my_endpoint);
zmq_threadclose (thread);

View File

@ -83,6 +83,8 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod
int main (void)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -90,7 +92,9 @@ int main (void)
void *radio = zmq_socket (ctx, ZMQ_RADIO);
void *dish = zmq_socket (ctx, ZMQ_DISH);
int rc = zmq_bind (radio, "tcp://127.0.0.1:5556");
int rc = zmq_bind (radio, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (radio, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Leaving a group which we didn't join
@ -114,7 +118,7 @@ int main (void)
assert (rc == -1);
// Connecting
rc = zmq_connect (dish, "tcp://127.0.0.1:5556");
rc = zmq_connect (dish, my_endpoint);
assert (rc == 0);
msleep (SETTLE_TIME);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,6 +32,8 @@
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -49,10 +51,12 @@ int main (void)
rc = zmq_setsockopt (req, ZMQ_RCVTIMEO, &rcvtimeo, sizeof (int));
assert (rc == 0);
rc = zmq_connect (req, "tcp://localhost:5555");
rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_bind (router, "tcp://127.0.0.1:5555");
rc = zmq_connect (req, my_endpoint);
assert (rc == 0);
// Send a multi-part request.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -57,6 +57,8 @@ static void bounce (void *socket)
int main (void)
{
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -70,7 +72,9 @@ int main (void)
rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
assert (rc == 0);
rc = zmq_bind (req, "tcp://127.0.0.1:5555");
rc = zmq_bind (req, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
const size_t services = 5;
@ -83,7 +87,7 @@ int main (void)
rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_connect (rep [peer], "tcp://localhost:5555");
rc = zmq_connect (rep [peer], my_endpoint);
assert (rc == 0);
}
// We have to give the connects time to finish otherwise the requests
@ -162,7 +166,7 @@ int main (void)
rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
assert (rc == 0);
rc = zmq_connect (req, "tcp://localhost:5555");
rc = zmq_connect (req, ENDPOINT_0);
assert (rc == 0);
// Setup ROUTER socket as server but do not bind it just yet
@ -174,7 +178,7 @@ int main (void)
s_send_seq (req, "TO_BE_ANSWERED", SEQ_END);
// Bind server allowing it to receive messages
rc = zmq_bind (router, "tcp://127.0.0.1:5555");
rc = zmq_bind (router, ENDPOINT_0);
assert (rc == 0);
// Read the two messages and send them back as is

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,29 +32,37 @@
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char endpoint1[MAX_SOCKET_STRING];
char endpoint2[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a req/rep device.
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
int rc = zmq_bind (dealer, "tcp://127.0.0.1:5560");
int rc = zmq_bind (dealer, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (dealer, ZMQ_LAST_ENDPOINT, endpoint1, &len);
assert (rc == 0);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
rc = zmq_bind (router, "tcp://127.0.0.1:5561");
rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, endpoint2, &len);
assert (rc == 0);
// Create a worker.
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
rc = zmq_connect (rep, "tcp://127.0.0.1:5560");
rc = zmq_connect (rep, endpoint1);
assert (rc == 0);
// Create a client.
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_connect (req, "tcp://127.0.0.1:5561");
rc = zmq_connect (req, endpoint2);
assert (rc == 0);
// Send a request.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -29,206 +29,48 @@
#include "testutil.hpp"
void test_single_connect_ipv4 (void)
void test_single_connect (const char *address)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
int rc = zmq_bind (sb, "tcp://127.0.0.1:5560");
assert (rc == 0);
int ipv6;
if (streq(address, "tcp://127.0.0.1:*"))
ipv6 = 0;
else if (streq(address, "tcp://[::1]:*"))
ipv6 = 1;
else
assert (false);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_disconnect (sc, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_multi_connect_ipv4 (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb0 = zmq_socket (ctx, ZMQ_REP);
assert (sb0);
int rc = zmq_bind (sb0, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *sb1 = zmq_socket (ctx, ZMQ_REP);
assert (sb1);
rc = zmq_bind (sb1, "tcp://127.0.0.1:5561");
assert (rc == 0);
void *sb2 = zmq_socket (ctx, ZMQ_REP);
assert (sb2);
rc = zmq_bind (sb2, "tcp://127.0.0.1:5562");
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5561");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5564;127.0.0.1:5562");
assert (rc == 0);
bounce (sb0, sc);
bounce (sb1, sc);
bounce (sb2, sc);
bounce (sb0, sc);
bounce (sb1, sc);
bounce (sb2, sc);
bounce (sb0, sc);
rc = zmq_disconnect (sc, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_disconnect (sc, "tcp://127.0.0.1:5564;127.0.0.1:5562");
assert (rc == 0);
rc = zmq_disconnect (sc, "tcp://127.0.0.1:5561");
assert (rc == 0);
rc = zmq_unbind (sb0, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_unbind (sb1, "tcp://127.0.0.1:5561");
assert (rc == 0);
rc = zmq_unbind (sb2, "tcp://127.0.0.1:5562");
assert (rc == 0);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb0);
assert (rc == 0);
rc = zmq_close (sb1);
assert (rc == 0);
rc = zmq_close (sb2);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_multi_connect_ipv4_same_port (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb0 = zmq_socket (ctx, ZMQ_REP);
assert (sb0);
int rc = zmq_bind (sb0, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *sb1 = zmq_socket (ctx, ZMQ_REP);
assert (sb1);
rc = zmq_bind (sb1, "tcp://127.0.0.1:5561");
assert (rc == 0);
void *sc0 = zmq_socket (ctx, ZMQ_REQ);
assert (sc0);
rc = zmq_connect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561");
assert (rc == 0);
void *sc1 = zmq_socket (ctx, ZMQ_REQ);
assert (sc1);
rc = zmq_connect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561");
assert (rc == 0);
bounce (sb0, sc0);
bounce (sb1, sc0);
bounce (sb0, sc1);
bounce (sb1, sc1);
bounce (sb0, sc0);
bounce (sb1, sc0);
rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561");
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560");
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561");
assert (rc == 0);
rc = zmq_unbind (sb0, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_unbind (sb1, "tcp://127.0.0.1:5561");
assert (rc == 0);
rc = zmq_close (sc0);
assert (rc == 0);
rc = zmq_close (sc1);
assert (rc == 0);
rc = zmq_close (sb0);
assert (rc == 0);
rc = zmq_close (sb1);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_single_connect_ipv6 (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
if (!is_ipv6_available ()) {
if (ipv6 && !is_ipv6_available ()) {
zmq_ctx_term (ctx);
return;
}
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
int ipv6 = 1;
int rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, "tcp://[::1]:5560");
rc = zmq_bind (sb, address);
assert (rc == 0);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc, "tcp://[::1]:5560");
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
bounce (sb, sc);
rc = zmq_disconnect (sc, "tcp://[::1]:5560");
rc = zmq_disconnect (sc, my_endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://[::1]:5560");
rc = zmq_unbind (sb, my_endpoint);
assert (rc == 0);
rc = zmq_close (sc);
@ -241,47 +83,73 @@ void test_single_connect_ipv6 (void)
assert (rc == 0);
}
void test_multi_connect_ipv6 (void)
void test_multi_connect (const char *address)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint_0[MAX_SOCKET_STRING];
char my_endpoint_1[MAX_SOCKET_STRING];
char my_endpoint_2[MAX_SOCKET_STRING];
char my_endpoint_3[MAX_SOCKET_STRING * 2];
void *ctx = zmq_ctx_new ();
assert (ctx);
if (!is_ipv6_available ()) {
int ipv6;
if (streq(address, "tcp://127.0.0.1:*"))
ipv6 = 0;
else if (streq(address, "tcp://[::1]:*"))
ipv6 = 1;
else
assert (false);
if (ipv6 && !is_ipv6_available ()) {
zmq_ctx_term (ctx);
return;
}
void *sb0 = zmq_socket (ctx, ZMQ_REP);
assert (sb0);
int ipv6 = 1;
int rc = zmq_setsockopt (sb0, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb0, "tcp://[::1]:5560");
rc = zmq_bind (sb0, address);
assert (rc == 0);
rc = zmq_getsockopt (sb0, ZMQ_LAST_ENDPOINT, my_endpoint_0, &len);
assert (rc == 0);
void *sb1 = zmq_socket (ctx, ZMQ_REP);
assert (sb1);
rc = zmq_setsockopt (sb1, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb1, "tcp://[::1]:5561");
rc = zmq_bind (sb1, address);
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (sb1, ZMQ_LAST_ENDPOINT, my_endpoint_1, &len);
assert (rc == 0);
void *sb2 = zmq_socket (ctx, ZMQ_REP);
assert (sb2);
rc = zmq_setsockopt (sb2, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb2, "tcp://[::1]:5562");
rc = zmq_bind (sb2, address);
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (sb2, ZMQ_LAST_ENDPOINT, my_endpoint_2, &len);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc, "tcp://[::1]:5560");
rc = zmq_connect (sc, my_endpoint_0);
assert (rc == 0);
rc = zmq_connect (sc, "tcp://[::1]:5561");
rc = zmq_connect (sc, my_endpoint_1);
assert (rc == 0);
rc = zmq_connect (sc, "tcp://[::1]:5564;[::1]:5562");
if (!ipv6)
sprintf (my_endpoint_3, "tcp://127.0.0.1:5564;%s",
strrchr(my_endpoint_2, '/') + 1);
else
sprintf (my_endpoint_3, "tcp://[::1]:5564;%s",
strrchr(my_endpoint_2, '/') + 1);
rc = zmq_connect (sc, my_endpoint_3);
assert (rc == 0);
bounce (sb0, sc);
@ -292,20 +160,20 @@ void test_multi_connect_ipv6 (void)
bounce (sb2, sc);
bounce (sb0, sc);
rc = zmq_disconnect (sc, "tcp://[::1]:5560");
rc = zmq_disconnect (sc, my_endpoint_0);
assert (rc == 0);
rc = zmq_disconnect (sc, "tcp://[::1]:5564;[::1]:5562");
rc = zmq_disconnect (sc, my_endpoint_3);
assert (rc == 0);
rc = zmq_disconnect (sc, "tcp://[::1]:5561");
rc = zmq_disconnect (sc, my_endpoint_1);
assert (rc == 0);
rc = zmq_unbind (sb0, "tcp://[::1]:5560");
rc = zmq_unbind (sb0, my_endpoint_0);
assert (rc == 0);
rc = zmq_unbind (sb1, "tcp://[::1]:5561");
rc = zmq_unbind (sb1, my_endpoint_1);
assert (rc == 0);
rc = zmq_unbind (sb2, "tcp://[::1]:5562");
rc = zmq_unbind (sb2, my_endpoint_2);
assert (rc == 0);
rc = zmq_close (sc);
@ -324,47 +192,90 @@ void test_multi_connect_ipv6 (void)
assert (rc == 0);
}
void test_multi_connect_ipv6_same_port (void)
void test_multi_connect_same_port (const char *address)
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint_0[MAX_SOCKET_STRING];
char my_endpoint_1[MAX_SOCKET_STRING];
char my_endpoint_2[MAX_SOCKET_STRING * 2];
char my_endpoint_3[MAX_SOCKET_STRING * 2];
char my_endpoint_4[MAX_SOCKET_STRING * 2];
char my_endpoint_5[MAX_SOCKET_STRING * 2];
void *ctx = zmq_ctx_new ();
assert (ctx);
if (!is_ipv6_available ()) {
int ipv6;
if (streq(address, "tcp://127.0.0.1:*"))
ipv6 = 0;
else if (streq(address, "tcp://[::1]:*"))
ipv6 = 1;
else
assert (false);
if (ipv6 && !is_ipv6_available ()) {
zmq_ctx_term (ctx);
return;
}
void *sb0 = zmq_socket (ctx, ZMQ_REP);
assert (sb0);
int ipv6 = 1;
int rc = zmq_setsockopt (sb0, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb0, "tcp://[::1]:5560");
rc = zmq_bind (sb0, address);
assert (rc == 0);
rc = zmq_getsockopt (sb0, ZMQ_LAST_ENDPOINT, my_endpoint_0, &len);
assert (rc == 0);
void *sb1 = zmq_socket (ctx, ZMQ_REP);
assert (sb1);
rc = zmq_setsockopt (sb1, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb1, "tcp://[::1]:5561");
rc = zmq_bind (sb1, address);
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (sb1, ZMQ_LAST_ENDPOINT, my_endpoint_1, &len);
assert (rc == 0);
void *sc0 = zmq_socket (ctx, ZMQ_REQ);
assert (sc0);
rc = zmq_setsockopt (sc0, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc0, "tcp://[::1]:5564;[::1]:5560");
if (!ipv6)
sprintf (my_endpoint_2, "tcp://127.0.0.1:5564;%s",
strrchr(my_endpoint_0, '/') + 1);
else
sprintf (my_endpoint_2, "tcp://[::1]:5564;%s",
strrchr(my_endpoint_0, '/') + 1);
rc = zmq_connect (sc0, my_endpoint_2);
assert (rc == 0);
rc = zmq_connect (sc0, "tcp://[::1]:5565;[::1]:5561");
if (!ipv6)
sprintf (my_endpoint_3, "tcp://127.0.0.1:5565;%s",
strrchr(my_endpoint_1, '/') + 1);
else
sprintf (my_endpoint_3, "tcp://[::1]:5565;%s",
strrchr(my_endpoint_1, '/') + 1);
rc = zmq_connect (sc0, my_endpoint_3);
assert (rc == 0);
void *sc1 = zmq_socket (ctx, ZMQ_REQ);
assert (sc1);
rc = zmq_setsockopt (sc1, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_connect (sc1, "tcp://[::1]:5565;[::1]:5560");
if (!ipv6)
sprintf (my_endpoint_4, "tcp://127.0.0.1:5565;%s",
strrchr(my_endpoint_0, '/') + 1);
else
sprintf (my_endpoint_4, "tcp://[::1]:5565;%s",
strrchr(my_endpoint_0, '/') + 1);
rc = zmq_connect (sc1, my_endpoint_4);
assert (rc == 0);
rc = zmq_connect (sc1, "tcp://[::1]:5564;[::1]:5561");
if (!ipv6)
sprintf (my_endpoint_5, "tcp://127.0.0.1:5564;%s",
strrchr(my_endpoint_1, '/') + 1);
else
sprintf (my_endpoint_5, "tcp://[::1]:5564;%s",
strrchr(my_endpoint_1, '/') + 1);
rc = zmq_connect (sc1, my_endpoint_5);
assert (rc == 0);
bounce (sb0, sc0);
@ -374,19 +285,19 @@ void test_multi_connect_ipv6_same_port (void)
bounce (sb0, sc0);
bounce (sb1, sc0);
rc = zmq_disconnect (sc1, "tcp://[::1]:5565;[::1]:5560");
rc = zmq_disconnect (sc1, my_endpoint_4);
assert (rc == 0);
rc = zmq_disconnect (sc1, "tcp://[::1]:5564;[::1]:5561");
rc = zmq_disconnect (sc1, my_endpoint_5);
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://[::1]:5564;[::1]:5560");
rc = zmq_disconnect (sc0, my_endpoint_2);
assert (rc == 0);
rc = zmq_disconnect (sc0, "tcp://[::1]:5565;[::1]:5561");
rc = zmq_disconnect (sc0, my_endpoint_3);
assert (rc == 0);
rc = zmq_unbind (sb0, "tcp://[::1]:5560");
rc = zmq_unbind (sb0, my_endpoint_0);
assert (rc == 0);
rc = zmq_unbind (sb1, "tcp://[::1]:5561");
rc = zmq_unbind (sb1, my_endpoint_1);
assert (rc == 0);
rc = zmq_close (sc0);
@ -409,17 +320,17 @@ int main (void)
{
setup_test_environment ();
test_single_connect_ipv4 ();
test_single_connect ("tcp://127.0.0.1:*");
test_multi_connect_ipv4 ();
test_multi_connect ("tcp://127.0.0.1:*");
test_multi_connect_ipv4_same_port ();
test_multi_connect_same_port ("tcp://127.0.0.1:*");
test_single_connect_ipv6 ();
test_single_connect ("tcp://[::1]:*");
test_multi_connect_ipv6 ();
test_multi_connect ("tcp://[::1]:*");
test_multi_connect_ipv6_same_port ();
test_multi_connect_same_port ("tcp://[::1]:*");
return 0 ;
}

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,12 +32,17 @@
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int rc = zmq_bind (router, "tcp://127.0.0.1:5560");
int rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Enable the handover flag
@ -50,7 +55,7 @@ int main (void)
assert (dealer_one);
rc = zmq_setsockopt (dealer_one, ZMQ_IDENTITY, "X", 1);
assert (rc == 0);
rc = zmq_connect (dealer_one, "tcp://127.0.0.1:5560");
rc = zmq_connect (dealer_one, my_endpoint);
assert (rc == 0);
// Get message from dealer to know when connection is ready
@ -68,7 +73,7 @@ int main (void)
assert (dealer_two);
rc = zmq_setsockopt (dealer_two, ZMQ_IDENTITY, "X", 1);
assert (rc == 0);
rc = zmq_connect (dealer_two, "tcp://127.0.0.1:5560");
rc = zmq_connect (dealer_two, my_endpoint);
assert (rc == 0);
// Get message from dealer to know when connection is ready

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,12 +32,17 @@
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int rc = zmq_bind (router, "tcp://127.0.0.1:5560");
int rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Send a message to an unknown peer with the default setting
@ -60,7 +65,7 @@ int main (void)
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_IDENTITY, "X", 1);
assert (rc == 0);
rc = zmq_connect (dealer, "tcp://127.0.0.1:5560");
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);
// Get message from dealer to know when connection is ready

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -41,6 +41,8 @@ int main (void)
int rc;
if (TRACE_ENABLED) fprintf(stderr, "Staring router mandatory HWM test ...\n");
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER);
@ -57,7 +59,9 @@ int main (void)
rc = zmq_setsockopt (router, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
rc = zmq_bind (router, "tcp://127.0.0.1:5560");
rc = zmq_bind (router, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Create dealer called "X" and connect it to our router, configure HWM
@ -69,7 +73,7 @@ int main (void)
rc = zmq_setsockopt (dealer, ZMQ_RCVHWM, &rcvhwm, sizeof (rcvhwm));
assert (rc == 0);
rc = zmq_connect (dealer, "tcp://127.0.0.1:5560");
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);
// Get message from dealer to know when connection is ready

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -173,7 +173,12 @@ int main (void)
assert (rc == 0);
rc = zmq_setsockopt (server, ZMQ_IDENTITY, "IDENT", 6);
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9998");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
#ifdef ZMQ_BUILD_DRAFT_API
@ -200,7 +205,7 @@ int main (void)
assert (rc == 0);
rc = zmq_setsockopt (client, ZMQ_CURVE_SECRETKEY, client_secret, 41);
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
bounce (server, client);
rc = zmq_close (client);
@ -222,7 +227,7 @@ int main (void)
assert (rc == 0);
rc = zmq_setsockopt (client, ZMQ_CURVE_SECRETKEY, client_secret, 41);
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -242,7 +247,7 @@ int main (void)
assert (rc == 0);
rc = zmq_setsockopt (client, ZMQ_CURVE_SECRETKEY, client_secret, 41);
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -262,7 +267,7 @@ int main (void)
assert (rc == 0);
rc = zmq_setsockopt (client, ZMQ_CURVE_SECRETKEY, garbage_key, 41);
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -286,7 +291,7 @@ int main (void)
assert (rc == 0);
rc = zmq_setsockopt (client, ZMQ_CURVE_SECRETKEY, bogus_secret, 41);
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -300,7 +305,7 @@ int main (void)
// This must be caught by the curve_server class, not passed to ZAP
client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -325,8 +330,12 @@ int main (void)
struct sockaddr_in ip4addr;
int s;
unsigned short int port;
rc = sscanf(my_endpoint, "tcp://127.0.0.1:%hu", &port);
assert (rc == 1);
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons (9998);
ip4addr.sin_port = htons (port);
#if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else

View File

@ -141,7 +141,7 @@ static void zap_handler (void *handler)
zmq_close (handler);
}
void test_valid_creds (void *ctx, void *server, void *server_mon)
void test_valid_creds (void *ctx, void *server, void *server_mon, char *endpoint)
{
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
@ -155,7 +155,7 @@ void test_valid_creds (void *ctx, void *server, void *server_mon)
rc = zmq_setsockopt (client, ZMQ_GSSAPI_PRINCIPAL_NAMETYPE,
&name_type, sizeof (name_type));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, endpoint);
assert (rc == 0);
bounce (server, client);
@ -169,7 +169,7 @@ void test_valid_creds (void *ctx, void *server, void *server_mon)
// Check security with valid but unauthorized credentials
// Note: ZAP may see multiple requests - after a failure, client will
// fall back to other crypto types for principal, if available.
void test_unauth_creds (void *ctx, void *server, void *server_mon)
void test_unauth_creds (void *ctx, void *server, void *server_mon, char *endpoint)
{
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
@ -184,7 +184,7 @@ void test_unauth_creds (void *ctx, void *server, void *server_mon)
&name_type, sizeof (name_type));
assert (rc == 0);
zap_deny_all = 1;
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
@ -196,11 +196,11 @@ void test_unauth_creds (void *ctx, void *server, void *server_mon)
// Check GSSAPI security with NULL client credentials
// This must be caught by the gssapi_server class, not passed to ZAP
void test_null_creds (void *ctx, void *server, void *server_mon)
void test_null_creds (void *ctx, void *server, void *server_mon, char *endpoint)
{
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
int rc = zmq_connect (client, "tcp://localhost:9998");
int rc = zmq_connect (client, endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -211,7 +211,7 @@ void test_null_creds (void *ctx, void *server, void *server_mon)
// Check GSSAPI security with PLAIN client credentials
// This must be caught by the curve_server class, not passed to ZAP
void test_plain_creds (void *ctx, void *server, void *server_mon)
void test_plain_creds (void *ctx, void *server, void *server_mon, char *endpoint)
{
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
@ -219,17 +219,22 @@ void test_plain_creds (void *ctx, void *server, void *server_mon)
assert (rc == 0);
rc = zmq_setsockopt (client, ZMQ_PLAIN_PASSWORD, "password", 8);
assert (rc == 0);
rc = zmq_connect (client, endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
}
// Unauthenticated messages from a vanilla socket shouldn't be received
void test_vanilla_socket (void *ctx, void *server, void *server_mon)
void test_vanilla_socket (void *ctx, void *server, void *server_mon, char *endpoint)
{
struct sockaddr_in ip4addr;
int s;
unsigned short int port;
int rc = sscanf(endpoint, "tcp://127.0.0.1:%hu", &port);
assert (rc == 1);
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons (9998);
ip4addr.sin_port = htons (port);
#if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else
@ -266,6 +271,9 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Spawn ZAP handler
// We create and bind ZAP socket in main thread to avoid case
// where child thread does not start up fast enough.
@ -288,7 +296,9 @@ int main (void)
rc = zmq_setsockopt (server, ZMQ_GSSAPI_PRINCIPAL_NAMETYPE,
&name_type, sizeof (name_type));
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9998");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// Monitor handshake events on the server
@ -305,11 +315,11 @@ int main (void)
assert (rc == 0);
// Attempt various connections
test_valid_creds (ctx, server, server_mon);
test_null_creds (ctx, server, server_mon);
test_plain_creds (ctx, server, server_mon);
test_vanilla_socket (ctx, server, server_mon);
test_unauth_creds (ctx, server, server_mon);
test_valid_creds (ctx, server, server_mon, my_endpoint);
test_null_creds (ctx, server, server_mon, my_endpoint);
test_plain_creds (ctx, server, server_mon, my_endpoint);
test_vanilla_socket (ctx, server, server_mon, my_endpoint);
test_unauth_creds (ctx, server, server_mon, my_endpoint);
// Shutdown
close_zero_linger (server_mon);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -85,6 +85,8 @@ zap_handler (void *handler)
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -105,9 +107,11 @@ int main (void)
assert (server);
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
rc = zmq_bind (server, "tcp://127.0.0.1:9000");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9000");
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
bounce (server, client);
close_zero_linger (client);
@ -122,9 +126,12 @@ int main (void)
assert (client);
rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, "WRONG", 5);
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9001");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9001");
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -137,9 +144,12 @@ int main (void)
assert (client);
rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, "TEST", 4);
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9002");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9002");
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
bounce (server, client);
close_zero_linger (client);
@ -150,14 +160,22 @@ int main (void)
assert (server);
rc = zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, "WRONG", 5);
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9003");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
struct sockaddr_in ip4addr;
int s;
unsigned short int port;
rc = sscanf(my_endpoint, "tcp://127.0.0.1:%hu", &port);
assert (rc == 1);
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons(9003);
ip4addr.sin_port = htons(port);
#if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -97,6 +97,8 @@ zap_handler (void *ctx)
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -111,7 +113,9 @@ int main (void)
int as_server = 1;
rc = zmq_setsockopt (server, ZMQ_PLAIN_SERVER, &as_server, sizeof (int));
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9998");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
char username [256];
@ -126,7 +130,7 @@ int main (void)
strcpy (password, "password");
rc = zmq_setsockopt (client, ZMQ_PLAIN_PASSWORD, password, strlen (password));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
bounce (server, client);
rc = zmq_close (client);
@ -139,7 +143,7 @@ int main (void)
as_server = 1;
rc = zmq_setsockopt (client, ZMQ_PLAIN_SERVER, &as_server, sizeof (int));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -153,7 +157,7 @@ int main (void)
assert (rc == 0);
rc = zmq_setsockopt (client, ZMQ_PLAIN_PASSWORD, password, strlen (password));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
expect_bounce_fail (server, client);
close_zero_linger (client);
@ -162,8 +166,12 @@ int main (void)
struct sockaddr_in ip4addr;
int s;
unsigned short int port;
rc = sscanf(my_endpoint, "tcp://127.0.0.1:%hu", &port);
assert (rc == 1);
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons (9998);
ip4addr.sin_port = htons (port);
#if defined (ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -31,17 +31,27 @@
#define THREAD_COUNT 100
struct thread_data {
void *ctx;
char endpoint[MAX_SOCKET_STRING];
};
extern "C"
{
static void worker (void *s)
static void worker (void *data)
{
int rc;
void *socket;
struct thread_data *tdata = (struct thread_data *)data;
rc = zmq_connect (s, "tcp://127.0.0.1:5560");
socket = zmq_socket (tdata->ctx, ZMQ_SUB);
assert (socket);
rc = zmq_connect (socket, tdata->endpoint);
assert (rc == 0);
// Start closing the socket while the connecting process is underway.
rc = zmq_close (s);
rc = zmq_close (socket);
assert (rc == 0);
}
}
@ -49,8 +59,7 @@ extern "C"
int main (void)
{
setup_test_environment();
void *s1;
void *s2;
void *socket;
int i;
int j;
int rc;
@ -59,30 +68,32 @@ int main (void)
for (j = 0; j != 10; j++) {
// Check the shutdown with many parallel I/O threads.
void *ctx = zmq_ctx_new ();
assert (ctx);
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 7);
struct thread_data tdata;
tdata.ctx = zmq_ctx_new ();
assert (tdata.ctx);
zmq_ctx_set (tdata.ctx, ZMQ_IO_THREADS, 7);
s1 = zmq_socket (ctx, ZMQ_PUB);
assert (s1);
socket = zmq_socket (tdata.ctx, ZMQ_PUB);
assert (socket);
rc = zmq_bind (s1, "tcp://127.0.0.1:5560");
rc = zmq_bind (socket, "tcp://127.0.0.1:*");
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (socket, ZMQ_LAST_ENDPOINT, tdata.endpoint, &len);
assert (rc == 0);
for (i = 0; i != THREAD_COUNT; i++) {
s2 = zmq_socket (ctx, ZMQ_SUB);
assert (s2);
threads [i] = zmq_threadstart(&worker, s2);
threads [i] = zmq_threadstart(&worker, &tdata);
}
for (i = 0; i != THREAD_COUNT; i++) {
zmq_threadclose(threads [i]);
}
rc = zmq_close (s1);
rc = zmq_close (socket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
rc = zmq_ctx_term (tdata.ctx);
assert (rc == 0);
}

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -30,7 +30,7 @@
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
char connect_address[MAX_SOCKET_STRING];
void test_round_robin_out (void *ctx)
{
@ -39,6 +39,9 @@ void test_round_robin_out (void *ctx)
int rc = zmq_bind (dealer, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (dealer, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5;
void *rep [services];
@ -91,6 +94,9 @@ void test_fair_queue_in (void *ctx)
rc = zmq_bind (receiver, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (receiver, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5;
void *senders [services];
@ -145,6 +151,9 @@ void test_destroy_queue_on_disconnect (void *ctx)
int rc = zmq_bind (A, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (A, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
void *B = zmq_socket (ctx, ZMQ_DEALER);
assert (B);
@ -227,12 +236,10 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:*" };
for (int transports = 0; transports < 2; ++transports) {
bind_address = binds [transports];
connect_address = connects [transports];
// SHALL route outgoing messages to available peers using a round-robin
// strategy.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -30,7 +30,7 @@
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
char connect_address[MAX_SOCKET_STRING];
void test_push_round_robin_out (void *ctx)
{
@ -39,6 +39,9 @@ void test_push_round_robin_out (void *ctx)
int rc = zmq_bind (push, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5;
void *pulls [services];
@ -85,6 +88,9 @@ void test_pull_fair_queue_in (void *ctx)
int rc = zmq_bind (pull, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5;
void *pushs [services];
@ -187,6 +193,9 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc = zmq_bind (A, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (A, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
void *B = zmq_socket (ctx, ZMQ_PULL);
assert (B);
@ -264,12 +273,10 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:*" };
for (int transport = 0; transport < 2; ++transport) {
bind_address = binds [transport];
connect_address = connects [transport];
// PUSH: SHALL route outgoing messages to connected peers using a
// round-robin strategy.

View File

@ -30,7 +30,7 @@
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
char connect_address[MAX_SOCKET_STRING];
void test_fair_queue_in (void *ctx)
{
@ -43,6 +43,9 @@ void test_fair_queue_in (void *ctx)
rc = zmq_bind (rep, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5;
void *reqs [services];
@ -106,6 +109,9 @@ void test_envelope (void *ctx)
int rc = zmq_bind (rep, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
@ -138,12 +144,10 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:*" };
for (int transport = 0; transport < 2; ++transport) {
bind_address = binds [transport];
connect_address = connects [transport];
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -30,7 +30,7 @@
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
char connect_address[MAX_SOCKET_STRING];
void test_round_robin_out (void *ctx)
{
@ -39,6 +39,9 @@ void test_round_robin_out (void *ctx)
int rc = zmq_bind (req, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5;
void *rep [services];
@ -84,6 +87,9 @@ void test_req_only_listens_to_current_peer (void *ctx)
rc = zmq_bind (req, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 3;
void *router [services];
@ -148,6 +154,9 @@ void test_req_message_format (void *ctx)
int rc = zmq_bind (req, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
rc = zmq_connect (router, connect_address);
assert (rc == 0);
@ -223,12 +232,10 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:*" };
for (int transport = 0; transport < 2; transport++) {
bind_address = binds [transport];
connect_address = connects [transport];
// SHALL route outgoing messages to connected peers using a round-robin
// strategy.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -30,7 +30,7 @@
#include "testutil.hpp"
const char *bind_address = 0;
const char *connect_address = 0;
char connect_address[MAX_SOCKET_STRING];
void test_fair_queue_in (void *ctx)
{
@ -43,6 +43,9 @@ void test_fair_queue_in (void *ctx)
rc = zmq_bind (receiver, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (receiver, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
const size_t services = 5;
void *senders [services];
@ -120,6 +123,9 @@ void test_destroy_queue_on_disconnect (void *ctx)
rc = zmq_bind (A, bind_address);
assert (rc == 0);
size_t len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (A, ZMQ_LAST_ENDPOINT, connect_address, &len);
assert (rc == 0);
void *B = zmq_socket (ctx, ZMQ_DEALER);
assert (B);
@ -188,12 +194,10 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:5555" };
const char *connects [] = { "inproc://a", "tcp://localhost:5555" };
const char *binds [] = { "inproc://a", "tcp://127.0.0.1:*" };
for (int transport = 0; transport < 2; ++transport) {
bind_address = binds [transport];
connect_address = connects [transport];
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -43,6 +43,8 @@
int main (void)
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
setup_test_environment();
// Create the infrastructure
@ -54,10 +56,13 @@ int main (void)
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_bind(rep, "tcp://127.0.0.1:5560");
rc = zmq_bind(rep, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_connect(req, "tcp://127.0.0.1:5560");
rc = zmq_getsockopt(rep, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect(req, my_endpoint);
assert (rc == 0);
char tmp[MSG_SIZE];

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -57,6 +57,8 @@ static void
test_stream_to_dealer (void)
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
@ -72,15 +74,18 @@ test_stream_to_dealer (void)
int enabled = 1;
rc = zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (stream, "tcp://127.0.0.1:5556");
rc = zmq_bind (stream, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (stream, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// We'll be using this socket as the other peer
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_connect (dealer, "tcp://localhost:5556");
rc = zmq_connect (dealer, my_endpoint);
// Send a message on the dealer socket
rc = zmq_send (dealer, "Hello", 5, 0);
@ -232,6 +237,8 @@ static void
test_stream_to_stream (void)
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set-up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -241,14 +248,16 @@ test_stream_to_stream (void)
int enabled = 1;
rc = zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9070");
rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_STREAM);
assert (client);
rc = zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9070");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
uint8_t id [256];
size_t id_size = 256;

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -65,6 +65,9 @@ int main(int, char**)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char bind_endpoint[MAX_SOCKET_STRING];
char connect_endpoint[MAX_SOCKET_STRING];
void *context = zmq_ctx_new ();
void *sockets [2];
int rc = 0;
@ -73,13 +76,24 @@ int main(int, char**)
int enabled = 1;
rc = zmq_setsockopt (sockets [SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666");
rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:*");
assert (rc == 0);
rc = zmq_getsockopt (sockets [SERVER], ZMQ_LAST_ENDPOINT, bind_endpoint,
&len);
assert (rc == 0);
// Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
#ifdef ZMQ_HAVE_WINDOWS
sprintf (connect_endpoint, "tcp://127.0.0.1:%s",
strrchr(bind_endpoint, ':') + 1);
#else
strcpy (connect_endpoint, bind_endpoint);
#endif
sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM);
rc = zmq_setsockopt (sockets [CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666");
rc = zmq_connect (sockets [CLIENT], connect_endpoint);
assert (rc == 0);
// wait for connect notification

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -31,6 +31,8 @@
int main (void) {
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -39,9 +41,11 @@ int main (void) {
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
int rc = zmq_bind (stream, "tcp://127.0.0.1:5555");
int rc = zmq_bind (stream, "tcp://127.0.0.1:*");
assert (rc >= 0);
rc = zmq_connect (dealer, "tcp://127.0.0.1:5555");
rc = zmq_getsockopt (stream, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (dealer, my_endpoint);
assert (rc >= 0);
zmq_send (dealer, "", 0, 0);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -68,6 +68,8 @@ static void
test_stream_handshake_timeout_accept (void)
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
@ -80,8 +82,6 @@ test_stream_handshake_timeout_accept (void)
int zero = 0;
rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_connect (stream, "tcp://localhost:5557");
assert (rc == 0);
// We'll be using this socket to test TCP stream handshake timeout
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
@ -119,7 +119,12 @@ test_stream_handshake_timeout_accept (void)
assert (rc == 0);
// bind dealer socket to accept connection from non-sending stream socket
rc = zmq_bind (dealer, "tcp://127.0.0.1:5557");
rc = zmq_bind (dealer, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (dealer, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
rc = zmq_connect (stream, my_endpoint);
assert (rc == 0);
// we should get ZMQ_EVENT_ACCEPTED and then ZMQ_EVENT_DISCONNECTED
@ -145,6 +150,8 @@ static void
test_stream_handshake_timeout_connect (void)
{
int rc;
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
@ -157,7 +164,9 @@ test_stream_handshake_timeout_connect (void)
int zero = 0;
rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (stream, "tcp://127.0.0.1:5556");
rc = zmq_bind (stream, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (stream, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
// We'll be using this socket to test TCP stream handshake timeout
@ -196,7 +205,7 @@ test_stream_handshake_timeout_connect (void)
assert (rc == 0);
// connect dealer socket to non-sending stream socket
rc = zmq_connect (dealer, "tcp://localhost:5556");
rc = zmq_connect (dealer, my_endpoint);
assert (rc == 0);
// we should get ZMQ_EVENT_CONNECTED and then ZMQ_EVENT_DISCONNECTED

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,29 +32,37 @@
int main (void)
{
setup_test_environment();
size_t len = MAX_SOCKET_STRING;
char endpoint1[MAX_SOCKET_STRING];
char endpoint2[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
// First, create an intermediate device
void *xpub = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub);
int rc = zmq_bind (xpub, "tcp://127.0.0.1:5560");
int rc = zmq_bind (xpub, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (xpub, ZMQ_LAST_ENDPOINT, endpoint1, &len);
assert (rc == 0);
void *xsub = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub);
rc = zmq_bind (xsub, "tcp://127.0.0.1:5561");
rc = zmq_bind (xsub, "tcp://127.0.0.1:*");
assert (rc == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (xsub, ZMQ_LAST_ENDPOINT, endpoint2, &len);
assert (rc == 0);
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (pub);
rc = zmq_connect (pub, "tcp://127.0.0.1:5561");
rc = zmq_connect (pub, endpoint2);
assert (rc == 0);
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB);
assert (sub);
rc = zmq_connect (sub, "tcp://127.0.0.1:5560");
rc = zmq_connect (sub, endpoint1);
assert (rc == 0);
// Subscribe for all messages.

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -72,7 +72,7 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
if (zmq_bind (dealer, "tcp://127.0.0.1:5670") == -1) {
if (zmq_bind (dealer, "tcp://127.0.0.1:*") == -1) {
printf ("E: Cannot find 127.0.0.1 -- your system does not have local\n");
printf ("E: networking. Please fix this before running libzmq checks.\n");
return -1;

View File

@ -39,7 +39,6 @@ int main (void)
int rc;
char buf[BUF_SIZE];
size_t buf_size;
const char *ep = "tcp://127.0.0.1:5560";
const char *ep_wc_tcp = "tcp://127.0.0.1:*";
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
const char *ep_wc_ipc = "ipc://*";
@ -53,11 +52,14 @@ int main (void)
assert (ctx);
void *push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
rc = zmq_bind (push, ep);
rc = zmq_bind (push, ep_wc_tcp);
assert (rc == 0);
buf_size = sizeof(buf);
rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, buf, &buf_size);
assert (rc == 0);
void *pull = zmq_socket (ctx, ZMQ_PULL);
assert (pull);
rc = zmq_connect (pull, ep);
rc = zmq_connect (pull, buf);
assert (rc == 0);
// Pass one message through to ensure the connection is established
@ -67,7 +69,10 @@ int main (void)
assert (rc == 3);
// Unbind the listening endpoint
rc = zmq_unbind (push, ep);
buf_size = sizeof(buf);
rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, buf, &buf_size);
assert (rc == 0);
rc = zmq_unbind (push, buf);
assert (rc == 0);
// Allow unbind to settle
@ -88,13 +93,16 @@ int main (void)
// Create infrastructure
ctx = zmq_ctx_new ();
assert (ctx);
push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
rc = zmq_connect (push, ep);
assert (rc == 0);
pull = zmq_socket (ctx, ZMQ_PULL);
assert (pull);
rc = zmq_bind (pull, ep);
rc = zmq_bind (pull, ep_wc_tcp);
assert (rc == 0);
buf_size = sizeof(buf);
rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, buf, &buf_size);
assert (rc == 0);
push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
rc = zmq_connect (push, buf);
assert (rc == 0);
// Pass one message through to ensure the connection is established.
@ -104,7 +112,10 @@ int main (void)
assert (rc == 3);
// Disconnect the bound endpoint
rc = zmq_disconnect (push, ep);
buf_size = sizeof(buf);
rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, buf, &buf_size);
assert (rc == 0);
rc = zmq_disconnect (push, buf);
assert (rc == 0);
// Allow disconnect to settle

View File

@ -1,5 +1,5 @@
/*:
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -45,11 +45,15 @@ void client_thread (void *client)
int main (void)
{
setup_test_environment ();
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER);
int rc = zmq_bind (server, "tcp://127.0.0.1:5560");
int rc = zmq_bind (server, "tcp://127.0.0.1:*");
assert (rc == 0);
rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_CLIENT);
@ -57,7 +61,7 @@ int main (void)
size_t size = sizeof (int);
zmq_getsockopt (client, ZMQ_THREAD_SAFE, &thread_safe, &size);
assert (thread_safe == 1);
rc = zmq_connect (client, "tcp://127.0.0.1:5560");
rc = zmq_connect (client, my_endpoint);
assert (rc == 0);
void *t1 = zmq_threadstart (client_thread, client);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
@ -32,20 +32,21 @@ int main (void)
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
int rc = zmq_bind (sb, "tcp://*:5555");
int rc = zmq_bind (sb, "tcp://*:*");
assert (rc == 0);
char bindEndpoint[256];
char connectEndpoint[256];
size_t endpoint_len = sizeof (bindEndpoint);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, bindEndpoint, &endpoint_len);
assert (rc == 0);
char connectEndpoint[256];
// Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
#ifdef ZMQ_HAVE_WINDOWS
strcpy(connectEndpoint, "tcp://127.0.0.1:5555");
sprintf (connectEndpoint, "tcp://127.0.0.1:%s",
strrchr(bindEndpoint, ':') + 1);
#else
strcpy(connectEndpoint, bindEndpoint);
strcpy (connectEndpoint, bindEndpoint);
#endif
rc = zmq_connect (sc, connectEndpoint);
@ -74,21 +75,23 @@ int main (void)
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, "tcp://*:5556");
rc = zmq_bind (sb, "tcp://*:*");
assert (rc == 0);
endpoint_len = sizeof (bindEndpoint);
memset(bindEndpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, bindEndpoint, &endpoint_len);
assert (rc == 0);
assert (rc == 0);
#ifdef ZMQ_HAVE_WINDOWS
if (ipv6)
strcpy(connectEndpoint, "tcp://[::1]:5556");
else
strcpy(connectEndpoint, "tcp://127.0.0.1:5556");
sprintf (connectEndpoint, "tcp://[::1]:%s",
strrchr(bindEndpoint, ':') + 1);
else
sprintf (connectEndpoint, "tcp://127.0.0.1:%s",
strrchr(bindEndpoint, ':') + 1);
#else
strcpy(connectEndpoint, bindEndpoint);
strcpy (connectEndpoint, bindEndpoint);
#endif
rc = zmq_connect (sc, connectEndpoint);
@ -206,87 +209,6 @@ int main (void)
assert (rc == 0);
}
/* No wildcard, IPv4 address, IPv6 disabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_bind (sb, "tcp://127.0.0.1:5557");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5557");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_disconnect (sc, "tcp://127.0.0.1:5557");
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://127.0.0.1:5557");
assert (rc == 0);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
/* No wildcard, IPv4 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, "tcp://127.0.0.1:5558");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5558");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_disconnect (sc, "tcp://127.0.0.1:5558");
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://127.0.0.1:5558");
assert (rc == 0);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
if (ipv6) {
/* No wildcard, IPv6 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_bind (sb, "tcp://[::1]:5559");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://[::1]:5559");
assert (rc == 0);
bounce (sb, sc);
rc = zmq_disconnect (sc, "tcp://[::1]:5559");
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://[::1]:5559");
assert (rc == 0);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
}
rc = zmq_ctx_term (ctx);
assert (rc == 0);

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2016 Contributors as noted in the AUTHORS file
Copyright (c) 2016-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -32,7 +32,7 @@
#if !defined (ZMQ_HAVE_WINDOWS)
#include <netdb.h>
void pre_allocate_sock (void *zmq_socket, const char *address,
uint16_t pre_allocate_sock (void *zmq_socket, const char *address,
const char *port)
{
struct addrinfo *addr, hint;
@ -65,25 +65,34 @@ void pre_allocate_sock (void *zmq_socket, const char *address,
sizeof (s_pre));
assert(rc == 0);
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
rc = getsockname(s_pre, (struct sockaddr *)&sin, &len);
assert (rc != -1);
freeaddrinfo(addr);
return ntohs(sin.sin_port);
}
void test_req_rep ()
{
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
pre_allocate_sock(sb, "127.0.0.1", "5560");
uint16_t port = pre_allocate_sock(sb, "127.0.0.1", "0");
sprintf (my_endpoint, "tcp://127.0.0.1:%u", port);
int rc = zmq_bind (sb, "tcp://127.0.0.1:5560");
int rc = zmq_bind (sb, my_endpoint);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
bounce (sb, sc);
@ -100,20 +109,22 @@ void test_req_rep ()
void test_pair ()
{
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
pre_allocate_sock(sb, "127.0.0.1", "5560");
uint16_t port = pre_allocate_sock(sb, "127.0.0.1", "0");
sprintf (my_endpoint, "tcp://127.0.0.1:%u", port);
int rc = zmq_bind (sb, "tcp://127.0.0.1:5560");
int rc = zmq_bind (sb, my_endpoint);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
bounce (sb, sc);
@ -131,20 +142,22 @@ void test_pair ()
void test_client_server ()
{
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
char my_endpoint[MAX_SOCKET_STRING];
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_SERVER);
assert (sb);
pre_allocate_sock(sb, "127.0.0.1", "5560");
uint16_t port = pre_allocate_sock(sb, "127.0.0.1", "0");
sprintf (my_endpoint, "tcp://127.0.0.1:%u", port);
int rc = zmq_bind (sb, "tcp://127.0.0.1:5560");
int rc = zmq_bind (sb, my_endpoint);
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_CLIENT);
assert (sc);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
rc = zmq_connect (sc, my_endpoint);
assert (rc == 0);
zmq_msg_t msg;

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -196,14 +196,15 @@ int test_unsubscribe_manual()
}
int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend,
const char *backend)
int test_xpub_proxy_unsubscribe_on_disconnect(void)
{
assert (frontend && backend);
const char* topic = "1";
const char* payload = "X";
size_t len = MAX_SOCKET_STRING;
char my_endpoint_backend[MAX_SOCKET_STRING];
char my_endpoint_frontend[MAX_SOCKET_STRING];
int manual = 1;
void *ctx = zmq_ctx_new ();
@ -212,22 +213,29 @@ int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend,
// proxy frontend
void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub_proxy);
assert (zmq_bind (xsub_proxy, frontend) == 0);
assert (zmq_bind (xsub_proxy, "tcp://127.0.0.1:*") == 0);
int rc = zmq_getsockopt (xsub_proxy, ZMQ_LAST_ENDPOINT, my_endpoint_frontend,
&len);
assert (rc == 0);
// proxy backend
void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub_proxy);
assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
assert (zmq_bind (xpub_proxy, backend) == 0);
assert (zmq_bind (xpub_proxy, "tcp://127.0.0.1:*") == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (xpub_proxy, ZMQ_LAST_ENDPOINT, my_endpoint_backend,
&len);
assert (rc == 0);
// publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (zmq_connect (pub, frontend) == 0);
assert (zmq_connect (pub, my_endpoint_frontend) == 0);
// first subscriber subscribes
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
assert (sub1);
assert (zmq_connect (sub1, backend) == 0);
assert (zmq_connect (sub1, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0);
// wait
@ -244,7 +252,7 @@ int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend,
// second subscriber subscribes
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
assert (sub2);
assert (zmq_connect (sub2, backend) == 0);
assert (zmq_connect (sub2, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0);
// wait
@ -335,14 +343,16 @@ int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend,
return 0;
}
int test_missing_subscriptions(const char *frontend, const char *backend)
int test_missing_subscriptions(void)
{
assert (frontend && backend);
const char* topic1 = "1";
const char* topic2 = "2";
const char* payload = "X";
size_t len = MAX_SOCKET_STRING;
char my_endpoint_backend[MAX_SOCKET_STRING];
char my_endpoint_frontend[MAX_SOCKET_STRING];
int manual = 1;
void *ctx = zmq_ctx_new ();
@ -351,17 +361,24 @@ int test_missing_subscriptions(const char *frontend, const char *backend)
// proxy frontend
void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
assert (xsub_proxy);
assert (zmq_bind (xsub_proxy, frontend) == 0);
assert (zmq_bind (xsub_proxy, "tcp://127.0.0.1:*") == 0);
int rc = zmq_getsockopt (xsub_proxy, ZMQ_LAST_ENDPOINT, my_endpoint_frontend,
&len);
assert (rc == 0);
// proxy backend
void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub_proxy);
assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
assert (zmq_bind (xpub_proxy, backend) == 0);
assert (zmq_bind (xpub_proxy, "tcp://127.0.0.1:*") == 0);
len = MAX_SOCKET_STRING;
rc = zmq_getsockopt (xpub_proxy, ZMQ_LAST_ENDPOINT, my_endpoint_backend,
&len);
assert (rc == 0);
// publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (zmq_connect (pub, frontend) == 0);
assert (zmq_connect (pub, my_endpoint_frontend) == 0);
// Here's the problem: because subscribers subscribe in quick succession,
// the proxy is unable to confirm the first subscription before receiving
@ -370,13 +387,13 @@ int test_missing_subscriptions(const char *frontend, const char *backend)
// first subscriber
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
assert (sub1);
assert (zmq_connect (sub1, backend) == 0);
assert (zmq_connect (sub1, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1) == 0);
// second subscriber
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
assert (sub2);
assert (zmq_connect (sub2, backend) == 0);
assert (zmq_connect (sub2, my_endpoint_backend) == 0);
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0);
// wait
@ -456,20 +473,8 @@ int main(void)
setup_test_environment ();
test_basic ();
test_unsubscribe_manual ();
const char *frontend;
const char *backend;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
frontend = "ipc://frontend";
backend = "ipc://backend";
test_xpub_proxy_unsubscribe_on_disconnect (frontend, backend);
test_missing_subscriptions (frontend, backend);
#endif
frontend = "tcp://127.0.0.1:5560";
backend = "tcp://127.0.0.1:5561";
test_xpub_proxy_unsubscribe_on_disconnect (frontend, backend);
test_missing_subscriptions (frontend, backend);
test_xpub_proxy_unsubscribe_on_disconnect ();
test_missing_subscriptions ();
return 0;
}

View File

@ -1,5 +1,5 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
@ -42,6 +42,17 @@
// get test failures on slower systems due to binds/connects not
// settled. Tested to work reliably at 1 msec on a fast PC.
#define SETTLE_TIME 300 // In msec
// Commonly used buffer size for ZMQ_LAST_ENDPOINT
#define MAX_SOCKET_STRING sizeof("tcp://127.0.0.1:65536")
// We need to test codepaths with non-random bind ports. List them here to
// keep them unique, to allow parallel test runs.
#define ENDPOINT_0 "tcp://127.0.0.1:5555"
#define ENDPOINT_1 "tcp://127.0.0.1:5556"
#define ENDPOINT_2 "tcp://127.0.0.1:5557"
#define ENDPOINT_3 "tcp://127.0.0.1:5558"
#define ENDPOINT_4 "udp://127.0.0.1:5559"
#define ENDPOINT_5 "udp://127.0.0.1:5560"
#undef NDEBUG
#include <time.h>