thread safety bug - lock when sending

This commit is contained in:
somdoron
2015-02-13 09:30:29 +02:00
parent 5d42fe1bf7
commit deaa89656f
3 changed files with 22 additions and 28 deletions

View File

@@ -20,8 +20,8 @@
#include "mailbox_safe.hpp"
#include "err.hpp"
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* socket_mutex_) :
socket_mutex (socket_mutex_)
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* sync_) :
sync (sync_)
{
// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
@@ -36,20 +36,17 @@ zmq::mailbox_safe_t::~mailbox_safe_t ()
// Work around problem that other threads might still be in our
// send() method, by waiting on the mutex before disappearing.
sync.lock ();
sync.unlock ();
sync->lock ();
sync->unlock ();
}
void zmq::mailbox_safe_t::add_signaler(signaler_t* signaler)
{
sync.lock();
signalers.push_back(signaler);
sync.unlock();
}
void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler)
{
sync.lock();
std::vector<signaler_t*>::iterator it = signalers.begin();
// TODO: make a copy of array and signal outside the lock
@@ -60,25 +57,22 @@ void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler)
if (it != signalers.end())
signalers.erase(it);
sync.unlock();
}
void zmq::mailbox_safe_t::send (const command_t &cmd_)
{
sync.lock ();
sync->lock ();
cpipe.write (cmd_, false);
const bool ok = cpipe.flush ();
if (!ok) {
cond_var.broadcast ();
for (std::vector<signaler_t*>::iterator it = signalers.begin(); it != signalers.end(); ++it){
(*it)->send();
}
}
sync.unlock ();
if (!ok)
cond_var.broadcast ();
sync->unlock ();
}
int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
@@ -86,14 +80,9 @@ int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
// Try to get the command straight away.
if (cpipe.read (cmd_))
return 0;
if (timeout_ == 0) {
errno = EAGAIN;
return -1;
}
// Wait for signal from the command sender.
int rc = cond_var.wait (socket_mutex, timeout_);
int rc = cond_var.wait (sync, timeout_);
if (rc == -1) {
errno_assert (errno == EAGAIN || errno == EINTR);
return -1;