From be8e7de6be6631609be063273c78e97ed3e37172 Mon Sep 17 00:00:00 2001 From: Simon Giesecke Date: Wed, 30 May 2018 09:30:00 +0200 Subject: [PATCH] Problem: temporary files in repo Solution: remove them --- src/router.cpp~RF40cad05.TMP | 549 ---------------------- src/session_base.cpp~RF4069b78.TMP | 711 ----------------------------- src/tcp_connecter.cpp.orig | 444 ------------------ 3 files changed, 1704 deletions(-) delete mode 100644 src/router.cpp~RF40cad05.TMP delete mode 100644 src/session_base.cpp~RF4069b78.TMP delete mode 100644 src/tcp_connecter.cpp.orig diff --git a/src/router.cpp~RF40cad05.TMP b/src/router.cpp~RF40cad05.TMP deleted file mode 100644 index 53d868f5..00000000 --- a/src/router.cpp~RF40cad05.TMP +++ /dev/null @@ -1,549 +0,0 @@ -/* - Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "precompiled.hpp" -#include "macros.hpp" -#include "router.hpp" -#include "pipe.hpp" -#include "wire.hpp" -#include "random.hpp" -#include "likely.hpp" -#include "err.hpp" - -zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_), - prefetched (false), - routing_id_sent (false), - current_in (NULL), - terminate_current_in (false), - more_in (false), - current_out (NULL), - more_out (false), - next_integral_routing_id (generate_random ()), - mandatory (false), - // raw_socket functionality in ROUTER is deprecated - raw_socket (false), - probe_router (false), - handover (false) -{ - options.type = ZMQ_ROUTER; - options.recv_routing_id = true; - options.raw_socket = false; - - prefetched_id.init (); - prefetched_msg.init (); -} - -zmq::router_t::~router_t () -{ - zmq_assert (anonymous_pipes.empty ()); - ; - zmq_assert (outpipes.empty ()); - prefetched_id.close (); - prefetched_msg.close (); -} - -void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) -{ - LIBZMQ_UNUSED (subscribe_to_all_); - - zmq_assert (pipe_); - - if (probe_router) { - msg_t probe_msg; - int rc = probe_msg.init (); - errno_assert (rc == 0); - - rc = pipe_->write (&probe_msg); - // zmq_assert (rc) is not applicable here, since it is not a bug. - pipe_->flush (); - - rc = probe_msg.close (); - errno_assert (rc == 0); - } - - bool routing_id_ok = identify_peer (pipe_); - if (routing_id_ok) - fq.attach (pipe_); - else - anonymous_pipes.insert (pipe_); -} - -int zmq::router_t::xsetsockopt (int option_, - const void *optval_, - size_t optvallen_) -{ - bool is_int = (optvallen_ == sizeof (int)); - int value = 0; - if (is_int) - memcpy (&value, optval_, sizeof (int)); - - switch (option_) { - case ZMQ_CONNECT_ROUTING_ID: - // TODO why isn't it possible to set an empty connect_routing_id - // (which is the default value) - if (optval_ && optvallen_) { - connect_routing_id.assign ((char *) optval_, optvallen_); - return 0; - } - break; - - case ZMQ_ROUTER_RAW: - if (is_int && value >= 0) { - raw_socket = (value != 0); - if (raw_socket) { - options.recv_routing_id = false; - options.raw_socket = true; - } - return 0; - } - break; - - case ZMQ_ROUTER_MANDATORY: - if (is_int && value >= 0) { - mandatory = (value != 0); - return 0; - } - break; - - case ZMQ_PROBE_ROUTER: - if (is_int && value >= 0) { - probe_router = (value != 0); - return 0; - } - break; - - case ZMQ_ROUTER_HANDOVER: - if (is_int && value >= 0) { - handover = (value != 0); - return 0; - } - break; - - default: - break; - } - errno = EINVAL; - return -1; -} - - -void zmq::router_t::xpipe_terminated (pipe_t *pipe_) -{ - std::set::iterator it = anonymous_pipes.find (pipe_); - if (it != anonymous_pipes.end ()) - anonymous_pipes.erase (it); - else { - outpipes_t::iterator iter = outpipes.find (pipe_->get_routing_id ()); - zmq_assert (iter != outpipes.end ()); - outpipes.erase (iter); - fq.pipe_terminated (pipe_); - pipe_->rollback (); - if (pipe_ == current_out) - current_out = NULL; - } -} - -void zmq::router_t::xread_activated (pipe_t *pipe_) -{ - std::set::iterator it = anonymous_pipes.find (pipe_); - if (it == anonymous_pipes.end ()) - fq.activated (pipe_); - else { - bool routing_id_ok = identify_peer (pipe_); - if (routing_id_ok) { - anonymous_pipes.erase (it); - fq.attach (pipe_); - } - } -} - -void zmq::router_t::xwrite_activated (pipe_t *pipe_) -{ - outpipes_t::iterator it; - for (it = outpipes.begin (); it != outpipes.end (); ++it) - if (it->second.pipe == pipe_) - break; - - zmq_assert (it != outpipes.end ()); - zmq_assert (!it->second.active); - it->second.active = true; -} - -int zmq::router_t::xsend (msg_t *msg_) -{ - // If this is the first part of the message it's the ID of the - // peer to send the message to. - if (!more_out) { - zmq_assert (!current_out); - - // If we have malformed message (prefix with no subsequent message) - // then just silently ignore it. - // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::more) { - more_out = true; - - // Find the pipe associated with the routing id stored in the prefix. - // If there's no such pipe just silently ignore the message, unless - // router_mandatory is set. - blob_t routing_id (static_cast (msg_->data ()), - msg_->size (), zmq::reference_tag_t ()); - outpipes_t::iterator it = outpipes.find (routing_id); - - if (it != outpipes.end ()) { - current_out = it->second.pipe; - - // Check whether pipe is closed or not - if (!current_out->check_write ()) { - // Check whether pipe is full or not - bool pipe_full = !current_out->check_hwm (); - it->second.active = false; - current_out = NULL; - - if (mandatory) { - more_out = false; - if (pipe_full) - errno = EAGAIN; - else - errno = EHOSTUNREACH; - return -1; - } - } - } else if (mandatory) { - more_out = false; - errno = EHOSTUNREACH; - return -1; - } - } - - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - return 0; - } - - // Ignore the MORE flag for raw-sock or assert? - if (options.raw_socket) - msg_->reset_flags (msg_t::more); - - // Check whether this is the last part of the message. - more_out = (msg_->flags () & msg_t::more) != 0; - - // Push the message into the pipe. If there's no out pipe, just drop it. - if (current_out) { - // Close the remote connection if user has asked to do so - // by sending zero length message. - // Pending messages in the pipe will be dropped (on receiving term- ack) - if (raw_socket && msg_->size () == 0) { - current_out->terminate (false); - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - current_out = NULL; - return 0; - } - - bool ok = current_out->write (msg_); - if (unlikely (!ok)) { - // Message failed to send - we must close it ourselves. - int rc = msg_->close (); - errno_assert (rc == 0); - // HWM was checked before, so the pipe must be gone. Roll back - // messages that were piped, for example REP labels. - current_out->rollback (); - current_out = NULL; - } else { - if (!more_out) { - current_out->flush (); - current_out = NULL; - } - } - } else { - int rc = msg_->close (); - errno_assert (rc == 0); - } - - // Detach the message from the data buffer. - int rc = msg_->init (); - errno_assert (rc == 0); - - return 0; -} - -int zmq::router_t::xrecv (msg_t *msg_) -{ - if (prefetched) { - if (!routing_id_sent) { - int rc = msg_->move (prefetched_id); - errno_assert (rc == 0); - routing_id_sent = true; - } else { - int rc = msg_->move (prefetched_msg); - errno_assert (rc == 0); - prefetched = false; - } - more_in = (msg_->flags () & msg_t::more) != 0; - - if (!more_in) { - if (terminate_current_in) { - current_in->terminate (true); - terminate_current_in = false; - } - current_in = NULL; - } - return 0; - } - - pipe_t *pipe = NULL; - int rc = fq.recvpipe (msg_, &pipe); - - // It's possible that we receive peer's routing id. That happens - // after reconnection. The current implementation assumes that - // the peer always uses the same routing id. - while (rc == 0 && msg_->is_routing_id ()) - rc = fq.recvpipe (msg_, &pipe); - - if (rc != 0) - return -1; - - zmq_assert (pipe != NULL); - - // If we are in the middle of reading a message, just return the next part. - if (more_in) { - more_in = (msg_->flags () & msg_t::more) != 0; - - if (!more_in) { - if (terminate_current_in) { - current_in->terminate (true); - terminate_current_in = false; - } - current_in = NULL; - } - } else { - // We are at the beginning of a message. - // Keep the message part we have in the prefetch buffer - // and return the ID of the peer instead. - rc = prefetched_msg.move (*msg_); - errno_assert (rc == 0); - prefetched = true; - current_in = pipe; - - const blob_t &routing_id = pipe->get_routing_id (); - rc = msg_->init_size (routing_id.size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), routing_id.data (), routing_id.size ()); - msg_->set_flags (msg_t::more); - if (prefetched_msg.metadata ()) - msg_->set_metadata (prefetched_msg.metadata ()); - routing_id_sent = true; - } - - return 0; -} - -int zmq::router_t::rollback () -{ - if (current_out) { - current_out->rollback (); - current_out = NULL; - more_out = false; - } - return 0; -} - -bool zmq::router_t::xhas_in () -{ - // If we are in the middle of reading the messages, there are - // definitely more parts available. - if (more_in) - return true; - - // We may already have a message pre-fetched. - if (prefetched) - return true; - - // Try to read the next message. - // The message, if read, is kept in the pre-fetch buffer. - pipe_t *pipe = NULL; - int rc = fq.recvpipe (&prefetched_msg, &pipe); - - // It's possible that we receive peer's routing id. That happens - // after reconnection. The current implementation assumes that - // the peer always uses the same routing id. - // TODO: handle the situation when the peer changes its routing id. - while (rc == 0 && prefetched_msg.is_routing_id ()) - rc = fq.recvpipe (&prefetched_msg, &pipe); - - if (rc != 0) - return false; - - zmq_assert (pipe != NULL); - - const blob_t &routing_id = pipe->get_routing_id (); - rc = prefetched_id.init_size (routing_id.size ()); - errno_assert (rc == 0); - memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ()); - prefetched_id.set_flags (msg_t::more); - - prefetched = true; - routing_id_sent = false; - current_in = pipe; - - return true; -} - -bool zmq::router_t::xhas_out () -{ - // In theory, ROUTER socket is always ready for writing (except when - // MANDATORY is set). Whether actual attempt to write succeeds depends - // on whitch pipe the message is going to be routed to. - - if (!mandatory) - return true; - - bool has_out = false; - outpipes_t::iterator it; - for (it = outpipes.begin (); it != outpipes.end (); ++it) - has_out |= it->second.pipe->check_hwm (); - - return has_out; -} - -const zmq::blob_t &zmq::router_t::get_credential () const -{ - return fq.get_credential (); -} - -int zmq::router_t::get_peer_state (const void *routing_id_, - size_t routing_id_size_) const -{ - int res = 0; - - blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_); - outpipes_t::const_iterator it = outpipes.find (routing_id_blob); - if (it == outpipes.end ()) { - errno = EHOSTUNREACH; - return -1; - } - - const outpipe_t &outpipe = it->second; - if (outpipe.pipe->check_hwm ()) - res |= ZMQ_POLLOUT; - - /** \todo does it make any sense to check the inpipe as well? */ - - return res; -} - -bool zmq::router_t::identify_peer (pipe_t *pipe_) -{ - msg_t msg; - bool ok; - blob_t routing_id; - - if (connect_routing_id.length ()) { - routing_id.set ((unsigned char *) connect_routing_id.c_str (), - connect_routing_id.length ()); - connect_routing_id.clear (); - outpipes_t::iterator it = outpipes.find (routing_id); - if (it != outpipes.end ()) - zmq_assert (false); // Not allowed to duplicate an existing rid - } else if ( - options - .raw_socket) { // Always assign an integral routing id for raw-socket - unsigned char buf[5]; - buf[0] = 0; - put_uint32 (buf + 1, next_integral_routing_id++); - routing_id.set (buf, sizeof buf); - } else if (!options.raw_socket) { - // Pick up handshake cases and also case where next integral routing id is set - msg.init (); - ok = pipe_->read (&msg); - if (!ok) - return false; - - if (msg.size () == 0) { - // Fall back on the auto-generation - unsigned char buf[5]; - buf[0] = 0; - put_uint32 (buf + 1, next_integral_routing_id++); - routing_id.set (buf, sizeof buf); - msg.close (); - } else { - routing_id.set (static_cast (msg.data ()), - msg.size ()); - outpipes_t::iterator it = outpipes.find (routing_id); - msg.close (); - - if (it != outpipes.end ()) { - if (!handover) - // Ignore peers with duplicate ID - return false; - - // We will allow the new connection to take over this - // routing id. Temporarily assign a new routing id to the - // existing pipe so we can terminate it asynchronously. - unsigned char buf[5]; - buf[0] = 0; - put_uint32 (buf + 1, next_integral_routing_id++); - blob_t new_routing_id (buf, sizeof buf); - - it->second.pipe->set_router_socket_routing_id (new_routing_id); - outpipe_t existing_outpipe = {it->second.pipe, - it->second.active}; - - ok = outpipes - .ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (new_routing_id), - existing_outpipe) - .second; - zmq_assert (ok); - - // Remove the existing routing id entry to allow the new - // connection to take the routing id. - outpipes.erase (it); - - if (existing_outpipe.pipe == current_in) - terminate_current_in = true; - else - existing_outpipe.pipe->terminate (true); - } - } - } - - pipe_->set_router_socket_routing_id (routing_id); - // Add the record into output pipes lookup table - outpipe_t outpipe = {pipe_, true}; - ok = outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) - .second; - zmq_assert (ok); - - return true; -} diff --git a/src/session_base.cpp~RF4069b78.TMP b/src/session_base.cpp~RF4069b78.TMP deleted file mode 100644 index ecf3aa2c..00000000 --- a/src/session_base.cpp~RF4069b78.TMP +++ /dev/null @@ -1,711 +0,0 @@ -/* - Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "precompiled.hpp" -#include "macros.hpp" -#include "session_base.hpp" -#include "i_engine.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "likely.hpp" -#include "tcp_connecter.hpp" -#include "ipc_connecter.hpp" -#include "tipc_connecter.hpp" -#include "socks_connecter.hpp" -#include "vmci_connecter.hpp" -#include "pgm_sender.hpp" -#include "pgm_receiver.hpp" -#include "address.hpp" -#include "norm_engine.hpp" -#include "udp_engine.hpp" - -#include "ctx.hpp" -#include "req.hpp" -#include "radio.hpp" -#include "dish.hpp" - -zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, - bool active_, - class socket_base_t *socket_, - const options_t &options_, - address_t *addr_) -{ - session_base_t *s = NULL; - switch (options_.type) { - case ZMQ_REQ: - s = new (std::nothrow) - req_session_t (io_thread_, active_, socket_, options_, addr_); - break; - case ZMQ_RADIO: - s = new (std::nothrow) - radio_session_t (io_thread_, active_, socket_, options_, addr_); - break; - case ZMQ_DISH: - s = new (std::nothrow) - dish_session_t (io_thread_, active_, socket_, options_, addr_); - break; - case ZMQ_DEALER: - case ZMQ_REP: - case ZMQ_ROUTER: - case ZMQ_PUB: - case ZMQ_XPUB: - case ZMQ_SUB: - case ZMQ_XSUB: - case ZMQ_PUSH: - case ZMQ_PULL: - case ZMQ_PAIR: - case ZMQ_STREAM: - case ZMQ_SERVER: - case ZMQ_CLIENT: - case ZMQ_GATHER: - case ZMQ_SCATTER: - case ZMQ_DGRAM: - s = new (std::nothrow) - session_base_t (io_thread_, active_, socket_, options_, addr_); - break; - default: - errno = EINVAL; - return NULL; - } - alloc_assert (s); - return s; -} - -zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, - bool active_, - class socket_base_t *socket_, - const options_t &options_, - address_t *addr_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - active (active_), - pipe (NULL), - zap_pipe (NULL), - incomplete_in (false), - pending (false), - engine (NULL), - socket (socket_), - io_thread (io_thread_), - has_linger_timer (false), - addr (addr_) -{ -} - -const char *zmq::session_base_t::get_endpoint () const -{ - return engine->get_endpoint (); -} - -zmq::session_base_t::~session_base_t () -{ - zmq_assert (!pipe); - zmq_assert (!zap_pipe); - - // If there's still a pending linger timer, remove it. - if (has_linger_timer) { - cancel_timer (linger_timer_id); - has_linger_timer = false; - } - - // Close the engine. - if (engine) - engine->terminate (); - - LIBZMQ_DELETE (addr); -} - -void zmq::session_base_t::attach_pipe (pipe_t *pipe_) -{ - zmq_assert (!is_terminating ()); - zmq_assert (!pipe); - zmq_assert (pipe_); - pipe = pipe_; - pipe->set_event_sink (this); -} - -int zmq::session_base_t::pull_msg (msg_t *msg_) -{ - if (!pipe || !pipe->read (msg_)) { - errno = EAGAIN; - return -1; - } - - incomplete_in = (msg_->flags () & msg_t::more) != 0; - - return 0; -} - -int zmq::session_base_t::push_msg (msg_t *msg_) -{ - if (msg_->flags () & msg_t::command) - return 0; - if (pipe && pipe->write (msg_)) { - int rc = msg_->init (); - errno_assert (rc == 0); - return 0; - } - - errno = EAGAIN; - return -1; -} - -int zmq::session_base_t::read_zap_msg (msg_t *msg_) -{ - if (zap_pipe == NULL) { - errno = ENOTCONN; - return -1; - } - - if (!zap_pipe->read (msg_)) { - errno = EAGAIN; - return -1; - } - - return 0; -} - -int zmq::session_base_t::write_zap_msg (msg_t *msg_) -{ - if (zap_pipe == NULL || !zap_pipe->write (msg_)) { - errno = ENOTCONN; - return -1; - } - - if ((msg_->flags () & msg_t::more) == 0) - zap_pipe->flush (); - - const int rc = msg_->init (); - errno_assert (rc == 0); - return 0; -} - -void zmq::session_base_t::reset () -{ -} - -void zmq::session_base_t::flush () -{ - if (pipe) - pipe->flush (); -} - -void zmq::session_base_t::clean_pipes () -{ - zmq_assert (pipe != NULL); - - // Get rid of half-processed messages in the out pipe. Flush any - // unflushed messages upstream. - pipe->rollback (); - pipe->flush (); - - // Remove any half-read message from the in pipe. - while (incomplete_in) { - msg_t msg; - int rc = msg.init (); - errno_assert (rc == 0); - rc = pull_msg (&msg); - errno_assert (rc == 0); - rc = msg.close (); - errno_assert (rc == 0); - } -} - -void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) -{ - // Drop the reference to the deallocated pipe if required. - zmq_assert (pipe_ == pipe || pipe_ == zap_pipe - || terminating_pipes.count (pipe_) == 1); - - if (pipe_ == pipe) { - // If this is our current pipe, remove it - pipe = NULL; - if (has_linger_timer) { - cancel_timer (linger_timer_id); - has_linger_timer = false; - } - } else if (pipe_ == zap_pipe) - zap_pipe = NULL; - else - // Remove the pipe from the detached pipes set - terminating_pipes.erase (pipe_); - - if (!is_terminating () && options.raw_socket) { - if (engine) { - engine->terminate (); - engine = NULL; - } - terminate (); - } - - // If we are waiting for pending messages to be sent, at this point - // we are sure that there will be no more messages and we can proceed - // with termination safely. - if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) { - pending = false; - own_t::process_term (0); - } -} - -void zmq::session_base_t::read_activated (pipe_t *pipe_) -{ - // Skip activating if we're detaching this pipe - if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) { - zmq_assert (terminating_pipes.count (pipe_) == 1); - return; - } - - if (unlikely (engine == NULL)) { - pipe->check_read (); - return; - } - - if (likely (pipe_ == pipe)) - engine->restart_output (); - else { - // i.e. pipe_ == zap_pipe - engine->zap_msg_available (); - } -} - -void zmq::session_base_t::write_activated (pipe_t *pipe_) -{ - // Skip activating if we're detaching this pipe - if (pipe != pipe_) { - zmq_assert (terminating_pipes.count (pipe_) == 1); - return; - } - - if (engine) - engine->restart_input (); -} - -void zmq::session_base_t::hiccuped (pipe_t *) -{ - // Hiccups are always sent from session to socket, not the other - // way round. - zmq_assert (false); -} - -zmq::socket_base_t *zmq::session_base_t::get_socket () -{ - return socket; -} - -void zmq::session_base_t::process_plug () -{ - if (active) - start_connecting (false); -} - -// This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP -// is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context) -// or it aborts on any other error. In other words, either ZAP is not -// configured or if it is configured it MUST be configured correctly and it -// MUST work, otherwise authentication cannot be guaranteed and it would be a -// security flaw. -int zmq::session_base_t::zap_connect () -{ - if (zap_pipe != NULL) - return 0; - - endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01"); - if (peer.socket == NULL) { - errno = ECONNREFUSED; - return -1; - } - zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER - || peer.options.type == ZMQ_SERVER); - - // Create a bi-directional pipe that will connect - // session with zap socket. - object_t *parents[2] = {this, peer.socket}; - pipe_t *new_pipes[2] = {NULL, NULL}; - int hwms[2] = {0, 0}; - bool conflates[2] = {false, false}; - int rc = pipepair (parents, new_pipes, hwms, conflates); - errno_assert (rc == 0); - - // Attach local end of the pipe to this socket object. - zap_pipe = new_pipes[0]; - zap_pipe->set_nodelay (); - zap_pipe->set_event_sink (this); - - send_bind (peer.socket, new_pipes[1], false); - - // Send empty routing id if required by the peer. - if (peer.options.recv_routing_id) { - msg_t id; - rc = id.init (); - errno_assert (rc == 0); - id.set_flags (msg_t::routing_id); - bool ok = zap_pipe->write (&id); - zmq_assert (ok); - zap_pipe->flush (); - } - - return 0; -} - -bool zmq::session_base_t::zap_enabled () -{ - return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ()); -} - -void zmq::session_base_t::process_attach (i_engine *engine_) -{ - zmq_assert (engine_ != NULL); - - // Create the pipe if it does not exist yet. - if (!pipe && !is_terminating ()) { - object_t *parents[2] = {this, socket}; - pipe_t *pipes[2] = {NULL, NULL}; - - bool conflate = - options.conflate - && (options.type == ZMQ_DEALER || options.type == ZMQ_PULL - || options.type == ZMQ_PUSH || options.type == ZMQ_PUB - || options.type == ZMQ_SUB); - - int hwms[2] = {conflate ? -1 : options.rcvhwm, - conflate ? -1 : options.sndhwm}; - bool conflates[2] = {conflate, conflate}; - int rc = pipepair (parents, pipes, hwms, conflates); - errno_assert (rc == 0); - - // Plug the local end of the pipe. - pipes[0]->set_event_sink (this); - - // Remember the local end of the pipe. - zmq_assert (!pipe); - pipe = pipes[0]; - - // Ask socket to plug into the remote end of the pipe. - send_bind (socket, pipes[1]); - } - - // Plug in the engine. - zmq_assert (!engine); - engine = engine_; - engine->plug (io_thread, this); -} - -void zmq::session_base_t::engine_error ( - zmq::stream_engine_t::error_reason_t reason_) -{ - // Engine is dead. Let's forget about it. - engine = NULL; - - // Remove any half-done messages from the pipes. - if (pipe) - clean_pipes (); - - zmq_assert (reason_ == stream_engine_t::connection_error - || reason_ == stream_engine_t::timeout_error - || reason_ == stream_engine_t::protocol_error); - - switch (reason_) { - case stream_engine_t::timeout_error: - /* FALLTHROUGH */ - case stream_engine_t::connection_error: - if (active) { - reconnect (); - break; - } - /* FALLTHROUGH */ - case stream_engine_t::protocol_error: - if (pending) { - if (pipe) - pipe->terminate (false); - if (zap_pipe) - zap_pipe->terminate (false); - } else { - terminate (); - } - break; - } - - // Just in case there's only a delimiter in the pipe. - if (pipe) - pipe->check_read (); - - if (zap_pipe) - zap_pipe->check_read (); -} - -void zmq::session_base_t::process_term (int linger_) -{ - zmq_assert (!pending); - - // If the termination of the pipe happens before the term command is - // delivered there's nothing much to do. We can proceed with the - // standard termination immediately. - if (!pipe && !zap_pipe && terminating_pipes.empty ()) { - own_t::process_term (0); - return; - } - - pending = true; - - if (pipe != NULL) { - // If there's finite linger value, delay the termination. - // If linger is infinite (negative) we don't even have to set - // the timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; - } - - // Start pipe termination process. Delay the termination till all messages - // are processed in case the linger time is non-zero. - pipe->terminate (linger_ != 0); - - // TODO: Should this go into pipe_t::terminate ? - // In case there's no engine and there's only delimiter in the - // pipe it wouldn't be ever read. Thus we check for it explicitly. - if (!engine) - pipe->check_read (); - } - - if (zap_pipe != NULL) - zap_pipe->terminate (false); -} - -void zmq::session_base_t::timer_event (int id_) -{ - // Linger period expired. We can proceed with termination even though - // there are still pending messages to be sent. - zmq_assert (id_ == linger_timer_id); - has_linger_timer = false; - - // Ask pipe to terminate even though there may be pending messages in it. - zmq_assert (pipe); - pipe->terminate (false); -} - -void zmq::session_base_t::reconnect () -{ - // For delayed connect situations, terminate the pipe - // and reestablish later on - if (pipe && options.immediate == 1 && addr->protocol != "pgm" - && addr->protocol != "epgm" && addr->protocol != "norm" - && addr->protocol != "udp") { - pipe->hiccup (); - pipe->terminate (false); - terminating_pipes.insert (pipe); - pipe = NULL; - - if (has_linger_timer) { - cancel_timer (linger_timer_id); - has_linger_timer = false; - } - } - - reset (); - - // Reconnect. - if (options.reconnect_ivl != -1) - start_connecting (true); - else { - std::string *ep = new (std::string); - addr->to_string (*ep); - send_term_endpoint (socket, ep); - } - - // For subscriber sockets we hiccup the inbound pipe, which will cause - // the socket object to resend all the subscriptions. - if (pipe - && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB - || options.type == ZMQ_DISH)) - pipe->hiccup (); -} - -void zmq::session_base_t::start_connecting (bool wait_) -{ - zmq_assert (active); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create the connecter object. - - if (addr->protocol == "tcp") { - if (!options.socks_proxy_address.empty ()) { - address_t *proxy_address = new (std::nothrow) - address_t ("tcp", options.socks_proxy_address, this->get_ctx ()); - alloc_assert (proxy_address); - socks_connecter_t *connecter = - new (std::nothrow) socks_connecter_t (io_thread, this, options, - addr, proxy_address, wait_); - alloc_assert (connecter); - launch_child (connecter); - } else { - tcp_connecter_t *connecter = new (std::nothrow) - tcp_connecter_t (io_thread, this, options, addr, wait_); - alloc_assert (connecter); - launch_child (connecter); - } - return; - } - -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ - && !defined ZMQ_HAVE_VXWORKS - if (addr->protocol == "ipc") { - ipc_connecter_t *connecter = new (std::nothrow) - ipc_connecter_t (io_thread, this, options, addr, wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif -#if defined ZMQ_HAVE_TIPC - if (addr->protocol == "tipc") { - tipc_connecter_t *connecter = new (std::nothrow) - tipc_connecter_t (io_thread, this, options, addr, wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - - if (addr->protocol == "udp") { - zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO - || options.type == ZMQ_DGRAM); - - udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); - alloc_assert (engine); - - bool recv = false; - bool send = false; - - if (options.type == ZMQ_RADIO) { - send = true; - recv = false; - } else if (options.type == ZMQ_DISH) { - send = false; - recv = true; - } else if (options.type == ZMQ_DGRAM) { - send = true; - recv = true; - } - - int rc = engine->init (addr, send, recv); - errno_assert (rc == 0); - - send_attach (this, engine); - - return; - } - -#ifdef ZMQ_HAVE_OPENPGM - - // Both PGM and EPGM transports are using the same infrastructure. - if (addr->protocol == "pgm" || addr->protocol == "epgm") { - zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB - || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); - - // For EPGM transport with UDP encapsulation of PGM is used. - bool const udp_encapsulation = addr->protocol == "epgm"; - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // PGM sender. - pgm_sender_t *pgm_sender = - new (std::nothrow) pgm_sender_t (io_thread, options); - alloc_assert (pgm_sender); - - int rc = - pgm_sender->init (udp_encapsulation, addr->address.c_str ()); - errno_assert (rc == 0); - - send_attach (this, pgm_sender); - } else { - // PGM receiver. - pgm_receiver_t *pgm_receiver = - new (std::nothrow) pgm_receiver_t (io_thread, options); - alloc_assert (pgm_receiver); - - int rc = - pgm_receiver->init (udp_encapsulation, addr->address.c_str ()); - errno_assert (rc == 0); - - send_attach (this, pgm_receiver); - } - - return; - } -#endif - -#ifdef ZMQ_HAVE_NORM - if (addr->protocol == "norm") { - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with NORM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // NORM sender. - norm_engine_t *norm_sender = - new (std::nothrow) norm_engine_t (io_thread, options); - alloc_assert (norm_sender); - - int rc = norm_sender->init (addr->address.c_str (), true, false); - errno_assert (rc == 0); - - send_attach (this, norm_sender); - } else { // ZMQ_SUB or ZMQ_XSUB - - // NORM receiver. - norm_engine_t *norm_receiver = - new (std::nothrow) norm_engine_t (io_thread, options); - alloc_assert (norm_receiver); - - int rc = norm_receiver->init (addr->address.c_str (), false, true); - errno_assert (rc == 0); - - send_attach (this, norm_receiver); - } - return; - } -#endif // ZMQ_HAVE_NORM - -#if defined ZMQ_HAVE_VMCI - if (addr->protocol == "vmci") { - vmci_connecter_t *connecter = new (std::nothrow) - vmci_connecter_t (io_thread, this, options, addr, wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - - zmq_assert (false); -} diff --git a/src/tcp_connecter.cpp.orig b/src/tcp_connecter.cpp.orig deleted file mode 100644 index 5135bdbe..00000000 --- a/src/tcp_connecter.cpp.orig +++ /dev/null @@ -1,444 +0,0 @@ -/* - Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "precompiled.hpp" -#include -#include - -#include "macros.hpp" -#include "tcp_connecter.hpp" -#include "stream_engine.hpp" -#include "io_thread.hpp" -#include "random.hpp" -#include "err.hpp" -#include "ip.hpp" -#include "tcp.hpp" -#include "address.hpp" -#include "tcp_address.hpp" -#include "session_base.hpp" - -#if !defined ZMQ_HAVE_WINDOWS -#include -#include -#include -#include -#include -#include -#include -#include -#ifdef ZMQ_HAVE_VXWORKS -#include -#endif -#ifdef ZMQ_HAVE_OPENVMS -#include -#endif -#endif - -#ifdef __APPLE__ -#include -#endif - -zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, - const options_t &options_, - address_t *addr_, - bool delayed_start_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - _addr (addr_), - _s (retired_fd), - _handle (static_cast (NULL)), - _delayed_start (delayed_start_), - _connect_timer_started (false), - _reconnect_timer_started (false), - _session (session_), - _current_reconnect_ivl (options.reconnect_ivl), - _socket (_session->get_socket ()) -{ - zmq_assert (_addr); - zmq_assert (_addr->protocol == "tcp"); - _addr->to_string (_endpoint); - // TODO the return value is unused! what if it fails? if this is impossible - // or does not matter, change such that endpoint in initialized using an - // initializer, and make endpoint const -} - -zmq::tcp_connecter_t::~tcp_connecter_t () -{ - zmq_assert (!_connect_timer_started); - zmq_assert (!_reconnect_timer_started); - zmq_assert (!_handle); - zmq_assert (_s == retired_fd); -} - -void zmq::tcp_connecter_t::process_plug () -{ - if (_delayed_start) - add_reconnect_timer (); - else - start_connecting (); -} - -void zmq::tcp_connecter_t::process_term (int linger_) -{ - if (_connect_timer_started) { - cancel_timer (connect_timer_id); - _connect_timer_started = false; - } - - if (_reconnect_timer_started) { - cancel_timer (reconnect_timer_id); - _reconnect_timer_started = false; - } - - if (_handle) { - rm_handle (); - } - - if (_s != retired_fd) - close (); - - own_t::process_term (linger_); -} - -void zmq::tcp_connecter_t::in_event () -{ - // We are not polling for incoming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - -void zmq::tcp_connecter_t::out_event () -{ - if (_connect_timer_started) { - cancel_timer (connect_timer_id); - _connect_timer_started = false; - } - - rm_handle (); - - const fd_t fd = connect (); - - // Handle the error condition by attempt to reconnect. - if (fd == retired_fd || !tune_socket (fd)) { - close (); - add_reconnect_timer (); - return; - } - - // Create the engine object for this connection. - stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, _endpoint); - alloc_assert (engine); - - // Attach the engine to the corresponding session object. - send_attach (_session, engine); - - // Shut the connecter down. - terminate (); - - _socket->event_connected (_endpoint, fd); -} - -void zmq::tcp_connecter_t::rm_handle () -{ - rm_fd (_handle); - _handle = static_cast (NULL); -} - -void zmq::tcp_connecter_t::timer_event (int id_) -{ - zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id); - if (id_ == connect_timer_id) { - _connect_timer_started = false; - rm_handle (); - close (); - add_reconnect_timer (); - } else if (id_ == reconnect_timer_id) { - _reconnect_timer_started = false; - start_connecting (); - } -} - -void zmq::tcp_connecter_t::start_connecting () -{ - // Open the connecting socket. - const int rc = open (); - - // Connect may succeed in synchronous manner. - if (rc == 0) { - _handle = add_fd (_s); - out_event (); - } - - // Connection establishment may be delayed. Poll for its completion. - else if (rc == -1 && errno == EINPROGRESS) { - _handle = add_fd (_s); - set_pollout (_handle); - _socket->event_connect_delayed (_endpoint, zmq_errno ()); - - // add userspace connect timeout - add_connect_timer (); - } - - // Handle any other error condition by eventual reconnect. - else { - if (_s != retired_fd) - close (); - add_reconnect_timer (); - } -} - -void zmq::tcp_connecter_t::add_connect_timer () -{ - if (options.connect_timeout > 0) { - add_timer (options.connect_timeout, connect_timer_id); - _connect_timer_started = true; - } -} - -void zmq::tcp_connecter_t::add_reconnect_timer () -{ - const int interval = get_new_reconnect_ivl (); - add_timer (interval, reconnect_timer_id); - _socket->event_connect_retried (_endpoint, interval); - _reconnect_timer_started = true; -} - -int zmq::tcp_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - const int interval = - _current_reconnect_ivl + generate_random () % options.reconnect_ivl; - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 - && options.reconnect_ivl_max > options.reconnect_ivl) - // Calculate the next interval - _current_reconnect_ivl = - std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max); - return interval; -} - -int zmq::tcp_connecter_t::open () -{ - zmq_assert (_s == retired_fd); - - // Resolve the address - if (_addr->resolved.tcp_addr != NULL) { - LIBZMQ_DELETE (_addr->resolved.tcp_addr); - } - - _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); - alloc_assert (_addr->resolved.tcp_addr); - int rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false, - options.ipv6); - if (rc != 0) { - LIBZMQ_DELETE (_addr->resolved.tcp_addr); - return -1; - } - zmq_assert (_addr->resolved.tcp_addr != NULL); - const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr; - - // Create the socket. - _s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); - - // IPv6 address family not supported, try automatic downgrade to IPv4. - if (_s == zmq::retired_fd && tcp_addr->family () == AF_INET6 - && errno == EAFNOSUPPORT && options.ipv6) { - rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false, - false); - if (rc != 0) { - LIBZMQ_DELETE (_addr->resolved.tcp_addr); - return -1; - } - _s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - } - - if (_s == retired_fd) { - return -1; - } - - // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. - // Switch it on in such cases. - if (tcp_addr->family () == AF_INET6) - enable_ipv4_mapping (_s); - - // Set the IP Type-Of-Service priority for this socket - if (options.tos != 0) - set_ip_type_of_service (_s, options.tos); - - // Bind the socket to a device if applicable - if (!options.bound_device.empty ()) - bind_to_device (_s, options.bound_device); - - // Set the socket to non-blocking mode so that we get async connect(). - unblock_socket (_s); - - // Set the socket to loopback fastpath if configured. - if (options.loopback_fastpath) - tcp_tune_loopback_fast_path (_s); - - // Set the socket buffer limits for the underlying socket. - if (options.sndbuf >= 0) - set_tcp_send_buffer (_s, options.sndbuf); - if (options.rcvbuf >= 0) - set_tcp_receive_buffer (_s, options.rcvbuf); - - // Set the IP Type-Of-Service for the underlying socket - if (options.tos != 0) - set_ip_type_of_service (_s, options.tos); - - // Set a source address for conversations - if (tcp_addr->has_src_addr ()) { - // Allow reusing of the address, to connect to different servers - // using the same source port on the client. - int flag = 1; -#ifdef ZMQ_HAVE_WINDOWS - rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, - reinterpret_cast (&flag), sizeof (int)); - wsa_assert (rc != SOCKET_ERROR); -#elif defined ZMQ_HAVE_VXWORKS - rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, - sizeof (int)); - errno_assert (rc == 0); -#else - rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); - errno_assert (rc == 0); -#endif - -#if defined ZMQ_HAVE_VXWORKS - rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (), - tcp_addr->src_addrlen ()); -#else - rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ()); -#endif - if (rc == -1) - return -1; - } - - // Connect to the remote peer. -#if defined ZMQ_HAVE_VXWORKS - rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ()); -#else - rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ()); -#endif - // Connect was successful immediately. - if (rc == 0) { - return 0; - } - - // Translate error codes indicating asynchronous connect has been - // launched to a uniform EINPROGRESS. -#ifdef ZMQ_HAVE_WINDOWS - const int last_error = WSAGetLastError (); - if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) - errno = EINPROGRESS; - else - errno = wsa_error_to_errno (last_error); -#else - if (errno == EINTR) - errno = EINPROGRESS; -#endif - return -1; -} - -zmq::fd_t zmq::tcp_connecter_t::connect () -{ - // Async connect has finished. Check whether an error occurred - int err = 0; -#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS - int len = sizeof err; -#else - socklen_t len = sizeof err; -#endif - - const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, - reinterpret_cast (&err), &len); - - // Assert if the error was caused by 0MQ bug. - // Networking problems are OK. No need to assert. -#ifdef ZMQ_HAVE_WINDOWS - zmq_assert (rc == 0); - if (err != 0) { - if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK - || err == WSAENOBUFS) { - wsa_assert_no (err); - } - return retired_fd; - } -#else - // Following code should handle both Berkeley-derived socket - // implementations and Solaris. - if (rc == -1) - err = errno; - if (err != 0) { - errno = err; -#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE - errno_assert (errno != EBADF && errno != ENOPROTOOPT - && errno != ENOTSOCK && errno != ENOBUFS); -#else - errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK - && errno != ENOBUFS); -#endif - return retired_fd; - } -#endif - - // Return the newly connected socket. - const fd_t result = _s; - _s = retired_fd; - return result; -} - -bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_) -{ - const int rc = tune_tcp_socket (fd_) - | tune_tcp_keepalives ( - fd_, options.tcp_keepalive, options.tcp_keepalive_cnt, - options.tcp_keepalive_idle, options.tcp_keepalive_intvl) - | tune_tcp_maxrt (fd_, options.tcp_maxrt); - return rc == 0; -} - -void zmq::tcp_connecter_t::close () -{ - zmq_assert (_s != retired_fd); -#ifdef ZMQ_HAVE_WINDOWS - const int rc = closesocket (_s); - wsa_assert (rc != SOCKET_ERROR); -#else - const int rc = ::close (_s); - errno_assert (rc == 0); -#endif - _socket->event_closed (_endpoint, _s); - _s = retired_fd; -}