From 991f7e2c85919daef43c62175da046e0a085f8e3 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Wed, 9 Nov 2011 13:12:46 +0100 Subject: [PATCH 01/14] Set libzmq ABI version to 3 libzmq master (3.1) is not ABI compatible with libzmq 2.1.x or 3.0 (removed functionality), hence the ABI version needs to be set to 3. Signed-off-by: Martin Lucina --- configure.in | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/configure.in b/configure.in index 86147eaa..9006f037 100644 --- a/configure.in +++ b/configure.in @@ -25,11 +25,13 @@ AC_SUBST(PACKAGE_VERSION) # # Changes: # -# ZeroMQ versions prior to 2.1.0 use 0.0.0 ("unstable") -# ZeroMQ version 2.1.0: 1:0:0 +# ZeroMQ versions prior to 2.1.0 use 0:0:0 (undefined) +# ZeroMQ versions 2.1.x: 1:0:0 (ABI version 1) +# ZeroMQ version 3.0: 2:0:0 (ABI version 2) +# ZeroMQ version 3.1: 3:0:0 (ABI version 3) # # libzmq -version-info current:revision:age -LTVER="1:0:0" +LTVER="3:0:0" AC_SUBST(LTVER) # Take a copy of original flags From 82d935309eede60777b59af9df7a50576916d01f Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Wed, 14 Dec 2011 00:00:32 +0100 Subject: [PATCH 02/14] Fix synchronous connect failure for ipc://, tcp:// (LIBZMQ-294) A synchronous connect() failure in ipc_connecter can result in Assertion failed: s == retired_fd (ipc_connecter.cpp:174), as reported in LIBZMQ-294. This patch fixes the bug, and also an identical problem in tcp_connecter which has not hit people since TCP connect() usually completes via the asynchronous code path (poll, out_event). Signed-off-by: Martin Lucina --- src/ipc_connecter.cpp | 1 + src/tcp_connecter.cpp | 1 + 2 files changed, 2 insertions(+) diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index dc0ee216..58dccf46 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -135,6 +135,7 @@ void zmq::ipc_connecter_t::start_connecting () } // Handle any other error condition by eventual reconnect. + close (); wait = true; add_reconnect_timer(); } diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 75079da5..042e82a7 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -146,6 +146,7 @@ void zmq::tcp_connecter_t::start_connecting () } // Handle any other error condition by eventual reconnect. + close (); wait = true; add_reconnect_timer(); } From 029e28865dd47895e6919ce1c3e12c95ea56ad32 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Sun, 25 Dec 2011 02:57:04 +0100 Subject: [PATCH 03/14] Fix assertion in pgm_sender_t::plug() (LIBZMQ-303) Opening any PGM socket gives this assertion. The problem is in pgm_sender_t::plug() which is incorrectly testing the return value from session::write(). Signed-off-by: Martin Lucina --- src/pgm_sender.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 759802f8..1a1ae9a9 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -98,8 +98,8 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) msg_t msg; msg.init_size (1); *(unsigned char*) msg.data () = 1; - bool ok = session_->write (&msg); - zmq_assert (ok); + int rc = session_->write (&msg); + errno_assert (rc == 0); session_->flush (); } From c34a1443651ce5a6d8f7a88d0677ec85c81e1570 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Wed, 4 Jan 2012 11:48:41 +0100 Subject: [PATCH 04/14] Fix pgm_receiver.cpp: zmq_assert (pending_bytes == 0) (LIBZMQ-205) This patch fixes the problem described in LIBZMQ-205. The assertion itself is probably caused by previously queued POLLIN events arriving after POLLIN has been disabled on the socket. The following additional bugs have been fixed as part of debugging this problem: - pgm_receiver_t does not flush messages written to the session in all cases which can lead to a stalled reader. Add calls to session->flush () in the appropriate places. - ensure to restart polling when a pending message is flushed in activate_in (). Signed-off-by: Martin Lucina --- src/pgm_receiver.cpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 122d1109..99e882bd 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -117,8 +117,15 @@ void zmq::pgm_receiver_t::activate_in () // processed the whole buffer but failed to write // the last message into the pipe. if (pending_bytes == 0) { - if (mru_decoder != NULL) + if (mru_decoder != NULL) { mru_decoder->process_buffer (NULL, 0); + session->flush (); + } + + // Resume polling. + set_pollin (pipe_handle); + set_pollin (socket_handle); + return; } @@ -128,6 +135,7 @@ void zmq::pgm_receiver_t::activate_in () // Ask the decoder to process remaining data. size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes); pending_bytes -= n; + session->flush (); if (pending_bytes > 0) return; @@ -145,7 +153,8 @@ void zmq::pgm_receiver_t::in_event () unsigned char *data = NULL; const pgm_tsi_t *tsi = NULL; - zmq_assert (pending_bytes == 0); + if (pending_bytes > 0) + return; if (has_rx_timer) { cancel_timer (rx_timer_id); From e6c97c5ecc3f2b9e84258cb405eb92f6b6f6ca7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Staffan=20Gim=C3=A5ker?= Date: Tue, 3 Jan 2012 16:34:45 +0100 Subject: [PATCH 05/14] Reduce memory usage of mtrie. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Staffan Gimåker --- src/mtrie.cpp | 39 +++++++++++++++++++++++++++++---------- src/mtrie.hpp | 2 +- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 88d10d56..a02353b6 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -34,6 +34,7 @@ #include "mtrie.hpp" zmq::mtrie_t::mtrie_t () : + pipes (0), min (0), count (0), live_nodes (0) @@ -42,6 +43,11 @@ zmq::mtrie_t::mtrie_t () : zmq::mtrie_t::~mtrie_t () { + if (pipes) { + delete pipes; + pipes = 0; + } + if (count == 1) { zmq_assert (next.node); delete next.node; @@ -65,8 +71,10 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, { // We are at the node corresponding to the prefix. We are done. if (!size_) { - bool result = pipes.empty (); - pipes.insert (pipe_); + bool result = !pipes; + if (!pipes) + pipes = new pipes_t; + pipes->insert (pipe_); return result; } @@ -154,8 +162,11 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, void *arg_) { // Remove the subscription from this node. - if (pipes.erase (pipe_) && pipes.empty ()) + if (pipes && pipes->erase (pipe_) && pipes->empty ()) { func_ (*buff_, buffsize_, arg_); + delete pipes; + pipes = 0; + } // Adjust the buffer. if (buffsize_ >= maxbuffsize_) { @@ -207,9 +218,15 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, pipe_t *pipe_) { if (!size_) { - pipes_t::size_type erased = pipes.erase (pipe_); - zmq_assert (erased == 1); - return pipes.empty (); + if (pipes) { + pipes_t::size_type erased = pipes->erase (pipe_); + zmq_assert (erased == 1); + if (pipes->empty ()) { + delete pipes; + pipes = 0; + } + } + return !pipes; } unsigned char c = *prefix_; @@ -245,9 +262,11 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, while (true) { // Signal the pipes attached to this node. - for (pipes_t::iterator it = current->pipes.begin (); - it != current->pipes.end (); ++it) - func_ (*it, arg_); + if (current->pipes) { + for (pipes_t::iterator it = current->pipes->begin (); + it != current->pipes->end (); ++it) + func_ (*it, arg_); + } // If we are at the end of the message, there's nothing more to match. if (!size_) @@ -281,5 +300,5 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, bool zmq::mtrie_t::is_redundant () const { - return pipes.empty () && live_nodes == 0; + return !pipes && live_nodes == 0; } diff --git a/src/mtrie.hpp b/src/mtrie.hpp index 549a0884..74b6f680 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -73,7 +73,7 @@ namespace zmq bool is_redundant () const; typedef std::set pipes_t; - pipes_t pipes; + pipes_t *pipes; unsigned char min; unsigned short count; From 0319cb2cd16aa40911855a1765312886bf081db2 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Thu, 2 Feb 2012 13:07:48 +0100 Subject: [PATCH 06/14] Fix data loss for PUB/SUB and unidirectional transports (LIBZMQ-268) With the introduction of subscription forwarding, the first message sent on a PUB socket using a unidirectional transport (e.g. PGM) is always lost due to the "subscribe to all" being done asynchronously. This patch fixes the problem and also refactors the code to have a single point where the "subscribe to all" is performed. Signed-off-by: Martin Lucina --- src/pair.cpp | 2 +- src/pair.hpp | 2 +- src/pgm_sender.cpp | 10 ---------- src/pull.cpp | 2 +- src/pull.hpp | 2 +- src/push.cpp | 2 +- src/push.hpp | 2 +- src/socket_base.cpp | 12 +++++++++--- src/socket_base.hpp | 5 +++-- src/xpub.cpp | 7 ++++++- src/xpub.hpp | 2 +- src/xrep.cpp | 2 +- src/xrep.hpp | 2 +- src/xreq.cpp | 2 +- src/xreq.hpp | 2 +- src/xsub.cpp | 2 +- src/xsub.hpp | 2 +- 17 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/pair.cpp b/src/pair.cpp index 6c652db0..82fd39b5 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -36,7 +36,7 @@ zmq::pair_t::~pair_t () zmq_assert (!pipe); } -void zmq::pair_t::xattach_pipe (pipe_t *pipe_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (!pipe); pipe = pipe_; diff --git a/src/pair.hpp b/src/pair.hpp index fb447f1c..5deea184 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -42,7 +42,7 @@ namespace zmq ~pair_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 1a1ae9a9..1d2083d3 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -91,16 +91,6 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) // Set POLLOUT for downlink_socket_handle. set_pollout (handle); - - // PGM is not able to pass subscriptions upstream, thus we have no idea - // what messages are peers interested in. Because of that we have to - // subscribe for all the messages. - msg_t msg; - msg.init_size (1); - *(unsigned char*) msg.data () = 1; - int rc = session_->write (&msg); - errno_assert (rc == 0); - session_->flush (); } void zmq::pgm_sender_t::unplug () diff --git a/src/pull.cpp b/src/pull.cpp index 6028118f..ee840d91 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -34,7 +34,7 @@ zmq::pull_t::~pull_t () { } -void zmq::pull_t::xattach_pipe (pipe_t *pipe_) +void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index da3bee16..11c16b86 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -45,7 +45,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); void xread_activated (zmq::pipe_t *pipe_); diff --git a/src/push.cpp b/src/push.cpp index a0ed992b..2a29d353 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -34,7 +34,7 @@ zmq::push_t::~push_t () { } -void zmq::push_t::xattach_pipe (pipe_t *pipe_) +void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); lb.attach (pipe_); diff --git a/src/push.hpp b/src/push.hpp index edee19db..18eaf095 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -45,7 +45,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); void xwrite_activated (zmq::pipe_t *pipe_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 72934193..5a7f1453 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -208,14 +208,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipe (pipe_t *pipe_) +void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); pipes.push_back (pipe_); // Let the derived socket type know about new pipe. - xattach_pipe (pipe_); + xattach_pipe (pipe_, icanhasall_); // If the socket is already being closed, ask any new pipes to terminate // straight away. @@ -454,8 +454,14 @@ int zmq::socket_base_t::connect (const char *addr_) rc = pipepair (parents, pipes, hwms, delays); errno_assert (rc == 0); + // PGM does not support subscription forwarding; ask for all data to be + // sent to this pipe. + bool icanhasall = false; + if (protocol == "pgm" || protocol == "epgm") + icanhasall = true; + // Attach local end of the pipe to the socket object. - attach_pipe (pipes [0]); + attach_pipe (pipes [0], icanhasall); // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index dda87f4a..b74f2511 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -103,7 +103,8 @@ namespace zmq // Concrete algorithms for the x- methods are to be defined by // individual socket types. - virtual void xattach_pipe (zmq::pipe_t *pipe_) = 0; + virtual void xattach_pipe (zmq::pipe_t *pipe_, + bool icanhasall_ = false) = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, overload this @@ -158,7 +159,7 @@ namespace zmq int check_protocol (const std::string &protocol_); // Register the pipe with this socket. - void attach_pipe (zmq::pipe_t *pipe_); + void attach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false); // Processes commands sent to this socket (if any). If timeout is -1, // returns only after at least one command was processed. diff --git a/src/xpub.cpp b/src/xpub.cpp index 5d7a97c4..4754b28d 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -37,11 +37,16 @@ zmq::xpub_t::~xpub_t () { } -void zmq::xpub_t::xattach_pipe (pipe_t *pipe_) +void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); dist.attach (pipe_); + // If icanhasall_ is specified, the caller would like to subscribe + // to all data on this pipe, implicitly. + if (icanhasall_) + subscriptions.add (NULL, 0, pipe_); + // The pipe is active when attached. Let's read the subscriptions from // it, if any. xread_activated (pipe_); diff --git a/src/xpub.hpp b/src/xpub.hpp index 3f80d4a3..3e6721b7 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -47,7 +47,7 @@ namespace zmq ~xpub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false); int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); int xrecv (zmq::msg_t *msg_, int flags_); diff --git a/src/xrep.cpp b/src/xrep.cpp index 520fa243..4d9e5937 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -55,7 +55,7 @@ zmq::xrep_t::~xrep_t () prefetched_msg.close (); } -void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) +void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); diff --git a/src/xrep.hpp b/src/xrep.hpp index 65bd5641..9fb19b7c 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -48,7 +48,7 @@ namespace zmq ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xreq.cpp b/src/xreq.cpp index bf288eee..13809d9f 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -46,7 +46,7 @@ zmq::xreq_t::~xreq_t () prefetched_msg.close (); } -void zmq::xreq_t::xattach_pipe (pipe_t *pipe_) +void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index 897c197c..4a213446 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -46,7 +46,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xsub.cpp b/src/xsub.cpp index 454dfb07..80ca1f2d 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -45,7 +45,7 @@ zmq::xsub_t::~xsub_t () errno_assert (rc == 0); } -void zmq::xsub_t::xattach_pipe (pipe_t *pipe_) +void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/xsub.hpp b/src/xsub.hpp index 24f81572..fd199367 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -45,7 +45,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); int xrecv (zmq::msg_t *msg_, int flags_); From 79f753bf56222e65ca497f5253434f11489c51da Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Mon, 19 Dec 2011 15:45:44 +0100 Subject: [PATCH 07/14] xpub: free received subscription messages --- src/xpub.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 8155e312..1be32d07 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -56,14 +56,11 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) { // There are some subscriptions waiting. Let's process them. msg_t sub; - sub.init (); while (true) { // Grab next subscription. - if (!pipe_->read (&sub)) { - sub.close (); + if (!pipe_->read (&sub)) return; - } // Apply the subscription to the trie. unsigned char *data = (unsigned char*) sub.data (); @@ -81,6 +78,8 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) pending.push_back (blob_t ((unsigned char*) sub.data (), sub.size ())); } + + sub.close() } } From 43b71ae4bf5b4b1a046c6549483f59ef03883d5a Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Fri, 3 Feb 2012 12:44:19 +0000 Subject: [PATCH 08/14] Fixing missing semicolon in xpub.cpp as reported on the list by Emmanuel TAUREL --- src/xpub.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 1be32d07..717e26c4 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -79,7 +79,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) sub.size ())); } - sub.close() + sub.close(); } } From 4dd6ce0639da832927ab68cfb7226c21808f0034 Mon Sep 17 00:00:00 2001 From: skaller Date: Sat, 4 Feb 2012 00:10:01 +1100 Subject: [PATCH 09/14] Add mission ; character --- src/xpub.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 1be32d07..717e26c4 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -79,7 +79,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) sub.size ())); } - sub.close() + sub.close(); } } From 988efbc73a2f4f0d8f8b380da61b73b5affaeccd Mon Sep 17 00:00:00 2001 From: skaller Date: Sat, 4 Feb 2012 01:41:09 +1100 Subject: [PATCH 10/14] Thread Safe Sockets. 1. Reorganise C API socket functions to eliminate bad practice of public functions calling other public functions. This should be done for msg's too but hasn't been in this patch. 2. Reorganise code in C API socket functions so that the socket is cast on one line, the C++ function called on the next with the result retained, then the result is returned. This makes the code much simpler to read and also allows pre- and post- call hooks to be inserted easily. 3. Insert pre- and post- call hooks which set and release a mutex iff the thread_safe flag is on. 4. Add the thread_safe_flag to base_socket_t initialised to false to preserve existing semantics. Add an accessor for the flag, add a mutex, and add lock and unlock functions. Note: as yet no code to actually set the flag. --- src/socket_base.cpp | 13 ++++- src/socket_base.hpp | 6 ++- src/zmq.cpp | 112 +++++++++++++++++++++++++++++++------------- 3 files changed, 96 insertions(+), 35 deletions(-) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 5a7f1453..8167786d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -121,7 +121,8 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : destroyed (false), last_tsc (0), ticks (0), - rcvmore (false) + rcvmore (false), + thread_safe_flag (false) { } @@ -873,3 +874,13 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) rcvmore = msg_->flags () & msg_t::more ? true : false; } +void zmq::socket_base_t::lock() +{ + sync.lock(); +} + +void zmq::socket_base_t::unlock() +{ + sync.unlock(); +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index b74f2511..3df50802 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -95,7 +95,9 @@ namespace zmq void write_activated (pipe_t *pipe_); void hiccuped (pipe_t *pipe_); void terminated (pipe_t *pipe_); - + bool thread_safe() const { return thread_safe_flag; } + void lock(); + void unlock(); protected: socket_base_t (zmq::ctx_t *parent_, uint32_t tid_); @@ -195,6 +197,8 @@ namespace zmq socket_base_t (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&); + bool thread_safe_flag; + mutex_t sync; }; } diff --git a/src/zmq.cpp b/src/zmq.cpp index 84dcdd1f..9aca8f9d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -194,8 +194,11 @@ int zmq_setsockopt (void *s_, int option_, const void *optval_, errno = ENOTSOCK; return -1; } - return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, - optvallen_)); + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + int result = s->setsockopt (option_, optval_, optvallen_); + if(s->thread_safe()) s->unlock(); + return result; } int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) @@ -204,8 +207,11 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) errno = ENOTSOCK; return -1; } - return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_, - optvallen_)); + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + int result = s->getsockopt (option_, optval_, optvallen_); + if(s->thread_safe()) s->unlock(); + return result; } int zmq_bind (void *s_, const char *addr_) @@ -214,7 +220,11 @@ int zmq_bind (void *s_, const char *addr_) errno = ENOTSOCK; return -1; } - return (((zmq::socket_base_t*) s_)->bind (addr_)); + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + int result = s->bind (addr_); + if(s->thread_safe()) s->unlock(); + return result; } int zmq_connect (void *s_, const char *addr_) @@ -223,7 +233,34 @@ int zmq_connect (void *s_, const char *addr_) errno = ENOTSOCK; return -1; } - return (((zmq::socket_base_t*) s_)->connect (addr_)); + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + int result = s->connect (addr_); + if(s->thread_safe()) s->unlock(); + return result; +} + +// sending functions +static int inner_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) +{ + int sz = (int) zmq_msg_size (msg_); + int rc = s_->send ((zmq::msg_t*) msg_, flags_); + if (unlikely (rc < 0)) + return -1; + return sz; +} + +int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + int result = inner_sendmsg (s, msg_, flags_); + if(s->thread_safe()) s->unlock(); + return result; } int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) @@ -234,7 +271,10 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) return -1; memcpy (zmq_msg_data (&msg), buf_, len_); - rc = zmq_sendmsg (s_, &msg, flags_); + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + rc = inner_sendmsg (s, &msg, flags_); + if(s->thread_safe()) s->unlock(); if (unlikely (rc < 0)) { int err = errno; int rc2 = zmq_msg_close (&msg); @@ -248,13 +288,43 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) return rc; } +// receiving functions +static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) +{ + int rc = s_->recv ((zmq::msg_t*) msg_, flags_); + if (unlikely (rc < 0)) + return -1; + return (int) zmq_msg_size (msg_); +} + +int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + int result = inner_recvmsg(s, msg_, flags_); + if(s->thread_safe()) s->unlock(); + return result; +} + + int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) { + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } zmq_msg_t msg; int rc = zmq_msg_init (&msg); errno_assert (rc == 0); - int nbytes = zmq_recvmsg (s_, &msg, flags_); + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + if(s->thread_safe()) s->lock(); + int nbytes = inner_recvmsg (s, &msg, flags_); + if(s->thread_safe()) s->unlock(); if (unlikely (nbytes < 0)) { int err = errno; rc = zmq_msg_close (&msg); @@ -274,31 +344,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) return nbytes; } -int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) -{ - if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { - errno = ENOTSOCK; - return -1; - } - int sz = (int) zmq_msg_size (msg_); - int rc = (((zmq::socket_base_t*) s_)->send ((zmq::msg_t*) msg_, flags_)); - if (unlikely (rc < 0)) - return -1; - return sz; -} - -int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) -{ - if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { - errno = ENOTSOCK; - return -1; - } - int rc = (((zmq::socket_base_t*) s_)->recv ((zmq::msg_t*) msg_, flags_)); - if (unlikely (rc < 0)) - return -1; - return (int) zmq_msg_size (msg_); -} - +// message manipulators int zmq_msg_init (zmq_msg_t *msg_) { return ((zmq::msg_t*) msg_)->init (); From 520ad3c2d7966ad868354c0ad66b76a3e408f568 Mon Sep 17 00:00:00 2001 From: skaller Date: Sat, 4 Feb 2012 02:17:35 +1100 Subject: [PATCH 11/14] Set and arrange propagation of thread safe sockets flag. We use a distinct context initialisation function to specify all sockets derived therefrom will be thread safe. However the inheritance is done exclusively in the C interface. This is not really correct, but it is chosen to minimise interference with the existing C++ code, including any construct or other calls within the C++ code base. Semantically the C++ code should be unchanged, physically some data structures and extra methods are provided by they're only used from the C binding. --- include/zmq.h | 1 + src/ctx.cpp | 10 ++++++++++ src/ctx.hpp | 6 ++++++ src/socket_base.cpp | 5 +++++ src/socket_base.hpp | 1 + src/zmq.cpp | 21 ++++++++++++++++++--- 6 files changed, 41 insertions(+), 3 deletions(-) diff --git a/include/zmq.h b/include/zmq.h index 5dc11461..8329b37e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -147,6 +147,7 @@ ZMQ_EXPORT int zmq_getmsgopt (zmq_msg_t *msg, int option, void *optval, /******************************************************************************/ ZMQ_EXPORT void *zmq_init (int io_threads); +ZMQ_EXPORT void *zmq_init_thread_safe (int io_threads); ZMQ_EXPORT int zmq_term (void *context); /******************************************************************************/ diff --git a/src/ctx.cpp b/src/ctx.cpp index d8783be8..44fd0465 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -81,6 +81,16 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq_assert (rc == 0); } +void zmq::ctx_t::set_thread_safe() +{ + thread_safe_flag = true; +} + +bool zmq::ctx_t::get_thread_safe() const +{ + return thread_safe_flag; +} + bool zmq::ctx_t::check_tag () { return tag == 0xbadcafe0; diff --git a/src/ctx.hpp b/src/ctx.hpp index d259f594..40b4acff 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -99,6 +99,10 @@ namespace zmq reaper_tid = 1 }; + // create thread safe sockets + void set_thread_safe(); + bool get_thread_safe() const; + ~ctx_t (); private: @@ -151,6 +155,8 @@ namespace zmq zmq::socket_base_t *log_socket; mutex_t log_sync; + bool thread_safe_flag; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 8167786d..7cd24c66 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -874,6 +874,11 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) rcvmore = msg_->flags () & msg_t::more ? true : false; } +void zmq::socket_base_t::set_thread_safe() +{ + thread_safe_flag = true; +} + void zmq::socket_base_t::lock() { sync.lock(); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 3df50802..f22e1ef0 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -96,6 +96,7 @@ namespace zmq void hiccuped (pipe_t *pipe_); void terminated (pipe_t *pipe_); bool thread_safe() const { return thread_safe_flag; } + void set_thread_safe(); // should be in constructor, here for compat void lock(); void unlock(); protected: diff --git a/src/zmq.cpp b/src/zmq.cpp index 9aca8f9d..777cbc3f 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -90,7 +90,7 @@ const char *zmq_strerror (int errnum_) return zmq::errno_to_string (errnum_); } -void *zmq_init (int io_threads_) +static zmq::ctx_t *inner_init (int io_threads_) { if (io_threads_ < 0) { errno = EINVAL; @@ -139,7 +139,19 @@ void *zmq_init (int io_threads_) // Create 0MQ context. zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); alloc_assert (ctx); - return (void*) ctx; + return ctx; +} + +void *zmq_init (int io_threads_) +{ + return (void*) inner_init (io_threads_); +} + +void *zmq_init_thread_safe (int io_threads_) +{ + zmq::ctx_t *ctx = inner_init (io_threads_); + ctx->set_thread_safe(); + return (void*) ctx; } int zmq_term (void *ctx_) @@ -174,7 +186,10 @@ void *zmq_socket (void *ctx_, int type_) errno = EFAULT; return NULL; } - return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_)); + zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_; + zmq::socket_base_t *s = ctx->create_socket (type_); + if (ctx->get_thread_safe ()) s->set_thread_safe (); + return (void*) s; } int zmq_close (void *s_) From 67fd4c9a2c9494e14c12ebb53c1b70042931bc9c Mon Sep 17 00:00:00 2001 From: Daniel Norberg Date: Fri, 3 Feb 2012 17:28:45 +0100 Subject: [PATCH 12/14] add missing semicolon --- src/xpub.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 1be32d07..717e26c4 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -79,7 +79,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) sub.size ())); } - sub.close() + sub.close(); } } From 759b2e01fd03b171f63d37b908a4a530ab44420b Mon Sep 17 00:00:00 2001 From: skaller Date: Sat, 4 Feb 2012 12:34:06 +1100 Subject: [PATCH 13/14] Fix comments to conform to style guide. --- src/zmq.cpp | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/zmq.cpp b/src/zmq.cpp index 777cbc3f..6e6b8d83 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -78,6 +78,8 @@ typedef char check_msg_t_size [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1]; +// Version. + void zmq_version (int *major_, int *minor_, int *patch_) { *major_ = ZMQ_VERSION_MAJOR; @@ -85,11 +87,20 @@ void zmq_version (int *major_, int *minor_, int *patch_) *patch_ = ZMQ_VERSION_PATCH; } +// Errors. + const char *zmq_strerror (int errnum_) { return zmq::errno_to_string (errnum_); } +int zmq_errno () +{ + return errno; +} + +// Contexts. + static zmq::ctx_t *inner_init (int io_threads_) { if (io_threads_ < 0) { @@ -180,6 +191,8 @@ int zmq_term (void *ctx_) return rc; } +// Sockets. + void *zmq_socket (void *ctx_, int type_) { if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { @@ -255,7 +268,8 @@ int zmq_connect (void *s_, const char *addr_) return result; } -// sending functions +// Sending functions. + static int inner_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) { int sz = (int) zmq_msg_size (msg_); @@ -303,7 +317,8 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) return rc; } -// receiving functions +// Receiving functions. + static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) { int rc = s_->recv ((zmq::msg_t*) msg_, flags_); @@ -359,7 +374,8 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) return nbytes; } -// message manipulators +// Message manipulators. + int zmq_msg_init (zmq_msg_t *msg_) { return ((zmq::msg_t*) msg_)->init (); @@ -420,6 +436,8 @@ int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_, } } +// Polling. + int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { #if defined ZMQ_POLL_BASED_ON_POLL @@ -772,11 +790,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #endif } -int zmq_errno () -{ - return errno; -} - #if defined ZMQ_POLL_BASED_ON_SELECT #undef ZMQ_POLL_BASED_ON_SELECT #endif From 81662d70be2014b947f1c63308d819c3f85f6a2a Mon Sep 17 00:00:00 2001 From: skaller Date: Sat, 4 Feb 2012 15:13:36 +1100 Subject: [PATCH 14/14] Add a test for thread safe sockets. --- tests/Makefile.am | 2 + tests/test_ts_context.cpp | 129 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 tests/test_ts_context.cpp diff --git a/tests/Makefile.am b/tests/Makefile.am index fb0c6f8b..bcc3c5e1 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -17,6 +17,7 @@ if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ test_pair_ipc \ test_reqrep_ipc \ + test_ts_context \ test_timeo endif @@ -35,6 +36,7 @@ test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_reqrep_ipc_SOURCES = test_reqrep_ipc.cpp testutil.hpp test_timeo_SOURCES = test_timeo.cpp +test_ts_context_SOURCES = test_ts_context.cpp endif TESTS = $(noinst_PROGRAMS) diff --git a/tests/test_ts_context.cpp b/tests/test_ts_context.cpp new file mode 100644 index 00000000..331f96e7 --- /dev/null +++ b/tests/test_ts_context.cpp @@ -0,0 +1,129 @@ +/* + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include + +#include "../include/zmq.h" +#include "../include/zmq_utils.h" + +#define THREAD_COUNT 30 +#define NMESSAGES 20 + +struct thread_data_t +{ + int thread_index; + void *socket; + pthread_t pthr; +}; + +extern "C" +{ + static void *source(void *client_data) + { + // Wait a bit util all threads created and subscriber ready. + zmq_sleep(2); // ms + + // Our thread number and socket. + thread_data_t *td = (thread_data_t *) client_data; + + // Buffer for messages. + char buffer[20]; + memset (buffer, 0, 20); + + // Send messages. + for (int i = 0; i < NMESSAGES; ++i) + { + sprintf(buffer,"Th %02d count %02d", td->thread_index, i); + int rc = zmq_send(td->socket,buffer,20,0); + assert (rc == 20); + zmq_sleep(1); // Don't overload the socket. + } + + return 0; + } +} + + +int main (int argc, char *argv []) +{ + fprintf (stderr, "test_ts_context running...\n"); + + // Make a thread safe context. + void *ctx = zmq_init_thread_safe (1); + assert (ctx); + + // Create a publisher. + void *pub = zmq_socket (ctx, ZMQ_PUB); + assert (pub); + int rc = zmq_bind (pub, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + // Create a subscriber. + void *sub = zmq_socket (ctx, ZMQ_SUB); + assert (sub); + rc = zmq_connect (sub, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + // Subscribe for all messages. + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + + thread_data_t threads [THREAD_COUNT]; + + // Create workers. + for(int i = 0; i < THREAD_COUNT; ++i) + { + threads [i].thread_index = i; + threads [i].socket = pub; + rc = pthread_create(&threads[i].pthr, NULL, source, threads+i); + assert (rc == 0); + } + + // Gather all the Messages. + char buff [20]; + for(int i= 1; i<=THREAD_COUNT * NMESSAGES; ++i) + { + rc = zmq_recv (sub, buff, 20, 0); + //fprintf (stderr, "%d/%d: %s\n",i,THREAD_COUNT * NMESSAGES, buff); // debug it + assert (rc >= 0); + } + + // Wait for worker death. + for(int i = 0; i < THREAD_COUNT; ++i) + { + rc = pthread_join(threads[i].pthr, NULL); + assert (rc == 0); + } + + + // Clean up. + rc = zmq_close (pub); + assert (rc == 0); + rc = zmq_close (sub); + assert (rc == 0); + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +}