From 6749c9b3eba1ad2d35942b37390c2947b55c4ac0 Mon Sep 17 00:00:00 2001 From: somdoron Date: Wed, 11 Feb 2015 00:01:50 +0200 Subject: [PATCH 1/3] thread safety --- Makefile.am | 12 +- src/client.cpp | 2 +- src/condition_variable.hpp | 149 ++++++++++++++++++++++++ src/ctx.cpp | 2 +- src/ctx.hpp | 2 +- src/i_mailbox.hpp | 50 ++++++++ src/mailbox.hpp | 3 +- src/mailbox_safe.cpp | 111 ++++++++++++++++++ src/mailbox_safe.hpp | 92 +++++++++++++++ src/mutex.hpp | 10 ++ src/server.cpp | 2 +- src/socket_base.cpp | 228 +++++++++++++++++++++++++++++++------ src/socket_base.hpp | 20 +++- tests/test_thread_safe.cpp | 145 +++++++++++++++++++++++ 14 files changed, 782 insertions(+), 46 deletions(-) create mode 100644 src/condition_variable.hpp create mode 100644 src/i_mailbox.hpp create mode 100644 src/mailbox_safe.cpp create mode 100644 src/mailbox_safe.hpp create mode 100644 tests/test_thread_safe.cpp diff --git a/Makefile.am b/Makefile.am index a0bc0185..28eef125 100644 --- a/Makefile.am +++ b/Makefile.am @@ -31,6 +31,7 @@ src_libzmq_la_SOURCES = \ src/clock.cpp \ src/clock.hpp \ src/command.hpp \ + src/condition_variable.hpp \ src/config.hpp \ src/ctx.cpp \ src/ctx.hpp \ @@ -63,6 +64,7 @@ src_libzmq_la_SOURCES = \ src/i_encoder.hpp \ src/i_engine.hpp \ src/i_decoder.hpp \ + src/i_mailbox.hpp \ src/i_poll_events.hpp \ src/io_object.cpp \ src/io_object.hpp \ @@ -83,6 +85,8 @@ src_libzmq_la_SOURCES = \ src/likely.hpp \ src/mailbox.cpp \ src/mailbox.hpp \ + src/mailbox_safe.cpp \ + src/mailbox_safe.hpp \ src/mechanism.cpp \ src/mechanism.hpp \ src/metadata.cpp \ @@ -349,7 +353,8 @@ test_apps = \ tests/test_atomics \ tests/test_client_server \ tests/test_server_drop_more \ - tests/test_client_drop_more + tests/test_client_drop_more \ + tests/test_thread_safe tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -530,6 +535,11 @@ tests_test_server_drop_more_LDADD = src/libzmq.la tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp tests_test_client_drop_more_LDADD = src/libzmq.la +tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp +tests_test_thread_safe_LDADD = src/libzmq.la + + + if !ON_MINGW if !ON_CYGWIN test_apps += \ diff --git a/src/client.cpp b/src/client.cpp index 73357c1e..f30b43c9 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -22,7 +22,7 @@ #include "msg.hpp" zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_) + socket_base_t (parent_, tid_, sid_, true) { options.type = ZMQ_CLIENT; } diff --git a/src/condition_variable.hpp b/src/condition_variable.hpp new file mode 100644 index 00000000..265bc9cc --- /dev/null +++ b/src/condition_variable.hpp @@ -0,0 +1,149 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__ +#define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__ + +#include "platform.hpp" +#include "err.hpp" +#include "mutex.hpp" + +// Condition variable class encapsulates OS mutex in a platform-independent way. + +#ifdef ZMQ_HAVE_WINDOWS + +#include "windows.hpp" + +namespace zmq +{ + + class condition_variable_t + { + public: + inline condition_variable_t () + { + InitializeConditionVariable (&cv); + } + + inline ~condition_variable_t () + { + + } + + inline int wait (mutex_t* mutex_, int timeout_ ) + { + int rc = SleepConditionVariableCS(&cv, mutex_->get_cs (), timeout_); + + if (rc != 0) + return 0; + + rc = GetLastError(); + + if (rc != ERROR_TIMEOUT) + win_assert(rc); + + errno = EAGAIN; + return -1; + } + + inline void broadcast () + { + WakeAllConditionVariable(&cv); + } + + private: + + CONDITION_VARIABLE cv; + + // Disable copy construction and assignment. + condition_variable_t (const condition_variable_t&); + void operator = (const condition_variable_t&); + }; + +} + +#else + +#include + +namespace zmq +{ + + class condition_variable_t + { + public: + inline condition_variable_t () + { + int rc = pthread_cond_init (&cond, NULL); + posix_assert (rc); + } + + inline ~condition_variable_t () + { + int rc = pthread_cond_destroy (&cond); + posix_assert (rc); + } + + inline int wait (mutex_t* mutex_, int timeout_) + { + int rc; + + if (timeout_ != -1) { + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + + timeout.tv_sec += timeout_ / 1000; + timeout.tv_nsec += (timeout_ % 1000) * 1000000; + rc = pthread_cond_timedwait (&cond, mutex_->get_mutex (), &timeout); + } + else + rc = pthread_cond_wait(&cond, mutex_->get_mutex()); + + if (rc == 0) + return 0; + + if (rc == ETIMEDOUT){ + errno= EAGAIN; + return -1; + } + + posix_assert (rc); + return -1; + } + + inline void broadcast () + { + int rc = pthread_cond_broadcast (&cond); + posix_assert (rc); + } + + private: + + pthread_cond_t cond; + + // Disable copy construction and assignment. + condition_variable_t (const condition_variable_t&); + const condition_variable_t &operator = (const condition_variable_t&); + }; +} + +#endif + + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 0867ca61..6f5c7b67 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -273,7 +273,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) int ios = io_thread_count; opt_sync.unlock (); slot_count = mazmq + ios + 2; - slots = (mailbox_t **) malloc (sizeof (mailbox_t*) * slot_count); + slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count); alloc_assert (slots); // Initialise the infrastructure for zmq_ctx_term thread. diff --git a/src/ctx.hpp b/src/ctx.hpp index 24e5fd1e..5b78107b 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -162,7 +162,7 @@ namespace zmq // Array of pointers to mailboxes for both application and I/O threads. uint32_t slot_count; - mailbox_t **slots; + i_mailbox **slots; // Mailbox for zmq_term thread. mailbox_t term_mailbox; diff --git a/src/i_mailbox.hpp b/src/i_mailbox.hpp new file mode 100644 index 00000000..8f472d38 --- /dev/null +++ b/src/i_mailbox.hpp @@ -0,0 +1,50 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_I_MAILBOX_HPP_INCLUDED__ +#define __ZMQ_I_MAILBOX_HPP_INCLUDED__ + +#include "stdint.hpp" + +namespace zmq +{ + // Interface to be implemented by mailbox. + + class i_mailbox + { + public: + virtual ~i_mailbox () {} + + virtual void send (const command_t &cmd_) = 0; + virtual int recv (command_t *cmd_, int timeout_) = 0; + + +#ifdef HAVE_FORK + // close the file descriptors in the signaller. This is used in a forked + // child process to close the file descriptors so that they do not interfere + // with the context in the parent process. + virtual void forked () = 0; +#endif + + + }; + +} + +#endif diff --git a/src/mailbox.hpp b/src/mailbox.hpp index 45c261fd..b9a7cfd0 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -29,11 +29,12 @@ #include "command.hpp" #include "ypipe.hpp" #include "mutex.hpp" +#include "i_mailbox.hpp" namespace zmq { - class mailbox_t + class mailbox_t : public i_mailbox { public: diff --git a/src/mailbox_safe.cpp b/src/mailbox_safe.cpp new file mode 100644 index 00000000..9bc5a5a7 --- /dev/null +++ b/src/mailbox_safe.cpp @@ -0,0 +1,111 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "mailbox_safe.hpp" +#include "err.hpp" + +zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* socket_mutex_) : + socket_mutex (socket_mutex_) +{ + // Get the pipe into passive state. That way, if the users starts by + // polling on the associated file descriptor it will get woken up when + // new command is posted. + const bool ok = cpipe.read (NULL); + zmq_assert (!ok); +} + +zmq::mailbox_safe_t::~mailbox_safe_t () +{ + // TODO: Retrieve and deallocate commands inside the cpipe. + + // Work around problem that other threads might still be in our + // send() method, by waiting on the mutex before disappearing. + sync.lock (); + sync.unlock (); +} + +void zmq::mailbox_safe_t::add_signaler(signaler_t* signaler) +{ + sync.lock(); + signalers.push_back(signaler); + sync.unlock(); +} + +void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler) +{ + sync.lock(); + std::vector::iterator it = signalers.begin(); + + // TODO: make a copy of array and signal outside the lock + for (; it != signalers.end(); ++it){ + if (*it == signaler) + break; + } + + if (it != signalers.end()) + signalers.erase(it); + sync.unlock(); +} + +void zmq::mailbox_safe_t::send (const command_t &cmd_) +{ + sync.lock (); + cpipe.write (cmd_, false); + const bool ok = cpipe.flush (); + + if (!ok) { + for (std::vector::iterator it = signalers.begin(); it != signalers.end(); ++it){ + (*it)->send(); + } + } + + sync.unlock (); + + if (!ok) + cond_var.broadcast (); +} + +int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_) +{ + // Try to get the command straight away. + if (cpipe.read (cmd_)) + return 0; + + if (timeout_ == 0) { + errno = EAGAIN; + return -1; + } + + // Wait for signal from the command sender. + int rc = cond_var.wait (socket_mutex, timeout_); + if (rc == -1) { + errno_assert (errno == EAGAIN || errno == EINTR); + return -1; + } + + // Another thread may already fetch the command + const bool ok = cpipe.read (cmd_); + + if (!ok) { + errno = EAGAIN; + return -1; + } + + return 0; +} diff --git a/src/mailbox_safe.hpp b/src/mailbox_safe.hpp new file mode 100644 index 00000000..f9b4f20d --- /dev/null +++ b/src/mailbox_safe.hpp @@ -0,0 +1,92 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + void send (const command_t &cmd_); + int recv (command_t *cmd_, int timeout_); + + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_MAILBOX_SAFE_HPP_INCLUDED__ +#define __ZMQ_MAILBOX_SAFE_HPP_INCLUDED__ + +#include +#include + +#include "platform.hpp" +#include "signaler.hpp" +#include "fd.hpp" +#include "config.hpp" +#include "command.hpp" +#include "ypipe.hpp" +#include "mutex.hpp" +#include "i_mailbox.hpp" +#include "condition_variable.hpp" + +namespace zmq +{ + + class mailbox_safe_t : public i_mailbox + { + public: + + mailbox_safe_t (mutex_t* socket_mutex_); + ~mailbox_safe_t (); + + void send (const command_t &cmd_); + int recv (command_t *cmd_, int timeout_); + + // Add signaler to mailbox which will be called when a message is ready + void add_signaler(signaler_t* signaler); + void remove_signaler(signaler_t* signaler); + +#ifdef HAVE_FORK + // close the file descriptors in the signaller. This is used in a forked + // child process to close the file descriptors so that they do not interfere + // with the context in the parent process. + void forked () + { + // TODO: call fork on the condition variable + } +#endif + + private: + + // The pipe to store actual commands. + typedef ypipe_t cpipe_t; + cpipe_t cpipe; + + // Condition variable to pass signals from writer thread to reader thread. + condition_variable_t cond_var; + + // There's only one thread receiving from the mailbox, but there + // is arbitrary number of threads sending. Given that ypipe requires + // synchronised access on both of its endpoints, we have to synchronise + // the sending side. + mutex_t sync; + + mutex_t* socket_mutex; + + std::vector signalers; + + // Disable copying of mailbox_t object. + mailbox_safe_t (const mailbox_safe_t&); + const mailbox_safe_t &operator = (const mailbox_safe_t&); + }; + +} + +#endif diff --git a/src/mutex.hpp b/src/mutex.hpp index 2368d518..26c7a71f 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -60,6 +60,11 @@ namespace zmq LeaveCriticalSection (&cs); } + inline CRITICAL_SECTION* get_cs() + { + return &cs; + } + private: CRITICAL_SECTION cs; @@ -115,6 +120,11 @@ namespace zmq posix_assert (rc); } + inline pthread_mutex_t* get_mutex() + { + return &mutex; + } + private: pthread_mutex_t mutex; diff --git a/src/server.cpp b/src/server.cpp index e77cd0bd..7fb77098 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -25,7 +25,7 @@ #include "err.hpp" zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_), + socket_base_t (parent_, tid_, sid_, true), next_rid (generate_random ()) { options.type = ZMQ_SERVER; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 198277a6..84c63f78 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -54,6 +54,8 @@ #include "ipc_address.hpp" #include "tcp_address.hpp" #include "tipc_address.hpp" +#include "mailbox.hpp" +#include "mailbox_safe.hpp" #ifdef ZMQ_HAVE_OPENPGM #include "pgm_socket.hpp" #endif @@ -73,6 +75,14 @@ #include "server.hpp" #include "client.hpp" +#define ENTER_MUTEX() \ + if (thread_safe) \ + sync.lock(); + +#define EXIT_MUTEX() \ + if (thread_safe) \ + sync.unlock(); + bool zmq::socket_base_t::check_tag () { return tag == 0xbaddecaf; @@ -131,13 +141,16 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, } alloc_assert (s); - if (s->mailbox.get_fd () == retired_fd) + + mailbox_t *mailbox = dynamic_cast (s->mailbox); + + if (mailbox != NULL && mailbox->get_fd () == retired_fd) return NULL; return s; } -zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : +zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) : own_t (parent_, tid_), tag (0xbaddecaf), ctx_terminated (false), @@ -147,22 +160,30 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : rcvmore (false), file_desc(-1), monitor_socket (NULL), - monitor_events (0) + monitor_events (0), + thread_safe (thread_safe_) { options.socket_id = sid_; options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0; + + if (thread_safe) + mailbox = new mailbox_safe_t(&sync); + else + mailbox = new mailbox_t(); + } zmq::socket_base_t::~socket_base_t () { + delete mailbox; stop_monitor (); zmq_assert (destroyed); } -zmq::mailbox_t *zmq::socket_base_t::get_mailbox () +zmq::i_mailbox *zmq::socket_base_t::get_mailbox () { - return &mailbox; + return mailbox; } void zmq::socket_base_t::stop () @@ -275,57 +296,84 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { + ENTER_MUTEX(); + if (unlikely (ctx_terminated)) { errno = ETERM; + EXIT_MUTEX(); return -1; } // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); - if (rc == 0 || errno != EINVAL) + if (rc == 0 || errno != EINVAL) { + EXIT_MUTEX(); return rc; + } // If the socket type doesn't support the option, pass it to // the generic option parser. - return options.setsockopt (option_, optval_, optvallen_); + rc = options.setsockopt (option_, optval_, optvallen_); + + EXIT_MUTEX(); + return rc; } int zmq::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { + ENTER_MUTEX(); + if (unlikely (ctx_terminated)) { errno = ETERM; + EXIT_MUTEX(); return -1; } if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; + EXIT_MUTEX(); return -1; } *((int*) optval_) = rcvmore ? 1 : 0; *optvallen_ = sizeof (int); + EXIT_MUTEX(); return 0; } if (option_ == ZMQ_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; + EXIT_MUTEX(); return -1; } - *((fd_t*) optval_) = mailbox.get_fd (); - *optvallen_ = sizeof (fd_t); + + if (thread_safe) { + // thread safe socket doesn't provide file descriptor + errno = EINVAL; + EXIT_MUTEX(); + return -1; + } + + *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd(); + *optvallen_ = sizeof(fd_t); + + EXIT_MUTEX(); return 0; } if (option_ == ZMQ_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; + EXIT_MUTEX(); return -1; } int rc = process_commands (0, false); - if (rc != 0 && (errno == EINTR || errno == ETERM)) + if (rc != 0 && (errno == EINTR || errno == ETERM)) { + EXIT_MUTEX(); return -1; + } errno_assert (rc == 0); *((int*) optval_) = 0; if (has_out ()) @@ -333,39 +381,51 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, if (has_in ()) *((int*) optval_) |= ZMQ_POLLIN; *optvallen_ = sizeof (int); + EXIT_MUTEX(); return 0; } if (option_ == ZMQ_LAST_ENDPOINT) { if (*optvallen_ < last_endpoint.size () + 1) { errno = EINVAL; + EXIT_MUTEX(); return -1; } strcpy (static_cast (optval_), last_endpoint.c_str ()); *optvallen_ = last_endpoint.size () + 1; + EXIT_MUTEX(); return 0; } - return options.getsockopt (option_, optval_, optvallen_); + int rc = options.getsockopt (option_, optval_, optvallen_); + EXIT_MUTEX(); + return rc; } int zmq::socket_base_t::bind (const char *addr_) { + ENTER_MUTEX(); + if (unlikely (ctx_terminated)) { errno = ETERM; + EXIT_MUTEX(); return -1; } // Process pending commands, if any. int rc = process_commands (0, false); - if (unlikely (rc != 0)) + if (unlikely (rc != 0)) { + EXIT_MUTEX(); return -1; + } // Parse addr_ string. std::string protocol; std::string address; - if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) + if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { + EXIT_MUTEX(); return -1; + } if (protocol == "inproc") { const endpoint_t endpoint = { this, options }; @@ -374,12 +434,14 @@ int zmq::socket_base_t::bind (const char *addr_) connect_pending (addr_, this); last_endpoint.assign (addr_); } + EXIT_MUTEX(); return rc; } if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { // For convenience's sake, bind can be used interchageable with // connect for PGM, EPGM and NORM transports. + EXIT_MUTEX(); return connect (addr_); } @@ -388,6 +450,7 @@ int zmq::socket_base_t::bind (const char *addr_) io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; + EXIT_MUTEX(); return -1; } @@ -399,6 +462,7 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) { delete listener; event_bind_failed (address, zmq_errno()); + EXIT_MUTEX(); return -1; } @@ -406,6 +470,7 @@ int zmq::socket_base_t::bind (const char *addr_) listener->get_address (last_endpoint); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); + EXIT_MUTEX(); return 0; } @@ -418,6 +483,7 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) { delete listener; event_bind_failed (address, zmq_errno()); + EXIT_MUTEX(); return -1; } @@ -425,6 +491,7 @@ int zmq::socket_base_t::bind (const char *addr_) listener->get_address (last_endpoint); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); + EXIT_MUTEX(); return 0; } #endif @@ -437,6 +504,7 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) { delete listener; event_bind_failed (address, zmq_errno()); + EXIT_MUTEX(); return -1; } @@ -444,31 +512,40 @@ int zmq::socket_base_t::bind (const char *addr_) listener->get_address (last_endpoint); add_endpoint (addr_, (own_t *) listener, NULL); + EXIT_MUTEX(); return 0; } #endif + EXIT_MUTEX(); zmq_assert (false); return -1; } int zmq::socket_base_t::connect (const char *addr_) { + ENTER_MUTEX(); + if (unlikely (ctx_terminated)) { errno = ETERM; + EXIT_MUTEX(); return -1; } // Process pending commands, if any. int rc = process_commands (0, false); - if (unlikely (rc != 0)) + if (unlikely (rc != 0)) { + EXIT_MUTEX(); return -1; + } // Parse addr_ string. std::string protocol; std::string address; - if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) + if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { + EXIT_MUTEX(); return -1; + } if (protocol == "inproc") { @@ -566,6 +643,7 @@ int zmq::socket_base_t::connect (const char *addr_) // remember inproc connections for disconnect inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0])); + EXIT_MUTEX(); return 0; } bool is_single_connect = (options.type == ZMQ_DEALER || @@ -577,6 +655,7 @@ int zmq::socket_base_t::connect (const char *addr_) // There is no valid use for multiple connects for SUB-PUB nor // DEALER-ROUTER nor REQ-REP. Multiple connects produces // nonsensical results. + EXIT_MUTEX(); return 0; } } @@ -585,6 +664,7 @@ int zmq::socket_base_t::connect (const char *addr_) io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; + EXIT_MUTEX(); return -1; } @@ -624,6 +704,7 @@ int zmq::socket_base_t::connect (const char *addr_) if (rc == -1) { errno = EINVAL; delete paddr; + EXIT_MUTEX(); return -1; } // Defer resolution until a socket is opened @@ -637,6 +718,7 @@ int zmq::socket_base_t::connect (const char *addr_) int rc = paddr->resolved.ipc_addr->resolve (address.c_str ()); if (rc != 0) { delete paddr; + EXIT_MUTEX(); return -1; } } @@ -652,6 +734,7 @@ int zmq::socket_base_t::connect (const char *addr_) if (res != NULL) pgm_freeaddrinfo (res); if (rc != 0 || port_number == 0) + EXIT_MUTEX(); return -1; } #endif @@ -663,6 +746,7 @@ int zmq::socket_base_t::connect (const char *addr_) int rc = paddr->resolved.tipc_addr->resolve (address.c_str()); if (rc != 0) { delete paddr; + EXIT_MUTEX(); return -1; } } @@ -708,6 +792,7 @@ int zmq::socket_base_t::connect (const char *addr_) paddr->to_string (last_endpoint); add_endpoint (addr_, (own_t *) session, newpipe); + EXIT_MUTEX(); return 0; } @@ -720,43 +805,55 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe int zmq::socket_base_t::term_endpoint (const char *addr_) { + ENTER_MUTEX(); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; + EXIT_MUTEX(); return -1; } // Check whether endpoint address passed to the function is valid. if (unlikely (!addr_)) { errno = EINVAL; + EXIT_MUTEX(); return -1; } // Process pending commands, if any, since there could be pending unprocessed process_own()'s // (from launch_child() for example) we're asked to terminate now. int rc = process_commands (0, false); - if (unlikely (rc != 0)) + if (unlikely(rc != 0)) { + EXIT_MUTEX(); return -1; + } // Parse addr_ string. std::string protocol; std::string address; - if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) + if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) { + EXIT_MUTEX(); return -1; + } // Disconnect an inproc socket if (protocol == "inproc") { - if (unregister_endpoint (std::string (addr_), this) == 0) + if (unregister_endpoint (std::string(addr_), this) == 0) { + EXIT_MUTEX(); return 0; + } std::pair range = inprocs.equal_range (std::string (addr_)); if (range.first == range.second) { errno = ENOENT; + EXIT_MUTEX(); return -1; } for (inprocs_t::iterator it = range.first; it != range.second; ++it) it->second->terminate (true); inprocs.erase (range.first, range.second); + EXIT_MUTEX(); return 0; } @@ -764,6 +861,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) std::pair range = endpoints.equal_range (std::string (addr_)); if (range.first == range.second) { errno = ENOENT; + EXIT_MUTEX(); return -1; } @@ -774,27 +872,34 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) term_child (it->second.first); } endpoints.erase (range.first, range.second); + EXIT_MUTEX(); return 0; } int zmq::socket_base_t::send (msg_t *msg_, int flags_) { + ENTER_MUTEX(); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; + EXIT_MUTEX(); return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { errno = EFAULT; + EXIT_MUTEX(); return -1; } // Process pending commands, if any. int rc = process_commands (0, true); - if (unlikely (rc != 0)) + if (unlikely (rc != 0)) { + EXIT_MUTEX(); return -1; + } // Clear any user-visible flags that are set on the message. msg_->reset_flags (msg_t::more); @@ -807,15 +912,21 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) // Try to send the message. rc = xsend (msg_); - if (rc == 0) + if (rc == 0) { + EXIT_MUTEX(); return 0; - if (unlikely (errno != EAGAIN)) + } + if (unlikely (errno != EAGAIN)) { + EXIT_MUTEX(); return -1; + } // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) + if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) { + EXIT_MUTEX(); return -1; + } // Compute the time when the timeout should occur. // If the timeout is infinite, don't care. @@ -826,35 +937,46 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) // command, process it and try to send the message again. // If timeout is reached in the meantime, return EAGAIN. while (true) { - if (unlikely (process_commands (timeout, false) != 0)) + if (unlikely (process_commands (timeout, false) != 0)) { + EXIT_MUTEX(); return -1; + } rc = xsend (msg_); if (rc == 0) break; - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + EXIT_MUTEX(); return -1; + } if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { errno = EAGAIN; + EXIT_MUTEX(); return -1; } } } + + EXIT_MUTEX(); return 0; } int zmq::socket_base_t::recv (msg_t *msg_, int flags_) { + ENTER_MUTEX(); + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; + EXIT_MUTEX(); return -1; } // Check whether message passed to the function is valid. if (unlikely (!msg_ || !msg_->check ())) { errno = EFAULT; + EXIT_MUTEX(); return -1; } @@ -867,21 +989,26 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (process_commands (0, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) { + EXIT_MUTEX(); return -1; + } ticks = 0; } // Get the message. int rc = xrecv (msg_); - if (unlikely (rc != 0 && errno != EAGAIN)) + if (unlikely (rc != 0 && errno != EAGAIN)) { + EXIT_MUTEX(); return -1; + } // If we have the message, return immediately. if (rc == 0) { if (file_desc != retired_fd) msg_->set_fd(file_desc); extract_flags (msg_); + EXIT_MUTEX(); return 0; } @@ -890,16 +1017,22 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { - if (unlikely (process_commands (0, false) != 0)) + if (unlikely (process_commands (0, false) != 0)) { + EXIT_MUTEX(); return -1; + } ticks = 0; rc = xrecv (msg_); - if (rc < 0) + if (rc < 0) { + EXIT_MUTEX(); return rc; + } if (file_desc != retired_fd) msg_->set_fd(file_desc); extract_flags (msg_); + + EXIT_MUTEX(); return 0; } @@ -912,20 +1045,25 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // we are able to fetch a message. bool block = (ticks != 0); while (true) { - if (unlikely (process_commands (block ? timeout : 0, false) != 0)) + if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { + EXIT_MUTEX(); return -1; + } rc = xrecv (msg_); if (rc == 0) { ticks = 0; break; } - if (unlikely (errno != EAGAIN)) + if (unlikely (errno != EAGAIN)) { + EXIT_MUTEX(); return -1; + } block = true; if (timeout > 0) { timeout = (int) (end - clock.now_ms ()); if (timeout <= 0) { errno = EAGAIN; + EXIT_MUTEX(); return -1; } } @@ -934,6 +1072,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) if (file_desc != retired_fd) msg_->set_fd(file_desc); extract_flags (msg_); + EXIT_MUTEX(); return 0; } @@ -964,7 +1103,25 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) { // Plug the socket to the reaper thread. poller = poller_; - handle = poller->add_fd (mailbox.get_fd (), this); + + fd_t fd; + + if (!thread_safe) + fd = ((mailbox_t*)mailbox)->get_fd(); + else { + ENTER_MUTEX(); + + // Add signaler to the safe mailbox + fd = reaper_signaler.get_fd(); + ((mailbox_safe_t*)mailbox)->add_signaler(&reaper_signaler); + + // Send a signal to make sure reaper handle existing commands + reaper_signaler.send(); + + EXIT_MUTEX(); + } + + handle = poller->add_fd (fd, this); poller->set_pollin (handle); // Initialise the termination and check whether it can be deallocated @@ -980,7 +1137,7 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) if (timeout_ != 0) { // If we are asked to wait, simply ask mailbox to wait. - rc = mailbox.recv (&cmd, timeout_); + rc = mailbox->recv (&cmd, timeout_); } else { @@ -1007,13 +1164,13 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = mailbox.recv (&cmd, 0); + rc = mailbox->recv (&cmd, 0); } // Process all available commands. while (rc == 0) { cmd.destination->process_command (cmd); - rc = mailbox.recv (&cmd, 0); + rc = mailbox->recv (&cmd, 0); } if (errno == EINTR) @@ -1118,7 +1275,10 @@ void zmq::socket_base_t::in_event () // of the reaper thread. Process any commands from other threads/sockets // that may be available at the moment. Ultimately, the socket will // be destroyed. + + ENTER_MUTEX(); process_commands (0, false); + EXIT_MUTEX(); check_destroy (); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 2742d223..6a6d6be4 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -31,7 +31,7 @@ #include "poller.hpp" #include "atomic_counter.hpp" #include "i_poll_events.hpp" -#include "mailbox.hpp" +#include "i_mailbox.hpp" #include "stdint.hpp" #include "clock.hpp" #include "pipe.hpp" @@ -66,7 +66,7 @@ namespace zmq uint32_t tid_, int sid_); // Returns the mailbox associated with this socket. - mailbox_t *get_mailbox (); + i_mailbox *get_mailbox (); // Interrupt blocking call if the socket is stuck in one. // This function can be called from a different thread! @@ -123,7 +123,7 @@ namespace zmq protected: - socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); + socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false); virtual ~socket_base_t (); // Concrete algorithms for the x- methods are to be defined by @@ -223,7 +223,7 @@ namespace zmq void process_term (int linger_); // Socket's mailbox object. - mailbox_t mailbox; + i_mailbox* mailbox; // List of attached pipes. typedef array_t pipes_t; @@ -257,9 +257,17 @@ namespace zmq // Last socket endpoint resolved URI std::string last_endpoint; - socket_base_t (const socket_base_t&); - const socket_base_t &operator = (const socket_base_t&); + // Indicate if the socket is thread safe + bool thread_safe; + + // Signaler to be used in the reaping stage + signaler_t reaper_signaler; + + // Mutex for synchronize access to the socket in thread safe mode mutex_t sync; + + socket_base_t (const socket_base_t&); + const socket_base_t &operator = (const socket_base_t&); }; } diff --git a/tests/test_thread_safe.cpp b/tests/test_thread_safe.cpp new file mode 100644 index 00000000..dbc360af --- /dev/null +++ b/tests/test_thread_safe.cpp @@ -0,0 +1,145 @@ +/*: + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +void worker1(void* s); +void worker2(void* s); + +int main (void) +{ + setup_test_environment(); + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *client = zmq_socket (ctx, ZMQ_CLIENT); + void *client2 = zmq_socket (ctx, ZMQ_CLIENT); + + int rc; + + rc = zmq_bind (client, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + rc = zmq_connect (client2, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + void* t1 = zmq_threadstart(worker1, client2); + void* t2 = zmq_threadstart(worker2, client2); + + char data[1]; + data[0] = 0; + + for (int i=0; i < 10; i++) { + rc = zmq_send_const(client, data, 1, 0); + assert (rc == 1); + + rc = zmq_send_const(client, data, 1, 0); + assert(rc == 1); + + char a, b; + + rc = zmq_recv(client, &a, 1, 0); + assert(rc == 1); + + rc = zmq_recv(client, &b, 1, 0); + assert(rc == 1); + + // make sure they came from different threads + assert((a == 1 && b == 2) || (a == 2 && b == 1)); + } + + // make the thread exit + data[0] = 1; + + rc = zmq_send_const(client, data, 1, 0); + assert (rc == 1); + + rc = zmq_send_const(client, data, 1, 0); + assert(rc == 1); + + zmq_threadclose(t1); + zmq_threadclose(t2); + + rc = zmq_close (client2); + assert (rc == 0); + + rc = zmq_close (client); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} + +void worker1(void* s) +{ + const char worker_id = 1; + char c; + + while (true) + { + int rc = zmq_recv(s, &c,1, 0); + assert(rc == 1); + + if (c == 0) + { + msleep(10); + rc = zmq_send_const(s,&worker_id, 1, 0); + assert(rc == 1); + } + else + { + // we got exit request + break; + } + } +} + +void worker2(void* s) +{ + const char worker_id = 2; + char c; + + while (true) + { + int rc = zmq_recv(s, &c,1, 0); + assert(rc == 1); + + assert(c == 1 || c == 0); + + if (c == 0) + { + msleep(10); + rc = zmq_send_const(s,&worker_id, 1, 0); + assert(rc == 1); + } + else + { + // we got exit request + break; + } + } +} + + + + + + From bbdd8662bafa99584b78fe44e1a55b24b37cf740 Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 12 Feb 2015 16:56:14 +0200 Subject: [PATCH 2/3] thread safety - supporting windows --- CMakeLists.txt | 1 + builds/cmake/platform.hpp.in | 9 +++++++++ builds/mingw32/platform.hpp | 9 +++++++++ builds/msvc/platform.hpp | 8 ++++++++ builds/msvc/vs2013/libzmq/libzmq.vcxproj | 3 +++ builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters | 9 +++++++++ tests/CMakeLists.txt | 2 ++ 7 files changed, 41 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index a0890065..2f586e09 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -423,6 +423,7 @@ set(cxx-sources kqueue.cpp lb.cpp mailbox.cpp + mailbox_safe.cpp mechanism.cpp metadata.cpp msg.cpp diff --git a/builds/cmake/platform.hpp.in b/builds/cmake/platform.hpp.in index d6236709..b1039dea 100644 --- a/builds/cmake/platform.hpp.in +++ b/builds/cmake/platform.hpp.in @@ -86,5 +86,14 @@ #cmakedefine ZMQ_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS + #if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600 + #undef _WIN32_WINNT + #endif + #ifndef _WIN32_WINNT + #define _WIN32_WINNT 0x0600 + #endif #endif + +#endif \ No newline at end of file diff --git a/builds/mingw32/platform.hpp b/builds/mingw32/platform.hpp index 4af872cd..62d36c1f 100644 --- a/builds/mingw32/platform.hpp +++ b/builds/mingw32/platform.hpp @@ -29,4 +29,13 @@ #define ZMQ_HAVE_WINDOWS +#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600 + #undef _WIN32_WINNT +#endif + +#ifndef _WIN32_WINNT + #define _WIN32_WINNT 0x0600 +#endif + + #endif diff --git a/builds/msvc/platform.hpp b/builds/msvc/platform.hpp index 4af872cd..d6e0ce60 100644 --- a/builds/msvc/platform.hpp +++ b/builds/msvc/platform.hpp @@ -29,4 +29,12 @@ #define ZMQ_HAVE_WINDOWS +#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600 + #undef _WIN32_WINNT +#endif + +#ifndef _WIN32_WINNT + #define _WIN32_WINNT 0x0600 +#endif + #endif diff --git a/builds/msvc/vs2013/libzmq/libzmq.vcxproj b/builds/msvc/vs2013/libzmq/libzmq.vcxproj index fb4e6d44..5a464a30 100644 --- a/builds/msvc/vs2013/libzmq/libzmq.vcxproj +++ b/builds/msvc/vs2013/libzmq/libzmq.vcxproj @@ -76,6 +76,7 @@ + @@ -103,6 +104,7 @@ + @@ -183,6 +185,7 @@ + diff --git a/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters b/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters index 7faf3190..18a165b6 100644 --- a/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters +++ b/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters @@ -232,6 +232,9 @@ src + + src + @@ -504,6 +507,12 @@ src\include + + src\include + + + src\include + diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e6b0a31d..dbf575fe 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -45,6 +45,8 @@ set(tests test_connect_rid test_xpub_nodrop test_pub_invert_matching + test_thread_safe + test_client_server ) if(NOT WIN32) list(APPEND tests From 5a897f750913397c328167dd92172d7f52267162 Mon Sep 17 00:00:00 2001 From: somdoron Date: Thu, 12 Feb 2015 20:38:49 +0200 Subject: [PATCH 3/3] allocate reaper_signal only when needed --- src/socket_base.cpp | 21 ++++++++++++++++----- src/socket_base.hpp | 2 +- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 84c63f78..48c8597f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -161,7 +161,8 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool file_desc(-1), monitor_socket (NULL), monitor_events (0), - thread_safe (thread_safe_) + thread_safe (thread_safe_), + reaper_signaler (NULL) { options.socket_id = sid_; options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); @@ -171,12 +172,15 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool mailbox = new mailbox_safe_t(&sync); else mailbox = new mailbox_t(); - } zmq::socket_base_t::~socket_base_t () { delete mailbox; + + if (reaper_signaler) + delete reaper_signaler; + stop_monitor (); zmq_assert (destroyed); } @@ -1111,12 +1115,14 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) else { ENTER_MUTEX(); + reaper_signaler = new signaler_t(); + // Add signaler to the safe mailbox - fd = reaper_signaler.get_fd(); - ((mailbox_safe_t*)mailbox)->add_signaler(&reaper_signaler); + fd = reaper_signaler->get_fd(); + ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler); // Send a signal to make sure reaper handle existing commands - reaper_signaler.send(); + reaper_signaler->send(); EXIT_MUTEX(); } @@ -1277,6 +1283,11 @@ void zmq::socket_base_t::in_event () // be destroyed. ENTER_MUTEX(); + + // If the socket is thread safe we need to unsignal the reaper signaler + if (thread_safe) + reaper_signaler->recv(); + process_commands (0, false); EXIT_MUTEX(); check_destroy (); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 6a6d6be4..d3dcc459 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -261,7 +261,7 @@ namespace zmq bool thread_safe; // Signaler to be used in the reaping stage - signaler_t reaper_signaler; + signaler_t* reaper_signaler; // Mutex for synchronize access to the socket in thread safe mode mutex_t sync;