diff --git a/CMakeLists.txt b/CMakeLists.txt
index a0890065..2f586e09 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -423,6 +423,7 @@ set(cxx-sources
kqueue.cpp
lb.cpp
mailbox.cpp
+ mailbox_safe.cpp
mechanism.cpp
metadata.cpp
msg.cpp
diff --git a/Makefile.am b/Makefile.am
index a0bc0185..28eef125 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -31,6 +31,7 @@ src_libzmq_la_SOURCES = \
src/clock.cpp \
src/clock.hpp \
src/command.hpp \
+ src/condition_variable.hpp \
src/config.hpp \
src/ctx.cpp \
src/ctx.hpp \
@@ -63,6 +64,7 @@ src_libzmq_la_SOURCES = \
src/i_encoder.hpp \
src/i_engine.hpp \
src/i_decoder.hpp \
+ src/i_mailbox.hpp \
src/i_poll_events.hpp \
src/io_object.cpp \
src/io_object.hpp \
@@ -83,6 +85,8 @@ src_libzmq_la_SOURCES = \
src/likely.hpp \
src/mailbox.cpp \
src/mailbox.hpp \
+ src/mailbox_safe.cpp \
+ src/mailbox_safe.hpp \
src/mechanism.cpp \
src/mechanism.hpp \
src/metadata.cpp \
@@ -349,7 +353,8 @@ test_apps = \
tests/test_atomics \
tests/test_client_server \
tests/test_server_drop_more \
- tests/test_client_drop_more
+ tests/test_client_drop_more \
+ tests/test_thread_safe
tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
@@ -530,6 +535,11 @@ tests_test_server_drop_more_LDADD = src/libzmq.la
tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp
tests_test_client_drop_more_LDADD = src/libzmq.la
+tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp
+tests_test_thread_safe_LDADD = src/libzmq.la
+
+
+
if !ON_MINGW
if !ON_CYGWIN
test_apps += \
diff --git a/builds/cmake/platform.hpp.in b/builds/cmake/platform.hpp.in
index d6236709..b1039dea 100644
--- a/builds/cmake/platform.hpp.in
+++ b/builds/cmake/platform.hpp.in
@@ -86,5 +86,14 @@
#cmakedefine ZMQ_HAVE_WINDOWS
+#ifdef ZMQ_HAVE_WINDOWS
+ #if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
+ #undef _WIN32_WINNT
+ #endif
+ #ifndef _WIN32_WINNT
+ #define _WIN32_WINNT 0x0600
+ #endif
#endif
+
+#endif
\ No newline at end of file
diff --git a/builds/mingw32/platform.hpp b/builds/mingw32/platform.hpp
index 4af872cd..62d36c1f 100644
--- a/builds/mingw32/platform.hpp
+++ b/builds/mingw32/platform.hpp
@@ -29,4 +29,13 @@
#define ZMQ_HAVE_WINDOWS
+#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
+ #undef _WIN32_WINNT
+#endif
+
+#ifndef _WIN32_WINNT
+ #define _WIN32_WINNT 0x0600
+#endif
+
+
#endif
diff --git a/builds/msvc/platform.hpp b/builds/msvc/platform.hpp
index 4af872cd..d6e0ce60 100644
--- a/builds/msvc/platform.hpp
+++ b/builds/msvc/platform.hpp
@@ -29,4 +29,12 @@
#define ZMQ_HAVE_WINDOWS
+#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
+ #undef _WIN32_WINNT
+#endif
+
+#ifndef _WIN32_WINNT
+ #define _WIN32_WINNT 0x0600
+#endif
+
#endif
diff --git a/builds/msvc/vs2013/libzmq/libzmq.vcxproj b/builds/msvc/vs2013/libzmq/libzmq.vcxproj
index fb4e6d44..5a464a30 100644
--- a/builds/msvc/vs2013/libzmq/libzmq.vcxproj
+++ b/builds/msvc/vs2013/libzmq/libzmq.vcxproj
@@ -76,6 +76,7 @@
+
@@ -103,6 +104,7 @@
+
@@ -183,6 +185,7 @@
+
diff --git a/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters b/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters
index 7faf3190..18a165b6 100644
--- a/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters
+++ b/builds/msvc/vs2013/libzmq/libzmq.vcxproj.filters
@@ -232,6 +232,9 @@
src
+
+ src
+
@@ -504,6 +507,12 @@
src\include
+
+ src\include
+
+
+ src\include
+
diff --git a/src/client.cpp b/src/client.cpp
index 73357c1e..f30b43c9 100644
--- a/src/client.cpp
+++ b/src/client.cpp
@@ -22,7 +22,7 @@
#include "msg.hpp"
zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
- socket_base_t (parent_, tid_, sid_)
+ socket_base_t (parent_, tid_, sid_, true)
{
options.type = ZMQ_CLIENT;
}
diff --git a/src/condition_variable.hpp b/src/condition_variable.hpp
new file mode 100644
index 00000000..265bc9cc
--- /dev/null
+++ b/src/condition_variable.hpp
@@ -0,0 +1,149 @@
+/*
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
+#define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
+
+#include "platform.hpp"
+#include "err.hpp"
+#include "mutex.hpp"
+
+// Condition variable class encapsulates OS mutex in a platform-independent way.
+
+#ifdef ZMQ_HAVE_WINDOWS
+
+#include "windows.hpp"
+
+namespace zmq
+{
+
+ class condition_variable_t
+ {
+ public:
+ inline condition_variable_t ()
+ {
+ InitializeConditionVariable (&cv);
+ }
+
+ inline ~condition_variable_t ()
+ {
+
+ }
+
+ inline int wait (mutex_t* mutex_, int timeout_ )
+ {
+ int rc = SleepConditionVariableCS(&cv, mutex_->get_cs (), timeout_);
+
+ if (rc != 0)
+ return 0;
+
+ rc = GetLastError();
+
+ if (rc != ERROR_TIMEOUT)
+ win_assert(rc);
+
+ errno = EAGAIN;
+ return -1;
+ }
+
+ inline void broadcast ()
+ {
+ WakeAllConditionVariable(&cv);
+ }
+
+ private:
+
+ CONDITION_VARIABLE cv;
+
+ // Disable copy construction and assignment.
+ condition_variable_t (const condition_variable_t&);
+ void operator = (const condition_variable_t&);
+ };
+
+}
+
+#else
+
+#include
+
+namespace zmq
+{
+
+ class condition_variable_t
+ {
+ public:
+ inline condition_variable_t ()
+ {
+ int rc = pthread_cond_init (&cond, NULL);
+ posix_assert (rc);
+ }
+
+ inline ~condition_variable_t ()
+ {
+ int rc = pthread_cond_destroy (&cond);
+ posix_assert (rc);
+ }
+
+ inline int wait (mutex_t* mutex_, int timeout_)
+ {
+ int rc;
+
+ if (timeout_ != -1) {
+ struct timespec timeout;
+ clock_gettime(CLOCK_REALTIME, &timeout);
+
+ timeout.tv_sec += timeout_ / 1000;
+ timeout.tv_nsec += (timeout_ % 1000) * 1000000;
+ rc = pthread_cond_timedwait (&cond, mutex_->get_mutex (), &timeout);
+ }
+ else
+ rc = pthread_cond_wait(&cond, mutex_->get_mutex());
+
+ if (rc == 0)
+ return 0;
+
+ if (rc == ETIMEDOUT){
+ errno= EAGAIN;
+ return -1;
+ }
+
+ posix_assert (rc);
+ return -1;
+ }
+
+ inline void broadcast ()
+ {
+ int rc = pthread_cond_broadcast (&cond);
+ posix_assert (rc);
+ }
+
+ private:
+
+ pthread_cond_t cond;
+
+ // Disable copy construction and assignment.
+ condition_variable_t (const condition_variable_t&);
+ const condition_variable_t &operator = (const condition_variable_t&);
+ };
+}
+
+#endif
+
+
+#endif
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 0867ca61..6f5c7b67 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -273,7 +273,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
- slots = (mailbox_t **) malloc (sizeof (mailbox_t*) * slot_count);
+ slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
alloc_assert (slots);
// Initialise the infrastructure for zmq_ctx_term thread.
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 24e5fd1e..5b78107b 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -162,7 +162,7 @@ namespace zmq
// Array of pointers to mailboxes for both application and I/O threads.
uint32_t slot_count;
- mailbox_t **slots;
+ i_mailbox **slots;
// Mailbox for zmq_term thread.
mailbox_t term_mailbox;
diff --git a/src/i_mailbox.hpp b/src/i_mailbox.hpp
new file mode 100644
index 00000000..8f472d38
--- /dev/null
+++ b/src/i_mailbox.hpp
@@ -0,0 +1,50 @@
+/*
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_I_MAILBOX_HPP_INCLUDED__
+#define __ZMQ_I_MAILBOX_HPP_INCLUDED__
+
+#include "stdint.hpp"
+
+namespace zmq
+{
+ // Interface to be implemented by mailbox.
+
+ class i_mailbox
+ {
+ public:
+ virtual ~i_mailbox () {}
+
+ virtual void send (const command_t &cmd_) = 0;
+ virtual int recv (command_t *cmd_, int timeout_) = 0;
+
+
+#ifdef HAVE_FORK
+ // close the file descriptors in the signaller. This is used in a forked
+ // child process to close the file descriptors so that they do not interfere
+ // with the context in the parent process.
+ virtual void forked () = 0;
+#endif
+
+
+ };
+
+}
+
+#endif
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 45c261fd..b9a7cfd0 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -29,11 +29,12 @@
#include "command.hpp"
#include "ypipe.hpp"
#include "mutex.hpp"
+#include "i_mailbox.hpp"
namespace zmq
{
- class mailbox_t
+ class mailbox_t : public i_mailbox
{
public:
diff --git a/src/mailbox_safe.cpp b/src/mailbox_safe.cpp
new file mode 100644
index 00000000..9bc5a5a7
--- /dev/null
+++ b/src/mailbox_safe.cpp
@@ -0,0 +1,111 @@
+/*
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include "mailbox_safe.hpp"
+#include "err.hpp"
+
+zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* socket_mutex_) :
+ socket_mutex (socket_mutex_)
+{
+ // Get the pipe into passive state. That way, if the users starts by
+ // polling on the associated file descriptor it will get woken up when
+ // new command is posted.
+ const bool ok = cpipe.read (NULL);
+ zmq_assert (!ok);
+}
+
+zmq::mailbox_safe_t::~mailbox_safe_t ()
+{
+ // TODO: Retrieve and deallocate commands inside the cpipe.
+
+ // Work around problem that other threads might still be in our
+ // send() method, by waiting on the mutex before disappearing.
+ sync.lock ();
+ sync.unlock ();
+}
+
+void zmq::mailbox_safe_t::add_signaler(signaler_t* signaler)
+{
+ sync.lock();
+ signalers.push_back(signaler);
+ sync.unlock();
+}
+
+void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler)
+{
+ sync.lock();
+ std::vector::iterator it = signalers.begin();
+
+ // TODO: make a copy of array and signal outside the lock
+ for (; it != signalers.end(); ++it){
+ if (*it == signaler)
+ break;
+ }
+
+ if (it != signalers.end())
+ signalers.erase(it);
+ sync.unlock();
+}
+
+void zmq::mailbox_safe_t::send (const command_t &cmd_)
+{
+ sync.lock ();
+ cpipe.write (cmd_, false);
+ const bool ok = cpipe.flush ();
+
+ if (!ok) {
+ for (std::vector::iterator it = signalers.begin(); it != signalers.end(); ++it){
+ (*it)->send();
+ }
+ }
+
+ sync.unlock ();
+
+ if (!ok)
+ cond_var.broadcast ();
+}
+
+int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
+{
+ // Try to get the command straight away.
+ if (cpipe.read (cmd_))
+ return 0;
+
+ if (timeout_ == 0) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Wait for signal from the command sender.
+ int rc = cond_var.wait (socket_mutex, timeout_);
+ if (rc == -1) {
+ errno_assert (errno == EAGAIN || errno == EINTR);
+ return -1;
+ }
+
+ // Another thread may already fetch the command
+ const bool ok = cpipe.read (cmd_);
+
+ if (!ok) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ return 0;
+}
diff --git a/src/mailbox_safe.hpp b/src/mailbox_safe.hpp
new file mode 100644
index 00000000..f9b4f20d
--- /dev/null
+++ b/src/mailbox_safe.hpp
@@ -0,0 +1,92 @@
+/*
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+ void send (const command_t &cmd_);
+ int recv (command_t *cmd_, int timeout_);
+
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_MAILBOX_SAFE_HPP_INCLUDED__
+#define __ZMQ_MAILBOX_SAFE_HPP_INCLUDED__
+
+#include
+#include
+
+#include "platform.hpp"
+#include "signaler.hpp"
+#include "fd.hpp"
+#include "config.hpp"
+#include "command.hpp"
+#include "ypipe.hpp"
+#include "mutex.hpp"
+#include "i_mailbox.hpp"
+#include "condition_variable.hpp"
+
+namespace zmq
+{
+
+ class mailbox_safe_t : public i_mailbox
+ {
+ public:
+
+ mailbox_safe_t (mutex_t* socket_mutex_);
+ ~mailbox_safe_t ();
+
+ void send (const command_t &cmd_);
+ int recv (command_t *cmd_, int timeout_);
+
+ // Add signaler to mailbox which will be called when a message is ready
+ void add_signaler(signaler_t* signaler);
+ void remove_signaler(signaler_t* signaler);
+
+#ifdef HAVE_FORK
+ // close the file descriptors in the signaller. This is used in a forked
+ // child process to close the file descriptors so that they do not interfere
+ // with the context in the parent process.
+ void forked ()
+ {
+ // TODO: call fork on the condition variable
+ }
+#endif
+
+ private:
+
+ // The pipe to store actual commands.
+ typedef ypipe_t cpipe_t;
+ cpipe_t cpipe;
+
+ // Condition variable to pass signals from writer thread to reader thread.
+ condition_variable_t cond_var;
+
+ // There's only one thread receiving from the mailbox, but there
+ // is arbitrary number of threads sending. Given that ypipe requires
+ // synchronised access on both of its endpoints, we have to synchronise
+ // the sending side.
+ mutex_t sync;
+
+ mutex_t* socket_mutex;
+
+ std::vector signalers;
+
+ // Disable copying of mailbox_t object.
+ mailbox_safe_t (const mailbox_safe_t&);
+ const mailbox_safe_t &operator = (const mailbox_safe_t&);
+ };
+
+}
+
+#endif
diff --git a/src/mutex.hpp b/src/mutex.hpp
index 2368d518..26c7a71f 100644
--- a/src/mutex.hpp
+++ b/src/mutex.hpp
@@ -60,6 +60,11 @@ namespace zmq
LeaveCriticalSection (&cs);
}
+ inline CRITICAL_SECTION* get_cs()
+ {
+ return &cs;
+ }
+
private:
CRITICAL_SECTION cs;
@@ -115,6 +120,11 @@ namespace zmq
posix_assert (rc);
}
+ inline pthread_mutex_t* get_mutex()
+ {
+ return &mutex;
+ }
+
private:
pthread_mutex_t mutex;
diff --git a/src/server.cpp b/src/server.cpp
index e77cd0bd..7fb77098 100644
--- a/src/server.cpp
+++ b/src/server.cpp
@@ -25,7 +25,7 @@
#include "err.hpp"
zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
- socket_base_t (parent_, tid_, sid_),
+ socket_base_t (parent_, tid_, sid_, true),
next_rid (generate_random ())
{
options.type = ZMQ_SERVER;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 198277a6..48c8597f 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -54,6 +54,8 @@
#include "ipc_address.hpp"
#include "tcp_address.hpp"
#include "tipc_address.hpp"
+#include "mailbox.hpp"
+#include "mailbox_safe.hpp"
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
@@ -73,6 +75,14 @@
#include "server.hpp"
#include "client.hpp"
+#define ENTER_MUTEX() \
+ if (thread_safe) \
+ sync.lock();
+
+#define EXIT_MUTEX() \
+ if (thread_safe) \
+ sync.unlock();
+
bool zmq::socket_base_t::check_tag ()
{
return tag == 0xbaddecaf;
@@ -131,13 +141,16 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
}
alloc_assert (s);
- if (s->mailbox.get_fd () == retired_fd)
+
+ mailbox_t *mailbox = dynamic_cast (s->mailbox);
+
+ if (mailbox != NULL && mailbox->get_fd () == retired_fd)
return NULL;
return s;
}
-zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
+zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_) :
own_t (parent_, tid_),
tag (0xbaddecaf),
ctx_terminated (false),
@@ -147,22 +160,34 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
rcvmore (false),
file_desc(-1),
monitor_socket (NULL),
- monitor_events (0)
+ monitor_events (0),
+ thread_safe (thread_safe_),
+ reaper_signaler (NULL)
{
options.socket_id = sid_;
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
+
+ if (thread_safe)
+ mailbox = new mailbox_safe_t(&sync);
+ else
+ mailbox = new mailbox_t();
}
zmq::socket_base_t::~socket_base_t ()
{
+ delete mailbox;
+
+ if (reaper_signaler)
+ delete reaper_signaler;
+
stop_monitor ();
zmq_assert (destroyed);
}
-zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
+zmq::i_mailbox *zmq::socket_base_t::get_mailbox ()
{
- return &mailbox;
+ return mailbox;
}
void zmq::socket_base_t::stop ()
@@ -275,57 +300,84 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
+ ENTER_MUTEX();
+
if (unlikely (ctx_terminated)) {
errno = ETERM;
+ EXIT_MUTEX();
return -1;
}
// First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_);
- if (rc == 0 || errno != EINVAL)
+ if (rc == 0 || errno != EINVAL) {
+ EXIT_MUTEX();
return rc;
+ }
// If the socket type doesn't support the option, pass it to
// the generic option parser.
- return options.setsockopt (option_, optval_, optvallen_);
+ rc = options.setsockopt (option_, optval_, optvallen_);
+
+ EXIT_MUTEX();
+ return rc;
}
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
+ ENTER_MUTEX();
+
if (unlikely (ctx_terminated)) {
errno = ETERM;
+ EXIT_MUTEX();
return -1;
}
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
+ EXIT_MUTEX();
return -1;
}
*((int*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int);
+ EXIT_MUTEX();
return 0;
}
if (option_ == ZMQ_FD) {
if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL;
+ EXIT_MUTEX();
return -1;
}
- *((fd_t*) optval_) = mailbox.get_fd ();
- *optvallen_ = sizeof (fd_t);
+
+ if (thread_safe) {
+ // thread safe socket doesn't provide file descriptor
+ errno = EINVAL;
+ EXIT_MUTEX();
+ return -1;
+ }
+
+ *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
+ *optvallen_ = sizeof(fd_t);
+
+ EXIT_MUTEX();
return 0;
}
if (option_ == ZMQ_EVENTS) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
+ EXIT_MUTEX();
return -1;
}
int rc = process_commands (0, false);
- if (rc != 0 && (errno == EINTR || errno == ETERM))
+ if (rc != 0 && (errno == EINTR || errno == ETERM)) {
+ EXIT_MUTEX();
return -1;
+ }
errno_assert (rc == 0);
*((int*) optval_) = 0;
if (has_out ())
@@ -333,39 +385,51 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
if (has_in ())
*((int*) optval_) |= ZMQ_POLLIN;
*optvallen_ = sizeof (int);
+ EXIT_MUTEX();
return 0;
}
if (option_ == ZMQ_LAST_ENDPOINT) {
if (*optvallen_ < last_endpoint.size () + 1) {
errno = EINVAL;
+ EXIT_MUTEX();
return -1;
}
strcpy (static_cast (optval_), last_endpoint.c_str ());
*optvallen_ = last_endpoint.size () + 1;
+ EXIT_MUTEX();
return 0;
}
- return options.getsockopt (option_, optval_, optvallen_);
+ int rc = options.getsockopt (option_, optval_, optvallen_);
+ EXIT_MUTEX();
+ return rc;
}
int zmq::socket_base_t::bind (const char *addr_)
{
+ ENTER_MUTEX();
+
if (unlikely (ctx_terminated)) {
errno = ETERM;
+ EXIT_MUTEX();
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, false);
- if (unlikely (rc != 0))
+ if (unlikely (rc != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
// Parse addr_ string.
std::string protocol;
std::string address;
- if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
+ if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
+ EXIT_MUTEX();
return -1;
+ }
if (protocol == "inproc") {
const endpoint_t endpoint = { this, options };
@@ -374,12 +438,14 @@ int zmq::socket_base_t::bind (const char *addr_)
connect_pending (addr_, this);
last_endpoint.assign (addr_);
}
+ EXIT_MUTEX();
return rc;
}
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
// For convenience's sake, bind can be used interchageable with
// connect for PGM, EPGM and NORM transports.
+ EXIT_MUTEX();
return connect (addr_);
}
@@ -388,6 +454,7 @@ int zmq::socket_base_t::bind (const char *addr_)
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
+ EXIT_MUTEX();
return -1;
}
@@ -399,6 +466,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) {
delete listener;
event_bind_failed (address, zmq_errno());
+ EXIT_MUTEX();
return -1;
}
@@ -406,6 +474,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
+ EXIT_MUTEX();
return 0;
}
@@ -418,6 +487,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) {
delete listener;
event_bind_failed (address, zmq_errno());
+ EXIT_MUTEX();
return -1;
}
@@ -425,6 +495,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
+ EXIT_MUTEX();
return 0;
}
#endif
@@ -437,6 +508,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) {
delete listener;
event_bind_failed (address, zmq_errno());
+ EXIT_MUTEX();
return -1;
}
@@ -444,31 +516,40 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint);
add_endpoint (addr_, (own_t *) listener, NULL);
+ EXIT_MUTEX();
return 0;
}
#endif
+ EXIT_MUTEX();
zmq_assert (false);
return -1;
}
int zmq::socket_base_t::connect (const char *addr_)
{
+ ENTER_MUTEX();
+
if (unlikely (ctx_terminated)) {
errno = ETERM;
+ EXIT_MUTEX();
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, false);
- if (unlikely (rc != 0))
+ if (unlikely (rc != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
// Parse addr_ string.
std::string protocol;
std::string address;
- if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
+ if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
+ EXIT_MUTEX();
return -1;
+ }
if (protocol == "inproc") {
@@ -566,6 +647,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// remember inproc connections for disconnect
inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
+ EXIT_MUTEX();
return 0;
}
bool is_single_connect = (options.type == ZMQ_DEALER ||
@@ -577,6 +659,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
// nonsensical results.
+ EXIT_MUTEX();
return 0;
}
}
@@ -585,6 +668,7 @@ int zmq::socket_base_t::connect (const char *addr_)
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
+ EXIT_MUTEX();
return -1;
}
@@ -624,6 +708,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (rc == -1) {
errno = EINVAL;
delete paddr;
+ EXIT_MUTEX();
return -1;
}
// Defer resolution until a socket is opened
@@ -637,6 +722,7 @@ int zmq::socket_base_t::connect (const char *addr_)
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
if (rc != 0) {
delete paddr;
+ EXIT_MUTEX();
return -1;
}
}
@@ -652,6 +738,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (res != NULL)
pgm_freeaddrinfo (res);
if (rc != 0 || port_number == 0)
+ EXIT_MUTEX();
return -1;
}
#endif
@@ -663,6 +750,7 @@ int zmq::socket_base_t::connect (const char *addr_)
int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
if (rc != 0) {
delete paddr;
+ EXIT_MUTEX();
return -1;
}
}
@@ -708,6 +796,7 @@ int zmq::socket_base_t::connect (const char *addr_)
paddr->to_string (last_endpoint);
add_endpoint (addr_, (own_t *) session, newpipe);
+ EXIT_MUTEX();
return 0;
}
@@ -720,43 +809,55 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe
int zmq::socket_base_t::term_endpoint (const char *addr_)
{
+ ENTER_MUTEX();
+
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
errno = ETERM;
+ EXIT_MUTEX();
return -1;
}
// Check whether endpoint address passed to the function is valid.
if (unlikely (!addr_)) {
errno = EINVAL;
+ EXIT_MUTEX();
return -1;
}
// Process pending commands, if any, since there could be pending unprocessed process_own()'s
// (from launch_child() for example) we're asked to terminate now.
int rc = process_commands (0, false);
- if (unlikely (rc != 0))
+ if (unlikely(rc != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
// Parse addr_ string.
std::string protocol;
std::string address;
- if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
+ if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
+ EXIT_MUTEX();
return -1;
+ }
// Disconnect an inproc socket
if (protocol == "inproc") {
- if (unregister_endpoint (std::string (addr_), this) == 0)
+ if (unregister_endpoint (std::string(addr_), this) == 0) {
+ EXIT_MUTEX();
return 0;
+ }
std::pair range = inprocs.equal_range (std::string (addr_));
if (range.first == range.second) {
errno = ENOENT;
+ EXIT_MUTEX();
return -1;
}
for (inprocs_t::iterator it = range.first; it != range.second; ++it)
it->second->terminate (true);
inprocs.erase (range.first, range.second);
+ EXIT_MUTEX();
return 0;
}
@@ -764,6 +865,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
std::pair range = endpoints.equal_range (std::string (addr_));
if (range.first == range.second) {
errno = ENOENT;
+ EXIT_MUTEX();
return -1;
}
@@ -774,27 +876,34 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
term_child (it->second.first);
}
endpoints.erase (range.first, range.second);
+ EXIT_MUTEX();
return 0;
}
int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{
+ ENTER_MUTEX();
+
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
errno = ETERM;
+ EXIT_MUTEX();
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
errno = EFAULT;
+ EXIT_MUTEX();
return -1;
}
// Process pending commands, if any.
int rc = process_commands (0, true);
- if (unlikely (rc != 0))
+ if (unlikely (rc != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
// Clear any user-visible flags that are set on the message.
msg_->reset_flags (msg_t::more);
@@ -807,15 +916,21 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// Try to send the message.
rc = xsend (msg_);
- if (rc == 0)
+ if (rc == 0) {
+ EXIT_MUTEX();
return 0;
- if (unlikely (errno != EAGAIN))
+ }
+ if (unlikely (errno != EAGAIN)) {
+ EXIT_MUTEX();
return -1;
+ }
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack.
- if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0)
+ if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
+ EXIT_MUTEX();
return -1;
+ }
// Compute the time when the timeout should occur.
// If the timeout is infinite, don't care.
@@ -826,35 +941,46 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// command, process it and try to send the message again.
// If timeout is reached in the meantime, return EAGAIN.
while (true) {
- if (unlikely (process_commands (timeout, false) != 0))
+ if (unlikely (process_commands (timeout, false) != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
rc = xsend (msg_);
if (rc == 0)
break;
- if (unlikely (errno != EAGAIN))
+ if (unlikely (errno != EAGAIN)) {
+ EXIT_MUTEX();
return -1;
+ }
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
errno = EAGAIN;
+ EXIT_MUTEX();
return -1;
}
}
}
+
+ EXIT_MUTEX();
return 0;
}
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{
+ ENTER_MUTEX();
+
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
errno = ETERM;
+ EXIT_MUTEX();
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) {
errno = EFAULT;
+ EXIT_MUTEX();
return -1;
}
@@ -867,21 +993,26 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) {
- if (unlikely (process_commands (0, false) != 0))
+ if (unlikely (process_commands (0, false) != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
ticks = 0;
}
// Get the message.
int rc = xrecv (msg_);
- if (unlikely (rc != 0 && errno != EAGAIN))
+ if (unlikely (rc != 0 && errno != EAGAIN)) {
+ EXIT_MUTEX();
return -1;
+ }
// If we have the message, return immediately.
if (rc == 0) {
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_);
+ EXIT_MUTEX();
return 0;
}
@@ -890,16 +1021,22 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
- if (unlikely (process_commands (0, false) != 0))
+ if (unlikely (process_commands (0, false) != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
ticks = 0;
rc = xrecv (msg_);
- if (rc < 0)
+ if (rc < 0) {
+ EXIT_MUTEX();
return rc;
+ }
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_);
+
+ EXIT_MUTEX();
return 0;
}
@@ -912,20 +1049,25 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// we are able to fetch a message.
bool block = (ticks != 0);
while (true) {
- if (unlikely (process_commands (block ? timeout : 0, false) != 0))
+ if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
+ EXIT_MUTEX();
return -1;
+ }
rc = xrecv (msg_);
if (rc == 0) {
ticks = 0;
break;
}
- if (unlikely (errno != EAGAIN))
+ if (unlikely (errno != EAGAIN)) {
+ EXIT_MUTEX();
return -1;
+ }
block = true;
if (timeout > 0) {
timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) {
errno = EAGAIN;
+ EXIT_MUTEX();
return -1;
}
}
@@ -934,6 +1076,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_);
+ EXIT_MUTEX();
return 0;
}
@@ -964,7 +1107,27 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
{
// Plug the socket to the reaper thread.
poller = poller_;
- handle = poller->add_fd (mailbox.get_fd (), this);
+
+ fd_t fd;
+
+ if (!thread_safe)
+ fd = ((mailbox_t*)mailbox)->get_fd();
+ else {
+ ENTER_MUTEX();
+
+ reaper_signaler = new signaler_t();
+
+ // Add signaler to the safe mailbox
+ fd = reaper_signaler->get_fd();
+ ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler);
+
+ // Send a signal to make sure reaper handle existing commands
+ reaper_signaler->send();
+
+ EXIT_MUTEX();
+ }
+
+ handle = poller->add_fd (fd, this);
poller->set_pollin (handle);
// Initialise the termination and check whether it can be deallocated
@@ -980,7 +1143,7 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
if (timeout_ != 0) {
// If we are asked to wait, simply ask mailbox to wait.
- rc = mailbox.recv (&cmd, timeout_);
+ rc = mailbox->recv (&cmd, timeout_);
}
else {
@@ -1007,13 +1170,13 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = mailbox.recv (&cmd, 0);
+ rc = mailbox->recv (&cmd, 0);
}
// Process all available commands.
while (rc == 0) {
cmd.destination->process_command (cmd);
- rc = mailbox.recv (&cmd, 0);
+ rc = mailbox->recv (&cmd, 0);
}
if (errno == EINTR)
@@ -1118,7 +1281,15 @@ void zmq::socket_base_t::in_event ()
// of the reaper thread. Process any commands from other threads/sockets
// that may be available at the moment. Ultimately, the socket will
// be destroyed.
+
+ ENTER_MUTEX();
+
+ // If the socket is thread safe we need to unsignal the reaper signaler
+ if (thread_safe)
+ reaper_signaler->recv();
+
process_commands (0, false);
+ EXIT_MUTEX();
check_destroy ();
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 2742d223..d3dcc459 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -31,7 +31,7 @@
#include "poller.hpp"
#include "atomic_counter.hpp"
#include "i_poll_events.hpp"
-#include "mailbox.hpp"
+#include "i_mailbox.hpp"
#include "stdint.hpp"
#include "clock.hpp"
#include "pipe.hpp"
@@ -66,7 +66,7 @@ namespace zmq
uint32_t tid_, int sid_);
// Returns the mailbox associated with this socket.
- mailbox_t *get_mailbox ();
+ i_mailbox *get_mailbox ();
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
@@ -123,7 +123,7 @@ namespace zmq
protected:
- socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
+ socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false);
virtual ~socket_base_t ();
// Concrete algorithms for the x- methods are to be defined by
@@ -223,7 +223,7 @@ namespace zmq
void process_term (int linger_);
// Socket's mailbox object.
- mailbox_t mailbox;
+ i_mailbox* mailbox;
// List of attached pipes.
typedef array_t pipes_t;
@@ -257,9 +257,17 @@ namespace zmq
// Last socket endpoint resolved URI
std::string last_endpoint;
- socket_base_t (const socket_base_t&);
- const socket_base_t &operator = (const socket_base_t&);
+ // Indicate if the socket is thread safe
+ bool thread_safe;
+
+ // Signaler to be used in the reaping stage
+ signaler_t* reaper_signaler;
+
+ // Mutex for synchronize access to the socket in thread safe mode
mutex_t sync;
+
+ socket_base_t (const socket_base_t&);
+ const socket_base_t &operator = (const socket_base_t&);
};
}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index e6b0a31d..dbf575fe 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -45,6 +45,8 @@ set(tests
test_connect_rid
test_xpub_nodrop
test_pub_invert_matching
+ test_thread_safe
+ test_client_server
)
if(NOT WIN32)
list(APPEND tests
diff --git a/tests/test_thread_safe.cpp b/tests/test_thread_safe.cpp
new file mode 100644
index 00000000..dbc360af
--- /dev/null
+++ b/tests/test_thread_safe.cpp
@@ -0,0 +1,145 @@
+/*:
+ Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include "testutil.hpp"
+
+void worker1(void* s);
+void worker2(void* s);
+
+int main (void)
+{
+ setup_test_environment();
+ void *ctx = zmq_ctx_new ();
+ assert (ctx);
+
+ void *client = zmq_socket (ctx, ZMQ_CLIENT);
+ void *client2 = zmq_socket (ctx, ZMQ_CLIENT);
+
+ int rc;
+
+ rc = zmq_bind (client, "tcp://127.0.0.1:5560");
+ assert (rc == 0);
+
+ rc = zmq_connect (client2, "tcp://127.0.0.1:5560");
+ assert (rc == 0);
+
+ void* t1 = zmq_threadstart(worker1, client2);
+ void* t2 = zmq_threadstart(worker2, client2);
+
+ char data[1];
+ data[0] = 0;
+
+ for (int i=0; i < 10; i++) {
+ rc = zmq_send_const(client, data, 1, 0);
+ assert (rc == 1);
+
+ rc = zmq_send_const(client, data, 1, 0);
+ assert(rc == 1);
+
+ char a, b;
+
+ rc = zmq_recv(client, &a, 1, 0);
+ assert(rc == 1);
+
+ rc = zmq_recv(client, &b, 1, 0);
+ assert(rc == 1);
+
+ // make sure they came from different threads
+ assert((a == 1 && b == 2) || (a == 2 && b == 1));
+ }
+
+ // make the thread exit
+ data[0] = 1;
+
+ rc = zmq_send_const(client, data, 1, 0);
+ assert (rc == 1);
+
+ rc = zmq_send_const(client, data, 1, 0);
+ assert(rc == 1);
+
+ zmq_threadclose(t1);
+ zmq_threadclose(t2);
+
+ rc = zmq_close (client2);
+ assert (rc == 0);
+
+ rc = zmq_close (client);
+ assert (rc == 0);
+
+ rc = zmq_ctx_term (ctx);
+ assert (rc == 0);
+
+ return 0 ;
+}
+
+void worker1(void* s)
+{
+ const char worker_id = 1;
+ char c;
+
+ while (true)
+ {
+ int rc = zmq_recv(s, &c,1, 0);
+ assert(rc == 1);
+
+ if (c == 0)
+ {
+ msleep(10);
+ rc = zmq_send_const(s,&worker_id, 1, 0);
+ assert(rc == 1);
+ }
+ else
+ {
+ // we got exit request
+ break;
+ }
+ }
+}
+
+void worker2(void* s)
+{
+ const char worker_id = 2;
+ char c;
+
+ while (true)
+ {
+ int rc = zmq_recv(s, &c,1, 0);
+ assert(rc == 1);
+
+ assert(c == 1 || c == 0);
+
+ if (c == 0)
+ {
+ msleep(10);
+ rc = zmq_send_const(s,&worker_id, 1, 0);
+ assert(rc == 1);
+ }
+ else
+ {
+ // we got exit request
+ break;
+ }
+ }
+}
+
+
+
+
+
+