From 0b5cc026fbe7ccc6de66907be29471562a2d344d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 6 Aug 2009 12:51:32 +0200 Subject: [PATCH] clean up - session/socket/engine stuff removed --- src/Makefile.am | 51 +------ src/app_thread.cpp | 106 +------------ src/app_thread.hpp | 27 +--- src/connecter.cpp | 189 ----------------------- src/connecter.hpp | 99 ------------ src/context.cpp | 139 ++--------------- src/context.hpp | 52 +------ src/data_distributor.cpp | 155 ------------------- src/data_distributor.hpp | 70 --------- src/devpoll.cpp | 9 +- src/devpoll.hpp | 3 +- src/dummy_aggregator.cpp | 111 -------------- src/dummy_aggregator.hpp | 73 --------- src/dummy_distributor.cpp | 85 ----------- src/dummy_distributor.hpp | 68 --------- src/epoll.cpp | 10 +- src/epoll.hpp | 3 +- src/fair_aggregator.cpp | 143 ------------------ src/fair_aggregator.hpp | 77 ---------- src/i_api.hpp | 42 +++--- src/i_demux.hpp | 57 ------- src/i_engine.hpp | 53 ------- src/i_mux.hpp | 60 -------- src/i_poller.hpp | 7 +- src/i_session.hpp | 37 ----- src/{req.cpp => i_socket.hpp} | 21 ++- src/i_thread.hpp | 38 ----- src/io_object.cpp | 37 ----- src/io_object.hpp | 51 ------- src/io_thread.cpp | 31 ---- src/io_thread.hpp | 27 +--- src/kqueue.cpp | 9 +- src/kqueue.hpp | 3 +- src/listener.cpp | 170 --------------------- src/listener.hpp | 110 -------------- src/load_balancer.cpp | 130 ---------------- src/load_balancer.hpp | 73 --------- src/object.cpp | 33 ---- src/object.hpp | 8 - src/p2p.cpp | 29 ---- src/p2p.hpp | 42 ------ src/pipe.cpp | 47 ------ src/pipe.hpp | 23 +-- src/pipe_reader.cpp | 118 --------------- src/pipe_reader.hpp | 89 ----------- src/pipe_writer.cpp | 120 --------------- src/pipe_writer.hpp | 88 ----------- src/poll.cpp | 13 +- src/poll.hpp | 3 +- src/pub.cpp | 38 ----- src/pub.hpp | 45 ------ src/rep.cpp | 29 ---- src/rep.hpp | 42 ------ src/req.hpp | 42 ------ src/safe_object.cpp | 76 ---------- src/safe_object.hpp | 68 --------- src/select.cpp | 13 +- src/select.hpp | 2 +- src/session.cpp | 273 ---------------------------------- src/session.hpp | 107 ------------- src/session_stub.cpp | 110 -------------- src/session_stub.hpp | 83 ----------- src/socket_base.cpp | 267 --------------------------------- src/socket_base.hpp | 96 ------------ src/sub.cpp | 45 ------ src/sub.hpp | 46 ------ src/zmq.cpp | 2 +- src/zmq_decoder.cpp | 79 ---------- src/zmq_decoder.hpp | 57 ------- src/zmq_encoder.cpp | 75 ---------- src/zmq_encoder.hpp | 54 ------- src/zmq_tcp_engine.cpp | 185 ----------------------- src/zmq_tcp_engine.hpp | 92 ------------ 73 files changed, 109 insertions(+), 4856 deletions(-) delete mode 100644 src/connecter.cpp delete mode 100644 src/connecter.hpp delete mode 100644 src/data_distributor.cpp delete mode 100644 src/data_distributor.hpp delete mode 100644 src/dummy_aggregator.cpp delete mode 100644 src/dummy_aggregator.hpp delete mode 100644 src/dummy_distributor.cpp delete mode 100644 src/dummy_distributor.hpp delete mode 100644 src/fair_aggregator.cpp delete mode 100644 src/fair_aggregator.hpp delete mode 100644 src/i_demux.hpp delete mode 100644 src/i_engine.hpp delete mode 100644 src/i_mux.hpp delete mode 100644 src/i_session.hpp rename src/{req.cpp => i_socket.hpp} (75%) delete mode 100644 src/i_thread.hpp delete mode 100644 src/io_object.cpp delete mode 100644 src/io_object.hpp delete mode 100644 src/listener.cpp delete mode 100644 src/listener.hpp delete mode 100644 src/load_balancer.cpp delete mode 100644 src/load_balancer.hpp delete mode 100644 src/p2p.cpp delete mode 100644 src/p2p.hpp delete mode 100644 src/pipe.cpp delete mode 100644 src/pipe_reader.cpp delete mode 100644 src/pipe_reader.hpp delete mode 100644 src/pipe_writer.cpp delete mode 100644 src/pipe_writer.hpp delete mode 100644 src/pub.cpp delete mode 100644 src/pub.hpp delete mode 100644 src/rep.cpp delete mode 100644 src/rep.hpp delete mode 100644 src/req.hpp delete mode 100644 src/safe_object.cpp delete mode 100644 src/safe_object.hpp delete mode 100644 src/session.cpp delete mode 100644 src/session.hpp delete mode 100644 src/session_stub.cpp delete mode 100644 src/session_stub.hpp delete mode 100644 src/socket_base.cpp delete mode 100644 src/socket_base.hpp delete mode 100644 src/sub.cpp delete mode 100644 src/sub.hpp delete mode 100644 src/zmq_decoder.cpp delete mode 100644 src/zmq_decoder.hpp delete mode 100644 src/zmq_encoder.cpp delete mode 100644 src/zmq_encoder.hpp delete mode 100644 src/zmq_tcp_engine.cpp delete mode 100644 src/zmq_tcp_engine.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 27f44121..bde9c39b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,53 +7,30 @@ libzmq_la_SOURCES = \ atomic_ptr.hpp \ command.hpp \ config.hpp \ - connecter.hpp \ context.hpp \ - data_distributor.hpp \ decoder.hpp \ devpoll.hpp \ - dummy_aggregator.hpp \ - dummy_distributor.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ - fair_aggregator.hpp \ fd.hpp \ fd_signaler.hpp \ - io_object.hpp \ io_thread.hpp \ ip.hpp \ i_api.hpp \ - i_demux.hpp \ - i_mux.hpp \ i_poller.hpp \ i_poll_events.hpp \ - i_session.hpp \ i_signaler.hpp \ - i_engine.hpp \ - i_thread.hpp \ - listener.hpp \ + i_socket.hpp \ kqueue.hpp \ - load_balancer.hpp \ msg.hpp \ mutex.hpp \ object.hpp \ - p2p.hpp \ pipe.hpp \ - pipe_reader.hpp \ - pipe_writer.hpp \ platform.hpp \ poll.hpp \ - pub.hpp \ - rep.hpp \ - req.hpp \ - safe_object.hpp \ select.hpp \ - session.hpp \ - session_stub.hpp \ simple_semaphore.hpp \ - socket_base.hpp \ - sub.hpp \ stdint.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -65,50 +42,24 @@ libzmq_la_SOURCES = \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ - zmq_decoder.hpp \ - zmq_encoder.hpp \ - zmq_tcp_engine.hpp \ app_thread.cpp \ - connecter.cpp \ context.cpp \ - data_distributor.cpp \ devpoll.hpp \ - dummy_aggregator.cpp \ - dummy_distributor.cpp \ epoll.cpp \ err.cpp \ - fair_aggregator.cpp \ fd_signaler.cpp \ - io_object.cpp \ io_thread.cpp \ ip.cpp \ kqueue.cpp \ - listener.cpp \ - load_balancer.cpp \ object.cpp \ - p2p.cpp \ - pipe.cpp \ - pipe_reader.cpp \ - pipe_writer.cpp \ poll.cpp \ - pub.cpp \ - rep.cpp \ - req.cpp \ - safe_object.cpp \ select.cpp \ - session.cpp \ - session_stub.cpp \ - socket_base.cpp \ - sub.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ uuid.cpp \ ypollset.cpp \ - zmq_decoder.cpp \ - zmq_encoder.cpp \ - zmq_tcp_engine.cpp \ zmq.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 9cc61c79..23a055a8 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -28,20 +28,8 @@ #include "app_thread.hpp" #include "context.hpp" #include "err.hpp" -#include "session.hpp" #include "pipe.hpp" #include "config.hpp" -#include "i_api.hpp" -#include "dummy_aggregator.hpp" -#include "fair_aggregator.hpp" -#include "dummy_distributor.hpp" -#include "data_distributor.hpp" -#include "load_balancer.hpp" -#include "p2p.hpp" -#include "pub.hpp" -#include "sub.hpp" -#include "req.hpp" -#include "rep.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : { } -void zmq::app_thread_t::shutdown () -{ - // Deallocate all the sessions associated with the thread. - while (!sessions.empty ()) - sessions [0]->shutdown (); - - delete this; -} - zmq::app_thread_t::~app_thread_t () { -} + // Ask all the sockets to start termination, then wait till it is complete. + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + (*it)->stop (); + for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) + delete *it; -void zmq::app_thread_t::attach_session (session_t *session_) -{ - session_->set_index (sessions.size ()); - sessions.push_back (session_); -} - -void zmq::app_thread_t::detach_session (session_t *session_) -{ - // O(1) removal of the session from the list. - sessions_t::size_type i = session_->get_index (); - sessions [i] = sessions [sessions.size () - 1]; - sessions [i]->set_index (i); - sessions.pop_back (); -} - -zmq::i_poller *zmq::app_thread_t::get_poller () -{ - zmq_assert (false); + delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler () bool zmq::app_thread_t::is_current () { - return !sessions.empty () && tid == getpid (); + return !sockets.empty () && tid == getpid (); } bool zmq::app_thread_t::make_current () { // If there are object managed by this slot we cannot assign the slot // to a different thread. - if (!sessions.empty ()) + if (!sockets.empty ()) return false; tid = getpid (); return true; } -zmq::i_api *zmq::app_thread_t::create_socket (int type_) -{ - i_mux *mux = NULL; - i_demux *demux = NULL; - session_t *session = NULL; - i_api *api = NULL; - - switch (type_) { - case ZMQ_P2P: - mux = new dummy_aggregator_t; - zmq_assert (mux); - demux = new dummy_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new p2p_t (this, session); - zmq_assert (api); - break; - case ZMQ_PUB: - demux = new data_distributor_t; - zmq_assert (demux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new pub_t (this, session); - zmq_assert (api); - break; - case ZMQ_SUB: - mux = new fair_aggregator_t; - zmq_assert (mux); - session = new session_t (this, this, mux, demux, true, false); - zmq_assert (session); - api = new sub_t (this, session); - zmq_assert (api); - break; - case ZMQ_REQ: - // TODO - zmq_assert (false); - api = new req_t (this, session); - zmq_assert (api); - break; - case ZMQ_REP: - // TODO - zmq_assert (false); - api = new rep_t (this, session); - zmq_assert (api); - break; - default: - errno = EINVAL; - return NULL; - } - - attach_session (session); - - return api; -} - void zmq::app_thread_t::process_commands (bool block_) { ypollset_t::signals_t signals; diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 8295c2f8..31679b8f 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -22,7 +22,7 @@ #include -#include "i_thread.hpp" +#include "i_socket.hpp" #include "stdint.hpp" #include "object.hpp" #include "ypollset.hpp" @@ -30,23 +30,18 @@ namespace zmq { - class app_thread_t : public object_t, public i_thread + class app_thread_t : public object_t { public: app_thread_t (class context_t *context_, int thread_slot_); - // To be called when the whole infrastrucure is being closed. - void shutdown (); + ~app_thread_t (); // Returns signaler associated with this application thread. i_signaler *get_signaler (); - // Create socket engine in this thread. Return false if the calling - // thread doesn't match the thread handled by this app thread object. - struct i_api *create_socket (int type_); - - // Nota bene: The following two functions are accessed from different + // Nota bene: Following two functions are accessed from different // threads. The caller (context) is responsible for synchronisation // of accesses. @@ -61,25 +56,17 @@ namespace zmq // set to true, returns only after at least one command was processed. void process_commands (bool block_); - // i_thread implementation. - void attach_session (class session_t *session_); - void detach_session (class session_t *session_); - struct i_poller *get_poller (); - private: - // Clean-up. - ~app_thread_t (); + // All the sockets created from this application thread. + typedef std::vector sockets_t; + sockets_t sockets; // Thread ID associated with this slot. // TODO: Virtualise pid_t! // TODO: Check whether getpid returns unique ID for each thread. int tid; - // Vector of all sessionss associated with this app thread. - typedef std::vector sessions_t; - sessions_t sessions; - // App thread's signaler object. ypollset_t pollset; diff --git a/src/connecter.cpp b/src/connecter.cpp deleted file mode 100644 index 970dcf7b..00000000 --- a/src/connecter.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "connecter.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" -#include "simple_semaphore.hpp" -#include "zmq_tcp_engine.hpp" - -zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, - session_t *session_) : - io_object_t (thread_), - state (idle), - poller (NULL), - session (session_), - addr (addr_), - identity ("abcde"), - engine (NULL) -{ -} - -void zmq::connecter_t::terminate () -{ - delete this; -} - -void zmq::connecter_t::shutdown () -{ - delete this; -} - -zmq::connecter_t::~connecter_t () -{ -} - -void zmq::connecter_t::process_reg (simple_semaphore_t *smph_) -{ - // Fet poller pointer for further use. - zmq_assert (!poller); - poller = get_poller (); - - // Ask the session to register itself with the I/O thread. Note that - // the session is living in the same I/O thread, thus this results - // in a synchronous call. - session->inc_seqnum (); - send_reg (session, NULL); - - // Unlock the application thread that created the connecter. - if (smph_) - smph_->post (); - - // Manually trigger timer event which will launch asynchronous connect. - state = waiting; - timer_event (); -} - -void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_) -{ - // Unregister connecter/engine from the poller. - zmq_assert (poller); - if (state == connecting) - poller->rm_fd (handle); - else if (state == waiting) - poller->cancel_timer (this); - else if (state == sending) - engine->terminate (); - - // Unlock the application thread closing the connecter. - if (smph_) - smph_->post (); -} - -void zmq::connecter_t::in_event () -{ - // Error occured in asynchronous connect. Retry to connect after a while. - if (state == connecting) { - fd_t fd = tcp_connecter.connect (); - zmq_assert (fd == retired_fd); - poller->rm_fd (handle); - poller->add_timer (this); - state = waiting; - return; - } - - zmq_assert (false); -} - -void zmq::connecter_t::out_event () -{ - if (state == connecting) { - - fd_t fd = tcp_connecter.connect (); - if (fd == retired_fd) { - poller->rm_fd (handle); - poller->add_timer (this); - state = waiting; - return; - } - - poller->rm_fd (handle); - engine = new zmq_tcp_engine_t (fd); - zmq_assert (engine); - engine->attach (poller, this); - state = sending; - return; - } - - zmq_assert (false); -} - -void zmq::connecter_t::timer_event () -{ - zmq_assert (state == waiting); - - // Initiate async connect and start polling for its completion. If async - // connect fails instantly, try to reconnect after a while. - int rc = tcp_connecter.open (addr.c_str ()); - if (rc == 0) { - state = connecting; - in_event (); - } - else if (rc == 1) { - handle = poller->add_fd (tcp_connecter.get_fd (), this); - poller->set_pollout (handle); - state = connecting; - } - else { - poller->add_timer (this); - state = waiting; - } -} - -void zmq::connecter_t::set_engine (struct i_engine *engine_) -{ - engine = engine_; -} - -bool zmq::connecter_t::read (zmq_msg *msg_) -{ - zmq_assert (state == sending); - - // Deallocate old content of the message just in case. - zmq_msg_close (msg_); - - // Send the identity. - zmq_msg_init_size (msg_, identity.size ()); - memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ()); - - // Ask engine to unregister from the poller. - i_engine *e = engine; - engine->detach (); - - // Attach the engine to the session. (Note that this is actually - // a synchronous call. - session->inc_seqnum (); - send_engine (session, e); - - state = idle; - - return true; -} - -bool zmq::connecter_t::write (struct zmq_msg *msg_) -{ - // No incoming messages are accepted till identity is sent. - return false; -} - -void zmq::connecter_t::flush () -{ - // No incoming messages are accepted till identity is sent. -} diff --git a/src/connecter.hpp b/src/connecter.hpp deleted file mode 100644 index 1f11c638..00000000 --- a/src/connecter.hpp +++ /dev/null @@ -1,99 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__ -#define __ZMQ_CONNECTER_HPP_INCLUDED__ - -#include - -#include "../include/zmq.h" - -#include "i_poller.hpp" -#include "io_object.hpp" -#include "i_poll_events.hpp" -#include "i_session.hpp" -#include "tcp_connecter.hpp" - -namespace zmq -{ - - class connecter_t : public io_object_t, public i_poll_events, - public i_session - { - public: - - connecter_t (class io_thread_t *thread_, const char *addr_, - class session_t *session_); - - void terminate (); - void shutdown (); - - void process_reg (class simple_semaphore_t *smph_); - void process_unreg (class simple_semaphore_t *smph_); - - // i_poll_events implementation. - void in_event (); - void out_event (); - void timer_event (); - - // i_session implementation - void set_engine (struct i_engine *engine_); - // void shutdown (); - bool read (struct zmq_msg *msg_); - bool write (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~connecter_t (); - - enum { - idle, - waiting, - connecting, - sending - } state; - - // Cached pointer to the poller. - struct i_poller *poller; - - // Handle of the connecting socket. - handle_t handle; - - // Associated session. It lives in the same I/O thread. - class session_t *session; - - // Address to connect to. - std::string addr; - - // Identity of the connection. - std::string identity; - - tcp_connecter_t tcp_connecter; - - struct i_engine *engine; - - connecter_t (const connecter_t&); - void operator = (const connecter_t&); - }; - -} - -#endif diff --git a/src/context.cpp b/src/context.cpp index ab4643ea..6b071cf6 100644 --- a/src/context.cpp +++ b/src/context.cpp @@ -20,15 +20,12 @@ #include "../include/zmq.h" #include "context.hpp" +#include "i_api.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" #include "pipe.hpp" -#include "pipe_reader.hpp" -#include "pipe_writer.hpp" -#include "session.hpp" -#include "i_api.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.h" @@ -72,37 +69,23 @@ zmq::context_t::context_t (int app_threads_, int io_threads_) io_threads [i]->start (); } -void zmq::context_t::shutdown () -{ - delete this; -} - zmq::context_t::~context_t () { - // Ask I/O threads to terminate. + // Close all application theads, sockets, io_objects etc. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + delete app_threads [i]; + + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O + // thread subsequent invocation of destructor would hang-up. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) io_threads [i]->stop (); // Wait till I/O threads actually terminate. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->join (); - - // At this point the current thread is the only thread with access to - // our internal data. Deallocation will be done exclusively in this thread. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i]->shutdown (); - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->shutdown (); + delete io_threads [i]; delete [] command_pipes; - // Deallocate all the pipes, pipe readers and pipe writers. - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) { - delete it->pipe; - delete it->reader; - delete it->writer; - } - #ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. int rc = WSACleanup (); @@ -123,7 +106,11 @@ zmq::i_api *zmq::context_t::create_socket (int type_) threads_sync.unlock (); return NULL; } - i_api *s = thread->create_socket (type_); + + zmq_assert (false); + i_api *s = NULL; + //i_api *s = thread->create_socket (type_); + threads_sync.unlock (); return s; } @@ -164,103 +151,3 @@ zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) return io_threads [result]; } - -void zmq::context_t::create_pipe (object_t *reader_parent_, - object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - pipe_reader_t **reader_, pipe_writer_t **writer_) -{ - // Create the pipe, reader & writer triple. - pipe_t *pipe = new pipe_t; - zmq_assert (pipe); - pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe, - hwm_, lwm_); - zmq_assert (reader); - pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader, - hwm_, lwm_); - zmq_assert (writer); - reader->set_peer (writer); - - // Store the pipe in the repository. - pipe_info_t info = {pipe, reader, writer}; - pipes_sync.lock (); - pipe->set_index (pipes.size ()); - pipes.push_back (info); - pipes_sync.unlock (); - - *reader_ = reader; - *writer_ = writer; -} - -void zmq::context_t::destroy_pipe (pipe_t *pipe_) -{ - // Remove the pipe from the repository. - pipe_info_t info; - pipes_sync.lock (); - pipes_t::size_type i = pipe_->get_index (); - info = pipes [i]; - pipes [i] = pipes.back (); - pipes.pop_back (); - pipes_sync.unlock (); - - // Deallocate the pipe and associated pipe reader & pipe writer. - zmq_assert (info.pipe == pipe_); - delete info.pipe; - delete info.reader; - delete info.writer; -} - -int zmq::context_t::register_inproc_endpoint (const char *endpoint_, - session_t *session_) -{ - inproc_endpoint_sync.lock (); - inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); - - if (it != inproc_endpoints.end ()) { - inproc_endpoint_sync.unlock (); - errno = EADDRINUSE; - return -1; - } - - inproc_endpoints.insert (std::make_pair (endpoint_, session_)); - - inproc_endpoint_sync.unlock (); - return 0; -} - -zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_) -{ - inproc_endpoint_sync.lock (); - inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); - - if (it == inproc_endpoints.end ()) { - inproc_endpoint_sync.unlock (); - errno = EADDRNOTAVAIL; - return NULL; - } - - it->second->inc_seqnum (); - object_t *session = it->second; - - inproc_endpoint_sync.unlock (); - return session; -} - -void zmq::context_t::unregister_inproc_endpoints (session_t *session_) -{ - inproc_endpoint_sync.lock (); - - // Remove the connection from the repository. - // TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n). - for (inproc_endpoints_t::iterator it = inproc_endpoints.begin (); - it != inproc_endpoints.end ();) { - if (it->second == session_) { - inproc_endpoints.erase (it); - it = inproc_endpoints.begin (); - } - else - it++; - } - - inproc_endpoint_sync.unlock (); -} - diff --git a/src/context.hpp b/src/context.hpp index 7701ef70..f2eab1c0 100644 --- a/src/context.hpp +++ b/src/context.hpp @@ -52,9 +52,9 @@ namespace zmq context_t (int app_threads_, int io_threads_); // To be called to terminate the whole infrastructure (zmq_term). - void shutdown (); + ~context_t (); - // Create a socket engine. + // Create a socket. struct i_api *create_socket (int type_); // Returns number of thread slots in the context. To be used by @@ -81,37 +81,12 @@ namespace zmq destination_].read (command_); } - // Creates new pipe. - void create_pipe (class object_t *reader_parent_, - class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - class pipe_reader_t **reader_, class pipe_writer_t **writer_); - - // Deallocates the pipe. - void destroy_pipe (class pipe_t *pipe_); - - // Registers existing session object as an inproc endpoint. - int register_inproc_endpoint (const char *endpoint_, - class session_t *session_); - - // Retrieves an inproc endpoint. Increments the command sequence number - // of the object by one. Caller is thus bound to send the command - // to the connection after invoking this function. Returns NULL if - // the endpoint doesn't exist. - class object_t *get_inproc_endpoint (const char *endpoint_); - - // Removes all the inproc endpoints associated with the given session - // object from the global repository. - void unregister_inproc_endpoints (class session_t *session_); - // Returns the I/O thread that is the least busy at the moment. // Taskset specifies which I/O threads are eligible (0 = all). class io_thread_t *choose_io_thread (uint64_t taskset_); private: - // Clean-up. - ~context_t (); - // Returns the app thread associated with the current thread. // NULL if we are out of app thread slots. class app_thread_t *choose_app_thread (); @@ -137,29 +112,6 @@ namespace zmq // Synchronisation of accesses to shared thread data. mutex_t threads_sync; - // Global repository of pipes. It's used only on terminal shutdown - // to deallocate all the pipes irrespective of whether they are - // referenced from pipe_reader, pipe_writer or both. - struct pipe_info_t - { - class pipe_t *pipe; - class pipe_reader_t *reader; - class pipe_writer_t *writer; - }; - typedef std::vector pipes_t; - pipes_t pipes; - - // Synchronisation of access to global repository of pipes. - mutex_t pipes_sync; - - // Global repository of available inproc endpoints. - typedef std::map inproc_endpoints_t; - inproc_endpoints_t inproc_endpoints; - - // Synchronisation of access to the global repository - // of inproc endpoints. - mutex_t inproc_endpoint_sync; - context_t (const context_t&); void operator = (const context_t&); }; diff --git a/src/data_distributor.cpp b/src/data_distributor.cpp deleted file mode 100644 index 971edce1..00000000 --- a/src/data_distributor.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "data_distributor.hpp" -#include "pipe_writer.hpp" -#include "err.hpp" -#include "session.hpp" -#include "msg.hpp" - -zmq::data_distributor_t::data_distributor_t () : - session (NULL) -{ -} - -void zmq::data_distributor_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::data_distributor_t::shutdown () -{ - // No need to deallocate pipes here. They'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::data_distributor_t::terminate () -{ - // Pipe unregisters itself during the call to terminate, so the pipes - // list shinks by one in each iteration. - while (!pipes.empty ()) - pipes [0]->terminate (); - - delete this; -} - -zmq::data_distributor_t::~data_distributor_t () -{ -} - -void zmq::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) -{ - // Associate demux with a new pipe. - pipe_->set_demux (this); - pipe_->set_index (pipes.size ()); - pipes.push_back (pipe_); -} - -void zmq::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) -{ - // Release the reference to the pipe. - int index = pipe_->get_index (); - pipe_->set_index (-1); - pipes [index] = pipes.back (); - pipes [index]->set_index (index); - pipes.pop_back (); -} - -bool zmq::data_distributor_t::empty () -{ - return pipes.empty (); -} - -bool zmq::data_distributor_t::send (zmq_msg *msg_) -{ - int pipes_count = pipes.size (); - - // If there are no pipes available, simply drop the message. - if (pipes_count == 0) { - zmq_msg_close (msg_); - zmq_msg_init (msg_); - return true; - } - - // TODO: ??? - // First check whether all pipes are available for writing. -// for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) -// if (!(*it)->check_write (msg_)) -// return false; - - // For VSMs the copying is straighforward. - if (msg_->content == (zmq_msg_content*) ZMQ_VSM) { - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) - write_to_pipe (*it, msg_); - zmq_msg_init (msg_); - return true; - } - - // Optimisation for the case when there's only a single pipe - // to send the message to - no refcount adjustment (i.e. atomic - // operations) needed. - if (pipes_count == 1) { - write_to_pipe (*pipes.begin (), msg_); - zmq_msg_init (msg_); - return true; - } - - // There are at least 2 destinations for the message. That means we have - // to deal with reference counting. First add N-1 references to - // the content (we are holding one reference anyway, that's why the -1). - if (msg_->shared) - msg_->content->refcnt.add (pipes_count - 1); - else { - msg_->shared = true; - // TODO: Add memory barrier here. - msg_->content->refcnt.set (pipes_count); - } - - // Push the message to all destinations. - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) - write_to_pipe (*it, msg_); - - // Detach the original message from the data buffer. - zmq_msg_init (msg_); - - return true; -} - -void zmq::data_distributor_t::flush () -{ - // Flush all pipes. If there's large number of pipes, it can be pretty - // inefficient (especially if there's new message only in a single pipe). - // Can it be improved? - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) - (*it)->flush (); -} - -void zmq::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_, - struct zmq_msg *msg_) -{ - if (!pipe_->write (msg_)) { - // TODO: Push gap notification to the pipe. - zmq_assert (false); - } -} - diff --git a/src/data_distributor.hpp b/src/data_distributor.hpp deleted file mode 100644 index 5bde2e81..00000000 --- a/src/data_distributor.hpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ -#define __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ - -#include - -#include - -namespace zmq -{ - - // Object to distribute messages to outbound pipes. - - class data_distributor_t : public i_demux - { - public: - - data_distributor_t (); - - // i_demux implementation. - void set_session (class session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_writer_t *pipe_); - void detach_pipe (class pipe_writer_t *pipe_); - bool empty (); - bool send (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~data_distributor_t (); - - // Reference to the owner session object. - class session_t *session; - - // Writes the message to the pipe if possible. If it isn't, writes - // a gap notification to the pipe. - void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_); - - // The list of outbound pipes. - typedef std::vector pipes_t; - pipes_t pipes; - - data_distributor_t (const data_distributor_t&); - void operator = (const data_distributor_t&); - }; - -} - -#endif diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 8fb0877b..b7d153c1 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -52,6 +52,10 @@ zmq::devpoll_t::devpoll_t () zmq::devpoll_t::~devpoll_t () { + // Make sure there are no fds registered on shutdown. + zmq_assert (load.get () == 0); + + worker.stop (); close (devpoll_fd); } @@ -152,11 +156,6 @@ void zmq::devpoll_t::stop () stopping = true; } -void zmq::devpoll_t::join () -{ - worker.stop (); -} - bool zmq::devpoll_t::loop () { // According to the poll(7d) man page, we can retrieve diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 28274c0f..57f0156d 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -42,7 +42,7 @@ namespace zmq public: devpoll_t (); - virtual ~devpoll_t (); + ~devpoll_t (); // i_poller implementation. handle_t add_fd (fd_t fd_, i_poll_events *events_); @@ -56,7 +56,6 @@ namespace zmq int get_load (); void start (); void stop (); - void join (); private: diff --git a/src/dummy_aggregator.cpp b/src/dummy_aggregator.cpp deleted file mode 100644 index 0b27fab7..00000000 --- a/src/dummy_aggregator.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "dummy_aggregator.hpp" -#include "err.hpp" -#include "pipe_reader.hpp" -#include "session.hpp" - -// Swaps pipes at specified indices. -#define swap_pipes(i1_, i2_) \ - std::swap (pipes [i1_], pipes [i2_]);\ - pipes [i1_]->set_index (i1_);\ - pipes [i2_]->set_index (i2_); - -zmq::dummy_aggregator_t::dummy_aggregator_t () : - session (NULL), - pipe (NULL), - active (false) -{ -} - -void zmq::dummy_aggregator_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::dummy_aggregator_t::shutdown () -{ - // No need to deallocate the pipe here. It'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::dummy_aggregator_t::terminate () -{ - if (pipe) - pipe->terminate (); - - delete this; -} - -zmq::dummy_aggregator_t::~dummy_aggregator_t () -{ -} - -void zmq::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) -{ - zmq_assert (!pipe); - pipe = pipe_; - active = true; - - // Associate new pipe with the mux object. - pipe_->set_mux (this); - session->revive (); -} - -void zmq::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_) -{ - zmq_assert (pipe == pipe_); - deactivate (pipe_); - pipe = NULL; -} - -bool zmq::dummy_aggregator_t::empty () -{ - return pipe == NULL; -} - -bool zmq::dummy_aggregator_t::recv (zmq_msg *msg_) -{ - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // Try to read from the pipe. - if (pipe && pipe->read (msg_)) - return true; - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - return false; -} - -void zmq::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_) -{ - active = false; -} - -void zmq::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_) -{ - active = true; -} diff --git a/src/dummy_aggregator.hpp b/src/dummy_aggregator.hpp deleted file mode 100644 index 6a9e9db5..00000000 --- a/src/dummy_aggregator.hpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ -#define __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ - -#include - -#include "i_mux.hpp" - -namespace zmq -{ - - // Fake message aggregator. There can be at most one pipe bound to it, - // so there's no real aggregation going on. However, it is more efficient - // than a real aggregator. It's intended to be used in the contexts - // where business logic ensures there'll be at most one pipe bound. - - class dummy_aggregator_t : public i_mux - { - public: - - dummy_aggregator_t (); - - // i_mux interface implementation. - void set_session (session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_reader_t *pipe_); - void detach_pipe (class pipe_reader_t *pipe_); - bool empty (); - void deactivate (class pipe_reader_t *pipe_); - void reactivate (class pipe_reader_t *pipe_); - bool recv (struct zmq_msg *msg_); - - - private: - - // Clean-up. - ~dummy_aggregator_t (); - - // Reference to the owner session object. - class session_t *session; - - // The single pipe bound. - class pipe_reader_t *pipe; - - // If true, the pipe is active. - bool active; - - dummy_aggregator_t (const dummy_aggregator_t&); - void operator = (const dummy_aggregator_t&); - }; - -} - -#endif diff --git a/src/dummy_distributor.cpp b/src/dummy_distributor.cpp deleted file mode 100644 index 62e2b88e..00000000 --- a/src/dummy_distributor.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "dummy_distributor.hpp" -#include "pipe_writer.hpp" -#include "err.hpp" -#include "session.hpp" -#include "msg.hpp" - -zmq::dummy_distributor_t::dummy_distributor_t () : - session (NULL) -{ -} - -void zmq::dummy_distributor_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::dummy_distributor_t::shutdown () -{ - // No need to deallocate pipe here. It'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::dummy_distributor_t::terminate () -{ - if (pipe) - pipe->terminate (); - - delete this; -} - -zmq::dummy_distributor_t::~dummy_distributor_t () -{ -} - -void zmq::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_) -{ - zmq_assert (!pipe); - pipe = pipe_; -} - -void zmq::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_) -{ - zmq_assert (pipe == pipe_); - pipe = NULL; -} - -bool zmq::dummy_distributor_t::empty () -{ - return pipe == NULL; -} - -bool zmq::dummy_distributor_t::send (zmq_msg *msg_) -{ - return pipe && pipe->write (msg_); -} - -void zmq::dummy_distributor_t::flush () -{ - if (pipe) - pipe->flush (); -} - diff --git a/src/dummy_distributor.hpp b/src/dummy_distributor.hpp deleted file mode 100644 index a71cc497..00000000 --- a/src/dummy_distributor.hpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ -#define __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ - -#include - -#include - -namespace zmq -{ - - // Fake message distributor. There can be only one pipe bound to it - // so there no real distribution going on. However, it is more efficient - // than a real distributor and should be used where business logic - // ensures there'll be at most one pipe bound. - - class dummy_distributor_t : public i_demux - { - public: - - dummy_distributor_t (); - - // i_demux implementation. - void set_session (class session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_writer_t *pipe_); - void detach_pipe (class pipe_writer_t *pipe_); - bool empty (); - bool send (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~dummy_distributor_t (); - - // Reference to the owner session object. - class session_t *session; - - // The bound pipe. - class pipe_writer_t *pipe; - - dummy_distributor_t (const dummy_distributor_t&); - void operator = (const dummy_distributor_t&); - }; - -} - -#endif diff --git a/src/epoll.cpp b/src/epoll.cpp index c4c8fdbc..15278c69 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -41,8 +41,11 @@ zmq::epoll_t::epoll_t () : zmq::epoll_t::~epoll_t () { - close (epoll_fd); + // Make sure there are no fds registered on shutdown. + zmq_assert (load.get () == 0); + worker.stop (); + close (epoll_fd); for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++) delete *it; } @@ -144,11 +147,6 @@ void zmq::epoll_t::stop () stopping = true; } -void zmq::epoll_t::join () -{ - worker.stop (); -} - void zmq::epoll_t::loop () { epoll_event ev_buf [max_io_events]; diff --git a/src/epoll.hpp b/src/epoll.hpp index aa363ee9..619d4f3b 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -44,7 +44,7 @@ namespace zmq public: epoll_t (); - virtual ~epoll_t (); + ~epoll_t (); // i_poller implementation. handle_t add_fd (fd_t fd_, i_poll_events *events_); @@ -58,7 +58,6 @@ namespace zmq int get_load (); void start (); void stop (); - void join (); private: diff --git a/src/fair_aggregator.cpp b/src/fair_aggregator.cpp deleted file mode 100644 index 1e6937f7..00000000 --- a/src/fair_aggregator.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "fair_aggregator.hpp" -#include "err.hpp" -#include "pipe_reader.hpp" -#include "session.hpp" - -// Swaps pipes at specified indices. -#define swap_pipes(i1_, i2_) \ - std::swap (pipes [i1_], pipes [i2_]);\ - pipes [i1_]->set_index (i1_);\ - pipes [i2_]->set_index (i2_); - -zmq::fair_aggregator_t::fair_aggregator_t () : - session (NULL), - active (0), - current (0) -{ -} - -void zmq::fair_aggregator_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::fair_aggregator_t::shutdown () -{ - // No need to deallocate pipes here. They'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::fair_aggregator_t::terminate () -{ - // Pipe unregisters itself during the call to terminate, so the pipes - // list shinks by one in each iteration. - while (!pipes.empty ()) - pipes [0]->terminate (); - - delete this; -} - -zmq::fair_aggregator_t::~fair_aggregator_t () -{ -} - -void zmq::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_) -{ - // Associate new pipe with the mux object. - pipe_->set_mux (this); - pipes.push_back (pipe_); - active++; - if (pipes.size () > active) - swap_pipes (pipes.size () - 1, active - 1); - if (active == 1) - session->revive (); -} - -void zmq::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_) -{ - // Move the pipe from the list of active pipes to the list of idle pipes. - deactivate (pipe_); - - // Move the pipe to the end of the idle list and remove it. - swap_pipes (pipe_->get_index (), pipes.size () - 1); - pipes.pop_back (); -} - -bool zmq::fair_aggregator_t::empty () -{ - return pipes.empty (); -} - -bool zmq::fair_aggregator_t::recv (zmq_msg *msg_) -{ - // Deallocate old content of the message. - zmq_msg_close (msg_); - - // O(1) fair queueing. Round-robin over the active pipes to get - // next message. - for (pipes_t::size_type i = active; i != 0; i--) { - - // Update current. - current = (current + 1) % active; - - // Try to read from current. - if (pipes [current]->read (msg_)) - return true; - } - - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - return false; -} - -void zmq::fair_aggregator_t::deactivate (pipe_reader_t *pipe_) -{ - int index = pipe_->get_index (); - - // Suspend an active pipe. - swap_pipes (index, active - 1); - active--; - - // If the deactiveted pipe is the current one, shift the current one pipe - // backwards so that the pipe that replaced the deactiveted one will be - // processed immediately rather than skipped. - if (index == (int) current) { - index--; - if (index == -1) - index = active - 1; - current = index; - } -} - -void zmq::fair_aggregator_t::reactivate (pipe_reader_t *pipe_) -{ - // Revive an idle pipe. - swap_pipes (pipe_->get_index (), active); - active++; - if (active == 1) - session->revive (); -} diff --git a/src/fair_aggregator.hpp b/src/fair_aggregator.hpp deleted file mode 100644 index 6ae1fc5c..00000000 --- a/src/fair_aggregator.hpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__ -#define __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__ - -#include - -#include "i_mux.hpp" - -namespace zmq -{ - - // Object to aggregate messages from inbound pipes. - - class fair_aggregator_t : public i_mux - { - public: - - fair_aggregator_t (); - - // i_mux interface implementation. - void set_session (session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_reader_t *pipe_); - void detach_pipe (class pipe_reader_t *pipe_); - bool empty (); - void deactivate (class pipe_reader_t *pipe_); - void reactivate (class pipe_reader_t *pipe_); - bool recv (struct zmq_msg *msg_); - - - private: - - // Clean-up. - ~fair_aggregator_t (); - - // Reference to the owner session object. - class session_t *session; - - // The list of inbound pipes. The active pipes are occupying indices - // from 0 to active-1. Suspended pipes occupy indices from 'active' - // to the end of the array. - typedef std::vector pipes_t; - pipes_t pipes; - - // The number of active pipes. - pipes_t::size_type active; - - // Pipe to retrieve next message from. The messages are retrieved - // from the pipes in round-robin fashion (a.k.a. fair queueing). - pipes_t::size_type current; - - fair_aggregator_t (const fair_aggregator_t&); - void operator = (const fair_aggregator_t&); - }; - -} - -#endif diff --git a/src/i_api.hpp b/src/i_api.hpp index fc7275b1..a87e41d9 100644 --- a/src/i_api.hpp +++ b/src/i_api.hpp @@ -1,28 +1,28 @@ /* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . +Copyright (c) 2007-2009 FastMQ Inc. + +This file is part of 0MQ. + +0MQ is free software; you can redistribute it and/or modify it under +the terms of the Lesser GNU General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +0MQ is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +Lesser GNU General Public License for more details. + +You should have received a copy of the Lesser GNU General Public License +along with this program. If not, see . */ - + #ifndef __ZMQ_I_API_HPP_INCLUDED__ #define __ZMQ_I_API_HPP_INCLUDED__ - + namespace zmq { - + struct i_api { virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0; @@ -33,7 +33,7 @@ namespace zmq virtual int recv (struct zmq_msg *msg_, int flags_) = 0; virtual int close () = 0; }; - + } - + #endif diff --git a/src/i_demux.hpp b/src/i_demux.hpp deleted file mode 100644 index c4755b5d..00000000 --- a/src/i_demux.hpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_DEMUX_HPP_INCLUDED__ -#define __ZMQ_I_DEMUX_HPP_INCLUDED__ - -namespace zmq -{ - - struct i_demux - { - // Attaches mux to a particular session. - virtual void set_session (class session_t *session_) = 0; - - // To be called when the whole infrastrucure - // is being closed (zmq_term). - virtual void shutdown () = 0; - - // To be called when session is being closed. - virtual void terminate () = 0; - - // Adds new pipe to the demux to send messages to. - virtual void attach_pipe (class pipe_writer_t *pipe_) = 0; - - // Removes pipe from the demux. - virtual void detach_pipe (class pipe_writer_t *pipe_) = 0; - - // Returns true if there's no pipe attached. - virtual bool empty () = 0; - - // Sends the message. Returns false if the message cannot be sent - // because the pipes are full. - virtual bool send (struct zmq_msg *msg_) = 0; - - // Flushes messages downstream. - virtual void flush () = 0; - }; - -} - -#endif diff --git a/src/i_engine.hpp b/src/i_engine.hpp deleted file mode 100644 index 8ca2007b..00000000 --- a/src/i_engine.hpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ -#define __ZMQ_I_ENGINE_HPP_INCLUDED__ - -namespace zmq -{ - - // Generic interface to access engines from MD objects. - - struct i_engine - { - // Attach the engine with specified context. - virtual void attach (struct i_poller *poller_, - struct i_session *session_) = 0; - - // Detach the engine from the current context. - virtual void detach () = 0; - - // Notify the engine that new messages are available. - virtual void revive () = 0; - - // Called by session when it decides the engine - // should terminate itself. - virtual void schedule_terminate () = 0; - - // Called by normal object termination process. - virtual void terminate () = 0; - - // To be called by MD when terminal shutdown (zmq_term) is in progress. - virtual void shutdown () = 0; - }; - -} - -#endif diff --git a/src/i_mux.hpp b/src/i_mux.hpp deleted file mode 100644 index 22e0a263..00000000 --- a/src/i_mux.hpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_MUX_HPP_INCLUDED__ -#define __ZMQ_I_MUX_HPP_INCLUDED__ - -namespace zmq -{ - - struct i_mux - { - // Attaches mux to a particular session. - virtual void set_session (class session_t *session_) = 0; - - // To be called when the whole infrastrucure - // is being closed (zmq_term). - virtual void shutdown () = 0; - - // To be called when session is being closed. - virtual void terminate () = 0; - - // Adds new pipe to the mux to send messages to. - virtual void attach_pipe (class pipe_reader_t *pipe_) = 0; - - // Removes pipe from the mux. - virtual void detach_pipe (class pipe_reader_t *pipe_) = 0; - - // Returns true if there's no pipe attached. - virtual bool empty () = 0; - - // Shifts the pipe from active to passive state and vice versa. - // TODO: Check whether state transitions cannot be done by - // mux object itself without a need for external APIs. - virtual void deactivate (class pipe_reader_t *pipe_) = 0; - virtual void reactivate (class pipe_reader_t *pipe_) = 0; - - // Receives a message. Returns false when there is no message - // to receive. - virtual bool recv (struct zmq_msg *msg_) = 0; - }; - -} - -#endif diff --git a/src/i_poller.hpp b/src/i_poller.hpp index 52ca095f..2665e825 100644 --- a/src/i_poller.hpp +++ b/src/i_poller.hpp @@ -75,13 +75,8 @@ namespace zmq // This method is called from a foreign thread. virtual void start () = 0; - // Ask underlying I/O thread to stop. This method is called from - // underlying thread (callback from io_thread object). + // Ask underlying I/O thread to stop. virtual void stop () = 0; - - // Wait for termination of undelying I/O thread. - // This method is called from a foreign thread. - virtual void join () = 0; }; } diff --git a/src/i_session.hpp b/src/i_session.hpp deleted file mode 100644 index 21cdc0d7..00000000 --- a/src/i_session.hpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_SESSION_HPP_INCLUDED__ -#define __ZMQ_I_SESSION_HPP_INCLUDED__ - -namespace zmq -{ - - struct i_session - { - virtual void set_engine (struct i_engine *engine_) = 0; - virtual void shutdown () = 0; - virtual bool read (struct zmq_msg *msg_) = 0; - virtual bool write (struct zmq_msg *msg_) = 0; - virtual void flush () = 0; - }; - -} - -#endif diff --git a/src/req.cpp b/src/i_socket.hpp similarity index 75% rename from src/req.cpp rename to src/i_socket.hpp index 01018f5a..99ade8ae 100644 --- a/src/req.cpp +++ b/src/i_socket.hpp @@ -17,13 +17,20 @@ along with this program. If not, see . */ -#include "../include/zmq.h" +#ifndef __ZMQ_I_SOCKET_HPP_INCLUDED__ +#define __ZMQ_I_SOCKET_HPP_INCLUDED__ -#include "req.hpp" -#include "app_thread.hpp" -#include "session.hpp" - -zmq::req_t::req_t (app_thread_t *thread_, session_t *session_) : - socket_base_t (thread_, session_) +namespace zmq { + + struct i_socket + { + virtual ~i_socket () {}; + + // Start shutting down the socket. + virtual void stop () = 0; + }; + } + +#endif diff --git a/src/i_thread.hpp b/src/i_thread.hpp deleted file mode 100644 index 9f31592d..00000000 --- a/src/i_thread.hpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_THREAD_HPP_INCLUDED__ -#define __ZMQ_I_THREAD_HPP_INCLUDED__ - -namespace zmq -{ - - // Interface used by session object to communicate with the thread - // it belongs to. - - struct i_thread - { - virtual void attach_session (class session_t *session_) = 0; - virtual void detach_session (class session_t *session_) = 0; - virtual struct i_poller *get_poller () = 0; - }; - -} - -#endif diff --git a/src/io_object.cpp b/src/io_object.cpp deleted file mode 100644 index ad379cf1..00000000 --- a/src/io_object.cpp +++ /dev/null @@ -1,37 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "io_object.hpp" -#include "io_thread.hpp" -#include "i_poller.hpp" - -zmq::io_object_t::io_object_t (io_thread_t *thread_) : - object_t (thread_), - thread (thread_) -{ -} - -zmq::io_object_t::~io_object_t () -{ -} - -zmq::i_poller *zmq::io_object_t::get_poller () -{ - return thread->get_poller (); -} diff --git a/src/io_object.hpp b/src/io_object.hpp deleted file mode 100644 index d3fa8095..00000000 --- a/src/io_object.hpp +++ /dev/null @@ -1,51 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__ -#define __ZMQ_IO_OBJECT_HPP_INCLUDED__ - -#include "object.hpp" - -namespace zmq -{ - - // All objects running within the context of an I/O thread should be - // derived from this class to allow owning application threads to - // destroy them. - - class io_object_t : public object_t - { - public: - - io_object_t (class io_thread_t *thread_); - ~io_object_t (); - - virtual void terminate () = 0; - virtual void shutdown () = 0; - - struct i_poller *get_poller (); - - private: - - class io_thread_t *thread; - }; - -} - -#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 162ed4c2..f5261a64 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -30,9 +30,7 @@ #include "devpoll.hpp" #include "kqueue.hpp" #include "context.hpp" -#include "session.hpp" #include "simple_semaphore.hpp" -#include "session.hpp" zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : object_t (context_, thread_slot_) @@ -76,15 +74,6 @@ zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : poller->set_pollin (signaler_handle); } -void zmq::io_thread_t::shutdown () -{ - // Deallocate all the sessions associated with the thread. - while (!sessions.empty ()) - sessions [0]->shutdown (); - - delete this; -} - zmq::io_thread_t::~io_thread_t () { delete poller; @@ -101,11 +90,6 @@ void zmq::io_thread_t::stop () send_stop (); } -void zmq::io_thread_t::join () -{ - poller->join (); -} - zmq::i_signaler *zmq::io_thread_t::get_signaler () { return &signaler; @@ -149,21 +133,6 @@ void zmq::io_thread_t::timer_event () zmq_assert (false); } -void zmq::io_thread_t::attach_session (session_t *session_) -{ - session_->set_index (sessions.size ()); - sessions.push_back (session_); -} - -void zmq::io_thread_t::detach_session (session_t *session_) -{ - // O(1) removal of the session from the list. - sessions_t::size_type i = session_->get_index (); - sessions [i] = sessions [sessions.size () - 1]; - sessions [i]->set_index (i); - sessions.pop_back (); -} - zmq::i_poller *zmq::io_thread_t::get_poller () { zmq_assert (poller); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 585a28b8..43ee19e3 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -23,7 +23,6 @@ #include #include "object.hpp" -#include "i_thread.hpp" #include "i_poller.hpp" #include "i_poll_events.hpp" #include "fd_signaler.hpp" @@ -34,26 +33,22 @@ namespace zmq // Generic part of the I/O thread. Polling-mechanism-specific features // are implemented in separate "polling objects". - class io_thread_t : public object_t, public i_poll_events, public i_thread + class io_thread_t : public object_t, public i_poll_events { public: io_thread_t (class context_t *context_, int thread_slot_); + // Clean-up. If the thread was started, it's neccessary to call 'stop' + // before invoking destructor. Otherwise the destructor would hang up. + ~io_thread_t (); + // Launch the physical thread. void start (); // Ask underlying thread to stop. void stop (); - // Wait till undelying thread terminates. - void join (); - - // To be called when the whole infrastrucure is being closed (zmq_term). - // It's vital to call the individual commands in this sequence: - // stop, join, shutdown. - void shutdown (); - // Returns signaler associated with this I/O thread. i_signaler *get_signaler (); @@ -62,9 +57,7 @@ namespace zmq void out_event (); void timer_event (); - // i_thread implementation. - void attach_session (class session_t *session_); - void detach_session (class session_t *session_); + // ??? struct i_poller *get_poller (); // Command handlers. @@ -75,9 +68,6 @@ namespace zmq private: - // Clean-up. - ~io_thread_t (); - // Poll thread gets notifications about incoming commands using // this signaler. fd_signaler_t signaler; @@ -87,11 +77,6 @@ namespace zmq // I/O multiplexing is performed using a poller object. i_poller *poller; - - // Vector of all sessions associated with this app thread. - typedef std::vector sessions_t; - sessions_t sessions; - }; } diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 28c15de4..f4c58a3d 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -42,6 +42,10 @@ zmq::kqueue_t::kqueue_t () zmq::kqueue_t::~kqueue_t () { + // Make sure there are no fds registered on shutdown. + zmq_assert (load.get () == 0); + + worker.stop (); close (kqueue_fd); } @@ -144,11 +148,6 @@ void zmq::kqueue_t::stop () stopping = true; } -void zmq::kqueue_t::join () -{ - worker.stop (); -} - void zmq::kqueue_t::loop () { while (!stopping) { diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 2fd68191..eeb6f096 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -42,7 +42,7 @@ namespace zmq public: kqueue_t (); - virtual ~kqueue_t (); + ~kqueue_t (); // i_poller implementation. handle_t add_fd (fd_t fd_, i_poll_events *events_); @@ -56,7 +56,6 @@ namespace zmq int get_load (); void start (); void stop (); - void join (); private: diff --git a/src/listener.cpp b/src/listener.cpp deleted file mode 100644 index 823b21b4..00000000 --- a/src/listener.cpp +++ /dev/null @@ -1,170 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "listener.hpp" -#include "simple_semaphore.hpp" -#include "zmq_tcp_engine.hpp" -#include "io_thread.hpp" -#include "session_stub.hpp" -#include "session.hpp" -#include "err.hpp" -#include "dummy_aggregator.hpp" -#include "dummy_distributor.hpp" - -zmq::listener_t::listener_t (io_thread_t *thread_, const char *addr_, - session_t *peer_, bool has_in_, bool has_out_, uint64_t taskset_) : - io_object_t (thread_), - poller (NULL), - addr (addr_), - peer (peer_), - taskset (taskset_), - has_in (has_in_), - has_out (has_out_) -{ -} - -void zmq::listener_t::terminate () -{ - for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) - session_stubs [i]->terminate (); - delete this; -} - -void zmq::listener_t::shutdown () -{ - for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) - session_stubs [i]->shutdown (); - delete this; -} - -zmq::listener_t::~listener_t () -{ -} - -void zmq::listener_t::got_identity (session_stub_t *session_stub_, - const char *identity_) -{ - // Get the engine allready disconnected from the stub and poller. - i_engine *engine = session_stub_->detach_engine (); - zmq_assert (engine); - - // Find the corresponding session. - session_t *session; - sessions_t::iterator it = sessions.find (identity_); - - // Destroy the stub. - int i = session_stub_->get_index (); - session_stubs [i] = session_stubs [session_stubs.size () - 1]; - session_stubs [i]->set_index (i); - session_stubs.pop_back (); - session_stub_->terminate (); - - // If there's no session with the specified identity, create one. - if (it != sessions.end ()) { - session = it->second; - session->inc_seqnum (); - } - else { - - // Choose an I/O thread with the least load to handle the new session. - io_thread_t *io_thread = choose_io_thread (taskset); - - // Create the session and bind it to the I/O thread and peer. Make - // sure that the peer session won't get deallocated till it processes - // the subsequent bind command. - i_mux *mux = new dummy_aggregator_t; - zmq_assert (mux); - i_demux *demux = new dummy_distributor_t; - zmq_assert (demux); - session = new session_t (io_thread, io_thread, mux, demux, false, true); - zmq_assert (session); - session->inc_seqnum (); - session->inc_seqnum (); - peer->inc_seqnum (); - send_reg_and_bind (session, peer, has_in, has_out); - } - - // Attach the engine to the session. - send_engine (session, engine); -} - -void zmq::listener_t::process_reg (simple_semaphore_t *smph_) -{ - zmq_assert (!poller); - poller = get_poller (); - - // Open the listening socket. - int rc = tcp_listener.open (addr.c_str ()); - zmq_assert (rc == 0); - - // Unlock the application thread that created the listener. - if (smph_) - smph_->post (); - - // Start polling for incoming connections. - handle = poller->add_fd (tcp_listener.get_fd (), this); - poller->set_pollin (handle); -} - -void zmq::listener_t::process_unreg (simple_semaphore_t *smph_) -{ - // Disassociate listener from the poller. - zmq_assert (poller); - poller->rm_fd (handle); - poller = NULL; - - // Unlock the application thread closing the listener. - if (smph_) - smph_->post (); -} - -void zmq::listener_t::in_event () -{ - fd_t fd = tcp_listener.accept (); - - // If connection was reset by the peer in the meantime, just ignore it. - // TODO: Handle specific errors like ENFILE/EMFILE etc. - if (fd == retired_fd) - return; - - // Create an session stub for the engine to take care for it till its - // identity is retreived. - session_stub_t *session_stub = new session_stub_t (this); - zmq_assert (session_stub); - session_stub->set_index (session_stubs.size ()); - session_stubs.push_back (session_stub); - - // Create an engine to encaspulate the socket. Engine will register itself - // with the stub so the stub will be able to free it in case of shutdown. - zmq_tcp_engine_t *engine = new zmq_tcp_engine_t (fd); - zmq_assert (engine); - engine->attach (poller, session_stub); -} - -void zmq::listener_t::out_event () -{ - zmq_assert (false); -} - -void zmq::listener_t::timer_event () -{ - zmq_assert (false); -} - - diff --git a/src/listener.hpp b/src/listener.hpp deleted file mode 100644 index 2fe93dbb..00000000 --- a/src/listener.hpp +++ /dev/null @@ -1,110 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_LISTENER_HPP_INCLUDED__ -#define __ZMQ_LISTENER_HPP_INCLUDED__ - -#include -#include -#include - -#include "io_object.hpp" -#include "tcp_listener.hpp" -#include "i_poller.hpp" -#include "i_poll_events.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - class listener_t : public io_object_t, public i_poll_events - { - public: - - listener_t (class io_thread_t *thread_, const char *addr_, - class session_t *peer_, bool has_in_, bool has_out_, - uint64_t taskset_); - - void terminate (); - void shutdown (); - - // This function is called by session stub once the identity - // is retrieved from the incoming connection. - void got_identity (class session_stub_t *session_stub_, - const char *identity_); - - void process_reg (class simple_semaphore_t *smph_); - void process_unreg (class simple_semaphore_t *smph_); - - // i_poll_events implementation. - void in_event (); - void out_event (); - void timer_event (); - - private: - - ~listener_t (); - - struct i_poller *poller; - - // Handle corresponding to the listening socket. - handle_t handle; - - // Actual listening socket. - tcp_listener_t tcp_listener; - - // Address to bind to. - std::string addr; - - // Peer session. All the newly created connections should bind to - // this session. - session_t *peer; - - // Taskset specifies which I/O threads are to be use to handle - // newly created connections (0 = all). - uint64_t taskset; - - // Sessions created by this listener are stored in this map. They are - // indexed by peer identities so that the same peer connects to the - // same session after reconnection. - // NB: Sessions are destroyed from other place and possibly later on, - // so no need to care about them during listener object termination. - typedef std::map sessions_t; - sessions_t sessions; - - // List of engines (bound to temorary session stubs) that we haven't - // retrieved the identity from so far. - typedef std::vector session_stubs_t; - session_stubs_t session_stubs; - - // If true, create inbound pipe when binding new connection - // to the peer. - bool has_in; - - // If true, create outbound pipe when binding new connection - // to the peer. - bool has_out; - - listener_t (const listener_t&); - void operator = (const listener_t&); - }; - -} - -#endif diff --git a/src/load_balancer.cpp b/src/load_balancer.cpp deleted file mode 100644 index 0d382a10..00000000 --- a/src/load_balancer.cpp +++ /dev/null @@ -1,130 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "load_balancer.hpp" -#include "pipe_writer.hpp" -#include "err.hpp" -#include "session.hpp" -#include "msg.hpp" - -zmq::load_balancer_t::load_balancer_t () : - session (NULL), - current (0) -{ -} - -void zmq::load_balancer_t::set_session (session_t *session_) -{ - zmq_assert (!session); - session = session_; -} - -void zmq::load_balancer_t::shutdown () -{ - // No need to deallocate pipes here. They'll be deallocated during the - // shutdown of the dispatcher. - delete this; -} - -void zmq::load_balancer_t::terminate () -{ - // Pipe unregisters itself during the call to terminate, so the pipes - // list shinks by one in each iteration. - while (!pipes.empty ()) - pipes [0]->terminate (); - - delete this; -} - -zmq::load_balancer_t::~load_balancer_t () -{ -} - -void zmq::load_balancer_t::attach_pipe (pipe_writer_t *pipe_) -{ - // Associate demux with a new pipe. - pipe_->set_demux (this); - pipe_->set_index (pipes.size ()); - pipes.push_back (pipe_); -} - -void zmq::load_balancer_t::detach_pipe (pipe_writer_t *pipe_) -{ - // Release the reference to the pipe. - int index = pipe_->get_index (); - pipe_->set_index (-1); - pipes [index] = pipes.back (); - pipes [index]->set_index (index); - pipes.pop_back (); -} - -bool zmq::load_balancer_t::empty () -{ - return pipes.empty (); -} - -bool zmq::load_balancer_t::send (zmq_msg *msg_) -{ - // If there are no pipes, message cannot be sent. - if (pipes.size () == 0) - return false; - - // Find the first pipe that is ready to accept the message. - bool found = false; - for (pipes_t::size_type i = 0; !found && i < pipes.size (); i++) { -// if (pipes [current]->check_write (msg)) - found = true; -// else -// current = (current + 1) % pipes.size (); - } - - // Oops, no pipe is ready to accept the message. - if (!found) - return false; - - // Send the message to the selected pipe. - write_to_pipe (pipes [current], msg_); - current = (current + 1) % pipes.size (); - - // Detach the original message from the data buffer. - zmq_msg_init (msg_); - - return true; -} - -void zmq::load_balancer_t::flush () -{ - // Flush all pipes. If there's large number of pipes, it can be pretty - // inefficient (especially if there's new message only in a single pipe). - // Can it be improved? - for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) - (*it)->flush (); -} - -void zmq::load_balancer_t::write_to_pipe (class pipe_writer_t *pipe_, - struct zmq_msg *msg_) -{ - if (!pipe_->write (msg_)) { - // TODO: Push gap notification to the pipe. - zmq_assert (false); - } -} - diff --git a/src/load_balancer.hpp b/src/load_balancer.hpp deleted file mode 100644 index 953ed3b1..00000000 --- a/src/load_balancer.hpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_LOAD_BALANCER_HPP_INCLUDED__ -#define __ZMQ_LOAD_BALANCER_HPP_INCLUDED__ - -#include - -#include - -namespace zmq -{ - - // Object to distribute messages to outbound pipes. - - class load_balancer_t : public i_demux - { - public: - - load_balancer_t (); - - // i_demux implementation. - void set_session (class session_t *session_); - void shutdown (); - void terminate (); - void attach_pipe (class pipe_writer_t *pipe_); - void detach_pipe (class pipe_writer_t *pipe_); - bool empty (); - bool send (struct zmq_msg *msg_); - void flush (); - - private: - - // Clean-up. - ~load_balancer_t (); - - // Reference to the owner session object. - class session_t *session; - - // Writes the message to the pipe if possible. If it isn't, writes - // a gap notification to the pipe. - void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_); - - // The list of outbound pipes. - typedef std::vector pipes_t; - pipes_t pipes; - - // Current pipe to write next message to. - pipes_t::size_type current; - - load_balancer_t (const load_balancer_t&); - void operator = (const load_balancer_t&); - }; - -} - -#endif diff --git a/src/object.cpp b/src/object.cpp index 7c852120..36f39378 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -20,12 +20,8 @@ #include "object.hpp" #include "context.hpp" #include "err.hpp" -#include "pipe_reader.hpp" -#include "pipe_writer.hpp" -#include "session.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" -#include "i_engine.hpp" zmq::object_t::object_t (context_t *context_, int thread_slot_) : context (context_), @@ -103,35 +99,6 @@ void zmq::object_t::process_command (command_t &cmd_) } } -void zmq::object_t::create_pipe (object_t *reader_parent_, - object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - pipe_reader_t **reader_, pipe_writer_t **writer_) -{ - context->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_, - reader_, writer_); -} - -void zmq::object_t::destroy_pipe (pipe_t *pipe_) -{ - context->destroy_pipe (pipe_); -} - -int zmq::object_t::register_inproc_endpoint (const char *endpoint_, - session_t *session_) -{ - return context->register_inproc_endpoint (endpoint_, session_); -} - -zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_) -{ - return context->get_inproc_endpoint (endpoint_); -} - -void zmq::object_t::unregister_inproc_endpoints (session_t *session_) -{ - context->unregister_inproc_endpoints (session_); -} - zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return context->choose_io_thread (taskset_); diff --git a/src/object.hpp b/src/object.hpp index 796e7fa8..5851c68f 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -44,14 +44,6 @@ namespace zmq // Derived object can use following functions to interact with // global repositories. See context.hpp for function details. int thread_slot_count (); - void create_pipe (class object_t *reader_parent_, - class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, - class pipe_reader_t **reader_, class pipe_writer_t **writer_); - void destroy_pipe (class pipe_t *pipe_); - int register_inproc_endpoint (const char *endpoint_, - class session_t *session_); - class object_t *get_inproc_endpoint (const char *endpoint_); - void unregister_inproc_endpoints (class session_t *session_); class io_thread_t *choose_io_thread (uint64_t taskset_); // Derived object can use these functions to send commands diff --git a/src/p2p.cpp b/src/p2p.cpp deleted file mode 100644 index c83d8b1f..00000000 --- a/src/p2p.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "p2p.hpp" -#include "app_thread.hpp" -#include "session.hpp" - -zmq::p2p_t::p2p_t (app_thread_t *thread_, session_t *session_) : - socket_base_t (thread_, session_) -{ -} diff --git a/src/p2p.hpp b/src/p2p.hpp deleted file mode 100644 index d3d9dc31..00000000 --- a/src/p2p.hpp +++ /dev/null @@ -1,42 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_P2P_HPP_INCLUDED__ -#define __ZMQ_P2P_HPP_INCLUDED__ - -#include "socket_base.hpp" - -namespace zmq -{ - - class p2p_t : public socket_base_t - { - public: - - p2p_t (class app_thread_t *thread_, class session_t *session_); - - private: - - p2p_t (const p2p_t&); - void operator = (const p2p_t&); - }; - -} - -#endif diff --git a/src/pipe.cpp b/src/pipe.cpp deleted file mode 100644 index bf761b40..00000000 --- a/src/pipe.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "pipe.hpp" - -zmq::pipe_t::pipe_t () : - ypipe_t (false), - index (-1) -{ -} - -zmq::pipe_t::~pipe_t () -{ - // Flush any outstanding messages to the pipe. - flush (); - - // Deallocate all the messages in the pipe. - zmq_msg msg; - while (read (&msg)) - zmq_msg_close (&msg); -} - -void zmq::pipe_t::set_index (int index_) -{ - index = index_; -} - -int zmq::pipe_t::get_index () -{ - return index; -} diff --git a/src/pipe.hpp b/src/pipe.hpp index 8894a225..d7711203 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -28,29 +28,10 @@ namespace zmq { - // Message pipe. A simple wrapper on top of ypipe. - + // Message pipe. class pipe_t : public ypipe_t { - // Context is a friend so that it can create & destroy the pipes. - // By making constructor & destructor private we are sure that nobody - // except context messes with pipes. - friend class context_t; - - private: - - pipe_t (); - ~pipe_t (); - - void set_index (int index_); - int get_index (); - - // Index of the pipe in context's array of pipes. - int index; - - pipe_t (const pipe_t&); - void operator = (const pipe_t&); - }; + }; } diff --git a/src/pipe_reader.cpp b/src/pipe_reader.cpp deleted file mode 100644 index 79dfe2e7..00000000 --- a/src/pipe_reader.cpp +++ /dev/null @@ -1,118 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "pipe_reader.hpp" -#include "pipe.hpp" -#include "err.hpp" -#include "i_mux.hpp" - -zmq::pipe_reader_t::pipe_reader_t (object_t *parent_, pipe_t *pipe_, - uint64_t hwm_, uint64_t lwm_) : - object_t (parent_), - pipe (pipe_), - peer (NULL), - mux (NULL), - index (-1), - hwm (hwm_), - lwm (lwm_), - head (0), - tail (0), - last_sent_head (0) -{ -} - -void zmq::pipe_reader_t::set_peer (object_t *peer_) -{ - peer = peer_; -} - -zmq::pipe_reader_t::~pipe_reader_t () -{ -} - -void zmq::pipe_reader_t::set_mux (i_mux *mux_) -{ - mux = mux_; -} - -void zmq::pipe_reader_t::set_index (int index_) -{ - index = index_; -} - -int zmq::pipe_reader_t::get_index () -{ - return index; -} - -void zmq::pipe_reader_t::process_tail (uint64_t bytes_) -{ - tail = bytes_; - mux->reactivate (this); -} - -bool zmq::pipe_reader_t::read (struct zmq_msg *msg_) -{ - // Read a message. - if (!pipe->read (msg_)) { - mux->deactivate (this); - return false; - } - - // If successfull, adjust the head of the pipe. - head += zmq_msg_size (msg_); - - // If pipe writer wasn't notified about the head position for long enough, - // notify it. - if (head - last_sent_head >= hwm - lwm) { - send_head (peer, head); - last_sent_head = head; - } - - if (zmq_msg_type (msg_) == ZMQ_DELIMITER) { - - // Detach the pipe from the mux and send termination request to - // the pipe writer. - mux->detach_pipe (this); - mux = NULL; - send_terminate (peer); - return false; - } - - return true; -} - -void zmq::pipe_reader_t::terminate () -{ - // Detach the pipe from the mux and send termination request to - // the pipe writer. - if (mux) { - mux->detach_pipe (this); - mux = NULL; - } - send_terminate (peer); -} - -void zmq::pipe_reader_t::process_terminate_ack () -{ - // Ask context to deallocate the pipe. - destroy_pipe (pipe); -} diff --git a/src/pipe_reader.hpp b/src/pipe_reader.hpp deleted file mode 100644 index cf45bb40..00000000 --- a/src/pipe_reader.hpp +++ /dev/null @@ -1,89 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_PIPE_READER_HPP_INCLUDED__ -#define __ZMQ_PIPE_READER_HPP_INCLUDED__ - -#include "object.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - class pipe_reader_t : public object_t - { - // Context is a friend so that it can create & destroy the reader. - // By making constructor & destructor private we are sure that nobody - // except context messes with readers. - friend class context_t; - - public: - - // Set & get index in the associated mux object. - void set_mux (struct i_mux *mux_); - void set_index (int index_); - int get_index (); - - // Reads a message to the underlying pipe. - bool read (struct zmq_msg *msg_); - - // Asks pipe to destroy itself. - void terminate (); - - private: - - pipe_reader_t (class object_t *parent_, class pipe_t *pipe_, - uint64_t hwm_, uint64_t lwm_); - ~pipe_reader_t (); - - // Second step of reader construction. The parameter cannot be passed - // in constructor as peer object doesn't yet exist at the time. - void set_peer (class object_t *peer_); - - void process_tail (uint64_t bytes_); - void process_terminate_ack (); - - // The underlying pipe. - class pipe_t *pipe; - - // Pipe writer associated with the other side of the pipe. - class object_t *peer; - - // Associated mux object. - struct i_mux *mux; - - // Index in the associated mux object. - int index; - - // High and low watermarks for in-memory storage (in bytes). - uint64_t hwm; - uint64_t lwm; - - // Positions of head and tail of the pipe (in bytes). - uint64_t head; - uint64_t tail; - uint64_t last_sent_head; - - pipe_reader_t (const pipe_reader_t&); - void operator = (const pipe_reader_t&); - }; - -} - -#endif diff --git a/src/pipe_writer.cpp b/src/pipe_writer.cpp deleted file mode 100644 index a54034b9..00000000 --- a/src/pipe_writer.cpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "pipe_writer.hpp" -#include "pipe.hpp" -#include "i_demux.hpp" - -zmq::pipe_writer_t::pipe_writer_t (object_t *parent_, pipe_t *pipe_, - object_t *peer_, uint64_t hwm_, uint64_t lwm_) : - object_t (parent_), - pipe (pipe_), - peer (peer_), - demux (NULL), - index (-1), - hwm (hwm_), - lwm (lwm_), - head (0), - tail (0) -{ -} - -zmq::pipe_writer_t::~pipe_writer_t () -{ -} - -void zmq::pipe_writer_t::set_demux (i_demux *demux_) -{ - demux = demux_; -} - -void zmq::pipe_writer_t::set_index (int index_) -{ - index = index_; -} - -int zmq::pipe_writer_t::get_index () -{ - return index; -} - -bool zmq::pipe_writer_t::write (zmq_msg *msg_) -{ - size_t msg_size = zmq_msg_size (msg_); - - // If message won't fit into the in-memory pipe, there's no way - // to pass it further. - // TODO: It should be discarded and 'oversized' notification should be - // placed into the pipe. - zmq_assert (!hwm || msg_size <= hwm); - - // If there's not enough space in the pipe at the moment, return false. - if (hwm && tail + msg_size - head > hwm) - return false; - - // Write the message to the pipe and adjust tail position. - pipe->write (*msg_); - flush (); - tail += msg_size; - - return true; -} - -void zmq::pipe_writer_t::flush () -{ - if (!pipe->flush ()) - send_tail (peer, tail); -} - -void zmq::pipe_writer_t::process_head (uint64_t bytes_) -{ - head = bytes_; -} - -void zmq::pipe_writer_t::terminate () -{ - // Disconnect from the associated demux. - if (demux) { - demux->detach_pipe (this); - demux = NULL; - } - - // Push the delimiter to the pipe. Delimiter is a notification for pipe - // reader that there will be no more messages in the pipe. - zmq_msg delimiter; - delimiter.content = (zmq_msg_content*) ZMQ_DELIMITER; - delimiter.shared = false; - delimiter.vsm_size = 0; - pipe->write (delimiter); - flush (); -} - -void zmq::pipe_writer_t::process_terminate () -{ - // Disconnect from the associated demux. - if (demux) { - demux->detach_pipe (this); - demux = NULL; - } - - // Send termination acknowledgement to the pipe reader. - send_terminate_ack (peer); -} diff --git a/src/pipe_writer.hpp b/src/pipe_writer.hpp deleted file mode 100644 index a727b1fc..00000000 --- a/src/pipe_writer.hpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_PIPE_WRITER_HPP_INCLUDED__ -#define __ZMQ_PIPE_WRITER_HPP_INCLUDED__ - -#include "object.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - class pipe_writer_t : public object_t - { - // Context is a friend so that it can create & destroy the writer. - // By making constructor & destructor private we are sure that nobody - // except context messes with writers. - friend class context_t; - - public: - - // Set & get index in the associated demux object. - void set_demux (struct i_demux *demux_); - void set_index (int index_); - int get_index (); - - // Writes a message to the underlying pipe. Returns false if the - // message cannot be written to the pipe at the moment. - bool write (struct zmq_msg *msg_); - - // Flush the messages downsteam. - void flush (); - - // Asks pipe to destroy itself. - void terminate (); - - private: - - pipe_writer_t (class object_t *parent_, class pipe_t *pipe_, - class object_t *peer_, uint64_t hwm_, uint64_t lwm_); - ~pipe_writer_t (); - - void process_head (uint64_t bytes_); - void process_terminate (); - - // The underlying pipe. - class pipe_t *pipe; - - // Pipe reader associated with the other side of the pipe. - class object_t *peer; - - // Associated demux object. - struct i_demux *demux; - - // Index in the associated demux object. - int index; - - // High and low watermarks for in-memory storage (in bytes). - uint64_t hwm; - uint64_t lwm; - - // Positions of head and tail of the pipe (in bytes). - uint64_t head; - uint64_t tail; - - pipe_writer_t (const pipe_writer_t&); - void operator = (const pipe_writer_t&); - }; - -} - -#endif diff --git a/src/poll.cpp b/src/poll.cpp index 864cfad7..94e4fd47 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -50,6 +50,14 @@ zmq::poll_t::poll_t () : fd_table [i].index = retired_fd; } +zmq::poll_t::~poll_t () +{ + // Make sure there are no fds registered on shutdown. + zmq_assert (load.get () == 0); + + worker.stop (); +} + zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) { pollfd pfd = {fd_, 0, 0}; @@ -132,11 +140,6 @@ void zmq::poll_t::stop () stopping = true; } -void zmq::poll_t::join () -{ - worker.stop (); -} - void zmq::poll_t::loop () { while (!stopping) { diff --git a/src/poll.hpp b/src/poll.hpp index dbfa7765..9fe60672 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -47,7 +47,7 @@ namespace zmq public: poll_t (); - virtual ~poll_t () {} + ~poll_t (); // i_poller implementation. handle_t add_fd (fd_t fd_, i_poll_events *events_); @@ -61,7 +61,6 @@ namespace zmq int get_load (); void start (); void stop (); - void join (); private: diff --git a/src/pub.cpp b/src/pub.cpp deleted file mode 100644 index 5dca0b88..00000000 --- a/src/pub.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "pub.hpp" -#include "app_thread.hpp" -#include "session.hpp" -#include "err.hpp" - -zmq::pub_t::pub_t (app_thread_t *thread_, session_t *session_) : - socket_base_t (thread_, session_) -{ - disable_in (); -} - -int zmq::pub_t::recv (struct zmq_msg *msg_, int flags_) -{ - // Publisher socket has no recv function. - errno = ENOTSUP; - return -1; -} diff --git a/src/pub.hpp b/src/pub.hpp deleted file mode 100644 index 909e731f..00000000 --- a/src/pub.hpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_PUB_HPP_INCLUDED__ -#define __ZMQ_PUB_HPP_INCLUDED__ - -#include "socket_base.hpp" - -namespace zmq -{ - - class pub_t : public socket_base_t - { - public: - - pub_t (class app_thread_t *thread_, class session_t *session_); - - // i_api overloads. - int recv (struct zmq_msg *msg_, int flags_); - - private: - - pub_t (const pub_t&); - void operator = (const pub_t&); - }; - -} - -#endif diff --git a/src/rep.cpp b/src/rep.cpp deleted file mode 100644 index 60767e12..00000000 --- a/src/rep.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "rep.hpp" -#include "app_thread.hpp" -#include "session.hpp" - -zmq::rep_t::rep_t (app_thread_t *thread_, session_t *session_) : - socket_base_t (thread_, session_) -{ -} diff --git a/src/rep.hpp b/src/rep.hpp deleted file mode 100644 index 92d27581..00000000 --- a/src/rep.hpp +++ /dev/null @@ -1,42 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_REP_HPP_INCLUDED__ -#define __ZMQ_REP_HPP_INCLUDED__ - -#include "socket_base.hpp" - -namespace zmq -{ - - class rep_t : public socket_base_t - { - public: - - rep_t (class app_thread_t *thread_, class session_t *session_); - - private: - - rep_t (const rep_t&); - void operator = (const rep_t&); - }; - -} - -#endif diff --git a/src/req.hpp b/src/req.hpp deleted file mode 100644 index c279f0ef..00000000 --- a/src/req.hpp +++ /dev/null @@ -1,42 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_REQ_HPP_INCLUDED__ -#define __ZMQ_REQ_HPP_INCLUDED__ - -#include "socket_base.hpp" - -namespace zmq -{ - - class req_t : public socket_base_t - { - public: - - req_t (class app_thread_t *thread_, class session_t *session_); - - private: - - req_t (const req_t&); - void operator = (const req_t&); - }; - -} - -#endif diff --git a/src/safe_object.cpp b/src/safe_object.cpp deleted file mode 100644 index d4a92d78..00000000 --- a/src/safe_object.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "safe_object.hpp" - -zmq::safe_object_t::safe_object_t (class context_t *context_, - int thread_slot_) : - object_t (context_, thread_slot_), - processed_seqnum (0), - terminating (false) -{ -} - -zmq::safe_object_t::safe_object_t (object_t *parent_) : - object_t (parent_), - processed_seqnum (0), - terminating (false) -{ -} - -void zmq::safe_object_t::inc_seqnum () -{ - // This function is called from the sender thread to ensure that this - // object will still exist when the command sent to it arrives in the - // destination thread. - sent_seqnum.add (1); -} - -void zmq::safe_object_t::process_command (struct command_t &cmd_) -{ - object_t::process_command (cmd_); - - // Adjust sequence number of the last processed command. - processed_seqnum++; - - // If we are already in the termination phase and all commands sent to - // this object are processed, it's safe to deallocate it. - if (terminating && sent_seqnum.get () == processed_seqnum) - delete this; -} - -void zmq::safe_object_t::terminate () -{ - // Wait till all commands sent to this session are processed. - terminating = true; - - // If there's no pending command we can deallocate the session - // straight saway. - if (sent_seqnum.get () == processed_seqnum) - delete this; -} - -bool zmq::safe_object_t::is_terminating () -{ - return terminating; -} - -zmq::safe_object_t::~safe_object_t () -{ -} diff --git a/src/safe_object.hpp b/src/safe_object.hpp deleted file mode 100644 index b47db48b..00000000 --- a/src/safe_object.hpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_SAFE_OBJECT_HPP_INCLUDED__ -#define __ZMQ_SAFE_OBJECT_HPP_INCLUDED__ - -#include "object.hpp" -#include "atomic_counter.hpp" - -namespace zmq -{ - - // Same as object_t with the exception of termination mechanism. While - // object_t is destroyed immediately on terminate (assuming that the caller - // have ensured that there are no more commands for the object on the - // fly), safe_object_t switches into termination mode and waits for all - // the on-the-fly commands to be delivered before it deallocates itself. - - class safe_object_t : public object_t - { - public: - - safe_object_t (class context_t *context_, int thread_slot_); - safe_object_t (object_t *parent_); - - void inc_seqnum (); - void process_command (struct command_t &cmd_); - - protected: - - void terminate (); - bool is_terminating (); - - virtual ~safe_object_t (); - - private: - - // Sequence number of the last command sent to the object and last - // command processed by the object. The former is an atomic counter - // meaning that other threads can increment it safely. - atomic_counter_t sent_seqnum; - uint32_t processed_seqnum; - - bool terminating; - - safe_object_t (const safe_object_t&); - void operator = (const safe_object_t&); - }; - -} - -#endif diff --git a/src/select.cpp b/src/select.cpp index 68ec9a0b..f10acdc1 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -51,6 +51,14 @@ zmq::select_t::select_t () : FD_ZERO (&source_set_err); } +zmq::select_t::~select_t () +{ + // Make sure there are no fds registered on shutdown. + zmq_assert (load.get () == 0); + + worker.stop (); +} + zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) { // Store the file descriptor. @@ -156,11 +164,6 @@ void zmq::select_t::stop () stopping = true; } -void zmq::select_t::join () -{ - worker.stop (); -} - void zmq::select_t::loop () { while (!stopping) { diff --git a/src/select.hpp b/src/select.hpp index c1e72a7c..c4424779 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -50,6 +50,7 @@ namespace zmq public: select_t (); + ~select_t (); // i_poller implementation. handle_t add_fd (fd_t fd_, i_poll_events *events_); @@ -63,7 +64,6 @@ namespace zmq int get_load (); void start (); void stop (); - void join (); private: diff --git a/src/session.cpp b/src/session.cpp deleted file mode 100644 index b9a450df..00000000 --- a/src/session.cpp +++ /dev/null @@ -1,273 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "session.hpp" -#include "i_engine.hpp" -#include "i_thread.hpp" -#include "i_mux.hpp" -#include "i_demux.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "pipe_reader.hpp" -#include "pipe_writer.hpp" -#include "simple_semaphore.hpp" - -zmq::session_t::session_t (object_t *parent_, i_thread *thread_, - i_mux *mux_, i_demux *demux_, - bool terminate_on_disconnect_, bool terminate_on_no_pipes_) : - safe_object_t (parent_), - mux (mux_), - demux (demux_), - thread (thread_), - engine (NULL), - terminate_on_disconnect (terminate_on_disconnect_), - terminate_on_no_pipes (false), - terminate_on_no_pipes_delayed (terminate_on_no_pipes_), - index (-1) -{ - // At least one way to terminate the session should be allowed. Otherwise - // the session can be orphaned forever. - zmq_assert (terminate_on_disconnect || terminate_on_no_pipes_delayed); - - // Give the mux and the demux callback pointer to ourselves. - if (mux) - mux->set_session (this); - if (demux) - demux->set_session (this); -} - -void zmq::session_t::shutdown () -{ - // Session may live even without an associated engine, thus we have - // to check if for NULL value. - if (engine) - engine->shutdown (); - - // Propagate the shutdown signal to both inbound and outbound pipes. - if (mux) - mux->shutdown (); - if (demux) - demux->shutdown (); - - delete this; -} - -void zmq::session_t::disconnected () -{ - // It's engine who calls this function so there's no need to deallocate - // the engine. Just drop the reference. - engine = NULL; - - // Some sessions won't shut down because of disconnect. New engine will - // attached to the session later on. - if (!terminate_on_disconnect) - return; - - terminate (); -} - -void zmq::session_t::bind (object_t *peer_, bool in_, bool out_) -{ - // Create the out pipe (if required). - pipe_reader_t *pipe_reader = NULL; - if (out_) { - pipe_writer_t *pipe_writer; - create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer); - demux->attach_pipe (pipe_writer); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } - - // Ask peer to attach to the out pipe (if one exists). If required, ask - // it to create a pipe in opposite direction. It's assumed that peer's - // seqnum was already incremented, so we don't need to care whether it's - // alive at the moment. - if (in_) - inc_seqnum (); - send_bind (peer_, pipe_reader, in_ ? this : NULL); -} - -void zmq::session_t::revive () -{ - if (engine) - engine->revive (); -} - -void zmq::session_t::terminate () -{ - // Terminate is always called by engine, thus it'll terminate itself, - // we just have to drop the pointer. - engine = NULL; - - // Propagate the terminate signal to both inbound and outbound pipes. - if (mux) { - mux->terminate (); - mux = NULL; - } - if (demux) { - demux->terminate (); - demux = NULL; - } - - // Session cannot be deallocated at this point. There can still be - // pending commands to process. Unregister session from global - // repository thus ensuring that no new commands will be sent. - unregister_inproc_endpoints (this); - - // Move to terminating state. - safe_object_t::terminate (); -} - -zmq::session_t::~session_t () -{ - // When session is actually deallocated it unregisters from its thread. - // Unregistration cannot be done earlier as it would result in memory - // leak if global shutdown happens in the middle of session termination. - thread->detach_session (this); -} - -void zmq::session_t::set_engine (i_engine *engine_) -{ - zmq_assert (!engine || !engine_); - engine = engine_; -} - -void zmq::session_t::set_index (int index_) -{ - index = index_; -} - -int zmq::session_t::get_index () -{ - return index; -} - -bool zmq::session_t::write (zmq_msg *msg_) -{ - return demux->send (msg_); -} - -void zmq::session_t::flush () -{ - demux->flush (); -} - -bool zmq::session_t::read (zmq_msg *msg_) -{ - bool retrieved = mux->recv (msg_); - if (terminate_on_no_pipes && mux->empty () && demux->empty ()) { - zmq_assert (engine); - engine->schedule_terminate (); - terminate (); - } - return retrieved; -} - -void zmq::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_) -{ - if (is_terminating ()) { - - // If session is already in termination phase, we'll ask newly arrived - // pipe reader & writer to terminate straight away. - if (reader_) - reader_->terminate (); - - // Peer session has already incremented its seqnum. We have to send - // a dummy command to avoid a memory leak. - if (peer_) - send_bind (peer_, NULL, NULL); - - return; - } - - // If inbound pipe is provided, bind it to the mux. - if (reader_) { - mux->attach_pipe (reader_); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } - - // If peer wants to get messages from ourselves, we'll bind to it. - if (peer_) { - pipe_reader_t *pipe_reader; - pipe_writer_t *pipe_writer; - create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer); - demux->attach_pipe (pipe_writer); - send_bind (peer_, pipe_reader, NULL); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } -} - -void zmq::session_t::process_reg (simple_semaphore_t *smph_) -{ - zmq_assert (!is_terminating ()); - - // Add the session to the list of sessions associated with this I/O thread. - // This way the session will be deallocated on the terminal shutdown. - thread->attach_session (this); - - // Release calling thead (if required). - if (smph_) - smph_->post (); -} - -void zmq::session_t::process_reg_and_bind (session_t *peer_, - bool flow_in_, bool flow_out_) -{ - zmq_assert (!is_terminating ()); - - // Add the session to the list of sessions associated with this I/O thread. - // This way the session will be deallocated on the terminal shutdown. - thread->attach_session (this); - - // Bind to the peer. Note that caller have already incremented command - // sequence number of the peer so we are sure it still exists. - pipe_reader_t *pipe_reader = NULL; - if (flow_out_) { - pipe_writer_t *pipe_writer; - create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer); - demux->attach_pipe (pipe_writer); - - // There's at least one pipe attached. We can deallocate the object - // when there are no pipes (if required). - terminate_on_no_pipes = terminate_on_no_pipes_delayed; - } - send_bind (peer_, pipe_reader, flow_in_ ? this : NULL); -} - -void zmq::session_t::process_engine (i_engine *engine_) -{ - if (is_terminating ()) { - - // Kill the engine. It won't be needed anymore. - engine_->terminate (); - return; - } - - engine_->attach (thread->get_poller (), this); -} diff --git a/src/session.hpp b/src/session.hpp deleted file mode 100644 index 855dd1d3..00000000 --- a/src/session.hpp +++ /dev/null @@ -1,107 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_SESSION_HPP_INCLUDED__ -#define __ZMQ_SESSION_HPP_INCLUDED__ - -#include "i_session.hpp" -#include "safe_object.hpp" -#include "stdint.hpp" -#include "atomic_counter.hpp" - -namespace zmq -{ - - // Object that encapsulates both mux and demux. - - class session_t : public safe_object_t, public i_session - { - public: - - // Creates the session object. - session_t (struct object_t *parent_, struct i_thread *thread_, - struct i_mux *mux_, struct i_demux *demux_, - bool terminate_on_disconnect_, bool terminate_on_no_pipes_); - - // i_session implementation - void set_engine (struct i_engine *engine_); - void shutdown (); - bool read (struct zmq_msg *msg_); - bool write (struct zmq_msg *msg_); - void flush (); - - // Called by the engine when it is being closed. - void disconnected (); - - // Creates a message flow between this session and the peer session. - // If in_ is true, the messages can flow from the peer to ourselves. - // If out_ is true, messages can flow from ourselves to the peer. - // It's assumed that peer's seqnum was already incremented. - void bind (class object_t *peer_, bool in_, bool out_); - - // Called by mux if new messages are available. - void revive (); - - // Functions to set & retrieve index of this MD in thread's array - // of session objects. - void set_index (int index_); - int get_index (); - - private: - - // Clean-up. - ~session_t (); - - // Terminate is private here. It is called by either when disconnected - // or no_pipes event occurs. - void terminate (); - - void process_bind (class pipe_reader_t *reader_, - class session_t *peer_); - void process_reg (class simple_semaphore_t *smph_); - void process_reg_and_bind (class session_t *peer_, - bool flow_in_, bool flow_out_); - void process_engine (i_engine *engine_); - - struct i_mux *mux; - struct i_demux *demux; - - struct i_thread *thread; - struct i_engine *engine; - - // If true termination of the session can be triggered by engine - // disconnect/close. - bool terminate_on_disconnect; - - // If true termination of the session can be triggered when the last - // pipe detaches from it. - bool terminate_on_no_pipes; - - // If true, terminate_on_no_pipes should be set when at least one - // pipe was bound. - bool terminate_on_no_pipes_delayed; - - // Index in thread's session array. - int index; - }; - -} - -#endif - diff --git a/src/session_stub.cpp b/src/session_stub.cpp deleted file mode 100644 index 152b9fbc..00000000 --- a/src/session_stub.cpp +++ /dev/null @@ -1,110 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include - -#include "../include/zmq.h" - -#include "session_stub.hpp" -#include "i_engine.hpp" -#include "listener.hpp" -#include "err.hpp" - -zmq::session_stub_t::session_stub_t (listener_t *listener_) : - state (reading_identity), - engine (NULL), - listener (listener_), - index (-1) -{ -} - -void zmq::session_stub_t::terminate () -{ - if (engine) - engine->terminate (); - delete this; -} - -void zmq::session_stub_t::shutdown () -{ - if (engine) - engine->shutdown (); - delete this; -} - -zmq::session_stub_t::~session_stub_t () -{ -} - -void zmq::session_stub_t::set_engine (i_engine *engine_) -{ - zmq_assert (!engine_ || !engine); - engine = engine_; -} - -bool zmq::session_stub_t::read (struct zmq_msg *msg_) -{ - // No messages are sent to the connecting peer. - return false; -} - -bool zmq::session_stub_t::write (struct zmq_msg *msg_) -{ - // The first message arrived is the connection identity. - if (state == reading_identity) { - identity = std::string ((const char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); - state = has_identity; - return true; - } - - // We are not interested in any subsequent messages. - return false; -} - -void zmq::session_stub_t::flush () -{ - // We have the identity. At this point we can find the correct session and - // attach it to the connection. - if (state == has_identity) { - - // At this point the stub will be deleted. Return immediately without - // touching 'this' pointer. - listener->got_identity (this, identity.c_str ()); - return; - } -} - -zmq::i_engine *zmq::session_stub_t::detach_engine () -{ - // Ask engine to unregister from the poller. - i_engine *e = engine; - engine->detach (); - return e; -} - -void zmq::session_stub_t::set_index (int index_) -{ - index = index_; -} - -int zmq::session_stub_t::get_index () -{ - return index; -} diff --git a/src/session_stub.hpp b/src/session_stub.hpp deleted file mode 100644 index 4499e454..00000000 --- a/src/session_stub.hpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_SESSION_STUB_HPP_INCLUDED__ -#define __ZMQ_SESSION_STUB_HPP_INCLUDED__ - -#include - -#include "i_session.hpp" - -namespace zmq -{ - - // This class is used instead of regular session till the identity of - // incomming connection is established and connection is attached - // to corresponding session. - - class session_stub_t : public i_session - { - public: - - session_stub_t (class listener_t *listener_); - - // i_session implementation. - void set_engine (struct i_engine *engine_); - void terminate (); - void shutdown (); - bool read (struct zmq_msg *msg_); - bool write (struct zmq_msg *msg_); - void flush (); - - // Detaches engine from the stub. Returns it to the caller. - struct i_engine *detach_engine (); - - // Manipulate stubs's index in listener's array of stubs. - void set_index (int index_); - int get_index (); - - private: - - // Clean-up. - virtual ~session_stub_t (); - - enum { - reading_identity, - has_identity - } state; - - // Reference to the associated engine. - i_engine *engine; - - // Reference to the listener object that owns this stub. - class listener_t *listener; - - // Index of the stub in listener's array of stubs. - int index; - - // Identity of the connection. - std::string identity; - - session_stub_t (const session_stub_t&); - void operator = (const session_stub_t&); - }; - -} - -#endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp deleted file mode 100644 index 6718244e..00000000 --- a/src/socket_base.cpp +++ /dev/null @@ -1,267 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include - -#include "../include/zmq.h" - -#include "socket_base.hpp" -#include "app_thread.hpp" -#include "err.hpp" -#include "listener.hpp" -#include "connecter.hpp" -#include "simple_semaphore.hpp" -#include "io_thread.hpp" -#include "io_object.hpp" -#include "session.hpp" -#include "dummy_aggregator.hpp" -#include "dummy_distributor.hpp" - -zmq::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) : - object_t (thread_), - thread (thread_), - session (session_), - has_in (true), - has_out (true) -{ - session->set_engine (this); -} - -void zmq::socket_base_t::shutdown () -{ - // Destroy all the I/O objects created from this socket. - for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) - io_objects [i]->shutdown (); - - delete this; -} - -void zmq::socket_base_t::schedule_terminate () -{ - // Terminate is never scheduled on socket engines. - zmq_assert (false); -} - -void zmq::socket_base_t::terminate () -{ - // Destroy all the I/O objects created from this socket. - // First unregister the object from I/O thread, then terminate it in - // this application thread. - simple_semaphore_t smph; - for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) { - send_unreg (io_objects [i], &smph); - smph.wait (); - io_objects [i]->terminate (); - } - - zmq_assert (session); - session->disconnected (); - - delete this; -} - -zmq::socket_base_t::~socket_base_t () -{ -} - -void zmq::socket_base_t::disable_in () -{ - has_in = false; -} - -void zmq::socket_base_t::disable_out () -{ - has_out = false; -} - -int zmq::socket_base_t::bind (const char *addr_, zmq_opts *opts_) -{ - thread->process_commands (false); - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") { - - // Choose the I/O thread with the least load, create the listener. - // Note that same taskset is used to choose the I/O thread to handle - // the listening socket and newly created connections. - // Note that has_in and has_out are twisted at this place - listener - // is going to create peer objects, so the message flows are viewed - // from the opposite direction. - io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0); - listener_t *listener = new listener_t (io_thread, addr_, session, - has_out, has_in, opts_ ? opts_->taskset : 0); - - // Ask it to start interacting with the I/O thread. - simple_semaphore_t smph; - send_reg (listener, &smph); - - // Store the reference to the listener so that it can be terminated - // when the socket is closed. - io_objects.push_back (listener); - - // Wait while listener is actually registered with the I/O thread. - smph.wait (); - - return 0; - } - else if (addr.substr (0, pos) == "inproc") { - - // For inproc transport the only thing we have to do is to register - // this socket as an inproc endpoint with the supplied name. - return register_inproc_endpoint (addr.substr (pos + 3).c_str (), - session); - } - else { - - // Unknown protocol requested. - errno = EINVAL; - return -1; - } -} - -int zmq::socket_base_t::connect (const char *addr_, zmq_opts *opts_) -{ - thread->process_commands (false); - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") { - - // Choose the I/O thread with the least load, create the connecter and - // session. - io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0); - i_mux *mux = new dummy_aggregator_t; - zmq_assert (mux); - i_demux *demux = new dummy_distributor_t; - zmq_assert (demux); - session_t *peer = new session_t (io_thread, io_thread, mux, demux, - false, true); - zmq_assert (peer); - connecter_t *connecter = new connecter_t (io_thread, addr_, peer); - zmq_assert (connecter); - - // Increment session's command sequence number so that it won't get - // deallocated till the subsequent bind command arrives. - peer->inc_seqnum (); - - // Register the connecter (and session) with its I/O thread. - simple_semaphore_t smph; - send_reg (connecter, &smph); - - // Store the reference to the connecter so that it can be terminated - // when the socket is closed. - io_objects.push_back (connecter); - - // Wait till registration succeeds. - smph.wait (); - - // Bind local session with the connecter's session so that messages - // can flow in both directions. - session->bind (peer, has_in, has_out); - - return 0; - } - else if (addr.substr (0, pos) == "inproc") { - - // Get the MD responsible for the address. In case of invalid address - // return error. - object_t *peer = get_inproc_endpoint (addr.substr (pos + 3).c_str ()); - if (!peer) { - errno = EADDRNOTAVAIL; - return -1; - } - - // Create bidirectional message pipes between this session and - // the peer session. - session->bind (peer, has_in, has_out); - - return 0; - } - else { - - // Unknown protocol requested. - errno = EINVAL; - return -1; - } -} - -int zmq::socket_base_t::subscribe (const char *criteria_) -{ - // No implementation at the moment... - errno = ENOTSUP; - return -1; -} - -int zmq::socket_base_t::send (zmq_msg *msg_, int flags_) -{ - thread->process_commands (false); - while (true) { - if (session->write (msg_)) - return 0; - if (flags_ & ZMQ_NOBLOCK) { - errno = EAGAIN; - return -1; - } - thread->process_commands (true); - } -} - -int zmq::socket_base_t::flush () -{ - thread->process_commands (false); - session->flush (); - return 0; -} - -int zmq::socket_base_t::recv (zmq_msg *msg_, int flags_) -{ - thread->process_commands (false); - while (true) { - if (session->read (msg_)) - return 0; - if (flags_ & ZMQ_NOBLOCK) { - errno = EAGAIN; - return -1; - } - thread->process_commands (true); - } -} - -int zmq::socket_base_t::close () -{ - terminate (); - return 0; -} - -void zmq::socket_base_t::attach (struct i_poller *poller_, i_session *session_) -{ - zmq_assert (false); -} - -void zmq::socket_base_t::detach () -{ - zmq_assert (false); -} - -void zmq::socket_base_t::revive () -{ - // We can ignore the event safely here. -} - diff --git a/src/socket_base.hpp b/src/socket_base.hpp deleted file mode 100644 index c1de8e6c..00000000 --- a/src/socket_base.hpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ -#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ - -#include - -#include "i_engine.hpp" -#include "i_api.hpp" -#include "object.hpp" - -namespace zmq -{ - - class socket_base_t : public object_t, public i_engine, public i_api - { - public: - - // TODO: Possibly, session can be attached to the engine using - // attach function. - socket_base_t (class app_thread_t *thread_, class session_t *session_); - - // i_engine interface implementation. - void attach (struct i_poller *poller_, struct i_session *session_); - void detach (); - void revive (); - void schedule_terminate (); - void terminate (); - void shutdown (); - - // i_api interface implementation. - int bind (const char *addr_, struct zmq_opts *opts_); - int connect (const char *addr_, struct zmq_opts *opts_); - int subscribe (const char *criteria_); - int send (struct zmq_msg *msg_, int flags_); - int flush (); - int recv (struct zmq_msg *msg_, int flags_); - int close (); - - protected: - - // Clean-up. The function has to be protected rather than private, - // otherwise auto-generated destructor in derived classes - // cannot be compiled. It has to be virtual so that socket_base_t's - // terminate & shutdown functions deallocate correct type of object. - virtual ~socket_base_t (); - - // By default, socket is able to pass messages in both inward and - // outward directions. By calling these functions, particular - // socket type is able to eliminate one direction. - void disable_in (); - void disable_out (); - - private: - - // Pointer to the application thread the socket belongs to. - class app_thread_t *thread; - - // Pointer to the associated session object. - class session_t *session; - - // List of I/O object created via this socket. These have to be shut - // down when the socket is closed. - typedef std::vector io_objects_t; - io_objects_t io_objects; - - // If true, socket creates inbound pipe when binding to an engine. - bool has_in; - - // If true, socket creates outbound pipe when binding to an engine. - bool has_out; - - socket_base_t (const socket_base_t&); - void operator = (const socket_base_t&); - }; - -} - -#endif diff --git a/src/sub.cpp b/src/sub.cpp deleted file mode 100644 index 3d1d5787..00000000 --- a/src/sub.cpp +++ /dev/null @@ -1,45 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "sub.hpp" -#include "app_thread.hpp" -#include "session.hpp" -#include "err.hpp" - -zmq::sub_t::sub_t (app_thread_t *thread_, session_t *session_) : - socket_base_t (thread_, session_) -{ - disable_out (); -} - -int zmq::sub_t::send (struct zmq_msg *msg_, int flags_) -{ - // Subscriber socket has no send function. - errno = ENOTSUP; - return -1; -} - -int zmq::sub_t::flush () -{ - // Subscriber socket has no flush function. - errno = ENOTSUP; - return -1; -} diff --git a/src/sub.hpp b/src/sub.hpp deleted file mode 100644 index f3e23c11..00000000 --- a/src/sub.hpp +++ /dev/null @@ -1,46 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_SUB_HPP_INCLUDED__ -#define __ZMQ_SUB_HPP_INCLUDED__ - -#include "socket_base.hpp" - -namespace zmq -{ - - class sub_t : public socket_base_t - { - public: - - sub_t (class app_thread_t *thread_, class session_t *session_); - - // i_api overloads. - int send (struct zmq_msg *msg_, int flags_); - int flush (); - - private: - - sub_t (const sub_t&); - void operator = (const sub_t&); - }; - -} - -#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 0fb6fe1a..d19b229e 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -176,7 +176,7 @@ void *zmq_init (int app_threads_, int io_threads_) int zmq_term (void *context_) { - ((zmq::context_t*) context_)->shutdown (); + delete (zmq::context_t*) context_; return 0; } diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp deleted file mode 100644 index 0c491ea8..00000000 --- a/src/zmq_decoder.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "zmq_decoder.hpp" -#include "i_session.hpp" -#include "wire.hpp" - -zmq::zmq_decoder_t::zmq_decoder_t () : - destination (NULL) -{ - zmq_msg_init (&in_progress); - - // At the beginning, read one byte and go to one_byte_size_ready state. - next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); -} - -zmq::zmq_decoder_t::~zmq_decoder_t () -{ - zmq_msg_close (&in_progress); -} - -void zmq::zmq_decoder_t::set_session (i_session *destination_) -{ - destination = destination_; -} - -bool zmq::zmq_decoder_t::one_byte_size_ready () -{ - // First byte of size is read. If it is 0xff read 8-byte size. - // Otherwise allocate the buffer for message data and read the - // message data into it. - if (*tmpbuf == 0xff) - next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); - else { - zmq_msg_init_size (&in_progress, *tmpbuf); - next_step (zmq_msg_data (&in_progress), *tmpbuf, - &zmq_decoder_t::message_ready); - } - return true; -} - -bool zmq::zmq_decoder_t::eight_byte_size_ready () -{ - // 8-byte size is read. Allocate the buffer for message body and - // read the message data into it. - size_t size = (size_t) get_uint64 (tmpbuf); - zmq_msg_init_size (&in_progress, size); - next_step (zmq_msg_data (&in_progress), size, - &zmq_decoder_t::message_ready); - return true; -} - -bool zmq::zmq_decoder_t::message_ready () -{ - // Message is completely read. Push it further and start reading - // new message. - if (!destination->write (&in_progress)) - return false; - - next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); - return true; -} - diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp deleted file mode 100644 index f648819d..00000000 --- a/src/zmq_decoder.hpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - - -#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ - -#include "../include/zmq.h" - -#include "decoder.hpp" - -namespace zmq -{ - // Decoder for 0MQ backend protocol. Converts data batches into messages. - - class zmq_decoder_t : public decoder_t - { - public: - - zmq_decoder_t (); - ~zmq_decoder_t (); - - void set_session (struct i_session *destination_); - - private: - - bool one_byte_size_ready (); - bool eight_byte_size_ready (); - bool message_ready (); - - struct i_session *destination; - unsigned char tmpbuf [8]; - ::zmq_msg in_progress; - - zmq_decoder_t (const zmq_decoder_t&); - void operator = (const zmq_decoder_t&); - }; - -} - -#endif diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp deleted file mode 100644 index 8a713cf2..00000000 --- a/src/zmq_encoder.cpp +++ /dev/null @@ -1,75 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "zmq_encoder.hpp" -#include "i_session.hpp" -#include "wire.hpp" - -zmq::zmq_encoder_t::zmq_encoder_t () : - source (NULL) -{ - zmq_msg_init (&in_progress); - - // Write 0 bytes to the batch and go to message_ready state. - next_step (NULL, 0, &zmq_encoder_t::message_ready, true); -} - -zmq::zmq_encoder_t::~zmq_encoder_t () -{ - zmq_msg_close (&in_progress); -} - -void zmq::zmq_encoder_t::set_session (i_session *source_) -{ - source = source_; -} - -bool zmq::zmq_encoder_t::size_ready () -{ - // Write message body into the buffer. - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), - &zmq_encoder_t::message_ready, false); - return true; -} - -bool zmq::zmq_encoder_t::message_ready () -{ - // Read new message from the dispatcher. If there is none, return false. - // Note that new state is set only if write is successful. That way - // unsuccessful write will cause retry on the next state machine - // invocation. - if (!source->read (&in_progress)) { - return false; - } - size_t size = zmq_msg_size (&in_progress); - - // For messages less than 255 bytes long, write one byte of message size. - // For longer messages write 0xff escape character followed by 8-byte - // message size. - if (size < 255) { - tmpbuf [0] = (unsigned char) size; - next_step (tmpbuf, 1, &zmq_encoder_t::size_ready, true); - } - else { - tmpbuf [0] = 0xff; - put_uint64 (tmpbuf + 1, size); - next_step (tmpbuf, 9, &zmq_encoder_t::size_ready, true); - } - return true; -} diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp deleted file mode 100644 index 829fd4ba..00000000 --- a/src/zmq_encoder.hpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ - -#include "../include/zmq.h" - -#include "encoder.hpp" - -namespace zmq -{ - // Encoder for 0MQ backend protocol. Converts messages into data batches. - - class zmq_encoder_t : public encoder_t - { - public: - - zmq_encoder_t (); - ~zmq_encoder_t (); - - void set_session (struct i_session *source_); - - private: - - bool size_ready (); - bool message_ready (); - - struct i_session *source; - ::zmq_msg in_progress; - unsigned char tmpbuf [9]; - - zmq_encoder_t (const zmq_encoder_t&); - void operator = (const zmq_encoder_t&); - }; -} - -#endif diff --git a/src/zmq_tcp_engine.cpp b/src/zmq_tcp_engine.cpp deleted file mode 100644 index 6091d80b..00000000 --- a/src/zmq_tcp_engine.cpp +++ /dev/null @@ -1,185 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "zmq_tcp_engine.hpp" -#include "config.hpp" -#include "i_session.hpp" -#include "err.hpp" - -zmq::zmq_tcp_engine_t::zmq_tcp_engine_t (fd_t fd_) : - poller (NULL), - session (NULL), - terminating (false), - insize (0), - inpos (0), - outsize (0), - outpos (0) -{ - // Allocate read & write buffer. - inbuf = new unsigned char [in_batch_size]; - zmq_assert (inbuf); - outbuf = new unsigned char [out_batch_size]; - zmq_assert (outbuf); - - // Attach the socket. - int rc = socket.open (fd_); - zmq_assert (rc == 0); -} - -void zmq::zmq_tcp_engine_t::attach (i_poller *poller_, i_session *session_) -{ - zmq_assert (!poller); - poller = poller_; - zmq_assert (!session); - session = session_; - encoder.set_session (session); - decoder.set_session (session); - - // Let session know we are here. - session->set_engine (this); - - // Register the engine with the polling thread. - handle = poller->add_fd (socket.get_fd (), this); - poller->set_pollin (handle); - poller->set_pollout (handle); - - // Flush any pending inbound messages. - in_event (); -} - -void zmq::zmq_tcp_engine_t::detach () -{ - zmq_assert (poller); - poller->rm_fd (handle); - poller = NULL; - zmq_assert (session); - session->set_engine (NULL); - session = NULL; - encoder.set_session (NULL); - decoder.set_session (NULL); -} - -void zmq::zmq_tcp_engine_t::revive () -{ - zmq_assert (poller); - poller->set_pollout (handle); -} - -void zmq::zmq_tcp_engine_t::schedule_terminate () -{ - terminating = true; -} - -void zmq::zmq_tcp_engine_t::terminate () -{ - delete this; -} - -void zmq::zmq_tcp_engine_t::shutdown () -{ - delete this; -} - -zmq::zmq_tcp_engine_t::~zmq_tcp_engine_t () -{ - detach (); - delete [] outbuf; - delete [] inbuf; -} - -void zmq::zmq_tcp_engine_t::in_event () -{ - // If there's no data to process in the buffer, read new data. - if (inpos == insize) { - - // Read as much data as possible to the read buffer. - insize = socket.read (inbuf, in_batch_size); - inpos = 0; - - // Check whether the peer has closed the connection. - if (insize == -1) { - insize = 0; - error (); - return; - } - } - - // Following code should be executed even if there's not a single byte in - // the buffer. There still can be a decoded messages stored in the decoder. - - // Push the data to the decoder. - int nbytes = decoder.write (inbuf + inpos, insize - inpos); - - // Adjust read position. Stop polling for input if we got stuck. - inpos += nbytes; - if (inpos < insize) - poller->reset_pollin (handle); - - // If at least one byte was processed, flush all messages the decoder - // may have produced. If engine is disconnected from session, no need - // to flush the messages. It's important that flush is called at the - // very end of in_event as it may invoke in_event itself. - if (nbytes > 0 && session) - session->flush (); -} - -void zmq::zmq_tcp_engine_t::out_event () -{ - // If write buffer is empty, try to read new data from the encoder. - if (outpos == outsize) { - - outsize = encoder.read (outbuf, out_batch_size); - outpos = 0; - - // If there are no more pipes, engine can be deallocated. - if (terminating) { - terminate (); - return; - } - - // If there is no data to send, stop polling for output. - if (outsize == 0) - poller->reset_pollout (handle); - } - - // If there are any data to write in write buffer, write as much as - // possible to the socket. - if (outpos < outsize) { - int nbytes = socket.write (outbuf + outpos, outsize - outpos); - - // Handle problems with the connection. - if (nbytes == -1) { - error (); - return; - } - - outpos += nbytes; - } -} - -void zmq::zmq_tcp_engine_t::timer_event () -{ - zmq_assert (false); -} - -void zmq::zmq_tcp_engine_t::error () -{ - zmq_assert (false); -} - diff --git a/src/zmq_tcp_engine.hpp b/src/zmq_tcp_engine.hpp deleted file mode 100644 index 6a83cec9..00000000 --- a/src/zmq_tcp_engine.hpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ZMQ_TCP_ENGINE_HPP_INCLUDED__ -#define __ZMQ_ZMQ_TCP_ENGINE_HPP_INCLUDED__ - -#include "i_engine.hpp" -#include "i_poller.hpp" -#include "i_poll_events.hpp" -#include "fd.hpp" -#include "tcp_socket.hpp" -#include "zmq_encoder.hpp" -#include "zmq_decoder.hpp" - -namespace zmq -{ - - class zmq_tcp_engine_t : public i_engine, public i_poll_events - { - public: - - zmq_tcp_engine_t (fd_t fd_); - - // i_engine implementation. - void attach (struct i_poller *poller_, struct i_session *session_); - void detach (); - void revive (); - void schedule_terminate (); - void terminate (); - void shutdown (); - - // i_poll_events implementation. - void in_event (); - void out_event (); - void timer_event (); - - private: - - void error (); - - // Clean-up. - ~zmq_tcp_engine_t (); - - // The underlying TCP socket. - tcp_socket_t socket; - - // Handle associated with the socket. - handle_t handle; - - // I/O thread that the engine runs in. - i_poller *poller; - - // Reference to the associated session object. - i_session *session; - - // If true, engine should terminate itself as soon as possible. - bool terminating; - - unsigned char *inbuf; - int insize; - int inpos; - - unsigned char *outbuf; - int outsize; - int outpos; - - zmq_encoder_t encoder; - zmq_decoder_t decoder; - - zmq_tcp_engine_t (const zmq_tcp_engine_t&); - void operator = (const zmq_tcp_engine_t&); - }; - -} - -#endif