diff --git a/src/mailbox_safe.cpp b/src/mailbox_safe.cpp index 9bc5a5a7..df0e191d 100644 --- a/src/mailbox_safe.cpp +++ b/src/mailbox_safe.cpp @@ -20,8 +20,8 @@ #include "mailbox_safe.hpp" #include "err.hpp" -zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* socket_mutex_) : - socket_mutex (socket_mutex_) +zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* sync_) : + sync (sync_) { // Get the pipe into passive state. That way, if the users starts by // polling on the associated file descriptor it will get woken up when @@ -36,20 +36,17 @@ zmq::mailbox_safe_t::~mailbox_safe_t () // Work around problem that other threads might still be in our // send() method, by waiting on the mutex before disappearing. - sync.lock (); - sync.unlock (); + sync->lock (); + sync->unlock (); } void zmq::mailbox_safe_t::add_signaler(signaler_t* signaler) { - sync.lock(); signalers.push_back(signaler); - sync.unlock(); } void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler) { - sync.lock(); std::vector::iterator it = signalers.begin(); // TODO: make a copy of array and signal outside the lock @@ -60,25 +57,22 @@ void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler) if (it != signalers.end()) signalers.erase(it); - sync.unlock(); } void zmq::mailbox_safe_t::send (const command_t &cmd_) { - sync.lock (); + sync->lock (); cpipe.write (cmd_, false); const bool ok = cpipe.flush (); if (!ok) { + cond_var.broadcast (); for (std::vector::iterator it = signalers.begin(); it != signalers.end(); ++it){ (*it)->send(); } } - sync.unlock (); - - if (!ok) - cond_var.broadcast (); + sync->unlock (); } int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_) @@ -86,14 +80,9 @@ int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_) // Try to get the command straight away. if (cpipe.read (cmd_)) return 0; - - if (timeout_ == 0) { - errno = EAGAIN; - return -1; - } // Wait for signal from the command sender. - int rc = cond_var.wait (socket_mutex, timeout_); + int rc = cond_var.wait (sync, timeout_); if (rc == -1) { errno_assert (errno == EAGAIN || errno == EINTR); return -1; diff --git a/src/mailbox_safe.hpp b/src/mailbox_safe.hpp index f9b4f20d..ae467e74 100644 --- a/src/mailbox_safe.hpp +++ b/src/mailbox_safe.hpp @@ -43,7 +43,7 @@ namespace zmq { public: - mailbox_safe_t (mutex_t* socket_mutex_); + mailbox_safe_t (mutex_t* sync_); ~mailbox_safe_t (); void send (const command_t &cmd_); @@ -72,13 +72,8 @@ namespace zmq // Condition variable to pass signals from writer thread to reader thread. condition_variable_t cond_var; - // There's only one thread receiving from the mailbox, but there - // is arbitrary number of threads sending. Given that ypipe requires - // synchronised access on both of its endpoints, we have to synchronise - // the sending side. - mutex_t sync; - - mutex_t* socket_mutex; + // Synchronize access to the mailbox from receivers and senders + mutex_t* sync; std::vector signalers; diff --git a/src/mutex.hpp b/src/mutex.hpp index 26c7a71f..ae68e786 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -88,7 +88,13 @@ namespace zmq public: inline mutex_t () { - int rc = pthread_mutex_init (&mutex, NULL); + int rc = pthread_mutexattr_init(&attr); + posix_assert (rc); + + rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); + posix_assert (rc); + + rc = pthread_mutex_init (&mutex, &attr); posix_assert (rc); } @@ -96,6 +102,9 @@ namespace zmq { int rc = pthread_mutex_destroy (&mutex); posix_assert (rc); + + rc = pthread_mutexattr_destroy (&attr); + posix_assert (rc); } inline void lock () @@ -128,6 +137,7 @@ namespace zmq private: pthread_mutex_t mutex; + pthread_mutexattr_t attr; // Disable copy construction and assignment. mutex_t (const mutex_t&);