diff --git a/CMakeLists.txt b/CMakeLists.txt index f1d94fbb..69421b52 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,9 +10,9 @@ if(APPLE) endif() if(WIN32) -option(WITH_TWEETNACL "Build with tweetnacl" OFF) + option(WITH_TWEETNACL "Build with tweetnacl" OFF) else() -option(WITH_TWEETNACL "Build with tweetnacl" ON) + option(WITH_TWEETNACL "Build with tweetnacl" ON) endif() if(WITH_TWEETNACL) @@ -23,7 +23,7 @@ if(WITH_TWEETNACL) ) set(TWEETNACL_SOURCES - tweetnacl/src/tweetnacl.c + tweetnacl/src/tweetnacl.c ) if(WIN32) else() @@ -157,6 +157,7 @@ check_function_exists(gethrtime HAVE_GETHRTIME) set(CMAKE_REQUIRED_INCLUDES ) add_definitions(-D_REENTRANT -D_THREAD_SAFE) +add_definitions(-D_USING_CMAKE) option(ENABLE_EVENTFD "Enable/disable eventfd" ZMQ_HAVE_EVENTFD) diff --git a/doc/Makefile.am b/doc/Makefile.am index 0f1c092b..7a8089eb 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -6,6 +6,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \ zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \ zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \ zmq_msg_send.3 zmq_msg_recv.3 \ + zmq_msg_routing_id.3 zmq_msg_set_routing_id.3 \ zmq_send.3 zmq_recv.3 zmq_send_const.3 \ zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \ zmq_getsockopt.3 zmq_setsockopt.3 \ diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 6a569856..4795c88b 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -688,6 +688,17 @@ Default value:: 0 (leave to OS default) Applicable socket types:: all, when using TCP transports. +ZMQ_THREADSAFE: Retrieve socket thread safety +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_THREADSAFE' option shall retrieve a boolean value indicating whether +or not the socket is threadsafe. Currently only 'ZMQ_CLIENT' sockets are +threadsafe. + +[horizontal] +Option value type:: boolean +Applicable socket types:: all + + ZMQ_TOS: Retrieve the Type-of-Service socket override status ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Retrieve the IP_TOS option for the socket. diff --git a/doc/zmq_msg_routing_id.txt b/doc/zmq_msg_routing_id.txt new file mode 100644 index 00000000..47f05a29 --- /dev/null +++ b/doc/zmq_msg_routing_id.txt @@ -0,0 +1,61 @@ +zmq_msg_routing_id(3) +===================== + + +NAME +---- +zmq_msg_routing_id - return routing ID for message, if any + + +SYNOPSIS +-------- +*uint32_t zmq_msg_routing_id (zmq_msg_t '*message');* + + +DESCRIPTION +----------- +The _zmq_msg_routing_id()_ function returns the routing ID for the message, +if any. The routing ID is set on all messages received from a 'ZMQ_SERVER' +socket. To send a message to a 'ZMQ_SERVER' socket you must set the routing +ID of a connected 'ZMQ_CLIENT' peer. Routing IDs are transient. + + +RETURN VALUE +------------ +The _zmq_msg_routing_id()_ function shall return zero if there is no routing +ID, otherwise it shall return an unsigned 32-bit integer greater than zero. + + +EXAMPLE +------- +.Receiving a client message and routing ID +---- +void *ctx = zmq_ctx_new (); +assert (ctx); + +void *server = zmq_socket (ctx, ZMQ_SERVER); +assert (server); +int rc = zmq_bind (server, "tcp://127.0.0.1:8080"); +assert (rc == 0); + +zmq_msg_t message; +rc = zmq_msg_init (&message); +assert (rc == 0); + +// Receive a message from socket +rc = zmq_msg_recv (server, &message, 0); +assert (rc != -1); +uint32_t routing_id = zmq_msg_routing_id (&message); +assert (routing_id); +---- + + +SEE ALSO +-------- +linkzmq:zmq_msg_set_routing_id[3] + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/doc/zmq_msg_set_routing_id.txt b/doc/zmq_msg_set_routing_id.txt new file mode 100644 index 00000000..b0f2c3a7 --- /dev/null +++ b/doc/zmq_msg_set_routing_id.txt @@ -0,0 +1,46 @@ +zmq_msg_set_routing_id(3) +========================= + + +NAME +---- + +zmq_msg_set_routing_id - set routing ID property on message + + +SYNOPSIS +-------- +*int zmq_msg_set_routing_id (zmq_msg_t '*message', uint32_t 'routing_id');* + + +DESCRIPTION +----------- +The _zmq_msg_set_routing_id()_ function sets the 'routing_id' specified, on the +the message pointed to by the 'message' argument. The 'routing_id' must be +greater than zero. To get a valid routing ID, you must receive a message +from a 'ZMQ_SERVER' socket, and use the libzmq:zmq_msg_routing_id method. +Routing IDs are transient. + + +RETURN VALUE +------------ +The _zmq_msg_set_routing_id()_ function shall return zero if successful. Otherwise it +shall return `-1` and set 'errno' to one of the values defined below. + + +ERRORS +------ +*EINVAL*:: +The provided 'routing_id' is zero. + + +SEE ALSO +-------- +linkzmq:zmq_msg_routing_id[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index dbbadaf8..8c7fa8da 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -48,7 +48,7 @@ _zmq_bind()_, thus allowing many-to-many relationships. .Thread safety 0MQ 'sockets' are _not_ thread safe. Applications MUST NOT use a socket -from multiple threads except after migrating a socket from one thread to +from multiple threads except after migrating a socket from one thread to another with a "full fence" memory barrier. .Socket types @@ -56,73 +56,45 @@ The following sections present the socket types defined by 0MQ, grouped by the general _messaging pattern_ which is built from related socket types. -Request-reply pattern +Client-server pattern ~~~~~~~~~~~~~~~~~~~~~ -The request-reply pattern is used for sending requests from a ZMQ_REQ _client_ -to one or more ZMQ_REP _services_, and receiving subsequent replies to each -request sent. -The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28. +The client-server pattern is used to allow a single 'ZMQ_SERVER' _server_ talk +to one or more 'ZMQ_CLIENT' _clients_. The client always starts the conversation, +after which either peer can send messages asynchronously, to the other. -ZMQ_REQ -^^^^^^^ -A socket of type 'ZMQ_REQ' is used by a _client_ to send requests to and -receive replies from a _service_. This socket type allows only an alternating -sequence of _zmq_send(request)_ and subsequent _zmq_recv(reply)_ calls. Each -request sent is round-robined among all _services_, and each reply received is -matched with the last issued request. +The client-server pattern is formally defined by http://rfc.zeromq.org/spec:41. -If no services are available, then any send operation on the socket shall -block until at least one _service_ becomes available. The REQ socket shall -not discard messages. +Note: this pattern deprecates the use of 'ZMQ_DEALER' and 'ZMQ_ROUTER' to build +client-server architectures. -[horizontal] -.Summary of ZMQ_REQ characteristics -Compatible peer sockets:: 'ZMQ_REP', 'ZMQ_ROUTER' -Direction:: Bidirectional -Send/receive pattern:: Send, Receive, Send, Receive, ... -Outgoing routing strategy:: Round-robin -Incoming routing strategy:: Last peer -Action in mute state:: Block - - -ZMQ_REP -^^^^^^^ -A socket of type 'ZMQ_REP' is used by a _service_ to receive requests from and -send replies to a _client_. This socket type allows only an alternating -sequence of _zmq_recv(request)_ and subsequent _zmq_send(reply)_ calls. Each -request received is fair-queued from among all _clients_, and each reply sent -is routed to the _client_ that issued the last request. If the original -requester does not exist any more the reply is silently discarded. - -[horizontal] -.Summary of ZMQ_REP characteristics -Compatible peer sockets:: 'ZMQ_REQ', 'ZMQ_DEALER' -Direction:: Bidirectional -Send/receive pattern:: Receive, Send, Receive, Send, ... -Incoming routing strategy:: Fair-queued -Outgoing routing strategy:: Last peer - - -ZMQ_DEALER +ZMQ_CLIENT ^^^^^^^^^^ -A socket of type 'ZMQ_DEALER' is an advanced pattern used for extending -request/reply sockets. Each message sent is round-robined among all connected -peers, and each message received is fair-queued from all connected peers. +A 'ZMQ_CLIENT' socket talks to a 'ZMQ_SERVER' socket. Either peer can connect, +though the usual and recommended model is to bind the 'ZMQ_SERVER' and connect +the 'ZMQ_CLIENT'. -When a 'ZMQ_DEALER' socket enters the 'mute' state due to having reached the -high water mark for all peers, or if there are no peers at all, then any -linkzmq:zmq_send[3] operations on the socket shall block until the mute -state ends or at least one peer becomes available for sending; messages are not -discarded. +If the 'ZMQ_CLIENT' socket has established a connection, linkzmq:zmq_send[3] +will accept messages, queue them, and send them as rapidly as the network +allows. The outgoing buffer limit is defined by the high water mark for the +socket. If the outgoing buffer is full, or if there is no connected peer, +linkzmq:zmq_send[3] will block, by default. The 'ZMQ_CLIENT' socket will not +drop messages. -When a 'ZMQ_DEALER' socket is connected to a 'ZMQ_REP' socket each message sent -must consist of an empty message part, the _delimiter_, followed by one or more -_body parts_. +When a 'ZMQ_CLIENT' socket is connected to multiple 'ZMQ_SERVER' sockets, +outgoing messages are distributed between connected peers on a round-robin +basis. Likewise, the 'ZMQ_CLIENT' socket receives messages fairly from each +connected peer. This usage is sensible only for stateless protocols. + +'ZMQ_CLIENT' sockets are threadsafe and can be used from multiple threads +at the same time. Note that replies from a 'ZMQ_SERVER' socket will go to +the first client thread that calls libzmq:zmq_msg_recv. If you need to get +replies back to the originating thread, use one 'ZMQ_CLIENT' socket per +thread. [horizontal] -.Summary of ZMQ_DEALER characteristics -Compatible peer sockets:: 'ZMQ_ROUTER', 'ZMQ_REP', 'ZMQ_DEALER' +.Summary of ZMQ_CLIENT characteristics +Compatible peer sockets:: 'ZMQ_SERVER' Direction:: Bidirectional Send/receive pattern:: Unrestricted Outgoing routing strategy:: Round-robin @@ -130,38 +102,30 @@ Incoming routing strategy:: Fair-queued Action in mute state:: Block -ZMQ_ROUTER +ZMQ_SERVER ^^^^^^^^^^ -A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending -request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall -prepend a message part containing the _identity_ of the originating peer to the -message before passing it to the application. Messages received are fair-queued -from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall -remove the first part of the message and use it to determine the _identity_ of -the peer the message shall be routed to. If the peer does not exist anymore -the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY' -socket option is set to '1'. +A 'ZMQ_SERVER' socket talks to a set of 'ZMQ_CLIENT' sockets. A 'ZMQ_SERVER' +socket can only reply to an incoming message: the 'ZMQ_CLIENT' peer must +always initiate a conversation. -When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the -high water mark for all peers, then any messages sent to the socket shall be dropped -until the mute state ends. Likewise, any messages routed to a peer for which -the individual high water mark has been reached shall also be dropped. +Each received message has a 'routing_id' that is a 32-bit unsigned integer. +The application can fetch this with linkzmq:zmq_msg_routing_id[3]. To send +a message to a given 'ZMQ_CLIENT' peer the application must set the peer's +'routing_id' on the message, using linkzmq:zmq_msg_set_routing_id[3]. -When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the -_identity_ of the originating peer each message received shall contain an empty -_delimiter_ message part. Hence, the entire structure of each received message -as seen by the application becomes: one or more _identity_ parts, _delimiter_ -part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the -application must include the _delimiter_ part. +If the 'routing_id' is not specified, or does not refer to a connected client +peer, the send call will fail with EHOSTUNREACH. If the outgoing buffer for +the client peer is full, the send call will fail with EAGAIN. The 'ZMQ_SERVER' +socket shall not drop messages, nor shall it block. [horizontal] -.Summary of ZMQ_ROUTER characteristics -Compatible peer sockets:: 'ZMQ_DEALER', 'ZMQ_REQ', 'ZMQ_ROUTER' +.Summary of ZMQ_SERVER characteristics +Compatible peer sockets:: 'ZMQ_CLIENT' Direction:: Bidirectional Send/receive pattern:: Unrestricted Outgoing routing strategy:: See text Incoming routing strategy:: Fair-queued -Action in mute state:: Drop +Action in mute state:: Return EAGAIN Publish-subscribe pattern @@ -328,26 +292,26 @@ Action in mute state:: Block Native Pattern ~~~~~~~~~~~~~~ -The native pattern is used for communicating with TCP peers and allows +The native pattern is used for communicating with TCP peers and allows asynchronous requests and replies in either direction. ZMQ_STREAM ^^^^^^^^^^ -A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a -non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can +A socket of type 'ZMQ_STREAM' is used to send and receive TCP data from a +non-0MQ peer, when using the tcp:// transport. A 'ZMQ_STREAM' socket can act as client and/or server, sending and/or receiving TCP data asynchronously. When receiving TCP data, a 'ZMQ_STREAM' socket shall prepend a message part -containing the _identity_ of the originating peer to the message before passing -it to the application. Messages received are fair-queued from among all -connected peers. +containing the _identity_ of the originating peer to the message before passing +it to the application. Messages received are fair-queued from among all +connected peers. -When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the -message and use it to determine the _identity_ of the peer the message shall be +When sending TCP data, a 'ZMQ_STREAM' socket shall remove the first part of the +message and use it to determine the _identity_ of the peer the message shall be routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error. -To open a connection to a server, use the zmq_connect call, and then fetch the +To open a connection to a server, use the zmq_connect call, and then fetch the socket identity using the ZMQ_IDENTITY zmq_getsockopt call. To close a specific connection, send the identity frame followed by a @@ -373,6 +337,116 @@ Incoming routing strategy:: Fair-queued Action in mute state:: EAGAIN +Request-reply pattern +~~~~~~~~~~~~~~~~~~~~~ +The request-reply pattern is used for sending requests from a ZMQ_REQ _client_ +to one or more ZMQ_REP _services_, and receiving subsequent replies to each +request sent. + +The request-reply pattern is formally defined by http://rfc.zeromq.org/spec:28. + +Note: this pattern will be deprecated in favor of the client-server pattern. + +ZMQ_REQ +^^^^^^^ +A socket of type 'ZMQ_REQ' is used by a _client_ to send requests to and +receive replies from a _service_. This socket type allows only an alternating +sequence of _zmq_send(request)_ and subsequent _zmq_recv(reply)_ calls. Each +request sent is round-robined among all _services_, and each reply received is +matched with the last issued request. + +If no services are available, then any send operation on the socket shall +block until at least one _service_ becomes available. The REQ socket shall +not discard messages. + +[horizontal] +.Summary of ZMQ_REQ characteristics +Compatible peer sockets:: 'ZMQ_REP', 'ZMQ_ROUTER' +Direction:: Bidirectional +Send/receive pattern:: Send, Receive, Send, Receive, ... +Outgoing routing strategy:: Round-robin +Incoming routing strategy:: Last peer +Action in mute state:: Block + + +ZMQ_REP +^^^^^^^ +A socket of type 'ZMQ_REP' is used by a _service_ to receive requests from and +send replies to a _client_. This socket type allows only an alternating +sequence of _zmq_recv(request)_ and subsequent _zmq_send(reply)_ calls. Each +request received is fair-queued from among all _clients_, and each reply sent +is routed to the _client_ that issued the last request. If the original +requester does not exist any more the reply is silently discarded. + +[horizontal] +.Summary of ZMQ_REP characteristics +Compatible peer sockets:: 'ZMQ_REQ', 'ZMQ_DEALER' +Direction:: Bidirectional +Send/receive pattern:: Receive, Send, Receive, Send, ... +Incoming routing strategy:: Fair-queued +Outgoing routing strategy:: Last peer + + +ZMQ_DEALER +^^^^^^^^^^ +A socket of type 'ZMQ_DEALER' is an advanced pattern used for extending +request/reply sockets. Each message sent is round-robined among all connected +peers, and each message received is fair-queued from all connected peers. + +When a 'ZMQ_DEALER' socket enters the 'mute' state due to having reached the +high water mark for all peers, or if there are no peers at all, then any +linkzmq:zmq_send[3] operations on the socket shall block until the mute +state ends or at least one peer becomes available for sending; messages are not +discarded. + +When a 'ZMQ_DEALER' socket is connected to a 'ZMQ_REP' socket each message sent +must consist of an empty message part, the _delimiter_, followed by one or more +_body parts_. + +[horizontal] +.Summary of ZMQ_DEALER characteristics +Compatible peer sockets:: 'ZMQ_ROUTER', 'ZMQ_REP', 'ZMQ_DEALER' +Direction:: Bidirectional +Send/receive pattern:: Unrestricted +Outgoing routing strategy:: Round-robin +Incoming routing strategy:: Fair-queued +Action in mute state:: Block + + +ZMQ_ROUTER +^^^^^^^^^^ +A socket of type 'ZMQ_ROUTER' is an advanced socket type used for extending +request/reply sockets. When receiving messages a 'ZMQ_ROUTER' socket shall +prepend a message part containing the _identity_ of the originating peer to the +message before passing it to the application. Messages received are fair-queued +from among all connected peers. When sending messages a 'ZMQ_ROUTER' socket shall +remove the first part of the message and use it to determine the _identity_ of +the peer the message shall be routed to. If the peer does not exist anymore +the message shall be silently discarded by default, unless 'ZMQ_ROUTER_MANDATORY' +socket option is set to '1'. + +When a 'ZMQ_ROUTER' socket enters the 'mute' state due to having reached the +high water mark for all peers, then any messages sent to the socket shall be dropped +until the mute state ends. Likewise, any messages routed to a peer for which +the individual high water mark has been reached shall also be dropped. + +When a 'ZMQ_REQ' socket is connected to a 'ZMQ_ROUTER' socket, in addition to the +_identity_ of the originating peer each message received shall contain an empty +_delimiter_ message part. Hence, the entire structure of each received message +as seen by the application becomes: one or more _identity_ parts, _delimiter_ +part, one or more _body parts_. When sending replies to a 'ZMQ_REQ' socket the +application must include the _delimiter_ part. + +[horizontal] +.Summary of ZMQ_ROUTER characteristics +Compatible peer sockets:: 'ZMQ_DEALER', 'ZMQ_REQ', 'ZMQ_ROUTER' +Direction:: Bidirectional +Send/receive pattern:: Unrestricted +Outgoing routing strategy:: See text +Incoming routing strategy:: Fair-queued +Action in mute state:: Drop + + RETURN VALUE ------------ The _zmq_socket()_ function shall return an opaque handle to the newly created diff --git a/include/zmq.h b/include/zmq.h index f797b4c6..62479163 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,8 +227,8 @@ ZMQ_EXPORT int zmq_msg_more (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property); ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); -ZMQ_EXPORT int zmq_msg_set_routing_id(zmq_msg_t *msg, uint32_t routing_id); -ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); +ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); +ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); /******************************************************************************/ diff --git a/src/client.cpp b/src/client.cpp index 7ed01f88..fccbfd9b 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -44,9 +44,9 @@ zmq::client_t::~client_t () void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); - zmq_assert (pipe_); + zmq_assert (pipe_); fq.attach (pipe_); lb.attach (pipe_); @@ -54,28 +54,26 @@ void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) int zmq::client_t::xsend (msg_t *msg_) { - zmq_assert(!(msg_->flags () & msg_t::more)); - return lb.sendpipe (msg_, NULL); } int zmq::client_t::xrecv (msg_t *msg_) -{ +{ int rc = fq.recvpipe (msg_, NULL); // Drop any messages with more flag while (rc == 0 && msg_->flags () & msg_t::more) { // drop all frames of the current multi-frame message - rc = fq.recvpipe (msg_, NULL); - + rc = fq.recvpipe (msg_, NULL); + while (rc == 0 && msg_->flags () & msg_t::more) - rc = fq.recvpipe (msg_, NULL); + rc = fq.recvpipe (msg_, NULL); // get the new message - if (rc == 0) + if (rc == 0) rc = fq.recvpipe (msg_, NULL); - } + } return rc; } diff --git a/src/dealer.cpp b/src/dealer.cpp index 87c2721f..0086508f 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -45,7 +45,7 @@ zmq::dealer_t::~dealer_t () void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); zmq_assert (pipe_); diff --git a/src/mechanism.cpp b/src/mechanism.cpp index a3eb9d4f..8dd7f104 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -74,7 +74,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const { static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", "DEALER", "ROUTER", "PULL", "PUSH", - "XPUB", "XSUB", "STREAM", "SERVER", "CLIENT"}; + "XPUB", "XSUB", "STREAM", + "SERVER", "CLIENT"}; zmq_assert (socket_type >= 0 && socket_type <= 13); return names [socket_type]; } @@ -190,7 +191,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const case ZMQ_SERVER: return type_ == "CLIENT"; case ZMQ_CLIENT: - return type_ == "CLIENT" || type_ == "SERVER"; + return type_ == "SERVER"; default: break; } diff --git a/src/msg.cpp b/src/msg.cpp index 47fe7a25..e3a1dccb 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -497,18 +497,22 @@ bool zmq::msg_t::rm_refs (int refs_) return true; } -uint32_t zmq::msg_t::get_routing_id() +uint32_t zmq::msg_t::get_routing_id () { return u.base.routing_id; } -int zmq::msg_t::set_routing_id(uint32_t routing_id_) +int zmq::msg_t::set_routing_id (uint32_t routing_id_) { - u.base.routing_id = routing_id_; - return 0; + if (routing_id_) { + u.base.routing_id = routing_id_; + return 0; + } + errno = EINVAL; + return -1; } -zmq::atomic_counter_t* zmq::msg_t::refcnt() +zmq::atomic_counter_t *zmq::msg_t::refcnt() { switch(u.base.type) { diff --git a/src/msg.hpp b/src/msg.hpp index 1090038f..c60bc955 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -97,8 +97,8 @@ namespace zmq bool is_vsm () const; bool is_cmsg () const; bool is_zcmsg() const; - uint32_t get_routing_id(); - int set_routing_id(uint32_t routing_id_); + uint32_t get_routing_id (); + int set_routing_id (uint32_t routing_id_); // After calling this function you can copy the message in POD-style // refs_ times. No need to call copy. diff --git a/src/pair.cpp b/src/pair.cpp index 18e5cb66..3e540142 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -48,7 +48,7 @@ zmq::pair_t::~pair_t () void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); zmq_assert (pipe_ != NULL); diff --git a/src/pipe.hpp b/src/pipe.hpp index b795f641..ae85b32c 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -85,8 +85,8 @@ namespace zmq void set_event_sink (i_pipe_events *sink_); // Pipe endpoint can store an routing ID to be used by its clients. - void set_routing_id(uint32_t routing_id_); - uint32_t get_routing_id(); + void set_routing_id (uint32_t routing_id_); + uint32_t get_routing_id (); // Pipe endpoint can store an opaque ID to be used by its clients. void set_identity (const blob_t &identity_); diff --git a/src/pull.cpp b/src/pull.cpp index a958b4c9..fd4ea3ea 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -45,7 +45,7 @@ zmq::pull_t::~pull_t () void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/push.cpp b/src/push.cpp index 31b1b0f5..90e7d441 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -45,7 +45,7 @@ zmq::push_t::~push_t () void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); // Don't delay pipe termination as there is no one // to receive the delimiter. diff --git a/src/router.cpp b/src/router.cpp index f2aeca52..551b2774 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -45,7 +45,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_rid (generate_random ()), mandatory (false), // raw_socket functionality in ROUTER is deprecated - raw_socket (false), + raw_socket (false), probe_router (false), handover (false) { @@ -67,7 +67,7 @@ zmq::router_t::~router_t () void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); zmq_assert (pipe_); @@ -104,6 +104,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, return 0; } break; + case ZMQ_ROUTER_RAW: if (is_int && value >= 0) { raw_socket = (value != 0); @@ -128,8 +129,8 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, return 0; } break; - - case ZMQ_ROUTER_HANDOVER: + + case ZMQ_ROUTER_HANDOVER: if (is_int && value >= 0) { handover = (value != 0); return 0; @@ -409,10 +410,10 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) connect_rid.length()); connect_rid.clear (); outpipes_t::iterator it = outpipes.find (identity); - if (it != outpipes.end ()) + if (it != outpipes.end ()) zmq_assert(false); // Not allowed to duplicate an existing rid } - else + else if (options.raw_socket) { // Always assign identity for raw-socket unsigned char buf [5]; buf [0] = 0; @@ -420,7 +421,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) identity = blob_t (buf, sizeof buf); } else - if (!options.raw_socket) { + if (!options.raw_socket) { // Pick up handshake cases and also case where next identity is set msg.init (); ok = pipe_->read (&msg); @@ -446,7 +447,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) return false; else { // We will allow the new connection to take over this - // identity. Temporarily assign a new identity to the + // identity. Temporarily assign a new identity to the // existing pipe so we can terminate it asynchronously. unsigned char buf [5]; buf [0] = 0; @@ -454,13 +455,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) blob_t new_identity = blob_t (buf, sizeof buf); it->second.pipe->set_identity (new_identity); - outpipe_t existing_outpipe = + outpipe_t existing_outpipe = {it->second.pipe, it->second.active}; - + ok = outpipes.insert (outpipes_t::value_type ( new_identity, existing_outpipe)).second; zmq_assert (ok); - + // Remove the existing identity entry to allow the new // connection to take the identity. outpipes.erase (it); diff --git a/src/server.cpp b/src/server.cpp index ed37991b..e3a60faf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -39,28 +39,31 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_, true), next_rid (generate_random ()) { - options.type = ZMQ_SERVER; + options.type = ZMQ_SERVER; } zmq::server_t::~server_t () -{ +{ zmq_assert (outpipes.empty ()); } void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); - zmq_assert (pipe_); + zmq_assert (pipe_); uint32_t routing_id = next_rid++; + if (!routing_id) + routing_id = next_rid++; // Never use RID zero + pipe_->set_routing_id (routing_id); // Add the record into output pipes lookup table outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second; zmq_assert (ok); - - fq.attach (pipe_); + + fq.attach (pipe_); } void zmq::server_t::xpipe_terminated (pipe_t *pipe_) @@ -68,12 +71,12 @@ void zmq::server_t::xpipe_terminated (pipe_t *pipe_) outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ()); zmq_assert (it != outpipes.end ()); outpipes.erase (it); - fq.pipe_terminated (pipe_); + fq.pipe_terminated (pipe_); } void zmq::server_t::xread_activated (pipe_t *pipe_) -{ - fq.activated (pipe_); +{ + fq.activated (pipe_); } void zmq::server_t::xwrite_activated (pipe_t *pipe_) @@ -90,20 +93,18 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_) int zmq::server_t::xsend (msg_t *msg_) { - zmq_assert(!(msg_->flags () & msg_t::more)); - - // Find the pipe associated with the routing stored in the message. - uint32_t routing_id = msg_->get_routing_id(); + // Find the pipe associated with the routing stored in the message. + uint32_t routing_id = msg_->get_routing_id (); outpipes_t::iterator it = outpipes.find (routing_id); - - if (it != outpipes.end ()) { + + if (it != outpipes.end ()) { if (!it->second.pipe->check_write ()) { - it->second.active = false; + it->second.active = false; errno = EAGAIN; - return -1; + return -1; } } - else { + else { errno = EHOSTUNREACH; return -1; } @@ -113,10 +114,11 @@ int zmq::server_t::xsend (msg_t *msg_) // Message failed to send - we must close it ourselves. int rc = msg_->close (); errno_assert (rc == 0); - } else { - it->second.pipe->flush (); } - + else + it->second.pipe->flush (); + + // Detach the message from the data buffer. int rc = msg_->init (); errno_assert (rc == 0); @@ -125,7 +127,7 @@ int zmq::server_t::xsend (msg_t *msg_) } int zmq::server_t::xrecv (msg_t *msg_) -{ +{ pipe_t *pipe = NULL; int rc = fq.recvpipe (msg_, &pipe); @@ -134,22 +136,22 @@ int zmq::server_t::xrecv (msg_t *msg_) // drop all frames of the current multi-frame message rc = fq.recvpipe (msg_, NULL); - + while (rc == 0 && msg_->flags () & msg_t::more) - rc = fq.recvpipe (msg_, NULL); + rc = fq.recvpipe (msg_, NULL); // get the new message - if (rc == 0) + if (rc == 0) rc = fq.recvpipe (msg_, &pipe); - } + } if (rc != 0) - return rc; + return rc; zmq_assert (pipe != NULL); - uint32_t routing_id = pipe->get_routing_id(); - msg_->set_routing_id(routing_id); + uint32_t routing_id = pipe->get_routing_id (); + msg_->set_routing_id (routing_id); return 0; } diff --git a/src/tcp.cpp b/src/tcp.cpp index 80e1721f..1260f186 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -95,13 +95,13 @@ void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_) { // These options are used only under certain #ifdefs below. - LIBZMQ_UNUSED(keepalive_); - LIBZMQ_UNUSED(keepalive_cnt_); - LIBZMQ_UNUSED(keepalive_idle_); - LIBZMQ_UNUSED(keepalive_intvl_); + LIBZMQ_UNUSED (keepalive_); + LIBZMQ_UNUSED (keepalive_cnt_); + LIBZMQ_UNUSED (keepalive_idle_); + LIBZMQ_UNUSED (keepalive_intvl_); // If none of the #ifdefs apply, then s_ is unused. - LIBZMQ_UNUSED(s_); + LIBZMQ_UNUSED (s_); // Tuning TCP keep-alives if platform allows it // All values = -1 means skip and leave it for OS diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp index 50c8c516..b8cec25a 100644 --- a/src/tcp_address.cpp +++ b/src/tcp_address.cpp @@ -57,7 +57,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) { // TODO: Unused parameter, IPv6 support not implemented for Solaris. - LIBZMQ_UNUSED(ipv6_); + LIBZMQ_UNUSED (ipv6_); // Create a socket. const int fd = open_socket (AF_INET, SOCK_DGRAM, 0); @@ -124,7 +124,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) { // TODO: Unused parameter, IPv6 support not implemented for AIX or HP/UX. - LIBZMQ_UNUSED(ipv6_); + LIBZMQ_UNUSED (ipv6_); // Create a socket. const int sd = open_socket (AF_INET, SOCK_DGRAM, 0); @@ -211,8 +211,8 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_ // This is true especially of Windows. int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) { - LIBZMQ_UNUSED(nic_); - LIBZMQ_UNUSED(ipv6_); + LIBZMQ_UNUSED (nic_); + LIBZMQ_UNUSED (ipv6_); errno = ENODEV; return -1; diff --git a/src/xsub.cpp b/src/xsub.cpp index 61c2a329..a9d2c925 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -56,7 +56,7 @@ zmq::xsub_t::~xsub_t () void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { - LIBZMQ_UNUSED(subscribe_to_all_); + LIBZMQ_UNUSED (subscribe_to_all_); zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/zmq.cpp b/src/zmq.cpp index 42dd783c..e5078386 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -686,19 +686,19 @@ int zmq_msg_set (zmq_msg_t *, int, int) int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_) { - return ((zmq::msg_t*) msg_)->set_routing_id(routing_id_); + return ((zmq::msg_t *) msg_)->set_routing_id (routing_id_); } -uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg_) +uint32_t zmq_msg_routing_id (zmq_msg_t *msg_) { - return ((zmq::msg_t*) msg_)->get_routing_id(); + return ((zmq::msg_t *) msg_)->get_routing_id (); } // Get message metadata string const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) { - zmq::metadata_t *metadata = ((zmq::msg_t*) msg_)->metadata (); + zmq::metadata_t *metadata = ((zmq::msg_t *) msg_)->metadata (); const char *value = NULL; if (metadata) value = metadata->get (std::string (property_)); diff --git a/tests/test_client_drop_more.cpp b/tests/test_client_drop_more.cpp index 745f24eb..1a38ebec 100644 --- a/tests/test_client_drop_more.cpp +++ b/tests/test_client_drop_more.cpp @@ -29,7 +29,7 @@ #include "testutil.hpp" -int send_msg(zmq_msg_t* msg, void* s, int flags, int value); +int send_msg (zmq_msg_t* msg, void* s, int flags, int value); int main (void) { @@ -38,14 +38,14 @@ int main (void) assert (ctx); void *client = zmq_socket (ctx, ZMQ_CLIENT); - void *dealer = zmq_socket (ctx, ZMQ_DEALER); + void *server = zmq_socket (ctx, ZMQ_SERVER); int rc; rc = zmq_bind (client, "inproc://serverdropmore"); assert (rc == 0); - rc = zmq_connect (dealer, "inproc://serverdropmore"); + rc = zmq_connect (server, "inproc://serverdropmore"); assert (rc == 0); zmq_msg_t msg; @@ -53,34 +53,34 @@ int main (void) assert (rc == 0); // we will send 2 3-frames messages and then single frame message, only last one should be received - rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 1); + rc = send_msg (&msg, client, ZMQ_SNDMORE, 1); assert(rc == 1); - rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 2); + rc = send_msg (&msg, client, ZMQ_SNDMORE, 2); assert(rc == 1); - rc = send_msg (&msg, dealer, 0, 3); - assert(rc == 1); - - rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 4); + rc = send_msg (&msg, client, 0, 3); assert(rc == 1); - rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 5); - assert(rc == 1); - - rc = send_msg (&msg, dealer, 0, 6); + rc = send_msg (&msg, client, ZMQ_SNDMORE, 4); assert(rc == 1); - rc = send_msg (&msg, dealer, 0, 7); + rc = send_msg (&msg, client, ZMQ_SNDMORE, 5); assert(rc == 1); - rc = zmq_msg_recv (&msg, client, 0); - assert (rc == 1); + rc = send_msg (&msg, client, 0, 6); + assert(rc == 1); - assert(zmq_msg_more(&msg) == 0); + rc = send_msg (&msg, client, 0, 7); + assert(rc == 1); - unsigned char* data = (unsigned char*)zmq_msg_data (&msg); - assert (data[0] == 7); + rc = zmq_msg_recv (&msg, server, 0); + assert (rc == 1); + + assert (zmq_msg_more (&msg) == 0); + + unsigned char *data = (unsigned char*) zmq_msg_data (&msg); + assert (data [0] == 7); rc = zmq_msg_close (&msg); assert (rc == 0); @@ -88,7 +88,7 @@ int main (void) rc = zmq_close (client); assert (rc == 0); - rc = zmq_close (dealer); + rc = zmq_close (server); assert (rc == 0); rc = zmq_ctx_term (ctx); @@ -97,20 +97,18 @@ int main (void) return 0 ; } -int send_msg(zmq_msg_t* msg, void* s, int flags, int value) +int send_msg (zmq_msg_t *msg, void *s, int flags, int value) { - int rc = zmq_msg_close(msg); - + int rc = zmq_msg_close (msg); if (rc != 0) return rc; - zmq_msg_init_size(msg, 1); - + zmq_msg_init_size (msg, 1); if (rc != 0) return rc; - unsigned char* data = (unsigned char*)zmq_msg_data(msg); - data[0] = (unsigned char)value; + unsigned char *data = (unsigned char *) zmq_msg_data (msg); + data [0] = (unsigned char) value; return zmq_msg_send (msg, s, flags); } diff --git a/tests/test_client_server.cpp b/tests/test_client_server.cpp index 155232ea..2d6b6e0a 100644 --- a/tests/test_client_server.cpp +++ b/tests/test_client_server.cpp @@ -38,31 +38,29 @@ int main (void) void *server = zmq_socket (ctx, ZMQ_SERVER); void *client = zmq_socket (ctx, ZMQ_CLIENT); - int rc; - - rc = zmq_bind (server, "tcp://127.0.0.1:5560"); + int rc = zmq_bind (server, "tcp://127.0.0.1:5560"); assert (rc == 0); rc = zmq_connect (client, "tcp://127.0.0.1:5560"); assert (rc == 0); zmq_msg_t msg; - rc = zmq_msg_init_size(&msg,1); + rc = zmq_msg_init_size (&msg, 1); assert (rc == 0); - char * data = (char *)zmq_msg_data(&msg); - data[0] = 1; + char *data = (char *) zmq_msg_data (&msg); + data [0] = 1; rc = zmq_msg_send(&msg, client, 0); assert (rc == 1); - rc = zmq_msg_init(&msg); + rc = zmq_msg_init (&msg); assert (rc == 0); - rc = zmq_msg_recv(&msg, server, 0); + rc = zmq_msg_recv (&msg, server, 0); assert (rc == 1); - uint32_t routing_id = zmq_msg_get_routing_id(&msg); - assert(routing_id != 0); + uint32_t routing_id = zmq_msg_routing_id (&msg); + assert (routing_id != 0); rc = zmq_msg_close(&msg); assert (rc == 0); diff --git a/tests/test_server_drop_more.cpp b/tests/test_server_drop_more.cpp index 03d6a37d..e39b4c37 100644 --- a/tests/test_server_drop_more.cpp +++ b/tests/test_server_drop_more.cpp @@ -29,7 +29,7 @@ #include "testutil.hpp" -int send_msg(zmq_msg_t* msg, void* s, int flags, int value); +int send_msg (zmq_msg_t* msg, void* s, int flags, int value); int main (void) { @@ -38,7 +38,7 @@ int main (void) assert (ctx); void *server = zmq_socket (ctx, ZMQ_SERVER); - void *client = zmq_socket (ctx, ZMQ_DEALER); + void *client = zmq_socket (ctx, ZMQ_CLIENT); int rc; @@ -61,13 +61,13 @@ int main (void) rc = send_msg (&msg, client, 0, 3); assert(rc == 1); - + rc = send_msg (&msg, client, ZMQ_SNDMORE, 4); assert(rc == 1); rc = send_msg (&msg, client, ZMQ_SNDMORE, 5); assert(rc == 1); - + rc = send_msg (&msg, client, 0, 6); assert(rc == 1); @@ -75,12 +75,12 @@ int main (void) assert(rc == 1); rc = zmq_msg_recv (&msg, server, 0); - assert (rc == 1); + assert (rc == 1); - assert(zmq_msg_more(&msg) == 0); + assert (zmq_msg_more (&msg) == 0); - unsigned char* data = (unsigned char*)zmq_msg_data (&msg); - assert (data[0] == 7); + unsigned char *data = (unsigned char*) zmq_msg_data (&msg); + assert (data [0] == 7); rc = zmq_msg_close (&msg); assert (rc == 0); @@ -97,20 +97,18 @@ int main (void) return 0 ; } -int send_msg(zmq_msg_t* msg, void* s, int flags, int value) +int send_msg (zmq_msg_t *msg, void *s, int flags, int value) { - int rc = zmq_msg_close(msg); - + int rc = zmq_msg_close (msg); if (rc != 0) return rc; - zmq_msg_init_size(msg, 1); - + zmq_msg_init_size (msg, 1); if (rc != 0) return rc; - unsigned char* data = (unsigned char*)zmq_msg_data(msg); - data[0] = (unsigned char)value; + unsigned char *data = (unsigned char *) zmq_msg_data (msg); + data [0] = (unsigned char) value; return zmq_msg_send (msg, s, flags); } diff --git a/tests/test_thread_safe.cpp b/tests/test_thread_safe.cpp index b07afaa4..1eb1b54a 100644 --- a/tests/test_thread_safe.cpp +++ b/tests/test_thread_safe.cpp @@ -29,71 +29,51 @@ #include "testutil.hpp" -void worker1(void* s); -void worker2(void* s); +// Client threads loop on send/recv until told to exit +void client_thread (void *client) +{ + char data = 0; + for (int count = 0; count < 100000; count++) { + int rc = zmq_send (client, &data, 1, 0); + assert (rc == 1); + } + data = 1; + int rc = zmq_send (client, &data, 1, 0); + assert (rc == 1); +} int main (void) { - setup_test_environment(); + setup_test_environment (); void *ctx = zmq_ctx_new (); assert (ctx); + void *server = zmq_socket (ctx, ZMQ_SERVER); + int rc = zmq_bind (server, "tcp://127.0.0.1:5560"); + assert (rc == 0); + void *client = zmq_socket (ctx, ZMQ_CLIENT); - void *client2 = zmq_socket (ctx, ZMQ_CLIENT); - int thread_safe; - size_t size = sizeof(int); - + size_t size = sizeof (int); zmq_getsockopt (client, ZMQ_THREAD_SAFE, &thread_safe, &size); - assert (thread_safe == 1); - - int rc; - - rc = zmq_bind (client, "tcp://127.0.0.1:5560"); + rc = zmq_connect (client, "tcp://127.0.0.1:5560"); assert (rc == 0); - rc = zmq_connect (client2, "tcp://127.0.0.1:5560"); - assert (rc == 0); + void *t1 = zmq_threadstart (client_thread, client); + void *t2 = zmq_threadstart (client_thread, client); - void* t1 = zmq_threadstart(worker1, client2); - void* t2 = zmq_threadstart(worker2, client2); - - char data[1]; - data[0] = 0; - - for (int i=0; i < 10; i++) { - rc = zmq_send_const(client, data, 1, 0); - assert (rc == 1); - - rc = zmq_send_const(client, data, 1, 0); - assert(rc == 1); - - char a, b; - - rc = zmq_recv(client, &a, 1, 0); - assert(rc == 1); - - rc = zmq_recv(client, &b, 1, 0); - assert(rc == 1); - - // make sure they came from different threads - assert((a == 1 && b == 2) || (a == 2 && b == 1)); + char data; + int threads_completed = 0; + while (threads_completed < 2) { + zmq_recv (server, &data, 1, 0); + if (data == 1) + threads_completed++; // Thread ended } + zmq_threadclose (t1); + zmq_threadclose (t2); - // make the thread exit - data[0] = 1; - - rc = zmq_send_const(client, data, 1, 0); - assert (rc == 1); - - rc = zmq_send_const(client, data, 1, 0); - assert(rc == 1); - - zmq_threadclose(t1); - zmq_threadclose(t2); - - rc = zmq_close (client2); + rc = zmq_close (server); assert (rc == 0); rc = zmq_close (client); @@ -104,59 +84,3 @@ int main (void) return 0 ; } - -void worker1(void* s) -{ - const char worker_id = 1; - char c; - - while (true) - { - int rc = zmq_recv(s, &c,1, 0); - assert(rc == 1); - - if (c == 0) - { - msleep(100); - rc = zmq_send_const(s,&worker_id, 1, 0); - assert(rc == 1); - } - else - { - // we got exit request - break; - } - } -} - -void worker2(void* s) -{ - const char worker_id = 2; - char c; - - while (true) - { - int rc = zmq_recv(s, &c,1, 0); - assert(rc == 1); - - assert(c == 1 || c == 0); - - if (c == 0) - { - msleep(100); - rc = zmq_send_const(s,&worker_id, 1, 0); - assert(rc == 1); - } - else - { - // we got exit request - break; - } - } -} - - - - - - diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 68a80892..ed1915ee 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -32,7 +32,11 @@ #include "../include/zmq.h" #include "../src/stdint.hpp" -#include "platform.hpp" +#ifdef USING_CMAKE +# include "platform.hpp" +#else +# include "../src/platform.hpp" +#endif // This defines the settle time used in tests; raise this if we // get test failures on slower systems due to binds/connects not @@ -60,7 +64,6 @@ // Bounce a message from client to server and back // For REQ/REP or DEALER/DEALER pairs only - void bounce (void *server, void *client) { @@ -116,7 +119,6 @@ bounce (void *server, void *client) // Same as bounce, but expect messages to never arrive // for security or subscriber reasons. - void expect_bounce_fail (void *server, void *client) { @@ -193,7 +195,9 @@ const char *SEQ_END = (const char *) 1; // Sends a message composed of frames that are C strings or null frames. // The list must be terminated by SEQ_END. // Example: s_send_seq (req, "ABC", 0, "DEF", SEQ_END); -void s_send_seq (void *socket, ...) + +void +s_send_seq (void *socket, ...) { va_list ap; va_start (ap, socket); @@ -222,7 +226,9 @@ void s_send_seq (void *socket, ...) // the given data which can be either C strings or 0 for a null frame. // The list must be terminated by SEQ_END. // Example: s_recv_seq (rep, "ABC", 0, "DEF", SEQ_END); -void s_recv_seq (void *socket, ...) + +void +s_recv_seq (void *socket, ...) { zmq_msg_t msg; zmq_msg_init (&msg); @@ -260,7 +266,8 @@ void s_recv_seq (void *socket, ...) // Sets a zero linger period on a socket and closes it. -void close_zero_linger (void *socket) +void +close_zero_linger (void *socket) { int linger = 0; int rc = zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof(linger)); @@ -269,7 +276,8 @@ void close_zero_linger (void *socket) assert (rc == 0); } -void setup_test_environment() +void +setup_test_environment (void) { #if defined _WIN32 # if defined _MSC_VER @@ -296,8 +304,11 @@ void setup_test_environment() } // Provide portable millisecond sleep -// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for -void msleep (int milliseconds) +// http://www.cplusplus.com/forum/unices/60161/ +// http://en.cppreference.com/w/cpp/thread/sleep_for + +void +msleep (int milliseconds) { #ifdef ZMQ_HAVE_WINDOWS Sleep (milliseconds);