diff --git a/src/Makefile.am b/src/Makefile.am
index 937372f8..d7509dc9 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -55,6 +55,7 @@ libzmq_la_SOURCES = \
blob.hpp \
command.hpp \
config.hpp \
+ connect_session.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
@@ -70,15 +71,17 @@ libzmq_la_SOURCES = \
ip.hpp \
i_engine.hpp \
i_poll_events.hpp \
+ i_terminate_events.hpp \
kqueue.hpp \
lb.hpp \
likely.hpp \
msg_content.hpp \
msg_store.hpp \
mutex.hpp \
+ named_session.hpp \
object.hpp \
options.hpp \
- owned.hpp \
+ own.hpp \
pgm_receiver.hpp \
pgm_sender.hpp \
pgm_socket.hpp \
@@ -106,6 +109,7 @@ libzmq_la_SOURCES = \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
+ transient_session.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -123,6 +127,7 @@ libzmq_la_SOURCES = \
zmq_listener.hpp \
command.cpp \
ctx.cpp \
+ connect_session.cpp \
devpoll.cpp \
epoll.cpp \
err.cpp \
@@ -134,9 +139,10 @@ libzmq_la_SOURCES = \
kqueue.cpp \
lb.cpp \
msg_store.cpp \
+ named_session.cpp \
object.cpp \
options.cpp \
- owned.cpp \
+ own.cpp \
pair.cpp \
pgm_receiver.cpp \
pgm_sender.cpp \
@@ -160,6 +166,7 @@ libzmq_la_SOURCES = \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
+ transient_session.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/command.hpp b/src/command.hpp
index 3d00cd7b..a924b4e8 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -61,7 +61,7 @@ namespace zmq
// Sent to socket to let it know about the newly created object.
struct {
- class owned_t *object;
+ class own_t *object;
} own;
// Attach the engine to the session.
@@ -104,7 +104,7 @@ namespace zmq
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
- class owned_t *object;
+ class own_t *object;
} term_req;
// Sent by socket to I/O object to start its shutdown.
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
new file mode 100644
index 00000000..5c088f64
--- /dev/null
+++ b/src/connect_session.cpp
@@ -0,0 +1,115 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#include "connect_session.hpp"
+#include "zmq_connecter.hpp"
+
+zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_t (io_thread_, socket_, options_),
+ protocol (protocol_),
+ address (address_)
+{
+}
+
+zmq::connect_session_t::~connect_session_t ()
+{
+}
+
+void zmq::connect_session_t::process_plug ()
+{
+ // Start connection process immediately.
+ start_connecting ();
+}
+
+void zmq::connect_session_t::start_connecting ()
+{
+ // Create the connecter object.
+
+ // Both TCP and IPC transports are using the same infrastructure.
+ if (protocol == "tcp" || protocol == "ipc") {
+ zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
+ choose_io_thread (options.affinity), this, options,
+ protocol.c_str (), address.c_str ());
+ zmq_assert (connecter);
+ launch_child (connecter);
+ return;
+ }
+
+#if defined ZMQ_HAVE_OPENPGM
+
+ // Both PGM and EPGM transports are using the same infrastructure.
+ if (addr_type == "pgm" || addr_type == "epgm") {
+
+ // For EPGM transport with UDP encapsulation of PGM is used.
+ bool udp_encapsulation = (addr_type == "epgm");
+
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
+ if (options.requires_out) {
+
+ // PGM sender.
+ pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
+ choose_io_thread (options.affinity), options);
+ zmq_assert (pgm_sender);
+
+ int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
+ if (rc != 0) {
+ delete pgm_sender;
+ return -1;
+ }
+
+ send_attach (this, pgm_sender, blob_t ());
+ }
+ else if (options.requires_in) {
+
+ // PGM receiver.
+ pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
+ choose_io_thread (options.affinity), options);
+ zmq_assert (pgm_receiver);
+
+ int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
+ if (rc != 0) {
+ delete pgm_receiver;
+ return -1;
+ }
+
+ send_attach (this, pgm_receiver, blob_t ());
+ }
+ else
+ zmq_assert (false);
+
+ return;
+ }
+#endif
+
+ zmq_assert (false);
+}
+
+void zmq::connect_session_t::detach ()
+{
+ // Clean up the mess left over by the failed connection.
+ clean_pipes ();
+
+ // Reconnect.
+ start_connecting ();
+}
+
diff --git a/src/connect_session.hpp b/src/connect_session.hpp
new file mode 100644
index 00000000..8303dda2
--- /dev/null
+++ b/src/connect_session.hpp
@@ -0,0 +1,60 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
+#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
+
+#include
+
+#include "session.hpp"
+
+namespace zmq
+{
+
+ // Connect session contains an address to connect to. On disconnect it
+ // attempts to reconnect.
+
+ class connect_session_t : public session_t
+ {
+ public:
+
+ connect_session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~connect_session_t ();
+
+ // i_inout interface implementation.
+ void detach ();
+
+ private:
+
+ // Start the connection process.
+ void start_connecting ();
+
+ // Command handlers.
+ void process_plug ();
+
+ // Address to connect to.
+ std::string protocol;
+ std::string address;
+ };
+
+}
+
+#endif
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 91157a55..d096b915 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -119,6 +119,7 @@ int zmq::ctx_t::term ()
// We don't even have to synchronise access to data.
zmq_assert (sockets.empty ());
+// TODO: We are accessing the list of zombies in unsynchronised manner here!
// Get rid of remaining zombie sockets.
while (!zombies.empty ()) {
dezombify ();
@@ -173,7 +174,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return s;
}
-void zmq::ctx_t::zombify (socket_base_t *socket_)
+void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
{
// Zombification of socket basically means that its ownership is tranferred
// from the application that created it to the context.
@@ -284,7 +285,8 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
void zmq::ctx_t::dezombify ()
{
- // Try to dezombify each zombie in the list.
+ // Try to dezombify each zombie in the list. Note that caller is
+ // responsible for calling this method in the slot_sync critical section.
for (zombies_t::size_type i = 0; i != zombies.size ();)
if (zombies [i]->dezombify ()) {
empty_slots.push_back (zombies [i]->get_slot ());
diff --git a/src/ctx.hpp b/src/ctx.hpp
index cb9a2d9c..c44cca6d 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -58,7 +58,7 @@ namespace zmq
class socket_base_t *create_socket (int type_);
// Make socket a zombie.
- void zombify (socket_base_t *socket_);
+ void zombify_socket (socket_base_t *socket_);
// Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_);
diff --git a/src/fq.cpp b/src/fq.cpp
index ddade280..8f6485f0 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -22,11 +22,14 @@
#include "fq.hpp"
#include "pipe.hpp"
#include "err.hpp"
+#include "i_terminate_events.hpp"
-zmq::fq_t::fq_t () :
+zmq::fq_t::fq_t (i_terminate_events *sink_) :
active (0),
current (0),
- more (false)
+ more (false),
+ sink (sink_),
+ terminating (false)
{
}
@@ -42,6 +45,10 @@ void zmq::fq_t::attach (reader_t *pipe_)
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
+
+ // If we are already terminating, ask the pipe to terminate straight away.
+ if (terminating)
+ pipe_->terminate ();
}
void zmq::fq_t::terminated (reader_t *pipe_)
@@ -59,15 +66,15 @@ void zmq::fq_t::terminated (reader_t *pipe_)
current = 0;
}
pipes.erase (pipe_);
+
+ if (terminating && pipes.empty ())
+ sink->terminated ();
}
-bool zmq::fq_t::has_pipes ()
+void zmq::fq_t::terminate ()
{
- return !pipes.empty ();
-}
+ terminating = true;
-void zmq::fq_t::term_pipes ()
-{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
diff --git a/src/fq.hpp b/src/fq.hpp
index 2e09809c..97e94695 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -33,12 +33,11 @@ namespace zmq
{
public:
- fq_t ();
+ fq_t (struct i_terminate_events *sink_);
~fq_t ();
void attach (reader_t *pipe_);
- bool has_pipes ();
- void term_pipes ();
+ void terminate ();
int recv (zmq_msg_t *msg_, int flags_);
bool has_in ();
@@ -64,6 +63,12 @@ namespace zmq
// there are following parts still waiting in the current pipe.
bool more;
+ // Object to send events to.
+ i_terminate_events *sink;
+
+ // If true, termination process is already underway.
+ bool terminating;
+
fq_t (const fq_t&);
void operator = (const fq_t&);
};
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index ea6b850d..0ba94f5b 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -20,8 +20,6 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
-#include
-
namespace zmq
{
@@ -30,18 +28,19 @@ namespace zmq
virtual ~i_engine () {}
// Plug the engine to the session.
- virtual void plug (struct i_inout *inout_) = 0;
+ virtual void plug (class io_thread_t *io_thread_,
+ struct i_inout *inout_) = 0;
// Unplug the engine from the session.
virtual void unplug () = 0;
- // This method is called by the session to signalise that there
- // are messages to send available.
- virtual void revive () = 0;
-
// This method is called by the session to signalise that more
// messages can be written to the pipe.
- virtual void resume_input () = 0;
+ virtual void activate_in () = 0;
+
+ // This method is called by the session to signalise that there
+ // are messages to send available.
+ virtual void activate_out () = 0;
};
}
diff --git a/src/i_inout.hpp b/src/i_inout.hpp
index 21d1838a..60bc5188 100644
--- a/src/i_inout.hpp
+++ b/src/i_inout.hpp
@@ -31,28 +31,17 @@ namespace zmq
{
virtual ~i_inout () {}
- // Engine asks to get a message to send to the network.
+ // Engine asks for a message to send to the network.
virtual bool read (::zmq_msg_t *msg_) = 0;
- // Engine sends the incoming message further on downstream.
+ // Engine received message from the network and sends it further on.
virtual bool write (::zmq_msg_t *msg_) = 0;
- // Flush all the previously written messages downstream.
+ // Flush all the previously written messages.
virtual void flush () = 0;
-
- // Drop all the references to the engine. The parameter is the object
- // to use to reconnect. If reconnection is not required, the argument
- // is set to NULL.
- virtual void detach (class owned_t *reconnecter_) = 0;
- // Returns least loaded I/O thread.
- virtual class io_thread_t *get_io_thread () = 0;
-
- // Return pointer to the owning socket.
- virtual class socket_base_t *get_owner () = 0;
-
- // Return ordinal number of the session.
- virtual uint64_t get_ordinal () = 0;
+ // Engine is dead. Drop all the references to it.
+ virtual void detach () = 0;
};
}
diff --git a/src/i_terminate_events.hpp b/src/i_terminate_events.hpp
new file mode 100644
index 00000000..08599ffe
--- /dev/null
+++ b/src/i_terminate_events.hpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
+#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
+
+namespace zmq
+{
+
+ // Algorithms such as fair queueing (fq_t) and load balancing (lb_t)
+ // use this interface to communicate termination event to the socket.
+
+ struct i_terminate_events
+ {
+ virtual ~i_terminate_events () {}
+
+ virtual void terminated () = 0;
+ };
+
+}
+
+#endif
diff --git a/src/io_object.cpp b/src/io_object.cpp
index 086f1730..b3b45ee4 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -21,21 +21,35 @@
#include "io_thread.hpp"
#include "err.hpp"
-zmq::io_object_t::io_object_t (io_thread_t *io_thread_)
+zmq::io_object_t::io_object_t (io_thread_t *io_thread_) :
+ poller (NULL)
{
- // Retrieve the poller from the thread we are running in.
- poller = io_thread_->get_poller ();
+ if (io_thread_)
+ plug (io_thread_);
}
zmq::io_object_t::~io_object_t ()
{
}
-void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
+void zmq::io_object_t::plug (io_thread_t *io_thread_)
{
+ zmq_assert (io_thread_);
+ zmq_assert (!poller);
+
+ // Retrieve the poller from the thread we are running in.
poller = io_thread_->get_poller ();
}
+void zmq::io_object_t::unplug ()
+{
+ zmq_assert (poller);
+
+ // Forget about old poller in preparation to be migrated
+ // to a different I/O thread.
+ poller = NULL;
+}
+
zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_)
{
return poller->add_fd (fd_, this);
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 655e7f58..284e6d1c 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -40,15 +40,15 @@ namespace zmq
io_object_t (class io_thread_t *io_thread_ = NULL);
~io_object_t ();
+ // When migrating an object from one I/O thread to another, first
+ // unplug it, then migrate it, then plug it to the new thread.
+ void plug (class io_thread_t *io_thread_);
+ void unplug ();
+
protected:
typedef poller_t::handle_t handle_t;
- // Derived class can init/swap the underlying I/O thread.
- // Caution: Remove all the file descriptors from the old I/O thread
- // before swapping to the new one!
- void set_io_thread (class io_thread_t *io_thread_);
-
// Methods to access underlying poller object.
handle_t add_fd (fd_t fd_);
void rm_fd (handle_t handle_);
diff --git a/src/lb.cpp b/src/lb.cpp
index ccfaaaed..2468b484 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -22,11 +22,14 @@
#include "lb.hpp"
#include "pipe.hpp"
#include "err.hpp"
+#include "i_terminate_events.hpp"
-zmq::lb_t::lb_t () :
+zmq::lb_t::lb_t (i_terminate_events *sink_) :
active (0),
current (0),
- more (false)
+ more (false),
+ sink (sink_),
+ terminating (false)
{
}
@@ -42,17 +45,22 @@ void zmq::lb_t::attach (writer_t *pipe_)
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
+
+ if (terminating)
+ pipe_->terminate ();
}
-void zmq::lb_t::term_pipes ()
+void zmq::lb_t::terminate ()
{
+ terminating = true;
+
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
void zmq::lb_t::terminated (writer_t *pipe_)
{
- // ???
+ // TODO: ???
zmq_assert (!more || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes
@@ -63,11 +71,9 @@ void zmq::lb_t::terminated (writer_t *pipe_)
current = 0;
}
pipes.erase (pipe_);
-}
-bool zmq::lb_t::has_pipes ()
-{
- return !pipes.empty ();
+ if (terminating && pipes.empty ())
+ sink->terminated ();
}
void zmq::lb_t::activated (writer_t *pipe_)
diff --git a/src/lb.hpp b/src/lb.hpp
index e69385e4..cb2ce721 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -32,12 +32,11 @@ namespace zmq
{
public:
- lb_t ();
+ lb_t (struct i_terminate_events *sink_);
~lb_t ();
void attach (writer_t *pipe_);
- void term_pipes ();
- bool has_pipes ();
+ void terminate ();
int send (zmq_msg_t *msg_, int flags_);
bool has_out ();
@@ -61,6 +60,12 @@ namespace zmq
// True if last we are in the middle of a multipart message.
bool more;
+ // Object to send events to.
+ struct i_terminate_events *sink;
+
+ // If true, termination process is already underway.
+ bool terminating;
+
lb_t (const lb_t&);
void operator = (const lb_t&);
};
diff --git a/src/named_session.cpp b/src/named_session.cpp
new file mode 100644
index 00000000..d219286e
--- /dev/null
+++ b/src/named_session.cpp
@@ -0,0 +1,87 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#include "named_session.hpp"
+#include "socket_base.hpp"
+
+/*
+zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
+ socket_base_t *socket_, const options_t &options_,
+ const blob_t &name_) :
+ session_t (io_thread_, socket_, options_),
+ name (name_)
+{
+ // Make double sure that the session has valid name.
+ zmq_assert (!name.empty ());
+ zmq_assert (name [0] != 0);
+
+ if (!socket_->register_session (name, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should log the error and drop the
+ // session.
+ zmq_assert (false);
+ }
+}
+
+zmq::named_session_t::~named_session_t ()
+{
+}
+
+void zmq::named_session_t::detach ()
+{
+ // TODO:
+ zmq_assert (false);
+}
+
+void zmq::named_session_t::attached (const blob_t &peer_identity_)
+{
+ if (!peer_identity.empty ()) {
+
+ // If both IDs are temporary, no checking is needed.
+ // TODO: Old ID should be reused in this case...
+ if (peer_identity.empty () || peer_identity [0] != 0 ||
+ peer_identity_.empty () || peer_identity_ [0] != 0) {
+
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity == peer_identity_);
+ }
+ }
+ else if (!peer_identity_.empty ()) {
+
+ // Store the peer identity.
+ peer_identity = peer_identity_;
+
+ // Register the session using the peer name.
+ if (!register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
+}
+
+void zmq::named_session_t::detached ()
+{
+ socket->unregister_session (peer_identity);
+}
+*/
diff --git a/src/named_session.hpp b/src/named_session.hpp
new file mode 100644
index 00000000..7248c8f6
--- /dev/null
+++ b/src/named_session.hpp
@@ -0,0 +1,56 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_NAMED_SESSION_HPP_INCLUDED__
+#define __ZMQ_NAMED_SESSION_HPP_INCLUDED__
+
+#include "session.hpp"
+#include "blob.hpp"
+
+namespace zmq
+{
+
+ // Named session is created by listener object when the peer identifies
+ // itself by a strong name. Named session survives reconnections.
+
+ class named_session_t : public session_t
+ {
+ public:
+
+ named_session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_,
+ const blob_t &name_);
+ ~named_session_t ();
+
+ // i_inout interface implementation.
+ void detach ();
+
+ // Handle events from session_t base class.
+ void attached (const blob_t &peer_identity_);
+ void detached ();
+
+ private:
+
+ // Name of the session. Corresponds to the peer's strong identity.
+ blob_t name;
+ };
+
+}
+
+#endif
diff --git a/src/object.cpp b/src/object.cpp
index cdb177f5..a8294b0f 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -24,7 +24,6 @@
#include "err.hpp"
#include "pipe.hpp"
#include "io_thread.hpp"
-#include "owned.hpp"
#include "session.hpp"
#include "socket_base.hpp"
@@ -143,9 +142,9 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
return ctx->choose_io_thread (taskset_);
}
-void zmq::object_t::zombify (socket_base_t *socket_)
+void zmq::object_t::zombify_socket (socket_base_t *socket_)
{
- ctx->zombify (socket_);
+ ctx->zombify_socket (socket_);
}
void zmq::object_t::send_stop ()
@@ -158,7 +157,7 @@ void zmq::object_t::send_stop ()
ctx->send_command (slot, cmd);
}
-void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
+void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -169,7 +168,7 @@ void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
send_command (cmd);
}
-void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
+void zmq::object_t::send_own (own_t *destination_, own_t *object_)
{
destination_->inc_seqnum ();
command_t cmd;
@@ -206,9 +205,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
send_command (cmd);
}
-void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
- bool inc_seqnum_)
+void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
+ writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -269,8 +267,8 @@ void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
send_command (cmd);
}
-void zmq::object_t::send_term_req (socket_base_t *destination_,
- owned_t *object_)
+void zmq::object_t::send_term_req (own_t *destination_,
+ own_t *object_)
{
command_t cmd;
cmd.destination = destination_;
@@ -279,7 +277,7 @@ void zmq::object_t::send_term_req (socket_base_t *destination_,
send_command (cmd);
}
-void zmq::object_t::send_term (owned_t *destination_)
+void zmq::object_t::send_term (own_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
@@ -287,7 +285,7 @@ void zmq::object_t::send_term (owned_t *destination_)
send_command (cmd);
}
-void zmq::object_t::send_term_ack (socket_base_t *destination_)
+void zmq::object_t::send_term_ack (own_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
@@ -305,7 +303,7 @@ void zmq::object_t::process_plug ()
zmq_assert (false);
}
-void zmq::object_t::process_own (owned_t *object_)
+void zmq::object_t::process_own (own_t *object_)
{
zmq_assert (false);
}
@@ -342,7 +340,7 @@ void zmq::object_t::process_pipe_term_ack ()
zmq_assert (false);
}
-void zmq::object_t::process_term_req (owned_t *object_)
+void zmq::object_t::process_term_req (own_t *object_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index c75a95aa..e083ce3c 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -53,18 +53,19 @@ namespace zmq
// Zombify particular socket. In other words, pass the ownership to
// the context.
- void zombify (class socket_base_t *socket_);
+ void zombify_socket (class socket_base_t *socket_);
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
- void send_plug (class owned_t *destination_, bool inc_seqnum_ = true);
- void send_own (class socket_base_t *destination_,
- class owned_t *object_);
+ void send_plug (class own_t *destination_,
+ bool inc_seqnum_ = true);
+ void send_own (class own_t *destination_,
+ class own_t *object_);
void send_attach (class session_t *destination_,
struct i_engine *engine_, const blob_t &peer_identity_,
bool inc_seqnum_ = true);
- void send_bind (class socket_base_t *destination_,
+ void send_bind (class own_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
@@ -72,16 +73,16 @@ namespace zmq
uint64_t msgs_read_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
- void send_term_req (class socket_base_t *destination_,
- class owned_t *object_);
- void send_term (class owned_t *destination_);
- void send_term_ack (class socket_base_t *destination_);
+ void send_term_req (class own_t *destination_,
+ class own_t *object_);
+ void send_term (class own_t *destination_);
+ void send_term_ack (class own_t *destination_);
// These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread.
virtual void process_stop ();
virtual void process_plug ();
- virtual void process_own (class owned_t *object_);
+ virtual void process_own (class own_t *object_);
virtual void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
@@ -90,7 +91,7 @@ namespace zmq
virtual void process_reader_info (uint64_t msgs_read_);
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
- virtual void process_term_req (class owned_t *object_);
+ virtual void process_term_req (class own_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
diff --git a/src/own.cpp b/src/own.cpp
new file mode 100644
index 00000000..d90e9c4e
--- /dev/null
+++ b/src/own.cpp
@@ -0,0 +1,198 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#include "own.hpp"
+#include "err.hpp"
+#include "io_thread.hpp"
+
+zmq::own_t::own_t (class ctx_t *parent_, uint32_t slot_) :
+ object_t (parent_, slot_),
+ terminating (false),
+ sent_seqnum (0),
+ processed_seqnum (0),
+ owner (NULL),
+ term_acks (0)
+{
+}
+
+zmq::own_t::own_t (io_thread_t *io_thread_) :
+ object_t (io_thread_),
+ terminating (false),
+ sent_seqnum (0),
+ processed_seqnum (0),
+ owner (NULL),
+ term_acks (0)
+{
+}
+
+zmq::own_t::~own_t ()
+{
+}
+
+void zmq::own_t::set_owner (own_t *owner_)
+{
+ zmq_assert (!owner);
+ owner = owner_;
+}
+
+void zmq::own_t::inc_seqnum ()
+{
+ // This function may be called from a different thread!
+ sent_seqnum.add (1);
+}
+
+void zmq::own_t::process_seqnum ()
+{
+ // Catch up with counter of processed commands.
+ processed_seqnum++;
+
+ // We may have catched up and still have pending terms acks.
+ check_term_acks ();
+}
+
+void zmq::own_t::launch_child (own_t *object_)
+{
+ // Specify the owner of the object.
+ object_->set_owner (this);
+
+ // Plug the object into the I/O thread.
+ send_plug (object_);
+
+ // Take ownership of the object.
+ send_own (this, object_);
+}
+
+void zmq::own_t::launch_sibling (own_t *object_)
+{
+ // Specify the owner of the object.
+ object_->set_owner (owner);
+
+ // Plug the object into its I/O thread.
+ send_plug (object_);
+
+ // Take ownership of the object.
+ send_own (owner, object_);
+}
+
+void zmq::own_t::process_term_req (own_t *object_)
+{
+ // When shutting down we can ignore termination requests from owned
+ // objects. The termination request was already sent to the object.
+ if (terminating)
+ return;
+
+ // If I/O object is well and alive let's ask it to terminate.
+ owned_t::iterator it = std::find (owned.begin (), owned.end (), object_);
+
+ // If not found, we assume that termination request was already sent to
+ // the object so we can safely ignore the request.
+ if (it == owned.end ())
+ return;
+
+ owned.erase (it);
+ register_term_acks (1);
+ send_term (object_);
+}
+
+void zmq::own_t::process_own (own_t *object_)
+{
+ // If the object is already being shut down, new owned objects are
+ // immediately asked to terminate.
+ if (terminating) {
+ register_term_acks (1);
+ send_term (object_);
+ return;
+ }
+
+ // Store the reference to the owned object.
+ owned.insert (object_);
+}
+
+void zmq::own_t::terminate ()
+{
+ // If termination is already underway, there's no point
+ // in starting it anew.
+ if (terminating)
+ return;
+
+ // As for the root of the ownership tree, there's noone to terminate it,
+ // so it has to terminate itself.
+ if (!owner) {
+ process_term ();
+ return;
+ }
+
+ // If I am an owned object, I'll ask my owner to terminate me.
+ send_term_req (owner, this);
+}
+
+void zmq::own_t::process_term ()
+{
+ // Double termination should never happen.
+ zmq_assert (!terminating);
+
+ // Send termination request to all owned objects.
+ for (owned_t::iterator it = owned.begin (); it != owned.end (); it++)
+ send_term (*it);
+ register_term_acks (owned.size ());
+ owned.clear ();
+
+ // Start termination process and check whether by chance we cannot
+ // terminate immediately.
+ terminating = true;
+ check_term_acks ();
+}
+
+void zmq::own_t::register_term_acks (int count_)
+{
+ term_acks += count_;
+}
+
+void zmq::own_t::unregister_term_ack ()
+{
+ zmq_assert (term_acks > 0);
+ term_acks--;
+
+ // This may be a last ack we are waiting for before termination...
+ check_term_acks ();
+}
+
+void zmq::own_t::process_term_ack ()
+{
+ unregister_term_ack ();
+}
+
+void zmq::own_t::check_term_acks ()
+{
+ if (terminating && processed_seqnum == sent_seqnum.get () &&
+ term_acks == 0) {
+
+ // Sanity check. There should be no active children at this point.
+ zmq_assert (owned.empty ());
+
+ // The root object has nobody to confirm the termination to.
+ // Other nodes will confirm the termination to the owner.
+ if (owner)
+ send_term_ack (owner);
+
+ // Deallocate the resources.
+ delete this;
+ }
+}
+
diff --git a/src/own.hpp b/src/own.hpp
new file mode 100644
index 00000000..dc14fcca
--- /dev/null
+++ b/src/own.hpp
@@ -0,0 +1,132 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_OWN_HPP_INCLUDED__
+#define __ZMQ_OWN_HPP_INCLUDED__
+
+#include
+#include
+
+#include "object.hpp"
+#include "atomic_counter.hpp"
+#include "stdint.hpp"
+
+namespace zmq
+{
+
+ // Base class for objects forming a part of ownership hierarchy.
+ // It handles initialisation and destruction of such objects.
+
+ class own_t : public object_t
+ {
+ public:
+
+ // Note that the owner is unspecified in the constructor.
+ // It'll be supplied later on when the object is plugged in.
+
+ // The object is not living within an I/O thread. It has it's own
+ // thread outside of 0MQ infrastructure.
+ own_t (class ctx_t *parent_, uint32_t slot_);
+
+ // The object is living within I/O thread.
+ own_t (class io_thread_t *io_thread_);
+
+ // When another owned object wants to send command to this object
+ // it calls this function to let it know it should not shut down
+ // before the command is delivered.
+ void inc_seqnum ();
+
+ protected:
+
+ // Launch the supplied object and become its owner.
+ void launch_child (own_t *object_);
+
+ // Launch the supplied object and make it your sibling (make your
+ // owner become its owner as well).
+ void launch_sibling (own_t *object_);
+
+ // Ask owner object to terminate this object. It may take a while
+ // while actual termination is started. This function should not be
+ // called more than once.
+ void terminate ();
+
+ // Derived object destroys own_t. There's no point in allowing
+ // others to invoke the destructor. At the same time, it has to be
+ // virtual so that generic own_t deallocation mechanism destroys
+ // specific type of the owned object correctly.
+ virtual ~own_t ();
+
+ // Term handler is protocted rather than private so that it can
+ // be intercepted by the derived class. This is useful to add custom
+ // steps to the beginning of the termination process.
+ void process_term ();
+
+ // Use following two functions to wait for arbitrary events before
+ // terminating. Just add number of events to wait for using
+ // register_tem_acks functions. When event occurs, call
+ // remove_term_ack. When number of pending acks reaches zero
+ // object will be deallocated.
+ void register_term_acks (int count_);
+ void unregister_term_ack ();
+
+ private:
+
+ // Set owner of the object
+ void set_owner (own_t *owner_);
+
+ // Handlers for incoming commands.
+ void process_own (own_t *object_);
+ void process_term_req (own_t *object_);
+ void process_term_ack ();
+
+ void process_seqnum ();
+
+ // Check whether all the peding term acks were delivered.
+ // If so, deallocate this object.
+ void check_term_acks ();
+
+ // True if termination was already initiated. If so, we can destroy
+ // the object if there are no more child objects or pending term acks.
+ bool terminating;
+
+ // Sequence number of the last command sent to this object.
+ atomic_counter_t sent_seqnum;
+
+ // Sequence number of the last command processed by this object.
+ uint64_t processed_seqnum;
+
+ // Socket owning this object. It's responsible for shutting down
+ // this object.
+ own_t *owner;
+
+ // List of all objects owned by this socket. We are responsible
+ // for deallocating them before we quit.
+ typedef std::set owned_t;
+ owned_t owned;
+
+ // Number of events we have to get before we can destroy the object.
+ int term_acks;
+
+ own_t (const own_t&);
+ void operator = (const own_t&);
+ };
+
+}
+
+#endif
diff --git a/src/owned.cpp b/src/owned.cpp
deleted file mode 100644
index 7d1cf5eb..00000000
--- a/src/owned.cpp
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see .
-*/
-
-#include "owned.hpp"
-#include "err.hpp"
-
-zmq::owned_t::owned_t (object_t *parent_, socket_base_t *owner_) :
- object_t (parent_),
- owner (owner_),
- sent_seqnum (0),
- processed_seqnum (0),
- shutting_down (false)
-{
-}
-
-zmq::owned_t::~owned_t ()
-{
-}
-
-void zmq::owned_t::inc_seqnum ()
-{
- // This function may be called from a different thread!
- sent_seqnum.add (1);
-}
-
-void zmq::owned_t::term ()
-{
- send_term_req (owner, this);
-}
-
-void zmq::owned_t::process_term ()
-{
- zmq_assert (!shutting_down);
- shutting_down = true;
- finalise ();
-}
-
-void zmq::owned_t::process_seqnum ()
-{
- // Catch up with counter of processed commands.
- processed_seqnum++;
- finalise ();
-}
-
-void zmq::owned_t::finalise ()
-{
- // If termination request was already received and there are no more
- // commands to wait for, terminate the object.
- if (shutting_down && processed_seqnum == sent_seqnum.get ()
- && is_terminable ()) {
- process_unplug ();
- send_term_ack (owner);
- delete this;
- }
-}
-
-bool zmq::owned_t::is_terminable ()
-{
- return true;
-}
-
diff --git a/src/owned.hpp b/src/owned.hpp
deleted file mode 100644
index 80cf42fd..00000000
--- a/src/owned.hpp
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see .
-*/
-
-#ifndef __ZMQ_OWNED_HPP_INCLUDED__
-#define __ZMQ_OWNED_HPP_INCLUDED__
-
-#include "socket_base.hpp"
-#include "atomic_counter.hpp"
-#include "stdint.hpp"
-
-namespace zmq
-{
-
- // Base class for objects owned by individual sockets. Handles
- // initialisation and destruction of such objects.
-
- class owned_t : public object_t
- {
- public:
-
- // The object will live in parent's thread, however, its lifetime
- // will be managed by its owner socket.
- owned_t (object_t *parent_, socket_base_t *owner_);
-
- // When another owned object wants to send command to this object
- // it calls this function to let it know it should not shut down
- // before the command is delivered.
- void inc_seqnum ();
-
- protected:
-
- // A mechanism allowing derived owned objects to postpone the
- // termination process. Default implementation defines no such delay.
- // Note that the derived object has to call finalise method when the
- // delay is over.
- virtual bool is_terminable ();
- void finalise ();
-
- // Ask owner socket to terminate this object.
- void term ();
-
- // Derived object destroys owned_t. No point in allowing others to
- // invoke the destructor. At the same time, it has to be virtual so
- // that generic owned_t deallocation mechanism destroys specific type
- // of the owned object correctly.
- virtual ~owned_t ();
-
- // io_object_t defines a new handler used to disconnect the object
- // from the poller object. Implement the handlen in the derived
- // classes to ensure sane cleanup.
- virtual void process_unplug () = 0;
-
- // Socket owning this object. When the socket is being closed it's
- // responsible for shutting down this object.
- socket_base_t *owner;
-
- private:
-
- // Handlers for incoming commands.
- void process_term ();
- void process_seqnum ();
-
- // Sequence number of the last command sent to this object.
- atomic_counter_t sent_seqnum;
-
- // Sequence number of the last command processed by this object.
- uint64_t processed_seqnum;
-
- // If true, the object is already shutting down.
- bool shutting_down;
-
- owned_t (const owned_t&);
- void operator = (const owned_t&);
- };
-
-}
-
-#endif
diff --git a/src/pair.cpp b/src/pair.cpp
index 1ff2e1aa..8db2ffce 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -28,7 +28,8 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) :
inpipe (NULL),
outpipe (NULL),
inpipe_alive (false),
- outpipe_alive (false)
+ outpipe_alive (false),
+ terminating (false)
{
options.requires_in = true;
options.requires_out = true;
@@ -43,6 +44,7 @@ zmq::pair_t::~pair_t ()
void zmq::pair_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
+ zmq_assert (!terminating);
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
@@ -59,6 +61,9 @@ void zmq::pair_t::terminated (class reader_t *pipe_)
zmq_assert (pipe_ == inpipe);
inpipe = NULL;
inpipe_alive = false;
+
+ if (terminating)
+ unregister_term_ack ();
}
void zmq::pair_t::terminated (class writer_t *pipe_)
@@ -66,19 +71,22 @@ void zmq::pair_t::terminated (class writer_t *pipe_)
zmq_assert (pipe_ == outpipe);
outpipe = NULL;
outpipe_alive = false;
+
+ if (terminating)
+ unregister_term_ack ();
}
-void zmq::pair_t::xterm_pipes ()
+void zmq::pair_t::process_term ()
{
- if (inpipe)
- inpipe->terminate ();
- if (outpipe)
- outpipe->terminate ();
-}
+ zmq_assert (inpipe && outpipe);
-bool zmq::pair_t::xhas_pipes ()
-{
- return inpipe != NULL || outpipe != NULL;
+ terminating = true;
+
+ register_term_acks (2);
+ inpipe->terminate ();
+ outpipe->terminate ();
+
+ socket_base_t::process_term ();
}
void zmq::pair_t::activated (class reader_t *pipe_)
diff --git a/src/pair.hpp b/src/pair.hpp
index 0c484d73..65b474e1 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -39,8 +39,6 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xterm_pipes ();
- bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
@@ -56,12 +54,17 @@ namespace zmq
private:
+ // Hook into termination process.
+ void process_term ();
+
class reader_t *inpipe;
class writer_t *outpipe;
bool inpipe_alive;
bool outpipe_alive;
+ bool terminating;
+
pair_t (const pair_t&);
void operator = (const pair_t&);
};
diff --git a/src/pub.cpp b/src/pub.cpp
index d1d1c724..2d0dea20 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -26,7 +26,8 @@
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_),
- active (0)
+ active (0),
+ terminating (false)
{
options.requires_in = false;
options.requires_out = true;
@@ -40,6 +41,7 @@ zmq::pub_t::~pub_t ()
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
+ zmq_assert (!terminating);
zmq_assert (!inpipe_);
outpipe_->set_event_sink (this);
@@ -47,18 +49,26 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
+
+ if (terminating) {
+ register_term_acks (1);
+ outpipe_->terminate ();
+ }
}
-void zmq::pub_t::xterm_pipes ()
+void zmq::pub_t::process_term ()
{
+ terminating = true;
+
// Start shutdown process for all the pipes.
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
-}
-bool zmq::pub_t::xhas_pipes ()
-{
- return !pipes.empty ();
+ // Wait for pipes to terminate before terminating yourself.
+ register_term_acks (pipes.size ());
+
+ // Continue with the termination immediately.
+ socket_base_t::process_term ();
}
void zmq::pub_t::activated (writer_t *pipe_)
@@ -75,6 +85,10 @@ void zmq::pub_t::terminated (writer_t *pipe_)
if (pipes.index (pipe_) < active)
active--;
pipes.erase (pipe_);
+
+ // If we are already terminating, wait for one term ack less.
+ if (terminating)
+ unregister_term_ack ();
}
int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
diff --git a/src/pub.hpp b/src/pub.hpp
index a81edfed..edc9b53e 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -37,8 +37,6 @@ namespace zmq
// Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xterm_pipes ();
- bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
bool xhas_out ();
@@ -48,6 +46,9 @@ namespace zmq
private:
+ // Hook into the termination process.
+ void process_term ();
+
// Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned.
bool write (class writer_t *pipe_, zmq_msg_t *msg_);
@@ -60,6 +61,9 @@ namespace zmq
// beginning of the pipes array.
pipes_t::size_type active;
+ // True if termination process is already underway.
+ bool terminating;
+
pub_t (const pub_t&);
void operator = (const pub_t&);
};
diff --git a/src/pull.cpp b/src/pull.cpp
index 4f4a8b3d..e7b5239f 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -23,7 +23,8 @@
#include "err.hpp"
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t slot_) :
- socket_base_t (parent_, slot_)
+ socket_base_t (parent_, slot_),
+ fq (this)
{
options.requires_in = true;
options.requires_out = false;
@@ -40,14 +41,17 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
-void zmq::pull_t::xterm_pipes ()
+void zmq::pull_t::process_term ()
{
- fq.term_pipes ();
+ register_term_acks (1);
+ fq.terminate ();
+
+ socket_base_t::process_term ();
}
-bool zmq::pull_t::xhas_pipes ()
+void zmq::pull_t::terminated ()
{
- return fq.has_pipes ();
+ unregister_term_ack ();
}
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
diff --git a/src/pull.hpp b/src/pull.hpp
index 4be40dd3..997eebf1 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -17,32 +17,39 @@
along with this program. If not, see .
*/
-#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
-#define __ZMQ_UPSTREAM_HPP_INCLUDED__
+#ifndef __ZMQ_PULL_HPP_INCLUDED__
+#define __ZMQ_PULL_HPP_INCLUDED__
+#include "i_terminate_events.hpp"
#include "socket_base.hpp"
#include "fq.hpp"
namespace zmq
{
- class pull_t : public socket_base_t
+ class pull_t : public socket_base_t, public i_terminate_events
{
public:
pull_t (class ctx_t *parent_, uint32_t slot_);
~pull_t ();
+ protected:
+
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xterm_pipes ();
- bool xhas_pipes ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
private:
+ // i_terminate_events interface implementation.
+ void terminated ();
+
+ // Hook into the termination process.
+ void process_term ();
+
// Fair queueing object for inbound pipes.
fq_t fq;
diff --git a/src/push.cpp b/src/push.cpp
index 3a3d258b..f587cef6 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -24,7 +24,8 @@
#include "pipe.hpp"
zmq::push_t::push_t (class ctx_t *parent_, uint32_t slot_) :
- socket_base_t (parent_, slot_)
+ socket_base_t (parent_, slot_),
+ lb (this)
{
options.requires_in = false;
options.requires_out = true;
@@ -41,14 +42,17 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
-void zmq::push_t::xterm_pipes ()
+void zmq::push_t::process_term ()
{
- lb.term_pipes ();
+ register_term_acks (1);
+ lb.terminate ();
+
+ socket_base_t::process_term ();
}
-bool zmq::push_t::xhas_pipes ()
+void zmq::push_t::terminated ()
{
- return lb.has_pipes ();
+ unregister_term_ack ();
}
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
diff --git a/src/push.hpp b/src/push.hpp
index e604abcf..aed26629 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -17,32 +17,39 @@
along with this program. If not, see .
*/
-#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
-#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#ifndef __ZMQ_PUSH_HPP_INCLUDED__
+#define __ZMQ_PUSH_HPP_INCLUDED__
+#include "i_terminate_events.hpp"
#include "socket_base.hpp"
#include "lb.hpp"
namespace zmq
{
- class push_t : public socket_base_t
+ class push_t : public socket_base_t, public i_terminate_events
{
public:
push_t (class ctx_t *parent_, uint32_t slot_);
~push_t ();
+ protected:
+
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
- void xterm_pipes ();
- bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
bool xhas_out ();
private:
+ // i_terminate_events interface implementation.
+ void terminated ();
+
+ // Hook into the termination process.
+ void process_term ();
+
// Load balancer managing the outbound pipes.
lb_t lb;
diff --git a/src/session.cpp b/src/session.cpp
index 3c748981..0494ff19 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -20,47 +20,25 @@
#include
#include "session.hpp"
+#include "socket_base.hpp"
#include "i_engine.hpp"
#include "err.hpp"
#include "pipe.hpp"
-zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_) :
- owned_t (parent_, owner_),
+zmq::session_t::session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_) :
+ own_t (io_thread_),
+ options (options_),
in_pipe (NULL),
incomplete_in (false),
active (true),
out_pipe (NULL),
engine (NULL),
- options (options_)
+ socket (socket_),
+ io_thread (io_thread_),
+ attach_processed (false),
+ term_processed (false)
{
- // It's possible to register the session at this point as it will be
- // searched for only on reconnect, i.e. no race condition (session found
- // before it is plugged into it's I/O thread) is possible.
- ordinal = owner->register_session (this);
-}
-
-zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const blob_t &peer_identity_) :
- owned_t (parent_, owner_),
- in_pipe (NULL),
- incomplete_in (false),
- active (true),
- out_pipe (NULL),
- engine (NULL),
- ordinal (0),
- peer_identity (peer_identity_),
- options (options_)
-{
- if (!peer_identity.empty () && peer_identity [0] != 0) {
- if (!owner->register_session (peer_identity, this)) {
-
- // TODO: There's already a session with the specified
- // identity. We should presumably syslog it and drop the
- // session.
- zmq_assert (false);
- }
- }
}
zmq::session_t::~session_t ()
@@ -69,10 +47,10 @@ zmq::session_t::~session_t ()
zmq_assert (!out_pipe);
}
-bool zmq::session_t::is_terminable ()
+void zmq::session_t::terminate ()
{
- // The session won't send term_ack until both in & out pipe are closed.
- return !in_pipe && !out_pipe;
+ // TODO:
+ zmq_assert (false);
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
@@ -105,17 +83,8 @@ void zmq::session_t::flush ()
out_pipe->flush ();
}
-void zmq::session_t::detach (owned_t *reconnecter_)
+void zmq::session_t::clean_pipes ()
{
- // Plug in the reconnecter object if any.
- if (reconnecter_) {
- send_plug (reconnecter_);
- send_own (owner, reconnecter_);
- }
-
- // Engine is terminating itself. No need to deallocate it from here.
- engine = NULL;
-
// Get rid of half-processed messages in the out pipe. Flush any
// unflushed messages upstream.
if (out_pipe) {
@@ -135,26 +104,6 @@ void zmq::session_t::detach (owned_t *reconnecter_)
zmq_msg_close (&msg);
}
}
-
- // Terminate transient session.
- if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
- term ();
-}
-
-zmq::io_thread_t *zmq::session_t::get_io_thread ()
-{
- return choose_io_thread (options.affinity);
-}
-
-class zmq::socket_base_t *zmq::session_t::get_owner ()
-{
- return owner;
-}
-
-uint64_t zmq::session_t::get_ordinal ()
-{
- zmq_assert (ordinal);
- return ordinal;
}
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
@@ -172,6 +121,9 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
out_pipe = outpipe_;
out_pipe->set_event_sink (this);
}
+
+ attach_processed = true;
+ finalise ();
}
void zmq::session_t::terminated (reader_t *pipe_)
@@ -192,14 +144,14 @@ void zmq::session_t::activated (reader_t *pipe_)
zmq_assert (in_pipe == pipe_);
active = true;
if (engine)
- engine->revive ();
+ engine->activate_out ();
}
void zmq::session_t::activated (writer_t *pipe_)
{
zmq_assert (out_pipe == pipe_);
if (engine)
- engine->resume_input ();
+ engine->activate_in ();
}
void zmq::session_t::process_plug ()
@@ -214,10 +166,9 @@ void zmq::session_t::process_unplug ()
// there may be some commands being sent to the session right now.
// Unregister the session from the socket.
- if (ordinal)
- owner->unregister_session (ordinal);
- else if (!peer_identity.empty () && peer_identity [0] != 0)
- owner->unregister_session (peer_identity);
+// if (!peer_identity.empty () && peer_identity [0] != 0)
+// unregister_session (peer_identity);
+// TODO: Should be done in named session.
// Ask associated pipes to terminate.
if (in_pipe)
@@ -232,63 +183,65 @@ void zmq::session_t::process_unplug ()
}
}
+void zmq::session_t::finalise ()
+{
+ // If all conditions are met, proceed with termination:
+ // 1. Owner object already asked us to terminate.
+ // 2. The pipes were already attached to the session.
+ // 3. Both pipes have already terminated. Note that inbound pipe
+ // is terminated after delimiter is read, i.e. all messages
+ // were already sent to the wire.
+ if (term_processed && attach_processed && !in_pipe && !out_pipe)
+ own_t::process_term ();
+}
+
void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_)
{
- if (!peer_identity.empty ()) {
-
- // If both IDs are temporary, no checking is needed.
- // TODO: Old ID should be reused in this case...
- if (peer_identity.empty () || peer_identity [0] != 0 ||
- peer_identity_.empty () || peer_identity_ [0] != 0) {
-
- // If we already know the peer name do nothing, just check whether
- // it haven't changed.
- zmq_assert (peer_identity == peer_identity_);
- }
- }
- else if (!peer_identity_.empty ()) {
-
- // Store the peer identity.
- peer_identity = peer_identity_;
-
- // If the session is not registered with the ordinal, let's register
- // it using the peer name.
- if (!ordinal) {
- if (!owner->register_session (peer_identity, this)) {
-
- // TODO: There's already a session with the specified
- // identity. We should presumably syslog it and drop the
- // session.
- zmq_assert (false);
- }
- }
- }
-
// Check whether the required pipes already exist. If not so, we'll
// create them and bind them to the socket object.
reader_t *socket_reader = NULL;
writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) {
- create_pipe (owner, this, options.hwm, options.swap, &socket_reader,
+ create_pipe (socket, this, options.hwm, options.swap, &socket_reader,
&out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out && !in_pipe) {
- create_pipe (this, owner, options.hwm, options.swap, &in_pipe,
+ create_pipe (this, socket, options.hwm, options.swap, &in_pipe,
&socket_writer);
in_pipe->set_event_sink (this);
}
if (socket_reader || socket_writer)
- send_bind (owner, socket_reader, socket_writer, peer_identity);
+ send_bind (socket, socket_reader, socket_writer, peer_identity);
// Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
- engine->plug (this);
+ engine->plug (io_thread, this);
+
+ // Trigger the notfication about the attachment.
+ attached (peer_identity_);
+}
+
+void zmq::session_t::process_term ()
+{
+ // Here we are pugging into the own_t's termination mechanism.
+ // The goal is to postpone the termination till all the pending messages
+ // are sent to the peer.
+ term_processed = true;
+ finalise ();
+}
+
+void zmq::session_t::attached (const blob_t &peer_identity_)
+{
+}
+
+void zmq::session_t::detached ()
+{
}
diff --git a/src/session.hpp b/src/session.hpp
index 603b50ca..ba259dcf 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,8 +20,8 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
+#include "own.hpp"
#include "i_inout.hpp"
-#include "owned.hpp"
#include "options.hpp"
#include "blob.hpp"
#include "pipe.hpp"
@@ -30,29 +30,22 @@ namespace zmq
{
class session_t :
- public owned_t,
+ public own_t,
public i_inout,
public i_reader_events,
public i_writer_events
{
public:
- // Creates unnamed session.
- session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_);
+ session_t (class io_thread_t *io_thread_,
+ class socket_base_t *socket_, const options_t &options_);
- // Creates named session.
- session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const blob_t &peer_identity_);
-
- // i_inout interface implementation.
+ // i_inout interface implementation. Note that detach method is not
+ // implemented by generic session. Different session types may handle
+ // engine disconnection in different ways.
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
- void detach (owned_t *reconnecter_);
- class io_thread_t *get_io_thread ();
- class socket_base_t *get_owner ();
- uint64_t get_ordinal ();
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
@@ -65,19 +58,40 @@ namespace zmq
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
- private:
+ protected:
+
+ // Forcefully close this session (without sending
+ // outbound messages to the wire).
+ void terminate ();
+
+ // Two events for the derived session type. Attached is triggered
+ // when session is attached to a peer, detached is triggered at the
+ // beginning of the termination process when session is about to
+ // be detached from the peer.
+ virtual void attached (const blob_t &peer_identity_);
+ virtual void detached ();
~session_t ();
- // Define the delayed termination. (I.e. termination is postponed
- // till all the data is flushed to the kernel.)
- bool is_terminable ();
+ // Remove any half processed messages. Flush unflushed messages.
+ // Call this function when engine disconnect to get rid of leftovers.
+ void clean_pipes ();
+
+ // Inherited socket options. These are visible to all session classes.
+ options_t options;
+
+ private:
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
+ void process_term ();
+
+ // Check whether object is ready for termination. If so proceed
+ // with closing child objects.
+ void finalise ();
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
@@ -92,18 +106,25 @@ namespace zmq
// Outbound pipe, i.e. one the socket is sending messages to.
class writer_t *out_pipe;
+ // The protocol I/O engine connected to the session.
struct i_engine *engine;
- // Session is identified by ordinal in the case when it was created
- // before connection to the peer was established and thus we are
- // unaware of peer's identity.
- uint64_t ordinal;
-
- // Identity of the peer.
+ // Identity of the peer (say the component on the other side
+ // of TCP connection).
blob_t peer_identity;
- // Inherited socket options.
- options_t options;
+ // The socket the session belongs to.
+ class socket_base_t *socket;
+
+ // I/O thread the session is living in. It will be used to plug in
+ // the engines into the same thread.
+ class io_thread_t *io_thread;
+
+ // True if pipes were already attached.
+ bool attach_processed;
+
+ // True if term command was already processed.
+ bool term_processed;
session_t (const session_t&);
void operator = (const session_t&);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 5d3175a0..903e7818 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -38,9 +38,8 @@
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
-#include "session.hpp"
+#include "connect_session.hpp"
#include "config.hpp"
-#include "owned.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "ctx.hpp"
@@ -109,20 +108,20 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
}
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
- object_t (parent_, slot_),
+ own_t (parent_, slot_),
zombie (false),
last_processing_time (0),
- pending_term_acks (0),
ticks (0),
- rcvmore (false),
- sent_seqnum (0),
- processed_seqnum (0),
- next_ordinal (1)
+ rcvmore (false)
{
}
zmq::socket_base_t::~socket_base_t ()
{
+ // Check whether there are no session leaks.
+ sessions_sync.lock ();
+ zmq_assert (sessions.empty ());
+ sessions_sync.unlock ();
}
zmq::signaler_t *zmq::socket_base_t::get_signaler ()
@@ -139,6 +138,46 @@ void zmq::socket_base_t::stop ()
send_stop ();
}
+int zmq::socket_base_t::check_protocol (const std::string &protocol_)
+{
+ // First check out whether the protcol is something we are aware of.
+ if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
+ protocol_ != "pgm" && protocol_ != "epgm") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+
+ // If 0MQ is not compiled with OpenPGM, pgm and epgm transports
+ // are not avaialble.
+#if !defined ZMQ_HAVE_OPENPGM
+ if (protocol_ == "pgm" || protocol_ == "epgm") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
+ // IPC transport is not available on Windows and OpenVMS.
+#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ if (protocol_ != "ipc") {
+ // Unknown protocol.
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
+ // Check whether socket type and transport protocol match.
+ // Specifically, multicast protocols can't be combined with
+ // bi-directional messaging patterns (socket types).
+ if ((protocol_ == "pgm" || protocol_ == "epgm") &&
+ options.requires_in && options.requires_out) {
+ errno = ENOCOMPATPROTO;
+ return -1;
+ }
+
+ // Protocol is available.
+ return 0;
+}
+
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
@@ -225,56 +264,48 @@ int zmq::socket_base_t::bind (const char *addr_)
}
// Parse addr_ string.
- std::string addr_type;
- std::string addr_args;
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
-
- if (pos == std::string::npos) {
- errno = EINVAL;
- return -1;
- }
-
- addr_type = addr.substr (0, pos);
- addr_args = addr.substr (pos + 3);
-
- if (addr_type == "inproc")
- return register_endpoint (addr_args.c_str (), this);
-
- if (addr_type == "tcp" || addr_type == "ipc") {
-
-#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
- if (addr_type == "ipc") {
- errno = EPROTONOSUPPORT;
+ std::string protocol;
+ std::string address;
+ {
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+ if (pos == std::string::npos) {
+ errno = EINVAL;
return -1;
}
-#endif
+ protocol = addr.substr (0, pos);
+ address = addr.substr (pos + 3);
+ }
+ int rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
+
+ if (protocol == "inproc")
+ return register_endpoint (address.c_str (), this);
+
+ if (protocol == "tcp" || protocol == "ipc") {
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
choose_io_thread (options.affinity), this, options);
zmq_assert (listener);
- int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ());
+ int rc = listener->set_address (protocol.c_str(), address.c_str ());
if (rc != 0) {
delete listener;
return -1;
}
+ launch_child (listener);
- send_plug (listener);
- send_own (this, listener);
return 0;
}
-#if defined ZMQ_HAVE_OPENPGM
- if (addr_type == "pgm" || addr_type == "epgm") {
- // In the case of PGM bind behaves the same like connect.
+ if (protocol == "pgm" || protocol == "epgm") {
+
+ // For convenience's sake, bind can be used interchageable with
+ // connect for PGM and EPGM transports.
return connect (addr_);
}
-#endif
- // Unknown protocol.
- errno = EPROTONOSUPPORT;
- return -1;
+ zmq_assert (false);
}
int zmq::socket_base_t::connect (const char *addr_)
@@ -285,28 +316,31 @@ int zmq::socket_base_t::connect (const char *addr_)
}
// Parse addr_ string.
- std::string addr_type;
- std::string addr_args;
-
- std::string addr (addr_);
- std::string::size_type pos = addr.find ("://");
-
- if (pos == std::string::npos) {
- errno = EINVAL;
- return -1;
+ std::string protocol;
+ std::string address;
+ {
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+ if (pos == std::string::npos) {
+ errno = EINVAL;
+ return -1;
+ }
+ protocol = addr.substr (0, pos);
+ address = addr.substr (pos + 3);
}
- addr_type = addr.substr (0, pos);
- addr_args = addr.substr (pos + 3);
+ int rc = check_protocol (protocol);
+ if (rc != 0)
+ return -1;
- if (addr_type == "inproc") {
+ if (protocol == "inproc") {
// TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
// Find the peer socket.
- socket_base_t *peer = find_endpoint (addr_args.c_str ());
+ socket_base_t *peer = find_endpoint (address.c_str ());
if (!peer)
return -1;
@@ -329,18 +363,18 @@ int zmq::socket_base_t::connect (const char *addr_)
attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
- // was incremented in find_endpoint function. The callee is notified
- // about the fact via the last parameter.
+ // was incremented in find_endpoint function. We don't need it
+ // increased here.
send_bind (peer, outpipe_reader, inpipe_writer,
options.identity, false);
return 0;
}
- // Create unnamed session.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- session_t *session = new (std::nothrow) session_t (io_thread,
- this, options);
+ // Create session.
+ connect_session_t *session = new (std::nothrow) connect_session_t (
+ choose_io_thread (options.affinity), this, options,
+ protocol.c_str (), address.c_str ());
zmq_assert (session);
// If 'immediate connect' feature is required, we'll create the pipes
@@ -370,95 +404,10 @@ int zmq::socket_base_t::connect (const char *addr_)
session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
}
- // Activate the session.
- send_plug (session);
- send_own (this, session);
+ // Activate the session. Make it a child of this socket.
+ launch_child (session);
- if (addr_type == "tcp" || addr_type == "ipc") {
-
-#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
- // Windows named pipes are not compatible with Winsock API.
- // There's no UNIX domain socket implementation on OpenVMS.
- if (addr_type == "ipc") {
- errno = EPROTONOSUPPORT;
- return -1;
- }
-#endif
-
- // Create the connecter object. Supply it with the session name
- // so that it can bind the new connection to the session once
- // it is established.
- zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
- choose_io_thread (options.affinity), this, options,
- session->get_ordinal (), false);
- zmq_assert (connecter);
- int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ());
- if (rc != 0) {
- delete connecter;
- return -1;
- }
- send_plug (connecter);
- send_own (this, connecter);
-
- return 0;
- }
-
-#if defined ZMQ_HAVE_OPENPGM
- if (addr_type == "pgm" || addr_type == "epgm") {
-
- // If the socket type requires bi-directional communication
- // multicast is not an option (it is uni-directional).
- if (options.requires_in && options.requires_out) {
- errno = ENOCOMPATPROTO;
- return -1;
- }
-
- // For epgm, pgm transport with UDP encapsulation is used.
- bool udp_encapsulation = (addr_type == "epgm");
-
- // At this point we'll create message pipes to the session straight
- // away. There's no point in delaying it as no concept of 'connect'
- // exists with PGM anyway.
- if (options.requires_out) {
-
- // PGM sender.
- pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
- choose_io_thread (options.affinity), options);
- zmq_assert (pgm_sender);
-
- int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
- if (rc != 0) {
- delete pgm_sender;
- return -1;
- }
-
- send_attach (session, pgm_sender, blob_t ());
- }
- else if (options.requires_in) {
-
- // PGM receiver.
- pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
- choose_io_thread (options.affinity), options);
- zmq_assert (pgm_receiver);
-
- int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
- if (rc != 0) {
- delete pgm_receiver;
- return -1;
- }
-
- send_attach (session, pgm_receiver, blob_t ());
- }
- else
- zmq_assert (false);
-
- return 0;
- }
-#endif
-
- // Unknown protoco.
- errno = EPROTONOSUPPORT;
- return -1;
+ return 0;
}
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
@@ -587,72 +536,23 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
+ zmq_assert (!zombie);
+
// Socket becomes a zombie. From now on all new arrived pipes (bind
- // command) and I/O objects (own command) are immediately terminated.
- // Also, any further requests form I/O object termination are ignored
- // (we are going to shut them down anyway -- this way we assure that
- // we do so once only).
+ // command) are immediately terminated.
zombie = true;
- // Unregister all inproc endpoints associated with this socket.
- // Doing this we make sure that no new pipes from other sockets (inproc)
- // will be initiated. However, there may be some inproc pipes already
- // on the fly, but not yet received by this socket. To get finished
- // with them we'll do the subsequent waiting from on-the-fly commands.
- // This should happen very quickly. There's no way to block here for
- // extensive period of time.
- unregister_endpoints (this);
- while (processed_seqnum != sent_seqnum.get ())
- process_commands (true, false);
- // TODO: My feeling is that the above has to be done in the dezombification
- // loop, otherwise we may end up with number of i/o object dropping to zero
- // even though there are more i/o objects on the way.
+ // Start termination of associated I/O object hierarchy.
+ terminate ();
- // The above process ensures that only pipes that will arrive from now on
- // are those initiated by sessions. These in turn have a nice property of
- // not arriving totally asynchronously. When a session -- being an I/O
- // object -- acknowledges its termination we are 100% sure that we'll get
- // no new pipe from it.
-
- // Start termination of all the pipes presently associated with the socket.
- xterm_pipes ();
-
- // Send termination request to all associated I/O objects.
- // Start waiting for the acks. Note that the actual waiting is not done
- // in this function. Rather it is done in delayed manner as socket is
- // being dezombified. The reason is that I/O object shutdown can take
- // considerable amount of time in case there's still a lot of data to
- // push to the network.
- for (io_objects_t::iterator it = io_objects.begin ();
- it != io_objects.end (); it++)
- send_term (*it);
- pending_term_acks += io_objects.size ();
- io_objects.clear ();
-
- // Note that new I/O objects may arrive even in zombie state (say new
- // session initiated by a listener object), however, in such case number
- // of pending acks never drops to zero. Here's the scenario: We have an
- // pending ack for the listener object. Then 'own' commands arrives from
- // the listener notifying the socket about new session. It immediately
- // triggers termination request and number of of pending acks if
- // incremented. Then term_acks arrives from the listener. Number of pending
- // acks is decremented. Later on, the session itself will ack its
- // termination. During the process, number of pending acks never dropped
- // to zero and thus the socket remains safely in the zombie state.
-
- // Transfer the ownership of the socket from this application thread
+ // Ask context to zombify this socket. In other words, transfer
+ // the ownership of the socket from this application thread
// to the context which will take care of the rest of shutdown process.
- zombify (this);
+ zombify_socket (this);
return 0;
}
-void zmq::socket_base_t::inc_seqnum ()
-{
- // Be aware: This function may be called from a different thread!
- sent_seqnum.add (1);
-}
-
bool zmq::socket_base_t::has_in ()
{
return xhas_in ();
@@ -667,7 +567,7 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered = named_sessions.insert (
+ bool registered = sessions.insert (
std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
@@ -676,17 +576,17 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (peer_identity_);
- zmq_assert (it != named_sessions.end ());
- named_sessions.erase (it);
+ sessions_t::iterator it = sessions.find (peer_identity_);
+ zmq_assert (it != sessions.end ());
+ sessions.erase (it);
sessions_sync.unlock ();
}
zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (peer_identity_);
- if (it == named_sessions.end ()) {
+ sessions_t::iterator it = sessions.find (peer_identity_);
+ if (it == sessions.end ()) {
sessions_sync.unlock ();
return NULL;
}
@@ -699,74 +599,16 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
return session;
}
-uint64_t zmq::socket_base_t::register_session (session_t *session_)
-{
- sessions_sync.lock ();
- uint64_t ordinal = next_ordinal;
- next_ordinal++;
- unnamed_sessions.insert (std::make_pair (ordinal, session_));
- sessions_sync.unlock ();
- return ordinal;
-}
-
-void zmq::socket_base_t::unregister_session (uint64_t ordinal_)
-{
- sessions_sync.lock ();
- unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
- zmq_assert (it != unnamed_sessions.end ());
- unnamed_sessions.erase (it);
- sessions_sync.unlock ();
-}
-
-zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)
-{
- sessions_sync.lock ();
-
- unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_);
- if (it == unnamed_sessions.end ()) {
- sessions_sync.unlock ();
- return NULL;
- }
- session_t *session = it->second;
-
- // Prepare the session for subsequent attach command.
- session->inc_seqnum ();
-
- sessions_sync.unlock ();
- return session;
-}
-
bool zmq::socket_base_t::dezombify ()
{
zmq_assert (zombie);
// Process any commands from other threads/sockets that may be available
- // at the moment.
+ // at the moment. Ultimately, socket will be destroyed.
process_commands (false, false);
- // If there are no more pipes attached and there are no more I/O objects
- // owned by the socket, we can kill the zombie.
- if (!pending_term_acks && !xhas_pipes ()) {
-
- // If all objects have acknowledged their termination there should
- // definitely be no I/O object remaining in the list.
- zmq_assert (io_objects.empty ());
-
- // Check whether there are no session leaks.
- sessions_sync.lock ();
- zmq_assert (named_sessions.empty ());
- zmq_assert (unnamed_sessions.empty ());
- sessions_sync.unlock ();
-
- // Deallocate all the resources tied to this socket.
- delete this;
-
- // Notify the caller about the fact that the zombie is finally dead.
- return true;
- }
-
- // The zombie remains undead.
- return false;
+// TODO: ???
+ return true;
}
void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
@@ -828,19 +670,6 @@ void zmq::socket_base_t::process_stop ()
zombie = true;
}
-void zmq::socket_base_t::process_own (owned_t *object_)
-{
- // If the socket is already being shut down, new owned objects are
- // immediately asked to terminate.
- if (zombie) {
- send_term (object_);
- pending_term_acks++;
- return;
- }
-
- io_objects.insert (object_);
-}
-
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
const blob_t &peer_identity_)
{
@@ -857,37 +686,21 @@ void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
-void zmq::socket_base_t::process_term_req (owned_t *object_)
+void zmq::socket_base_t::process_unplug ()
{
- // When shutting down we can ignore termination requests from owned
- // objects. It means the termination request was already sent to
- // the object.
- if (zombie)
- return;
-
- // If I/O object is well and alive ask it to terminate.
- io_objects_t::iterator it = std::find (io_objects.begin (),
- io_objects.end (), object_);
-
- // If not found, we assume that termination request was already sent to
- // the object so we can safely ignore the request.
- if (it == io_objects.end ())
- return;
-
- pending_term_acks++;
- io_objects.erase (it);
- send_term (object_);
}
-void zmq::socket_base_t::process_term_ack ()
+void zmq::socket_base_t::process_term ()
{
- zmq_assert (pending_term_acks);
- pending_term_acks--;
-}
+ zmq_assert (zombie);
-void zmq::socket_base_t::process_seqnum ()
-{
- processed_seqnum++;
+ // Unregister all inproc endpoints associated with this socket.
+ // Doing this we make sure that no new pipes from other sockets (inproc)
+ // will be initiated.
+ unregister_endpoints (this);
+
+ // Continue the termination process immediately.
+ own_t::process_term ();
}
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 386fdbb5..f76dc4c8 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -20,13 +20,12 @@
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
-#include
#include