mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-12 10:33:52 +01:00
Merge pull request #3143 from sigiesec/remove-extra-files
Problem: temporary files in repo
This commit is contained in:
commit
088fd65bf2
@ -1,549 +0,0 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "precompiled.hpp"
|
||||
#include "macros.hpp"
|
||||
#include "router.hpp"
|
||||
#include "pipe.hpp"
|
||||
#include "wire.hpp"
|
||||
#include "random.hpp"
|
||||
#include "likely.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
prefetched (false),
|
||||
routing_id_sent (false),
|
||||
current_in (NULL),
|
||||
terminate_current_in (false),
|
||||
more_in (false),
|
||||
current_out (NULL),
|
||||
more_out (false),
|
||||
next_integral_routing_id (generate_random ()),
|
||||
mandatory (false),
|
||||
// raw_socket functionality in ROUTER is deprecated
|
||||
raw_socket (false),
|
||||
probe_router (false),
|
||||
handover (false)
|
||||
{
|
||||
options.type = ZMQ_ROUTER;
|
||||
options.recv_routing_id = true;
|
||||
options.raw_socket = false;
|
||||
|
||||
prefetched_id.init ();
|
||||
prefetched_msg.init ();
|
||||
}
|
||||
|
||||
zmq::router_t::~router_t ()
|
||||
{
|
||||
zmq_assert (anonymous_pipes.empty ());
|
||||
;
|
||||
zmq_assert (outpipes.empty ());
|
||||
prefetched_id.close ();
|
||||
prefetched_msg.close ();
|
||||
}
|
||||
|
||||
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
||||
{
|
||||
LIBZMQ_UNUSED (subscribe_to_all_);
|
||||
|
||||
zmq_assert (pipe_);
|
||||
|
||||
if (probe_router) {
|
||||
msg_t probe_msg;
|
||||
int rc = probe_msg.init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
rc = pipe_->write (&probe_msg);
|
||||
// zmq_assert (rc) is not applicable here, since it is not a bug.
|
||||
pipe_->flush ();
|
||||
|
||||
rc = probe_msg.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
bool routing_id_ok = identify_peer (pipe_);
|
||||
if (routing_id_ok)
|
||||
fq.attach (pipe_);
|
||||
else
|
||||
anonymous_pipes.insert (pipe_);
|
||||
}
|
||||
|
||||
int zmq::router_t::xsetsockopt (int option_,
|
||||
const void *optval_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
|
||||
switch (option_) {
|
||||
case ZMQ_CONNECT_ROUTING_ID:
|
||||
// TODO why isn't it possible to set an empty connect_routing_id
|
||||
// (which is the default value)
|
||||
if (optval_ && optvallen_) {
|
||||
connect_routing_id.assign ((char *) optval_, optvallen_);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_ROUTER_RAW:
|
||||
if (is_int && value >= 0) {
|
||||
raw_socket = (value != 0);
|
||||
if (raw_socket) {
|
||||
options.recv_routing_id = false;
|
||||
options.raw_socket = true;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_ROUTER_MANDATORY:
|
||||
if (is_int && value >= 0) {
|
||||
mandatory = (value != 0);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_PROBE_ROUTER:
|
||||
if (is_int && value >= 0) {
|
||||
probe_router = (value != 0);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_ROUTER_HANDOVER:
|
||||
if (is_int && value >= 0) {
|
||||
handover = (value != 0);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
std::set<pipe_t *>::iterator it = anonymous_pipes.find (pipe_);
|
||||
if (it != anonymous_pipes.end ())
|
||||
anonymous_pipes.erase (it);
|
||||
else {
|
||||
outpipes_t::iterator iter = outpipes.find (pipe_->get_routing_id ());
|
||||
zmq_assert (iter != outpipes.end ());
|
||||
outpipes.erase (iter);
|
||||
fq.pipe_terminated (pipe_);
|
||||
pipe_->rollback ();
|
||||
if (pipe_ == current_out)
|
||||
current_out = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::router_t::xread_activated (pipe_t *pipe_)
|
||||
{
|
||||
std::set<pipe_t *>::iterator it = anonymous_pipes.find (pipe_);
|
||||
if (it == anonymous_pipes.end ())
|
||||
fq.activated (pipe_);
|
||||
else {
|
||||
bool routing_id_ok = identify_peer (pipe_);
|
||||
if (routing_id_ok) {
|
||||
anonymous_pipes.erase (it);
|
||||
fq.attach (pipe_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
|
||||
{
|
||||
outpipes_t::iterator it;
|
||||
for (it = outpipes.begin (); it != outpipes.end (); ++it)
|
||||
if (it->second.pipe == pipe_)
|
||||
break;
|
||||
|
||||
zmq_assert (it != outpipes.end ());
|
||||
zmq_assert (!it->second.active);
|
||||
it->second.active = true;
|
||||
}
|
||||
|
||||
int zmq::router_t::xsend (msg_t *msg_)
|
||||
{
|
||||
// If this is the first part of the message it's the ID of the
|
||||
// peer to send the message to.
|
||||
if (!more_out) {
|
||||
zmq_assert (!current_out);
|
||||
|
||||
// If we have malformed message (prefix with no subsequent message)
|
||||
// then just silently ignore it.
|
||||
// TODO: The connections should be killed instead.
|
||||
if (msg_->flags () & msg_t::more) {
|
||||
more_out = true;
|
||||
|
||||
// Find the pipe associated with the routing id stored in the prefix.
|
||||
// If there's no such pipe just silently ignore the message, unless
|
||||
// router_mandatory is set.
|
||||
blob_t routing_id (static_cast<unsigned char *> (msg_->data ()),
|
||||
msg_->size (), zmq::reference_tag_t ());
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
|
||||
if (it != outpipes.end ()) {
|
||||
current_out = it->second.pipe;
|
||||
|
||||
// Check whether pipe is closed or not
|
||||
if (!current_out->check_write ()) {
|
||||
// Check whether pipe is full or not
|
||||
bool pipe_full = !current_out->check_hwm ();
|
||||
it->second.active = false;
|
||||
current_out = NULL;
|
||||
|
||||
if (mandatory) {
|
||||
more_out = false;
|
||||
if (pipe_full)
|
||||
errno = EAGAIN;
|
||||
else
|
||||
errno = EHOSTUNREACH;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else if (mandatory) {
|
||||
more_out = false;
|
||||
errno = EHOSTUNREACH;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Ignore the MORE flag for raw-sock or assert?
|
||||
if (options.raw_socket)
|
||||
msg_->reset_flags (msg_t::more);
|
||||
|
||||
// Check whether this is the last part of the message.
|
||||
more_out = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
// Push the message into the pipe. If there's no out pipe, just drop it.
|
||||
if (current_out) {
|
||||
// Close the remote connection if user has asked to do so
|
||||
// by sending zero length message.
|
||||
// Pending messages in the pipe will be dropped (on receiving term- ack)
|
||||
if (raw_socket && msg_->size () == 0) {
|
||||
current_out->terminate (false);
|
||||
int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
current_out = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool ok = current_out->write (msg_);
|
||||
if (unlikely (!ok)) {
|
||||
// Message failed to send - we must close it ourselves.
|
||||
int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
// HWM was checked before, so the pipe must be gone. Roll back
|
||||
// messages that were piped, for example REP labels.
|
||||
current_out->rollback ();
|
||||
current_out = NULL;
|
||||
} else {
|
||||
if (!more_out) {
|
||||
current_out->flush ();
|
||||
current_out = NULL;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
|
||||
// Detach the message from the data buffer.
|
||||
int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::router_t::xrecv (msg_t *msg_)
|
||||
{
|
||||
if (prefetched) {
|
||||
if (!routing_id_sent) {
|
||||
int rc = msg_->move (prefetched_id);
|
||||
errno_assert (rc == 0);
|
||||
routing_id_sent = true;
|
||||
} else {
|
||||
int rc = msg_->move (prefetched_msg);
|
||||
errno_assert (rc == 0);
|
||||
prefetched = false;
|
||||
}
|
||||
more_in = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
if (!more_in) {
|
||||
if (terminate_current_in) {
|
||||
current_in->terminate (true);
|
||||
terminate_current_in = false;
|
||||
}
|
||||
current_in = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
pipe_t *pipe = NULL;
|
||||
int rc = fq.recvpipe (msg_, &pipe);
|
||||
|
||||
// It's possible that we receive peer's routing id. That happens
|
||||
// after reconnection. The current implementation assumes that
|
||||
// the peer always uses the same routing id.
|
||||
while (rc == 0 && msg_->is_routing_id ())
|
||||
rc = fq.recvpipe (msg_, &pipe);
|
||||
|
||||
if (rc != 0)
|
||||
return -1;
|
||||
|
||||
zmq_assert (pipe != NULL);
|
||||
|
||||
// If we are in the middle of reading a message, just return the next part.
|
||||
if (more_in) {
|
||||
more_in = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
if (!more_in) {
|
||||
if (terminate_current_in) {
|
||||
current_in->terminate (true);
|
||||
terminate_current_in = false;
|
||||
}
|
||||
current_in = NULL;
|
||||
}
|
||||
} else {
|
||||
// We are at the beginning of a message.
|
||||
// Keep the message part we have in the prefetch buffer
|
||||
// and return the ID of the peer instead.
|
||||
rc = prefetched_msg.move (*msg_);
|
||||
errno_assert (rc == 0);
|
||||
prefetched = true;
|
||||
current_in = pipe;
|
||||
|
||||
const blob_t &routing_id = pipe->get_routing_id ();
|
||||
rc = msg_->init_size (routing_id.size ());
|
||||
errno_assert (rc == 0);
|
||||
memcpy (msg_->data (), routing_id.data (), routing_id.size ());
|
||||
msg_->set_flags (msg_t::more);
|
||||
if (prefetched_msg.metadata ())
|
||||
msg_->set_metadata (prefetched_msg.metadata ());
|
||||
routing_id_sent = true;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::router_t::rollback ()
|
||||
{
|
||||
if (current_out) {
|
||||
current_out->rollback ();
|
||||
current_out = NULL;
|
||||
more_out = false;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool zmq::router_t::xhas_in ()
|
||||
{
|
||||
// If we are in the middle of reading the messages, there are
|
||||
// definitely more parts available.
|
||||
if (more_in)
|
||||
return true;
|
||||
|
||||
// We may already have a message pre-fetched.
|
||||
if (prefetched)
|
||||
return true;
|
||||
|
||||
// Try to read the next message.
|
||||
// The message, if read, is kept in the pre-fetch buffer.
|
||||
pipe_t *pipe = NULL;
|
||||
int rc = fq.recvpipe (&prefetched_msg, &pipe);
|
||||
|
||||
// It's possible that we receive peer's routing id. That happens
|
||||
// after reconnection. The current implementation assumes that
|
||||
// the peer always uses the same routing id.
|
||||
// TODO: handle the situation when the peer changes its routing id.
|
||||
while (rc == 0 && prefetched_msg.is_routing_id ())
|
||||
rc = fq.recvpipe (&prefetched_msg, &pipe);
|
||||
|
||||
if (rc != 0)
|
||||
return false;
|
||||
|
||||
zmq_assert (pipe != NULL);
|
||||
|
||||
const blob_t &routing_id = pipe->get_routing_id ();
|
||||
rc = prefetched_id.init_size (routing_id.size ());
|
||||
errno_assert (rc == 0);
|
||||
memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ());
|
||||
prefetched_id.set_flags (msg_t::more);
|
||||
|
||||
prefetched = true;
|
||||
routing_id_sent = false;
|
||||
current_in = pipe;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool zmq::router_t::xhas_out ()
|
||||
{
|
||||
// In theory, ROUTER socket is always ready for writing (except when
|
||||
// MANDATORY is set). Whether actual attempt to write succeeds depends
|
||||
// on whitch pipe the message is going to be routed to.
|
||||
|
||||
if (!mandatory)
|
||||
return true;
|
||||
|
||||
bool has_out = false;
|
||||
outpipes_t::iterator it;
|
||||
for (it = outpipes.begin (); it != outpipes.end (); ++it)
|
||||
has_out |= it->second.pipe->check_hwm ();
|
||||
|
||||
return has_out;
|
||||
}
|
||||
|
||||
const zmq::blob_t &zmq::router_t::get_credential () const
|
||||
{
|
||||
return fq.get_credential ();
|
||||
}
|
||||
|
||||
int zmq::router_t::get_peer_state (const void *routing_id_,
|
||||
size_t routing_id_size_) const
|
||||
{
|
||||
int res = 0;
|
||||
|
||||
blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_);
|
||||
outpipes_t::const_iterator it = outpipes.find (routing_id_blob);
|
||||
if (it == outpipes.end ()) {
|
||||
errno = EHOSTUNREACH;
|
||||
return -1;
|
||||
}
|
||||
|
||||
const outpipe_t &outpipe = it->second;
|
||||
if (outpipe.pipe->check_hwm ())
|
||||
res |= ZMQ_POLLOUT;
|
||||
|
||||
/** \todo does it make any sense to check the inpipe as well? */
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
{
|
||||
msg_t msg;
|
||||
bool ok;
|
||||
blob_t routing_id;
|
||||
|
||||
if (connect_routing_id.length ()) {
|
||||
routing_id.set ((unsigned char *) connect_routing_id.c_str (),
|
||||
connect_routing_id.length ());
|
||||
connect_routing_id.clear ();
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
if (it != outpipes.end ())
|
||||
zmq_assert (false); // Not allowed to duplicate an existing rid
|
||||
} else if (
|
||||
options
|
||||
.raw_socket) { // Always assign an integral routing id for raw-socket
|
||||
unsigned char buf[5];
|
||||
buf[0] = 0;
|
||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||
routing_id.set (buf, sizeof buf);
|
||||
} else if (!options.raw_socket) {
|
||||
// Pick up handshake cases and also case where next integral routing id is set
|
||||
msg.init ();
|
||||
ok = pipe_->read (&msg);
|
||||
if (!ok)
|
||||
return false;
|
||||
|
||||
if (msg.size () == 0) {
|
||||
// Fall back on the auto-generation
|
||||
unsigned char buf[5];
|
||||
buf[0] = 0;
|
||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||
routing_id.set (buf, sizeof buf);
|
||||
msg.close ();
|
||||
} else {
|
||||
routing_id.set (static_cast<unsigned char *> (msg.data ()),
|
||||
msg.size ());
|
||||
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||
msg.close ();
|
||||
|
||||
if (it != outpipes.end ()) {
|
||||
if (!handover)
|
||||
// Ignore peers with duplicate ID
|
||||
return false;
|
||||
|
||||
// We will allow the new connection to take over this
|
||||
// routing id. Temporarily assign a new routing id to the
|
||||
// existing pipe so we can terminate it asynchronously.
|
||||
unsigned char buf[5];
|
||||
buf[0] = 0;
|
||||
put_uint32 (buf + 1, next_integral_routing_id++);
|
||||
blob_t new_routing_id (buf, sizeof buf);
|
||||
|
||||
it->second.pipe->set_router_socket_routing_id (new_routing_id);
|
||||
outpipe_t existing_outpipe = {it->second.pipe,
|
||||
it->second.active};
|
||||
|
||||
ok = outpipes
|
||||
.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (new_routing_id),
|
||||
existing_outpipe)
|
||||
.second;
|
||||
zmq_assert (ok);
|
||||
|
||||
// Remove the existing routing id entry to allow the new
|
||||
// connection to take the routing id.
|
||||
outpipes.erase (it);
|
||||
|
||||
if (existing_outpipe.pipe == current_in)
|
||||
terminate_current_in = true;
|
||||
else
|
||||
existing_outpipe.pipe->terminate (true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pipe_->set_router_socket_routing_id (routing_id);
|
||||
// Add the record into output pipes lookup table
|
||||
outpipe_t outpipe = {pipe_, true};
|
||||
ok = outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe)
|
||||
.second;
|
||||
zmq_assert (ok);
|
||||
|
||||
return true;
|
||||
}
|
@ -1,711 +0,0 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "precompiled.hpp"
|
||||
#include "macros.hpp"
|
||||
#include "session_base.hpp"
|
||||
#include "i_engine.hpp"
|
||||
#include "err.hpp"
|
||||
#include "pipe.hpp"
|
||||
#include "likely.hpp"
|
||||
#include "tcp_connecter.hpp"
|
||||
#include "ipc_connecter.hpp"
|
||||
#include "tipc_connecter.hpp"
|
||||
#include "socks_connecter.hpp"
|
||||
#include "vmci_connecter.hpp"
|
||||
#include "pgm_sender.hpp"
|
||||
#include "pgm_receiver.hpp"
|
||||
#include "address.hpp"
|
||||
#include "norm_engine.hpp"
|
||||
#include "udp_engine.hpp"
|
||||
|
||||
#include "ctx.hpp"
|
||||
#include "req.hpp"
|
||||
#include "radio.hpp"
|
||||
#include "dish.hpp"
|
||||
|
||||
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
|
||||
bool active_,
|
||||
class socket_base_t *socket_,
|
||||
const options_t &options_,
|
||||
address_t *addr_)
|
||||
{
|
||||
session_base_t *s = NULL;
|
||||
switch (options_.type) {
|
||||
case ZMQ_REQ:
|
||||
s = new (std::nothrow)
|
||||
req_session_t (io_thread_, active_, socket_, options_, addr_);
|
||||
break;
|
||||
case ZMQ_RADIO:
|
||||
s = new (std::nothrow)
|
||||
radio_session_t (io_thread_, active_, socket_, options_, addr_);
|
||||
break;
|
||||
case ZMQ_DISH:
|
||||
s = new (std::nothrow)
|
||||
dish_session_t (io_thread_, active_, socket_, options_, addr_);
|
||||
break;
|
||||
case ZMQ_DEALER:
|
||||
case ZMQ_REP:
|
||||
case ZMQ_ROUTER:
|
||||
case ZMQ_PUB:
|
||||
case ZMQ_XPUB:
|
||||
case ZMQ_SUB:
|
||||
case ZMQ_XSUB:
|
||||
case ZMQ_PUSH:
|
||||
case ZMQ_PULL:
|
||||
case ZMQ_PAIR:
|
||||
case ZMQ_STREAM:
|
||||
case ZMQ_SERVER:
|
||||
case ZMQ_CLIENT:
|
||||
case ZMQ_GATHER:
|
||||
case ZMQ_SCATTER:
|
||||
case ZMQ_DGRAM:
|
||||
s = new (std::nothrow)
|
||||
session_base_t (io_thread_, active_, socket_, options_, addr_);
|
||||
break;
|
||||
default:
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
}
|
||||
alloc_assert (s);
|
||||
return s;
|
||||
}
|
||||
|
||||
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
||||
bool active_,
|
||||
class socket_base_t *socket_,
|
||||
const options_t &options_,
|
||||
address_t *addr_) :
|
||||
own_t (io_thread_, options_),
|
||||
io_object_t (io_thread_),
|
||||
active (active_),
|
||||
pipe (NULL),
|
||||
zap_pipe (NULL),
|
||||
incomplete_in (false),
|
||||
pending (false),
|
||||
engine (NULL),
|
||||
socket (socket_),
|
||||
io_thread (io_thread_),
|
||||
has_linger_timer (false),
|
||||
addr (addr_)
|
||||
{
|
||||
}
|
||||
|
||||
const char *zmq::session_base_t::get_endpoint () const
|
||||
{
|
||||
return engine->get_endpoint ();
|
||||
}
|
||||
|
||||
zmq::session_base_t::~session_base_t ()
|
||||
{
|
||||
zmq_assert (!pipe);
|
||||
zmq_assert (!zap_pipe);
|
||||
|
||||
// If there's still a pending linger timer, remove it.
|
||||
if (has_linger_timer) {
|
||||
cancel_timer (linger_timer_id);
|
||||
has_linger_timer = false;
|
||||
}
|
||||
|
||||
// Close the engine.
|
||||
if (engine)
|
||||
engine->terminate ();
|
||||
|
||||
LIBZMQ_DELETE (addr);
|
||||
}
|
||||
|
||||
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
|
||||
{
|
||||
zmq_assert (!is_terminating ());
|
||||
zmq_assert (!pipe);
|
||||
zmq_assert (pipe_);
|
||||
pipe = pipe_;
|
||||
pipe->set_event_sink (this);
|
||||
}
|
||||
|
||||
int zmq::session_base_t::pull_msg (msg_t *msg_)
|
||||
{
|
||||
if (!pipe || !pipe->read (msg_)) {
|
||||
errno = EAGAIN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
incomplete_in = (msg_->flags () & msg_t::more) != 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::session_base_t::push_msg (msg_t *msg_)
|
||||
{
|
||||
if (msg_->flags () & msg_t::command)
|
||||
return 0;
|
||||
if (pipe && pipe->write (msg_)) {
|
||||
int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
errno = EAGAIN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
|
||||
{
|
||||
if (zap_pipe == NULL) {
|
||||
errno = ENOTCONN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!zap_pipe->read (msg_)) {
|
||||
errno = EAGAIN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::session_base_t::write_zap_msg (msg_t *msg_)
|
||||
{
|
||||
if (zap_pipe == NULL || !zap_pipe->write (msg_)) {
|
||||
errno = ENOTCONN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((msg_->flags () & msg_t::more) == 0)
|
||||
zap_pipe->flush ();
|
||||
|
||||
const int rc = msg_->init ();
|
||||
errno_assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void zmq::session_base_t::reset ()
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::session_base_t::flush ()
|
||||
{
|
||||
if (pipe)
|
||||
pipe->flush ();
|
||||
}
|
||||
|
||||
void zmq::session_base_t::clean_pipes ()
|
||||
{
|
||||
zmq_assert (pipe != NULL);
|
||||
|
||||
// Get rid of half-processed messages in the out pipe. Flush any
|
||||
// unflushed messages upstream.
|
||||
pipe->rollback ();
|
||||
pipe->flush ();
|
||||
|
||||
// Remove any half-read message from the in pipe.
|
||||
while (incomplete_in) {
|
||||
msg_t msg;
|
||||
int rc = msg.init ();
|
||||
errno_assert (rc == 0);
|
||||
rc = pull_msg (&msg);
|
||||
errno_assert (rc == 0);
|
||||
rc = msg.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
// Drop the reference to the deallocated pipe if required.
|
||||
zmq_assert (pipe_ == pipe || pipe_ == zap_pipe
|
||||
|| terminating_pipes.count (pipe_) == 1);
|
||||
|
||||
if (pipe_ == pipe) {
|
||||
// If this is our current pipe, remove it
|
||||
pipe = NULL;
|
||||
if (has_linger_timer) {
|
||||
cancel_timer (linger_timer_id);
|
||||
has_linger_timer = false;
|
||||
}
|
||||
} else if (pipe_ == zap_pipe)
|
||||
zap_pipe = NULL;
|
||||
else
|
||||
// Remove the pipe from the detached pipes set
|
||||
terminating_pipes.erase (pipe_);
|
||||
|
||||
if (!is_terminating () && options.raw_socket) {
|
||||
if (engine) {
|
||||
engine->terminate ();
|
||||
engine = NULL;
|
||||
}
|
||||
terminate ();
|
||||
}
|
||||
|
||||
// If we are waiting for pending messages to be sent, at this point
|
||||
// we are sure that there will be no more messages and we can proceed
|
||||
// with termination safely.
|
||||
if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) {
|
||||
pending = false;
|
||||
own_t::process_term (0);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||
{
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) {
|
||||
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely (engine == NULL)) {
|
||||
pipe->check_read ();
|
||||
return;
|
||||
}
|
||||
|
||||
if (likely (pipe_ == pipe))
|
||||
engine->restart_output ();
|
||||
else {
|
||||
// i.e. pipe_ == zap_pipe
|
||||
engine->zap_msg_available ();
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
||||
{
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (pipe != pipe_) {
|
||||
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
||||
return;
|
||||
}
|
||||
|
||||
if (engine)
|
||||
engine->restart_input ();
|
||||
}
|
||||
|
||||
void zmq::session_base_t::hiccuped (pipe_t *)
|
||||
{
|
||||
// Hiccups are always sent from session to socket, not the other
|
||||
// way round.
|
||||
zmq_assert (false);
|
||||
}
|
||||
|
||||
zmq::socket_base_t *zmq::session_base_t::get_socket ()
|
||||
{
|
||||
return socket;
|
||||
}
|
||||
|
||||
void zmq::session_base_t::process_plug ()
|
||||
{
|
||||
if (active)
|
||||
start_connecting (false);
|
||||
}
|
||||
|
||||
// This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
|
||||
// is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
|
||||
// or it aborts on any other error. In other words, either ZAP is not
|
||||
// configured or if it is configured it MUST be configured correctly and it
|
||||
// MUST work, otherwise authentication cannot be guaranteed and it would be a
|
||||
// security flaw.
|
||||
int zmq::session_base_t::zap_connect ()
|
||||
{
|
||||
if (zap_pipe != NULL)
|
||||
return 0;
|
||||
|
||||
endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
|
||||
if (peer.socket == NULL) {
|
||||
errno = ECONNREFUSED;
|
||||
return -1;
|
||||
}
|
||||
zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
|
||||
|| peer.options.type == ZMQ_SERVER);
|
||||
|
||||
// Create a bi-directional pipe that will connect
|
||||
// session with zap socket.
|
||||
object_t *parents[2] = {this, peer.socket};
|
||||
pipe_t *new_pipes[2] = {NULL, NULL};
|
||||
int hwms[2] = {0, 0};
|
||||
bool conflates[2] = {false, false};
|
||||
int rc = pipepair (parents, new_pipes, hwms, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Attach local end of the pipe to this socket object.
|
||||
zap_pipe = new_pipes[0];
|
||||
zap_pipe->set_nodelay ();
|
||||
zap_pipe->set_event_sink (this);
|
||||
|
||||
send_bind (peer.socket, new_pipes[1], false);
|
||||
|
||||
// Send empty routing id if required by the peer.
|
||||
if (peer.options.recv_routing_id) {
|
||||
msg_t id;
|
||||
rc = id.init ();
|
||||
errno_assert (rc == 0);
|
||||
id.set_flags (msg_t::routing_id);
|
||||
bool ok = zap_pipe->write (&id);
|
||||
zmq_assert (ok);
|
||||
zap_pipe->flush ();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool zmq::session_base_t::zap_enabled ()
|
||||
{
|
||||
return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
|
||||
}
|
||||
|
||||
void zmq::session_base_t::process_attach (i_engine *engine_)
|
||||
{
|
||||
zmq_assert (engine_ != NULL);
|
||||
|
||||
// Create the pipe if it does not exist yet.
|
||||
if (!pipe && !is_terminating ()) {
|
||||
object_t *parents[2] = {this, socket};
|
||||
pipe_t *pipes[2] = {NULL, NULL};
|
||||
|
||||
bool conflate =
|
||||
options.conflate
|
||||
&& (options.type == ZMQ_DEALER || options.type == ZMQ_PULL
|
||||
|| options.type == ZMQ_PUSH || options.type == ZMQ_PUB
|
||||
|| options.type == ZMQ_SUB);
|
||||
|
||||
int hwms[2] = {conflate ? -1 : options.rcvhwm,
|
||||
conflate ? -1 : options.sndhwm};
|
||||
bool conflates[2] = {conflate, conflate};
|
||||
int rc = pipepair (parents, pipes, hwms, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Plug the local end of the pipe.
|
||||
pipes[0]->set_event_sink (this);
|
||||
|
||||
// Remember the local end of the pipe.
|
||||
zmq_assert (!pipe);
|
||||
pipe = pipes[0];
|
||||
|
||||
// Ask socket to plug into the remote end of the pipe.
|
||||
send_bind (socket, pipes[1]);
|
||||
}
|
||||
|
||||
// Plug in the engine.
|
||||
zmq_assert (!engine);
|
||||
engine = engine_;
|
||||
engine->plug (io_thread, this);
|
||||
}
|
||||
|
||||
void zmq::session_base_t::engine_error (
|
||||
zmq::stream_engine_t::error_reason_t reason_)
|
||||
{
|
||||
// Engine is dead. Let's forget about it.
|
||||
engine = NULL;
|
||||
|
||||
// Remove any half-done messages from the pipes.
|
||||
if (pipe)
|
||||
clean_pipes ();
|
||||
|
||||
zmq_assert (reason_ == stream_engine_t::connection_error
|
||||
|| reason_ == stream_engine_t::timeout_error
|
||||
|| reason_ == stream_engine_t::protocol_error);
|
||||
|
||||
switch (reason_) {
|
||||
case stream_engine_t::timeout_error:
|
||||
/* FALLTHROUGH */
|
||||
case stream_engine_t::connection_error:
|
||||
if (active) {
|
||||
reconnect ();
|
||||
break;
|
||||
}
|
||||
/* FALLTHROUGH */
|
||||
case stream_engine_t::protocol_error:
|
||||
if (pending) {
|
||||
if (pipe)
|
||||
pipe->terminate (false);
|
||||
if (zap_pipe)
|
||||
zap_pipe->terminate (false);
|
||||
} else {
|
||||
terminate ();
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Just in case there's only a delimiter in the pipe.
|
||||
if (pipe)
|
||||
pipe->check_read ();
|
||||
|
||||
if (zap_pipe)
|
||||
zap_pipe->check_read ();
|
||||
}
|
||||
|
||||
void zmq::session_base_t::process_term (int linger_)
|
||||
{
|
||||
zmq_assert (!pending);
|
||||
|
||||
// If the termination of the pipe happens before the term command is
|
||||
// delivered there's nothing much to do. We can proceed with the
|
||||
// standard termination immediately.
|
||||
if (!pipe && !zap_pipe && terminating_pipes.empty ()) {
|
||||
own_t::process_term (0);
|
||||
return;
|
||||
}
|
||||
|
||||
pending = true;
|
||||
|
||||
if (pipe != NULL) {
|
||||
// If there's finite linger value, delay the termination.
|
||||
// If linger is infinite (negative) we don't even have to set
|
||||
// the timer.
|
||||
if (linger_ > 0) {
|
||||
zmq_assert (!has_linger_timer);
|
||||
add_timer (linger_, linger_timer_id);
|
||||
has_linger_timer = true;
|
||||
}
|
||||
|
||||
// Start pipe termination process. Delay the termination till all messages
|
||||
// are processed in case the linger time is non-zero.
|
||||
pipe->terminate (linger_ != 0);
|
||||
|
||||
// TODO: Should this go into pipe_t::terminate ?
|
||||
// In case there's no engine and there's only delimiter in the
|
||||
// pipe it wouldn't be ever read. Thus we check for it explicitly.
|
||||
if (!engine)
|
||||
pipe->check_read ();
|
||||
}
|
||||
|
||||
if (zap_pipe != NULL)
|
||||
zap_pipe->terminate (false);
|
||||
}
|
||||
|
||||
void zmq::session_base_t::timer_event (int id_)
|
||||
{
|
||||
// Linger period expired. We can proceed with termination even though
|
||||
// there are still pending messages to be sent.
|
||||
zmq_assert (id_ == linger_timer_id);
|
||||
has_linger_timer = false;
|
||||
|
||||
// Ask pipe to terminate even though there may be pending messages in it.
|
||||
zmq_assert (pipe);
|
||||
pipe->terminate (false);
|
||||
}
|
||||
|
||||
void zmq::session_base_t::reconnect ()
|
||||
{
|
||||
// For delayed connect situations, terminate the pipe
|
||||
// and reestablish later on
|
||||
if (pipe && options.immediate == 1 && addr->protocol != "pgm"
|
||||
&& addr->protocol != "epgm" && addr->protocol != "norm"
|
||||
&& addr->protocol != "udp") {
|
||||
pipe->hiccup ();
|
||||
pipe->terminate (false);
|
||||
terminating_pipes.insert (pipe);
|
||||
pipe = NULL;
|
||||
|
||||
if (has_linger_timer) {
|
||||
cancel_timer (linger_timer_id);
|
||||
has_linger_timer = false;
|
||||
}
|
||||
}
|
||||
|
||||
reset ();
|
||||
|
||||
// Reconnect.
|
||||
if (options.reconnect_ivl != -1)
|
||||
start_connecting (true);
|
||||
else {
|
||||
std::string *ep = new (std::string);
|
||||
addr->to_string (*ep);
|
||||
send_term_endpoint (socket, ep);
|
||||
}
|
||||
|
||||
// For subscriber sockets we hiccup the inbound pipe, which will cause
|
||||
// the socket object to resend all the subscriptions.
|
||||
if (pipe
|
||||
&& (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
|
||||
|| options.type == ZMQ_DISH))
|
||||
pipe->hiccup ();
|
||||
}
|
||||
|
||||
void zmq::session_base_t::start_connecting (bool wait_)
|
||||
{
|
||||
zmq_assert (active);
|
||||
|
||||
// Choose I/O thread to run connecter in. Given that we are already
|
||||
// running in an I/O thread, there must be at least one available.
|
||||
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
||||
zmq_assert (io_thread);
|
||||
|
||||
// Create the connecter object.
|
||||
|
||||
if (addr->protocol == "tcp") {
|
||||
if (!options.socks_proxy_address.empty ()) {
|
||||
address_t *proxy_address = new (std::nothrow)
|
||||
address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
|
||||
alloc_assert (proxy_address);
|
||||
socks_connecter_t *connecter =
|
||||
new (std::nothrow) socks_connecter_t (io_thread, this, options,
|
||||
addr, proxy_address, wait_);
|
||||
alloc_assert (connecter);
|
||||
launch_child (connecter);
|
||||
} else {
|
||||
tcp_connecter_t *connecter = new (std::nothrow)
|
||||
tcp_connecter_t (io_thread, this, options, addr, wait_);
|
||||
alloc_assert (connecter);
|
||||
launch_child (connecter);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||
&& !defined ZMQ_HAVE_VXWORKS
|
||||
if (addr->protocol == "ipc") {
|
||||
ipc_connecter_t *connecter = new (std::nothrow)
|
||||
ipc_connecter_t (io_thread, this, options, addr, wait_);
|
||||
alloc_assert (connecter);
|
||||
launch_child (connecter);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
#if defined ZMQ_HAVE_TIPC
|
||||
if (addr->protocol == "tipc") {
|
||||
tipc_connecter_t *connecter = new (std::nothrow)
|
||||
tipc_connecter_t (io_thread, this, options, addr, wait_);
|
||||
alloc_assert (connecter);
|
||||
launch_child (connecter);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (addr->protocol == "udp") {
|
||||
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
|
||||
|| options.type == ZMQ_DGRAM);
|
||||
|
||||
udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
|
||||
alloc_assert (engine);
|
||||
|
||||
bool recv = false;
|
||||
bool send = false;
|
||||
|
||||
if (options.type == ZMQ_RADIO) {
|
||||
send = true;
|
||||
recv = false;
|
||||
} else if (options.type == ZMQ_DISH) {
|
||||
send = false;
|
||||
recv = true;
|
||||
} else if (options.type == ZMQ_DGRAM) {
|
||||
send = true;
|
||||
recv = true;
|
||||
}
|
||||
|
||||
int rc = engine->init (addr, send, recv);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
send_attach (this, engine);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
#ifdef ZMQ_HAVE_OPENPGM
|
||||
|
||||
// Both PGM and EPGM transports are using the same infrastructure.
|
||||
if (addr->protocol == "pgm" || addr->protocol == "epgm") {
|
||||
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|
||||
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
|
||||
|
||||
// For EPGM transport with UDP encapsulation of PGM is used.
|
||||
bool const udp_encapsulation = addr->protocol == "epgm";
|
||||
|
||||
// At this point we'll create message pipes to the session straight
|
||||
// away. There's no point in delaying it as no concept of 'connect'
|
||||
// exists with PGM anyway.
|
||||
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
||||
// PGM sender.
|
||||
pgm_sender_t *pgm_sender =
|
||||
new (std::nothrow) pgm_sender_t (io_thread, options);
|
||||
alloc_assert (pgm_sender);
|
||||
|
||||
int rc =
|
||||
pgm_sender->init (udp_encapsulation, addr->address.c_str ());
|
||||
errno_assert (rc == 0);
|
||||
|
||||
send_attach (this, pgm_sender);
|
||||
} else {
|
||||
// PGM receiver.
|
||||
pgm_receiver_t *pgm_receiver =
|
||||
new (std::nothrow) pgm_receiver_t (io_thread, options);
|
||||
alloc_assert (pgm_receiver);
|
||||
|
||||
int rc =
|
||||
pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
|
||||
errno_assert (rc == 0);
|
||||
|
||||
send_attach (this, pgm_receiver);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_HAVE_NORM
|
||||
if (addr->protocol == "norm") {
|
||||
// At this point we'll create message pipes to the session straight
|
||||
// away. There's no point in delaying it as no concept of 'connect'
|
||||
// exists with NORM anyway.
|
||||
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
|
||||
// NORM sender.
|
||||
norm_engine_t *norm_sender =
|
||||
new (std::nothrow) norm_engine_t (io_thread, options);
|
||||
alloc_assert (norm_sender);
|
||||
|
||||
int rc = norm_sender->init (addr->address.c_str (), true, false);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
send_attach (this, norm_sender);
|
||||
} else { // ZMQ_SUB or ZMQ_XSUB
|
||||
|
||||
// NORM receiver.
|
||||
norm_engine_t *norm_receiver =
|
||||
new (std::nothrow) norm_engine_t (io_thread, options);
|
||||
alloc_assert (norm_receiver);
|
||||
|
||||
int rc = norm_receiver->init (addr->address.c_str (), false, true);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
send_attach (this, norm_receiver);
|
||||
}
|
||||
return;
|
||||
}
|
||||
#endif // ZMQ_HAVE_NORM
|
||||
|
||||
#if defined ZMQ_HAVE_VMCI
|
||||
if (addr->protocol == "vmci") {
|
||||
vmci_connecter_t *connecter = new (std::nothrow)
|
||||
vmci_connecter_t (io_thread, this, options, addr, wait_);
|
||||
alloc_assert (connecter);
|
||||
launch_child (connecter);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
zmq_assert (false);
|
||||
}
|
@ -1,444 +0,0 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "precompiled.hpp"
|
||||
#include <new>
|
||||
#include <string>
|
||||
|
||||
#include "macros.hpp"
|
||||
#include "tcp_connecter.hpp"
|
||||
#include "stream_engine.hpp"
|
||||
#include "io_thread.hpp"
|
||||
#include "random.hpp"
|
||||
#include "err.hpp"
|
||||
#include "ip.hpp"
|
||||
#include "tcp.hpp"
|
||||
#include "address.hpp"
|
||||
#include "tcp_address.hpp"
|
||||
#include "session_base.hpp"
|
||||
|
||||
#if !defined ZMQ_HAVE_WINDOWS
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <netinet/tcp.h>
|
||||
#include <netinet/in.h>
|
||||
#include <netdb.h>
|
||||
#include <fcntl.h>
|
||||
#ifdef ZMQ_HAVE_VXWORKS
|
||||
#include <sockLib.h>
|
||||
#endif
|
||||
#ifdef ZMQ_HAVE_OPENVMS
|
||||
#include <ioctl.h>
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#ifdef __APPLE__
|
||||
#include <TargetConditionals.h>
|
||||
#endif
|
||||
|
||||
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
|
||||
class session_base_t *session_,
|
||||
const options_t &options_,
|
||||
address_t *addr_,
|
||||
bool delayed_start_) :
|
||||
own_t (io_thread_, options_),
|
||||
io_object_t (io_thread_),
|
||||
_addr (addr_),
|
||||
_s (retired_fd),
|
||||
_handle (static_cast<handle_t> (NULL)),
|
||||
_delayed_start (delayed_start_),
|
||||
_connect_timer_started (false),
|
||||
_reconnect_timer_started (false),
|
||||
_session (session_),
|
||||
_current_reconnect_ivl (options.reconnect_ivl),
|
||||
_socket (_session->get_socket ())
|
||||
{
|
||||
zmq_assert (_addr);
|
||||
zmq_assert (_addr->protocol == "tcp");
|
||||
_addr->to_string (_endpoint);
|
||||
// TODO the return value is unused! what if it fails? if this is impossible
|
||||
// or does not matter, change such that endpoint in initialized using an
|
||||
// initializer, and make endpoint const
|
||||
}
|
||||
|
||||
zmq::tcp_connecter_t::~tcp_connecter_t ()
|
||||
{
|
||||
zmq_assert (!_connect_timer_started);
|
||||
zmq_assert (!_reconnect_timer_started);
|
||||
zmq_assert (!_handle);
|
||||
zmq_assert (_s == retired_fd);
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::process_plug ()
|
||||
{
|
||||
if (_delayed_start)
|
||||
add_reconnect_timer ();
|
||||
else
|
||||
start_connecting ();
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::process_term (int linger_)
|
||||
{
|
||||
if (_connect_timer_started) {
|
||||
cancel_timer (connect_timer_id);
|
||||
_connect_timer_started = false;
|
||||
}
|
||||
|
||||
if (_reconnect_timer_started) {
|
||||
cancel_timer (reconnect_timer_id);
|
||||
_reconnect_timer_started = false;
|
||||
}
|
||||
|
||||
if (_handle) {
|
||||
rm_handle ();
|
||||
}
|
||||
|
||||
if (_s != retired_fd)
|
||||
close ();
|
||||
|
||||
own_t::process_term (linger_);
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::in_event ()
|
||||
{
|
||||
// We are not polling for incoming data, so we are actually called
|
||||
// because of error here. However, we can get error on out event as well
|
||||
// on some platforms, so we'll simply handle both events in the same way.
|
||||
out_event ();
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::out_event ()
|
||||
{
|
||||
if (_connect_timer_started) {
|
||||
cancel_timer (connect_timer_id);
|
||||
_connect_timer_started = false;
|
||||
}
|
||||
|
||||
rm_handle ();
|
||||
|
||||
const fd_t fd = connect ();
|
||||
|
||||
// Handle the error condition by attempt to reconnect.
|
||||
if (fd == retired_fd || !tune_socket (fd)) {
|
||||
close ();
|
||||
add_reconnect_timer ();
|
||||
return;
|
||||
}
|
||||
|
||||
// Create the engine object for this connection.
|
||||
stream_engine_t *engine =
|
||||
new (std::nothrow) stream_engine_t (fd, options, _endpoint);
|
||||
alloc_assert (engine);
|
||||
|
||||
// Attach the engine to the corresponding session object.
|
||||
send_attach (_session, engine);
|
||||
|
||||
// Shut the connecter down.
|
||||
terminate ();
|
||||
|
||||
_socket->event_connected (_endpoint, fd);
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::rm_handle ()
|
||||
{
|
||||
rm_fd (_handle);
|
||||
_handle = static_cast<handle_t> (NULL);
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::timer_event (int id_)
|
||||
{
|
||||
zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
|
||||
if (id_ == connect_timer_id) {
|
||||
_connect_timer_started = false;
|
||||
rm_handle ();
|
||||
close ();
|
||||
add_reconnect_timer ();
|
||||
} else if (id_ == reconnect_timer_id) {
|
||||
_reconnect_timer_started = false;
|
||||
start_connecting ();
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::start_connecting ()
|
||||
{
|
||||
// Open the connecting socket.
|
||||
const int rc = open ();
|
||||
|
||||
// Connect may succeed in synchronous manner.
|
||||
if (rc == 0) {
|
||||
_handle = add_fd (_s);
|
||||
out_event ();
|
||||
}
|
||||
|
||||
// Connection establishment may be delayed. Poll for its completion.
|
||||
else if (rc == -1 && errno == EINPROGRESS) {
|
||||
_handle = add_fd (_s);
|
||||
set_pollout (_handle);
|
||||
_socket->event_connect_delayed (_endpoint, zmq_errno ());
|
||||
|
||||
// add userspace connect timeout
|
||||
add_connect_timer ();
|
||||
}
|
||||
|
||||
// Handle any other error condition by eventual reconnect.
|
||||
else {
|
||||
if (_s != retired_fd)
|
||||
close ();
|
||||
add_reconnect_timer ();
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::add_connect_timer ()
|
||||
{
|
||||
if (options.connect_timeout > 0) {
|
||||
add_timer (options.connect_timeout, connect_timer_id);
|
||||
_connect_timer_started = true;
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::add_reconnect_timer ()
|
||||
{
|
||||
const int interval = get_new_reconnect_ivl ();
|
||||
add_timer (interval, reconnect_timer_id);
|
||||
_socket->event_connect_retried (_endpoint, interval);
|
||||
_reconnect_timer_started = true;
|
||||
}
|
||||
|
||||
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
|
||||
{
|
||||
// The new interval is the current interval + random value.
|
||||
const int interval =
|
||||
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
|
||||
|
||||
// Only change the current reconnect interval if the maximum reconnect
|
||||
// interval was set and if it's larger than the reconnect interval.
|
||||
if (options.reconnect_ivl_max > 0
|
||||
&& options.reconnect_ivl_max > options.reconnect_ivl)
|
||||
// Calculate the next interval
|
||||
_current_reconnect_ivl =
|
||||
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
|
||||
return interval;
|
||||
}
|
||||
|
||||
int zmq::tcp_connecter_t::open ()
|
||||
{
|
||||
zmq_assert (_s == retired_fd);
|
||||
|
||||
// Resolve the address
|
||||
if (_addr->resolved.tcp_addr != NULL) {
|
||||
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
|
||||
}
|
||||
|
||||
_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
||||
alloc_assert (_addr->resolved.tcp_addr);
|
||||
int rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false,
|
||||
options.ipv6);
|
||||
if (rc != 0) {
|
||||
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
|
||||
return -1;
|
||||
}
|
||||
zmq_assert (_addr->resolved.tcp_addr != NULL);
|
||||
const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
|
||||
|
||||
// Create the socket.
|
||||
_s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
||||
|
||||
// IPv6 address family not supported, try automatic downgrade to IPv4.
|
||||
if (_s == zmq::retired_fd && tcp_addr->family () == AF_INET6
|
||||
&& errno == EAFNOSUPPORT && options.ipv6) {
|
||||
rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false,
|
||||
false);
|
||||
if (rc != 0) {
|
||||
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
|
||||
return -1;
|
||||
}
|
||||
_s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
||||
}
|
||||
|
||||
if (_s == retired_fd) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
|
||||
// Switch it on in such cases.
|
||||
if (tcp_addr->family () == AF_INET6)
|
||||
enable_ipv4_mapping (_s);
|
||||
|
||||
// Set the IP Type-Of-Service priority for this socket
|
||||
if (options.tos != 0)
|
||||
set_ip_type_of_service (_s, options.tos);
|
||||
|
||||
// Bind the socket to a device if applicable
|
||||
if (!options.bound_device.empty ())
|
||||
bind_to_device (_s, options.bound_device);
|
||||
|
||||
// Set the socket to non-blocking mode so that we get async connect().
|
||||
unblock_socket (_s);
|
||||
|
||||
// Set the socket to loopback fastpath if configured.
|
||||
if (options.loopback_fastpath)
|
||||
tcp_tune_loopback_fast_path (_s);
|
||||
|
||||
// Set the socket buffer limits for the underlying socket.
|
||||
if (options.sndbuf >= 0)
|
||||
set_tcp_send_buffer (_s, options.sndbuf);
|
||||
if (options.rcvbuf >= 0)
|
||||
set_tcp_receive_buffer (_s, options.rcvbuf);
|
||||
|
||||
// Set the IP Type-Of-Service for the underlying socket
|
||||
if (options.tos != 0)
|
||||
set_ip_type_of_service (_s, options.tos);
|
||||
|
||||
// Set a source address for conversations
|
||||
if (tcp_addr->has_src_addr ()) {
|
||||
// Allow reusing of the address, to connect to different servers
|
||||
// using the same source port on the client.
|
||||
int flag = 1;
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR,
|
||||
reinterpret_cast<const char *> (&flag), sizeof (int));
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#elif defined ZMQ_HAVE_VXWORKS
|
||||
rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
|
||||
sizeof (int));
|
||||
errno_assert (rc == 0);
|
||||
#else
|
||||
rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
|
||||
#if defined ZMQ_HAVE_VXWORKS
|
||||
rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
|
||||
tcp_addr->src_addrlen ());
|
||||
#else
|
||||
rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
|
||||
#endif
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Connect to the remote peer.
|
||||
#if defined ZMQ_HAVE_VXWORKS
|
||||
rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
|
||||
#else
|
||||
rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
|
||||
#endif
|
||||
// Connect was successful immediately.
|
||||
if (rc == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Translate error codes indicating asynchronous connect has been
|
||||
// launched to a uniform EINPROGRESS.
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
const int last_error = WSAGetLastError ();
|
||||
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
|
||||
errno = EINPROGRESS;
|
||||
else
|
||||
errno = wsa_error_to_errno (last_error);
|
||||
#else
|
||||
if (errno == EINTR)
|
||||
errno = EINPROGRESS;
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
zmq::fd_t zmq::tcp_connecter_t::connect ()
|
||||
{
|
||||
// Async connect has finished. Check whether an error occurred
|
||||
int err = 0;
|
||||
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
|
||||
int len = sizeof err;
|
||||
#else
|
||||
socklen_t len = sizeof err;
|
||||
#endif
|
||||
|
||||
const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
|
||||
reinterpret_cast<char *> (&err), &len);
|
||||
|
||||
// Assert if the error was caused by 0MQ bug.
|
||||
// Networking problems are OK. No need to assert.
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
zmq_assert (rc == 0);
|
||||
if (err != 0) {
|
||||
if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
|
||||
|| err == WSAENOBUFS) {
|
||||
wsa_assert_no (err);
|
||||
}
|
||||
return retired_fd;
|
||||
}
|
||||
#else
|
||||
// Following code should handle both Berkeley-derived socket
|
||||
// implementations and Solaris.
|
||||
if (rc == -1)
|
||||
err = errno;
|
||||
if (err != 0) {
|
||||
errno = err;
|
||||
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
|
||||
errno_assert (errno != EBADF && errno != ENOPROTOOPT
|
||||
&& errno != ENOTSOCK && errno != ENOBUFS);
|
||||
#else
|
||||
errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
|
||||
&& errno != ENOBUFS);
|
||||
#endif
|
||||
return retired_fd;
|
||||
}
|
||||
#endif
|
||||
|
||||
// Return the newly connected socket.
|
||||
const fd_t result = _s;
|
||||
_s = retired_fd;
|
||||
return result;
|
||||
}
|
||||
|
||||
bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
|
||||
{
|
||||
const int rc = tune_tcp_socket (fd_)
|
||||
| tune_tcp_keepalives (
|
||||
fd_, options.tcp_keepalive, options.tcp_keepalive_cnt,
|
||||
options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
|
||||
| tune_tcp_maxrt (fd_, options.tcp_maxrt);
|
||||
return rc == 0;
|
||||
}
|
||||
|
||||
void zmq::tcp_connecter_t::close ()
|
||||
{
|
||||
zmq_assert (_s != retired_fd);
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
const int rc = closesocket (_s);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
const int rc = ::close (_s);
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
_socket->event_closed (_endpoint, _s);
|
||||
_s = retired_fd;
|
||||
}
|
Loading…
Reference in New Issue
Block a user