2009-08-08 16:01:58 +02:00
|
|
|
/*
|
|
|
|
Copyright (c) 2007-2009 FastMQ Inc.
|
|
|
|
|
|
|
|
This file is part of 0MQ.
|
|
|
|
|
|
|
|
0MQ is free software; you can redistribute it and/or modify it under
|
|
|
|
the terms of the Lesser GNU General Public License as published by
|
|
|
|
the Free Software Foundation; either version 3 of the License, or
|
|
|
|
(at your option) any later version.
|
|
|
|
|
|
|
|
0MQ is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
Lesser GNU General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the Lesser GNU General Public License
|
|
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
#include <string>
|
2009-08-08 16:01:58 +02:00
|
|
|
#include <algorithm>
|
|
|
|
|
|
|
|
#include "../include/zmq.h"
|
|
|
|
|
|
|
|
#include "socket_base.hpp"
|
|
|
|
#include "app_thread.hpp"
|
|
|
|
#include "err.hpp"
|
|
|
|
#include "zmq_listener.hpp"
|
2009-08-09 16:12:09 +02:00
|
|
|
#include "zmq_connecter.hpp"
|
2009-08-08 16:01:58 +02:00
|
|
|
#include "io_thread.hpp"
|
2009-08-20 11:32:23 +02:00
|
|
|
#include "session.hpp"
|
2009-08-09 11:57:21 +02:00
|
|
|
#include "config.hpp"
|
2009-08-20 11:32:23 +02:00
|
|
|
#include "owned.hpp"
|
2009-08-21 14:29:22 +02:00
|
|
|
#include "uuid.hpp"
|
2009-08-08 16:01:58 +02:00
|
|
|
|
|
|
|
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
|
|
|
|
object_t (parent_),
|
|
|
|
pending_term_acks (0),
|
2009-08-21 14:29:22 +02:00
|
|
|
app_thread (parent_),
|
|
|
|
shutting_down (false)
|
2009-08-09 11:57:21 +02:00
|
|
|
{
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
zmq::socket_base_t::~socket_base_t ()
|
|
|
|
{
|
2009-08-21 14:29:22 +02:00
|
|
|
shutting_down = true;
|
|
|
|
|
2009-08-08 16:01:58 +02:00
|
|
|
while (true) {
|
|
|
|
|
|
|
|
// On third pass of the loop there should be no more I/O objects
|
|
|
|
// because all connecters and listerners were destroyed during
|
|
|
|
// the first pass and all engines delivered by delayed 'own' commands
|
|
|
|
// are destroyed during the second pass.
|
|
|
|
if (io_objects.empty () && !pending_term_acks)
|
|
|
|
break;
|
|
|
|
|
|
|
|
// Send termination request to all associated I/O objects.
|
2009-08-09 09:24:48 +02:00
|
|
|
for (io_objects_t::iterator it = io_objects.begin ();
|
|
|
|
it != io_objects.end (); it++)
|
|
|
|
send_term (*it);
|
2009-08-08 16:01:58 +02:00
|
|
|
|
|
|
|
// Move the objects to the list of pending term acks.
|
|
|
|
pending_term_acks += io_objects.size ();
|
|
|
|
io_objects.clear ();
|
|
|
|
|
|
|
|
// Process commands till we get all the termination acknowledgements.
|
|
|
|
while (pending_term_acks)
|
|
|
|
app_thread->process_commands (true);
|
|
|
|
}
|
2009-08-20 11:32:23 +02:00
|
|
|
|
|
|
|
// Check whether there are no session leaks.
|
2009-08-21 14:29:22 +02:00
|
|
|
sessions_sync.lock ();
|
2009-08-20 11:32:23 +02:00
|
|
|
zmq_assert (sessions.empty ());
|
2009-08-21 14:29:22 +02:00
|
|
|
sessions_sync.unlock ();
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
|
2009-08-09 09:24:48 +02:00
|
|
|
size_t optvallen_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2009-08-09 11:57:21 +02:00
|
|
|
switch (option_) {
|
|
|
|
|
|
|
|
case ZMQ_HWM:
|
|
|
|
if (optvallen_ != sizeof (int64_t)) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2009-08-12 09:40:16 +02:00
|
|
|
options.hwm = *((int64_t*) optval_);
|
2009-08-09 11:57:21 +02:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
case ZMQ_LWM:
|
|
|
|
if (optvallen_ != sizeof (int64_t)) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2009-08-12 09:40:16 +02:00
|
|
|
options.lwm = *((int64_t*) optval_);
|
2009-08-09 11:57:21 +02:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
case ZMQ_SWAP:
|
|
|
|
if (optvallen_ != sizeof (int64_t)) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2009-08-12 09:40:16 +02:00
|
|
|
options.swap = *((int64_t*) optval_);
|
2009-08-09 11:57:21 +02:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
case ZMQ_MASK:
|
|
|
|
if (optvallen_ != sizeof (int64_t)) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2009-08-12 09:40:16 +02:00
|
|
|
options.mask = (uint64_t) *((int64_t*) optval_);
|
2009-08-09 11:57:21 +02:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
case ZMQ_AFFINITY:
|
|
|
|
if (optvallen_ != sizeof (int64_t)) {
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2009-08-12 09:40:16 +02:00
|
|
|
options.affinity = (uint64_t) *((int64_t*) optval_);
|
2009-08-09 11:57:21 +02:00
|
|
|
return 0;
|
|
|
|
|
2009-08-12 09:40:16 +02:00
|
|
|
case ZMQ_IDENTITY:
|
2009-08-21 14:29:22 +02:00
|
|
|
options.identity.assign ((const char*) optval_, optvallen_);
|
2009-08-09 11:57:21 +02:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
default:
|
|
|
|
errno = EINVAL;
|
|
|
|
return -1;
|
|
|
|
}
|
2009-08-09 09:24:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
int zmq::socket_base_t::bind (const char *addr_)
|
|
|
|
{
|
2009-08-12 09:40:16 +02:00
|
|
|
zmq_listener_t *listener = new zmq_listener_t (
|
|
|
|
choose_io_thread (options.affinity), this, options);
|
2009-08-09 11:21:47 +02:00
|
|
|
int rc = listener->set_address (addr_);
|
|
|
|
if (rc != 0)
|
|
|
|
return -1;
|
|
|
|
|
2009-08-08 16:01:58 +02:00
|
|
|
send_plug (listener);
|
|
|
|
send_own (this, listener);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2009-08-09 09:24:48 +02:00
|
|
|
int zmq::socket_base_t::connect (const char *addr_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2009-08-21 14:29:22 +02:00
|
|
|
// Generate a unique name for the session.
|
|
|
|
std::string session_name ("#");
|
|
|
|
session_name += uuid_t ().to_string ();
|
|
|
|
|
|
|
|
// Create the session.
|
|
|
|
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
|
|
|
session_t *session = new session_t (io_thread, this, session_name.c_str ());
|
|
|
|
zmq_assert (session);
|
|
|
|
send_plug (session);
|
|
|
|
send_own (this, session);
|
|
|
|
|
|
|
|
// Create the connecter object. Supply it with the session name so that
|
|
|
|
// it can bind the new connection to the session once it is established.
|
2009-08-12 09:40:16 +02:00
|
|
|
zmq_connecter_t *connecter = new zmq_connecter_t (
|
2009-08-21 14:29:22 +02:00
|
|
|
choose_io_thread (options.affinity), this, options,
|
|
|
|
session_name.c_str ());
|
2009-08-09 16:12:09 +02:00
|
|
|
int rc = connecter->set_address (addr_);
|
2009-08-21 14:29:22 +02:00
|
|
|
if (rc != 0) {
|
|
|
|
delete connecter;
|
2009-08-09 16:12:09 +02:00
|
|
|
return -1;
|
2009-08-21 14:29:22 +02:00
|
|
|
}
|
2009-08-09 16:12:09 +02:00
|
|
|
send_plug (connecter);
|
|
|
|
send_own (this, connecter);
|
2009-08-21 14:29:22 +02:00
|
|
|
|
2009-08-09 16:12:09 +02:00
|
|
|
return 0;
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
|
|
|
|
|
|
|
int zmq::socket_base_t::flush ()
|
|
|
|
{
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
|
|
|
zmq_assert (false);
|
|
|
|
}
|
|
|
|
|
|
|
|
int zmq::socket_base_t::close ()
|
|
|
|
{
|
|
|
|
app_thread->remove_socket (this);
|
|
|
|
delete this;
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
bool zmq::socket_base_t::register_session (const char *name_,
|
2009-08-20 11:32:23 +02:00
|
|
|
session_t *session_)
|
|
|
|
{
|
|
|
|
sessions_sync.lock ();
|
2009-08-21 14:29:22 +02:00
|
|
|
bool registered = sessions.insert (std::make_pair (name_, session_)).second;
|
2009-08-20 11:32:23 +02:00
|
|
|
sessions_sync.unlock ();
|
2009-08-21 14:29:22 +02:00
|
|
|
return registered;
|
2009-08-20 11:32:23 +02:00
|
|
|
}
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
bool zmq::socket_base_t::unregister_session (const char *name_)
|
2009-08-20 11:32:23 +02:00
|
|
|
{
|
|
|
|
sessions_sync.lock ();
|
|
|
|
sessions_t::iterator it = sessions.find (name_);
|
2009-08-21 14:29:22 +02:00
|
|
|
bool unregistered = (it != sessions.end ());
|
2009-08-20 11:32:23 +02:00
|
|
|
sessions.erase (it);
|
|
|
|
sessions_sync.unlock ();
|
2009-08-21 14:29:22 +02:00
|
|
|
return unregistered;
|
2009-08-20 11:32:23 +02:00
|
|
|
}
|
|
|
|
|
2009-08-21 14:29:22 +02:00
|
|
|
zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
|
2009-08-20 11:32:23 +02:00
|
|
|
{
|
|
|
|
sessions_sync.lock ();
|
2009-08-21 14:29:22 +02:00
|
|
|
|
2009-08-20 11:32:23 +02:00
|
|
|
sessions_t::iterator it = sessions.find (name_);
|
2009-08-21 14:29:22 +02:00
|
|
|
if (it == sessions.end ()) {
|
|
|
|
sessions_sync.unlock ();
|
|
|
|
return NULL;
|
|
|
|
}
|
|
|
|
|
|
|
|
// Prepare the session for subsequent attach command.
|
|
|
|
it->second->inc_seqnum ();
|
|
|
|
|
2009-08-20 11:32:23 +02:00
|
|
|
sessions_sync.unlock ();
|
2009-08-21 14:29:22 +02:00
|
|
|
return it->second;
|
2009-08-20 11:32:23 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
void zmq::socket_base_t::process_own (owned_t *object_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2009-08-09 09:24:48 +02:00
|
|
|
io_objects.insert (object_);
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
2009-08-20 11:32:23 +02:00
|
|
|
void zmq::socket_base_t::process_term_req (owned_t *object_)
|
2009-08-08 16:01:58 +02:00
|
|
|
{
|
2009-08-21 14:29:22 +02:00
|
|
|
// When shutting down we can ignore termination requests from owned
|
|
|
|
// objects. They are going to be terminated anyway.
|
|
|
|
if (shutting_down)
|
|
|
|
return;
|
|
|
|
|
2009-08-08 16:01:58 +02:00
|
|
|
// If I/O object is well and alive ask it to terminate.
|
|
|
|
io_objects_t::iterator it = std::find (io_objects.begin (),
|
|
|
|
io_objects.end (), object_);
|
|
|
|
|
|
|
|
// If not found, we assume that termination request was already sent to
|
|
|
|
// the object so we can sagely ignore the request.
|
2009-08-09 09:24:48 +02:00
|
|
|
if (it == io_objects.end ())
|
|
|
|
return;
|
|
|
|
|
|
|
|
pending_term_acks++;
|
|
|
|
io_objects.erase (it);
|
|
|
|
send_term (object_);
|
2009-08-08 16:01:58 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
void zmq::socket_base_t::process_term_ack ()
|
|
|
|
{
|
|
|
|
zmq_assert (pending_term_acks);
|
|
|
|
pending_term_acks--;
|
|
|
|
}
|