mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 10:52:56 +01:00
Add tests for Request-Reply pattern sockets.
* See http://rfc.zeromq.org/spec:28/REQREP * Not all testable statements are covered. * At this point, there are several failures: - test_spec_req: The REQ socket does not correctly discard messages from peers that are not currently being talked to. - test_spec_dealer/router: On disconnect, the queues seem to not be emptied. The DEALER can still receive a message the disconnected peer sent, the ROUTER can still send to the identity of the dis- connected peer.
This commit is contained in:
parent
78e47912d2
commit
798b394087
@ -24,7 +24,11 @@ noinst_PROGRAMS = test_pair_inproc \
|
||||
test_ctx_options \
|
||||
test_security \
|
||||
test_security_curve \
|
||||
test_iov
|
||||
test_iov \
|
||||
test_spec_req \
|
||||
test_spec_rep \
|
||||
test_spec_dealer \
|
||||
test_spec_router
|
||||
|
||||
if !ON_MINGW
|
||||
noinst_PROGRAMS += test_shutdown_stress \
|
||||
@ -55,6 +59,10 @@ test_ctx_options_SOURCES = test_ctx_options.cpp
|
||||
test_iov_SOURCES = test_iov.cpp
|
||||
test_security_SOURCES = test_security.cpp
|
||||
test_security_curve_SOURCES = test_security_curve.cpp
|
||||
test_spec_req_SOURCES = test_spec_req.cpp
|
||||
test_spec_rep_SOURCES = test_spec_rep.cpp
|
||||
test_spec_dealer_SOURCES = test_spec_dealer.cpp
|
||||
test_spec_router_SOURCES = test_spec_router.cpp
|
||||
if !ON_MINGW
|
||||
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
|
||||
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
|
||||
@ -67,4 +75,4 @@ noinst_PROGRAMS += test_raw_sock
|
||||
test_raw_sock_SOURCES = test_raw_sock.cpp
|
||||
|
||||
# Run the test cases
|
||||
TESTS = $(noinst_PROGRAMS)
|
||||
TESTS = $(noinst_PROGRAMS)
|
||||
|
248
tests/test_spec_dealer.cpp
Normal file
248
tests/test_spec_dealer.cpp
Normal file
@ -0,0 +1,248 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include "testutil.hpp"
|
||||
|
||||
void test_round_robin_out (void *ctx)
|
||||
{
|
||||
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (dealer);
|
||||
|
||||
int rc = zmq_bind (dealer, "inproc://b");
|
||||
assert (rc == 0);
|
||||
|
||||
const size_t N = 5;
|
||||
void *rep[N];
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rep[i] = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (rep[i]);
|
||||
|
||||
int timeout = 100;
|
||||
rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (rep[i], "inproc://b");
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
// Send N requests
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
s_send_seq (dealer, 0, "ABC", SEQ_END);
|
||||
}
|
||||
|
||||
// Expect every REP got one message
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
s_recv_seq (rep[i], "ABC", SEQ_END);
|
||||
}
|
||||
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (dealer);
|
||||
assert (rc == 0);
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rc = zmq_close (rep[i]);
|
||||
assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
void test_fair_queue_in (void *ctx)
|
||||
{
|
||||
void *receiver = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (receiver);
|
||||
|
||||
int timeout = 100;
|
||||
int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (receiver, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
const size_t N = 5;
|
||||
void *senders[N];
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
senders[i] = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (senders[i]);
|
||||
|
||||
rc = zmq_setsockopt (senders[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (senders[i], "inproc://a");
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
s_send_seq (senders[0], "A", SEQ_END);
|
||||
s_recv_seq (receiver, "A", SEQ_END);
|
||||
|
||||
s_send_seq (senders[0], "A", SEQ_END);
|
||||
s_recv_seq (receiver, "A", SEQ_END);
|
||||
|
||||
// send N requests
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
char *str = strdup("A");
|
||||
str[0] += i;
|
||||
s_send_seq (senders[i], str, SEQ_END);
|
||||
free (str);
|
||||
}
|
||||
|
||||
// handle N requests
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
char *str = strdup("A");
|
||||
str[0] += i;
|
||||
s_recv_seq (receiver, str, SEQ_END);
|
||||
free (str);
|
||||
}
|
||||
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (receiver);
|
||||
assert (rc == 0);
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rc = zmq_close (senders[i]);
|
||||
assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
void test_destroy_queue_on_disconnect (void *ctx)
|
||||
{
|
||||
void *A = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (A);
|
||||
|
||||
int rc = zmq_bind (A, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
void *B = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (B);
|
||||
|
||||
rc = zmq_connect (B, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
// Send a message in both directions
|
||||
s_send_seq (A, "ABC", SEQ_END);
|
||||
s_send_seq (B, "DEF", SEQ_END);
|
||||
|
||||
rc = zmq_disconnect (B, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
// Disconnect may take time and need command processing.
|
||||
zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
|
||||
rc = zmq_poll (poller, 2, 100);
|
||||
assert (rc == 0);
|
||||
|
||||
// No messages should be available, sending should fail.
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
|
||||
rc = zmq_send (A, 0, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
// After a reconnect of B, the messages should still be gone
|
||||
rc = zmq_connect (B, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (A);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (B);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_block_on_send_no_peers (void *ctx)
|
||||
{
|
||||
void *sc = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (sc);
|
||||
|
||||
int timeout = 100;
|
||||
int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof(timeout));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_send (sc, 0, 0, 0);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_close (sc);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
int main ()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// SHALL route outgoing messages to available peers using a round-robin
|
||||
// strategy.
|
||||
test_round_robin_out (ctx);
|
||||
|
||||
// SHALL receive incoming messages from its peers using a fair-queuing
|
||||
// strategy.
|
||||
test_fair_queue_in (ctx);
|
||||
|
||||
// SHALL block on sending, or return a suitable error, when it has no connected peers.
|
||||
test_block_on_send_no_peers (ctx);
|
||||
|
||||
// SHALL create a double queue when a peer connects to it. If this peer
|
||||
// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
|
||||
// discard any messages it contains.
|
||||
test_destroy_queue_on_disconnect (ctx);
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0 ;
|
||||
}
|
144
tests/test_spec_rep.cpp
Normal file
144
tests/test_spec_rep.cpp
Normal file
@ -0,0 +1,144 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include "testutil.hpp"
|
||||
|
||||
void test_fair_queue_in (void *ctx)
|
||||
{
|
||||
void *rep = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (rep);
|
||||
|
||||
int timeout = 100;
|
||||
int rc = zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (rep, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
const size_t N = 5;
|
||||
void *reqs[N];
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
reqs[i] = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (reqs[i]);
|
||||
|
||||
rc = zmq_setsockopt (reqs[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (reqs[i], "inproc://a");
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
s_send_seq (reqs[0], "A", SEQ_END);
|
||||
s_recv_seq (rep, "A", SEQ_END);
|
||||
s_send_seq (rep, "A", SEQ_END);
|
||||
s_recv_seq (reqs[0], "A", SEQ_END);
|
||||
|
||||
s_send_seq (reqs[0], "A", SEQ_END);
|
||||
s_recv_seq (rep, "A", SEQ_END);
|
||||
s_send_seq (rep, "A", SEQ_END);
|
||||
s_recv_seq (reqs[0], "A", SEQ_END);
|
||||
|
||||
// send N requests
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
char * str = strdup("A");
|
||||
str[0] += i;
|
||||
s_send_seq (reqs[i], str, SEQ_END);
|
||||
free (str);
|
||||
}
|
||||
|
||||
// handle N requests
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
char * str = strdup("A");
|
||||
str[0] += i;
|
||||
s_recv_seq (rep, str, SEQ_END);
|
||||
s_send_seq (rep, str, SEQ_END);
|
||||
s_recv_seq (reqs[i], str, SEQ_END);
|
||||
free (str);
|
||||
}
|
||||
|
||||
rc = zmq_close (rep);
|
||||
assert (rc == 0);
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rc = zmq_close (reqs[i]);
|
||||
assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
void test_envelope (void *ctx)
|
||||
{
|
||||
void *rep = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (rep);
|
||||
|
||||
int rc = zmq_bind (rep, "inproc://b");
|
||||
assert (rc == 0);
|
||||
|
||||
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (dealer);
|
||||
|
||||
rc = zmq_connect (dealer, "inproc://b");
|
||||
assert (rc == 0);
|
||||
|
||||
// minimal envelope
|
||||
s_send_seq (dealer, 0, "A", SEQ_END);
|
||||
s_recv_seq (rep, "A", SEQ_END);
|
||||
s_send_seq (rep, "A", SEQ_END);
|
||||
s_recv_seq (dealer, 0, "A", SEQ_END);
|
||||
|
||||
// big envelope
|
||||
s_send_seq (dealer, "X", "Y", 0, "A", SEQ_END);
|
||||
s_recv_seq (rep, "A", SEQ_END);
|
||||
s_send_seq (rep, "A", SEQ_END);
|
||||
s_recv_seq (dealer, "X", "Y", 0, "A", SEQ_END);
|
||||
|
||||
rc = zmq_close (rep);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (dealer);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
int main ()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// SHALL receive incoming messages from its peers using a fair-queuing
|
||||
// strategy.
|
||||
test_fair_queue_in (ctx);
|
||||
|
||||
// For an incoming message:
|
||||
// SHALL remove and store the address envelope, including the delimiter.
|
||||
// SHALL pass the remaining data frames to its calling application.
|
||||
// SHALL wait for a single reply message from its calling application.
|
||||
// SHALL prepend the address envelope and delimiter.
|
||||
// SHALL deliver this message back to the originating peer.
|
||||
test_envelope (ctx);
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0 ;
|
||||
}
|
230
tests/test_spec_req.cpp
Normal file
230
tests/test_spec_req.cpp
Normal file
@ -0,0 +1,230 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include "testutil.hpp"
|
||||
|
||||
void test_round_robin_out (void *ctx)
|
||||
{
|
||||
void *req = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (req);
|
||||
|
||||
int rc = zmq_bind (req, "inproc://b");
|
||||
assert (rc == 0);
|
||||
|
||||
const size_t N = 5;
|
||||
void *rep[N];
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rep[i] = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (rep[i]);
|
||||
|
||||
int timeout = 100;
|
||||
rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (rep[i], "inproc://b");
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
// Send N request-replies, and expect every REP it used once in order
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
s_send_seq (req, "ABC", SEQ_END);
|
||||
s_recv_seq (rep[i], "ABC", SEQ_END);
|
||||
s_send_seq (rep[i], "DEF", SEQ_END);
|
||||
s_recv_seq (req, "DEF", SEQ_END);
|
||||
}
|
||||
|
||||
rc = zmq_close (req);
|
||||
assert (rc == 0);
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rc = zmq_close (rep[i]);
|
||||
assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
void test_req_only_listens_to_current_peer (void *ctx)
|
||||
{
|
||||
void *req = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (req);
|
||||
|
||||
int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (req, "inproc://c");
|
||||
assert (rc == 0);
|
||||
|
||||
const size_t N = 3;
|
||||
void *router[N];
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
router[i] = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert (router[i]);
|
||||
|
||||
int timeout = 100;
|
||||
rc = zmq_setsockopt (router[i], ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
|
||||
assert (rc == 0);
|
||||
|
||||
int enabled = 1;
|
||||
rc = zmq_setsockopt (router[i], ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (router[i], "inproc://c");
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
s_send_seq (req, "ABC", SEQ_END);
|
||||
|
||||
// Receive on router i
|
||||
s_recv_seq (router[i], "A", 0, "ABC", SEQ_END);
|
||||
|
||||
// Send back replies on all routers
|
||||
for (size_t j = 0; j < N; ++j)
|
||||
{
|
||||
const char *replies[] = { "WRONG", "GOOD" };
|
||||
const char *reply = replies[i == j ? 1 : 0];
|
||||
s_send_seq (router[j], "A", 0, reply, SEQ_END);
|
||||
}
|
||||
|
||||
// Recieve only the good relpy
|
||||
s_recv_seq (req, "GOOD", SEQ_END);
|
||||
}
|
||||
|
||||
rc = zmq_close (req);
|
||||
assert (rc == 0);
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rc = zmq_close (router[i]);
|
||||
assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
void test_req_message_format (void *ctx)
|
||||
{
|
||||
void *req = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (req);
|
||||
|
||||
void *router = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert (router);
|
||||
|
||||
int rc = zmq_bind (req, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (router, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
// Send a multi-part request.
|
||||
s_send_seq (req, "ABC", "DEF", SEQ_END);
|
||||
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
|
||||
// Receive peer identity
|
||||
rc = zmq_msg_recv (&msg, router, 0);
|
||||
assert (rc != -1);
|
||||
assert (zmq_msg_size (&msg) > 0);
|
||||
zmq_msg_t peer_id_msg;
|
||||
zmq_msg_init (&peer_id_msg);
|
||||
zmq_msg_copy (&peer_id_msg, &msg);
|
||||
|
||||
int more = 0;
|
||||
size_t more_size = sizeof(more);
|
||||
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size);
|
||||
assert (rc == 0);
|
||||
assert (more);
|
||||
|
||||
// Receive the rest.
|
||||
s_recv_seq (router, 0, "ABC", "DEF", SEQ_END);
|
||||
|
||||
// Send back a single-part reply.
|
||||
rc = zmq_msg_send (&peer_id_msg, router, ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
s_send_seq (router, 0, "GHI", SEQ_END);
|
||||
|
||||
// Receive reply.
|
||||
s_recv_seq (req, "GHI", SEQ_END);
|
||||
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_msg_close (&peer_id_msg);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (req);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (router);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_block_on_send_no_peers (void *ctx)
|
||||
{
|
||||
void *sc = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (sc);
|
||||
|
||||
int timeout = 100;
|
||||
int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof(timeout));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_send (sc, 0, 0, 0);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_close (sc);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
int main ()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// SHALL route outgoing messages to connected peers using a round-robin
|
||||
// strategy.
|
||||
test_round_robin_out (ctx);
|
||||
|
||||
// The request and reply messages SHALL have this format on the wire:
|
||||
// * A delimiter, consisting of an empty frame, added by the REQ socket.
|
||||
// * One or more data frames, comprising the message visible to the
|
||||
// application.
|
||||
test_req_message_format (ctx);
|
||||
|
||||
// SHALL block on sending, or return a suitable error, when it has no connected peers.
|
||||
test_block_on_send_no_peers (ctx);
|
||||
|
||||
// SHALL accept an incoming message only from the last peer that it sent a
|
||||
// request to.
|
||||
// SHALL discard silently any messages received from other peers.
|
||||
test_req_only_listens_to_current_peer (ctx);
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0 ;
|
||||
}
|
180
tests/test_spec_router.cpp
Normal file
180
tests/test_spec_router.cpp
Normal file
@ -0,0 +1,180 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include "testutil.hpp"
|
||||
|
||||
void test_fair_queue_in (void *ctx)
|
||||
{
|
||||
void *receiver = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert (receiver);
|
||||
|
||||
int timeout = 100;
|
||||
int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (receiver, "inproc://a");
|
||||
assert (rc == 0);
|
||||
|
||||
const size_t N = 5;
|
||||
void *senders[N];
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
senders[i] = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (senders[i]);
|
||||
|
||||
rc = zmq_setsockopt (senders[i], ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
char *str = strdup("A");
|
||||
str[0] += i;
|
||||
rc = zmq_setsockopt (senders[i], ZMQ_IDENTITY, str, 2);
|
||||
assert (rc == 0);
|
||||
free (str);
|
||||
|
||||
rc = zmq_connect (senders[i], "inproc://a");
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
zmq_msg_t msg;
|
||||
rc = zmq_msg_init (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
s_send_seq (senders[0], "M", SEQ_END);
|
||||
s_recv_seq (receiver, "A", "M", SEQ_END);
|
||||
|
||||
s_send_seq (senders[0], "M", SEQ_END);
|
||||
s_recv_seq (receiver, "A", "M", SEQ_END);
|
||||
|
||||
// send N requests
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
s_send_seq (senders[i], "M", SEQ_END);
|
||||
}
|
||||
|
||||
// handle N requests
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
char *str = strdup("A");
|
||||
str[0] += i;
|
||||
s_recv_seq (receiver, str, "M", SEQ_END);
|
||||
free (str);
|
||||
}
|
||||
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (receiver);
|
||||
assert (rc == 0);
|
||||
|
||||
for (size_t i = 0; i < N; ++i)
|
||||
{
|
||||
rc = zmq_close (senders[i]);
|
||||
assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
void test_destroy_queue_on_disconnect (void *ctx)
|
||||
{
|
||||
void *A = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert (A);
|
||||
|
||||
int enabled = 1;
|
||||
int rc = zmq_setsockopt (A, ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (A, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
void *B = zmq_socket (ctx, ZMQ_DEALER);
|
||||
assert (B);
|
||||
|
||||
rc = zmq_setsockopt (B, ZMQ_IDENTITY, "B", 2);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (B, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
// Send a message in both directions
|
||||
s_send_seq (A, "B", "ABC", SEQ_END);
|
||||
s_send_seq (B, "DEF", SEQ_END);
|
||||
|
||||
rc = zmq_disconnect (B, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
// Disconnect may take time and need command processing.
|
||||
zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } };
|
||||
rc = zmq_poll (poller, 2, 100);
|
||||
assert (rc == 0);
|
||||
|
||||
// No messages should be available, sending should fail.
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
|
||||
rc = zmq_send (A, "B", 2, ZMQ_SNDMORE | ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EHOSTUNREACH);
|
||||
|
||||
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
// After a reconnect of B, the messages should still be gone
|
||||
rc = zmq_connect (B, "inproc://d");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_msg_recv (&msg, B, ZMQ_DONTWAIT);
|
||||
assert (rc == -1);
|
||||
assert (errno == EAGAIN);
|
||||
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (A);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (B);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
|
||||
int main ()
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
// SHALL receive incoming messages from its peers using a fair-queuing
|
||||
// strategy.
|
||||
test_fair_queue_in (ctx);
|
||||
|
||||
// SHALL create a double queue when a peer connects to it. If this peer
|
||||
// disconnects, the ROUTER socket SHALL destroy its double queue and SHALL
|
||||
// discard any messages it contains.
|
||||
test_destroy_queue_on_disconnect (ctx);
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0 ;
|
||||
}
|
@ -24,11 +24,12 @@
|
||||
#include <string.h>
|
||||
#undef NDEBUG
|
||||
#include <assert.h>
|
||||
#include <stdarg.h>
|
||||
|
||||
// Bounce a message from client to server and back
|
||||
// For REQ/REP or DEALER/DEALER pairs only
|
||||
|
||||
static void
|
||||
void
|
||||
bounce (void *server, void *client)
|
||||
{
|
||||
const char *content = "12345678ABCDEFGH12345678abcdefgh";
|
||||
@ -108,4 +109,81 @@ s_sendmore (void *socket, const char *string) {
|
||||
#define streq(s1,s2) (!strcmp ((s1), (s2)))
|
||||
#define strneq(s1,s2) (strcmp ((s1), (s2)))
|
||||
|
||||
|
||||
const char * SEQ_END = (const char *)1;
|
||||
|
||||
// Sends a message composed of frames that are C strings or null frames.
|
||||
// The list must be terminated by SEQ_END.
|
||||
// Example: s_send_seq (req, "ABC", 0, "DEF", SEQ_END);
|
||||
void s_send_seq (void *socket, ...)
|
||||
{
|
||||
va_list ap;
|
||||
va_start (ap, socket);
|
||||
const char * data = va_arg (ap, const char *);
|
||||
while (true)
|
||||
{
|
||||
const char * prev = data;
|
||||
data = va_arg (ap, const char *);
|
||||
bool end = data == SEQ_END;
|
||||
|
||||
if (!prev)
|
||||
{
|
||||
int rc = zmq_send (socket, 0, 0, end ? 0 : ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
}
|
||||
else
|
||||
{
|
||||
int rc = zmq_send (socket, prev, strlen (prev)+1, end ? 0 : ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
}
|
||||
if (end)
|
||||
break;
|
||||
}
|
||||
va_end (ap);
|
||||
}
|
||||
|
||||
// Receives message a number of frames long and checks that the frames have
|
||||
// the given data which can be either C strings or 0 for a null frame.
|
||||
// The list must be terminated by SEQ_END.
|
||||
// Example: s_recv_seq (rep, "ABC", 0, "DEF", SEQ_END);
|
||||
void s_recv_seq (void *socket, ...)
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
|
||||
int more;
|
||||
size_t more_size = sizeof(more);
|
||||
|
||||
va_list ap;
|
||||
va_start (ap, socket);
|
||||
const char * data = va_arg (ap, const char *);
|
||||
while (true)
|
||||
{
|
||||
int rc = zmq_msg_recv (&msg, socket, 0);
|
||||
assert (rc != -1);
|
||||
|
||||
if (!data)
|
||||
{
|
||||
assert (zmq_msg_size (&msg) == 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
assert (strcmp (data, (const char *)zmq_msg_data (&msg)) == 0);
|
||||
}
|
||||
|
||||
data = va_arg (ap, const char *);
|
||||
bool end = data == SEQ_END;
|
||||
|
||||
rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
|
||||
assert (rc == 0);
|
||||
|
||||
assert (!more == end);
|
||||
if (end)
|
||||
break;
|
||||
}
|
||||
va_end (ap);
|
||||
|
||||
zmq_msg_close (&msg);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user