diff --git a/src/ctx.cpp b/src/ctx.cpp index 59f3be98..11e72f2a 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -127,6 +127,11 @@ zmq::ctx_t::~ctx_t () tag = ZMQ_CTX_TAG_VALUE_BAD; } +bool zmq::ctx_t::valid () const +{ + return term_mailbox.valid (); +} + int zmq::ctx_t::terminate () { slot_sync.lock(); @@ -146,7 +151,6 @@ int zmq::ctx_t::terminate () terminating = saveTerminating; if (!starting) { - #ifdef HAVE_FORK if (pid != getpid ()) { // we are a forked child process. Close all file descriptors @@ -320,47 +324,83 @@ int zmq::ctx_t::get (int option_) return rc; } +bool zmq::ctx_t::start () +{ + // Initialise the array of mailboxes. Additional three slots are for + // zmq_ctx_term thread and reaper thread. + opt_sync.lock (); + int mazmq = max_sockets; + int ios = io_thread_count; + opt_sync.unlock (); + slot_count = mazmq + ios + 2; + slots = (i_mailbox **) malloc (sizeof (i_mailbox *) * slot_count); + if (!slots) { + errno = ENOMEM; + goto fail; + } + + // Initialise the infrastructure for zmq_ctx_term thread. + slots[term_tid] = &term_mailbox; + + // Create the reaper thread. + reaper = new (std::nothrow) reaper_t (this, reaper_tid); + if (!reaper) { + errno = ENOMEM; + goto fail_cleanup_slots; + } + if (!reaper->get_mailbox ()->valid ()) + goto fail_cleanup_reaper; + slots[reaper_tid] = reaper->get_mailbox (); + reaper->start (); + + // Create I/O thread objects and launch them. + for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) 2; i--) { + slots[i] = NULL; + } + + for (int i = 2; i != ios + 2; i++) { + io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); + if (!io_thread) { + errno = ENOMEM; + goto fail_cleanup_reaper; + } + if (!io_thread->get_mailbox ()->valid ()) { + delete io_thread; + goto fail_cleanup_reaper; + } + io_threads.push_back (io_thread); + slots [i] = io_thread->get_mailbox (); + io_thread->start (); + } + + // In the unused part of the slot array, create a list of empty slots. + for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) ios + 2; i--) { + empty_slots.push_back (i); + } + + starting = false; + return true; + +fail_cleanup_reaper: + reaper->stop (); + delete reaper; + reaper = NULL; + +fail_cleanup_slots: + free (slots); + slots = NULL; + +fail: + return false; +} + zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) { - scoped_lock_t locker(slot_sync); + scoped_lock_t locker (slot_sync); if (unlikely (starting)) { - - starting = false; - // Initialise the array of mailboxes. Additional three slots are for - // zmq_ctx_term thread and reaper thread. - opt_sync.lock (); - int mazmq = max_sockets; - int ios = io_thread_count; - opt_sync.unlock (); - slot_count = mazmq + ios + 2; - slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count); - alloc_assert (slots); - - // Initialise the infrastructure for zmq_ctx_term thread. - slots [term_tid] = &term_mailbox; - - // Create the reaper thread. - reaper = new (std::nothrow) reaper_t (this, reaper_tid); - alloc_assert (reaper); - slots [reaper_tid] = reaper->get_mailbox (); - reaper->start (); - - // Create I/O thread objects and launch them. - for (int i = 2; i != ios + 2; i++) { - io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); - alloc_assert (io_thread); - io_threads.push_back (io_thread); - slots [i] = io_thread->get_mailbox (); - io_thread->start (); - } - - // In the unused part of the slot array, create a list of empty slots. - for (int32_t i = (int32_t) slot_count - 1; - i >= (int32_t) ios + 2; i--) { - empty_slots.push_back (i); - slots [i] = NULL; - } + if (!start ()) + return NULL; } // Once zmq_ctx_term() was called, we can't create new sockets. diff --git a/src/ctx.hpp b/src/ctx.hpp index 500b75e5..10a4d076 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -133,7 +133,10 @@ namespace zmq ~ctx_t (); + bool valid() const; + private: + bool start(); struct pending_connection_t { diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 65e1f6a3..35635ca2 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -37,13 +37,16 @@ #include "ctx.hpp" zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : - object_t (ctx_, tid_) + object_t (ctx_, tid_), + mailbox_handle (NULL) { poller = new (std::nothrow) poller_t (*ctx_); alloc_assert (poller); - mailbox_handle = poller->add_fd (mailbox.get_fd (), this); - poller->set_pollin (mailbox_handle); + if (mailbox.get_fd () != retired_fd) { + mailbox_handle = poller->add_fd (mailbox.get_fd (), this); + poller->set_pollin (mailbox_handle); + } } zmq::io_thread_t::~io_thread_t () @@ -109,6 +112,8 @@ zmq::poller_t *zmq::io_thread_t::get_poller () void zmq::io_thread_t::process_stop () { - poller->rm_fd (mailbox_handle); + if (mailbox_handle) { + poller->rm_fd (mailbox_handle); + } poller->stop (); } diff --git a/src/mailbox.cpp b/src/mailbox.cpp index 14633fab..8d814a03 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -99,3 +99,8 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) zmq_assert (ok); return 0; } + +bool zmq::mailbox_t::valid () const +{ + return signaler.valid (); +} diff --git a/src/mailbox.hpp b/src/mailbox.hpp index e3fcbb31..0e060269 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -54,6 +54,8 @@ namespace zmq void send (const command_t &cmd_); int recv (command_t *cmd_, int timeout_); + bool valid () const; + #ifdef HAVE_FORK // close the file descriptors in the signaller. This is used in a forked // child process to close the file descriptors so that they do not interfere diff --git a/src/reaper.cpp b/src/reaper.cpp index 82935b64..5c53e1d0 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -36,9 +36,13 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_), mailbox_handle((poller_t::handle_t)NULL), + poller (NULL), sockets (0), terminating (false) { + if (!mailbox.valid ()) + return; + poller = new (std::nothrow) poller_t (*ctx_); alloc_assert (poller); @@ -64,13 +68,17 @@ zmq::mailbox_t *zmq::reaper_t::get_mailbox () void zmq::reaper_t::start () { + zmq_assert (mailbox.valid ()); + // Start the thread. poller->start (); } void zmq::reaper_t::stop () { - send_stop (); + if (get_mailbox ()->valid ()) { + send_stop (); + } } void zmq::reaper_t::in_event () diff --git a/src/select.cpp b/src/select.cpp index 58af98d9..1e09728e 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -55,6 +55,7 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) : #else maxfd (retired_fd), #endif + started (false), stopping (false) { #if defined ZMQ_HAVE_WINDOWS @@ -65,7 +66,10 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) : zmq::select_t::~select_t () { - worker.stop (); + if (started) { + stop (); + worker.stop (); + } } zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) @@ -257,6 +261,7 @@ void zmq::select_t::reset_pollout (handle_t handle_) void zmq::select_t::start () { ctx.start_thread (worker, worker_routine, this); + started = true; } void zmq::select_t::stop () diff --git a/src/select.hpp b/src/select.hpp index e76ebcaa..949868a2 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -166,6 +166,9 @@ class select_t : public poller_base_t static fd_entries_t::iterator find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_); + // If true, start has been called. + bool started; + // If true, thread is shutting down. bool stopping; diff --git a/src/signaler.cpp b/src/signaler.cpp index 4afffdd0..8f4c2923 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -372,6 +372,11 @@ int zmq::signaler_t::recv_failable () return 0; } +bool zmq::signaler_t::valid () const +{ + return w != retired_fd; +} + #ifdef HAVE_FORK void zmq::signaler_t::forked () { @@ -398,7 +403,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) errno_assert (errno == ENFILE || errno == EMFILE); *w_ = *r_ = -1; return -1; - } + } else { *w_ = *r_ = fd; return 0; diff --git a/src/signaler.hpp b/src/signaler.hpp index 12de2f08..ec9a7270 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -51,12 +51,16 @@ namespace zmq signaler_t (); ~signaler_t (); + // Returns the socket/file descriptor + // May return retired_fd if the signaler could not be initialized. fd_t get_fd () const; void send (); int wait (int timeout_); void recv (); int recv_failable (); + bool valid () const; + #ifdef HAVE_FORK // close the file descriptors in a forked child process so that they // do not interfere with the context in the parent process. @@ -70,7 +74,8 @@ namespace zmq static int make_fdpair (fd_t *r_, fd_t *w_); // Underlying write & read file descriptor - // Will be -1 if we exceeded number of available handles + // Will be -1 if an error occurred during initialization, e.g. we + // exceeded the number of available handles fd_t w; fd_t r; diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 3bda68e2..e09ade28 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -111,8 +111,19 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e zmq_assert (rc == 0); if (thread_safe) { - if (signaler == NULL) - signaler = new signaler_t (); + if (signaler == NULL) { + signaler = new (std::nothrow) signaler_t (); + if (!signaler) { + errno = ENOMEM; + return -1; + } + if (!signaler->valid ()) { + delete signaler; + signaler = NULL; + errno = EMFILE; + return -1; + } + } rc = socket_->add_signaler (signaler); zmq_assert (rc == 0); diff --git a/src/zmq.cpp b/src/zmq.cpp index 9d4c0063..dbe1a2db 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -161,7 +161,12 @@ void *zmq_ctx_new (void) // Create 0MQ context. zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; - alloc_assert (ctx); + if (ctx) { + if (!ctx->valid ()) { + delete ctx; + return NULL; + } + } return ctx; }