From f06ca69ae910a0bedb211e92080bbb6932183112 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 22 Jun 2014 10:32:22 +0200 Subject: [PATCH] Add support for SOCKS proxies This is still raw and experimental. To connect through a SOCKS proxy, set ZMQ_SOCKS_PROXY socket option on socket before issuing a connect call, e.g.: zmq_setsockopt (s, ZMQ_SOCKS_PROXY, "127.0.0.1:22222", strlen ("127.0.0.1:22222")); zmq_connect (s, "tcp://127.0.0.1:5555"); Known limitations: - only SOCKS version 5 supported - authentication not supported - new option is still undocumented --- include/zmq.h | 1 + src/Makefile.am | 4 + src/options.cpp | 21 ++ src/options.hpp | 3 + src/session_base.cpp | 21 +- src/socks.cpp | 269 +++++++++++++++++++++++ src/socks.hpp | 125 +++++++++++ src/socks_connecter.cpp | 467 ++++++++++++++++++++++++++++++++++++++++ src/socks_connecter.hpp | 153 +++++++++++++ src/stream_engine.cpp | 110 +--------- src/stream_engine.hpp | 10 - src/tcp.cpp | 101 +++++++++ src/tcp.hpp | 10 + 13 files changed, 1176 insertions(+), 119 deletions(-) create mode 100644 src/socks.cpp create mode 100644 src/socks.hpp create mode 100644 src/socks_connecter.cpp create mode 100644 src/socks_connecter.hpp diff --git a/include/zmq.h b/include/zmq.h index 7206ee2d..86a52638 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -302,6 +302,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); #define ZMQ_GSSAPI_PLAINTEXT 65 #define ZMQ_HANDSHAKE_IVL 66 #define ZMQ_IDENTITY_FD 67 +#define ZMQ_SOCKS_PROXY 68 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/Makefile.am b/src/Makefile.am index dae104dd..89c5f16f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -78,6 +78,8 @@ libzmq_la_SOURCES = \ session_base.hpp \ signaler.hpp \ socket_base.hpp \ + socks.hpp \ + socks_connecter.hpp \ stdint.hpp \ stream.hpp \ stream_engine.hpp \ @@ -149,6 +151,8 @@ libzmq_la_SOURCES = \ session_base.cpp \ signaler.cpp \ socket_base.cpp \ + socks.cpp \ + socks_connecter.cpp \ stream.cpp \ stream_engine.cpp \ sub.cpp \ diff --git a/src/options.cpp b/src/options.cpp index 959b2a55..ef945486 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -206,6 +206,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; + case ZMQ_SOCKS_PROXY: + if (optval_ == NULL && optvallen_ == 0) { + socks_proxy_address.clear (); + return 0; + } + else + if (optval_ != NULL && optvallen_ > 0 ) { + socks_proxy_address = + std::string ((const char *) optval_, optvallen_); + return 0; + } + break; + case ZMQ_TCP_KEEPALIVE: if (is_int && (value == -1 || value == 0 || value == 1)) { tcp_keepalive = value; @@ -618,6 +631,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; + case ZMQ_SOCKS_PROXY: + if (*optvallen_ >= socks_proxy_address.size () + 1) { + memcpy (optval_, socks_proxy_address.c_str (), socks_proxy_address.size () + 1); + *optvallen_ = socks_proxy_address.size () + 1; + return 0; + } + break; + case ZMQ_TCP_KEEPALIVE: if (is_int) { *value = tcp_keepalive; diff --git a/src/options.hpp b/src/options.hpp index 16aa3bd2..3bacec0e 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -114,6 +114,9 @@ namespace zmq // if true, router socket accepts non-zmq tcp connections bool raw_sock; + // Addres of SOCKS proxy + std::string socks_proxy_address; + // TCP keep-alive settings. // Defaults to -1 = do not change socket options int tcp_keepalive; diff --git a/src/session_base.cpp b/src/session_base.cpp index 074db2a3..60c530fa 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -25,6 +25,7 @@ #include "tcp_connecter.hpp" #include "ipc_connecter.hpp" #include "tipc_connecter.hpp" +#include "socks_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" #include "address.hpp" @@ -497,10 +498,22 @@ void zmq::session_base_t::start_connecting (bool wait_) // Create the connecter object. if (addr->protocol == "tcp") { - tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( - io_thread, this, options, addr, wait_); - alloc_assert (connecter); - launch_child (connecter); + if (options.socks_proxy_address != "") { + address_t *proxy_address = new (std::nothrow) + address_t ("tcp", options.socks_proxy_address); + alloc_assert (proxy_address); + socks_connecter_t *connecter = + new (std::nothrow) socks_connecter_t ( + io_thread, this, options, addr, proxy_address, wait_); + alloc_assert (connecter); + launch_child (connecter); + } + else { + tcp_connecter_t *connecter = new (std::nothrow) + tcp_connecter_t (io_thread, this, options, addr, wait_); + alloc_assert (connecter); + launch_child (connecter); + } return; } diff --git a/src/socks.cpp b/src/socks.cpp new file mode 100644 index 00000000..e5a3f560 --- /dev/null +++ b/src/socks.cpp @@ -0,0 +1,269 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include + +#include "err.hpp" +#include "platform.hpp" +#include "socks.hpp" +#include "tcp.hpp" + +zmq::socks_greeting_t::socks_greeting_t (uint8_t method_) : + num_methods (1) +{ + methods [0] = method_; +} + +zmq::socks_greeting_t::socks_greeting_t ( + uint8_t *methods_, size_t num_methods_) + : num_methods (num_methods_) +{ + zmq_assert (num_methods_ <= 255); + + for (size_t i = 0; i < num_methods_; i++) + methods [i] = methods_ [i]; +} + +zmq::socks_greeting_encoder_t::socks_greeting_encoder_t () + : bytes_encoded (0), bytes_written (0) +{} + +void zmq::socks_greeting_encoder_t::encode (const socks_greeting_t &greeting_) +{ + uint8_t *ptr = buf; + + *ptr++ = 0x05; + *ptr++ = greeting_.num_methods; + for (size_t i = 0; i < greeting_.num_methods; i++) + *ptr++ = greeting_.methods [i]; + + bytes_encoded = 2 + greeting_.num_methods; + bytes_written = 0; +} + +int zmq::socks_greeting_encoder_t::output (fd_t fd_) +{ + const int rc = tcp_write ( + fd_, buf + bytes_written, bytes_encoded - bytes_written); + if (rc > 0) + bytes_written += static_cast (rc); + return rc; +} + +bool zmq::socks_greeting_encoder_t::has_pending_data () const +{ + return bytes_written < bytes_encoded; +} + +void zmq::socks_greeting_encoder_t::reset () +{ + bytes_encoded = bytes_written = 0; +} + +zmq::socks_choice_t::socks_choice_t (unsigned char method_) + : method (method_) +{} + +zmq::socks_choice_decoder_t::socks_choice_decoder_t () + : bytes_read (0) +{} + +int zmq::socks_choice_decoder_t::input (fd_t fd_) +{ + zmq_assert (bytes_read < 2); + const int rc = tcp_read (fd_, buf + bytes_read, 2 - bytes_read); + if (rc > 0) { + bytes_read += static_cast (rc); + if (buf [0] != 0x05) + return -1; + } + return rc; +} + +bool zmq::socks_choice_decoder_t::message_ready () const +{ + return bytes_read == 2; +} + +zmq::socks_choice_t zmq::socks_choice_decoder_t::decode () +{ + zmq_assert (message_ready ()); + return socks_choice_t (buf [1]); +} + +void zmq::socks_choice_decoder_t::reset () +{ + bytes_read = 0; +} + +zmq::socks_request_t::socks_request_t ( + uint8_t command_, std::string hostname_, uint16_t port_) + : command (command_), hostname (hostname_), port (port_) +{} + +zmq::socks_request_encoder_t::socks_request_encoder_t () + : bytes_encoded (0), bytes_written (0) +{} + +void zmq::socks_request_encoder_t::encode (const socks_request_t &req) +{ + unsigned char *ptr = buf; + *ptr++ = 0x05; + *ptr++ = req.command; + *ptr++ = 0x00; + +#if defined ZMQ_HAVE_OPENVMS && defined __ia64 && __INITIAL_POINTER_SIZE == 64 + __addrinfo64 hints, *res = NULL; +#else + addrinfo hints, *res = NULL; +#endif + + memset (&hints, 0, sizeof hints); + + // Suppress potential DNS lookups. + hints.ai_flags = AI_NUMERICHOST; + + const int rc = getaddrinfo (req.hostname.c_str (), NULL, &hints, &res); + if (rc == 0 && res->ai_family == AF_INET) { + struct sockaddr_in *sockaddr_in = + reinterpret_cast (res->ai_addr); + *ptr++ = 0x01; + memcpy (ptr, &sockaddr_in->sin_addr, 4); + ptr += 4; + } + else + if (rc == 0 && res->ai_family == AF_INET6) { + struct sockaddr_in6 *sockaddr_in6 = + reinterpret_cast (res->ai_addr); + *ptr++ = 0x04; + memcpy (ptr, &sockaddr_in6->sin6_addr, 16); + ptr += 16; + } + else { + *ptr++ = 0x03; + *ptr++ = req.hostname.size (); + memcpy (ptr, req.hostname.c_str (), req.hostname.size ()); + ptr += req.hostname.size (); + } + + if (rc == 0) + freeaddrinfo (res); + + *ptr++ = req.port / 256; + *ptr++ = req.port % 256; + + bytes_encoded = ptr - buf; + bytes_written = 0; +} + +int zmq::socks_request_encoder_t::output (fd_t fd_) +{ + const int rc = tcp_write ( + fd_, buf + bytes_written, bytes_encoded - bytes_written); + if (rc > 0) + bytes_written += static_cast (rc); + return rc; +} + +bool zmq::socks_request_encoder_t::has_pending_data () const +{ + return bytes_written < bytes_encoded; +} + +void zmq::socks_request_encoder_t::reset () +{ + bytes_encoded = bytes_written = 0; +} + +zmq::socks_response_t::socks_response_t ( + uint8_t response_code_, std::string address_, uint16_t port_) + : response_code (response_code_), address (address_), port (port_) +{} + +zmq::socks_response_decoder_t::socks_response_decoder_t () + : bytes_read (0) +{} + +int zmq::socks_response_decoder_t::input (fd_t fd_) +{ + size_t n = 0; + + if (bytes_read < 5) + n = 5 - bytes_read; + else { + const uint8_t atyp = buf [3]; + zmq_assert (atyp == 0x01 || atyp == 0x03 || atyp == 0x04); + if (atyp == 0x01) + n = 3 + 2; + else + if (atyp == 0x03) + n = buf [4] + 2; + else + if (atyp == 0x04) + n = 15 + 2; + } + const int rc = tcp_read (fd_, buf + bytes_read, n); + if (rc > 0) { + bytes_read += static_cast (rc); + if (buf [0] != 0x05) + return -1; + if (bytes_read >= 2) + if (buf [1] > 0x08) + return -1; + if (bytes_read >= 3) + if (buf [2] != 0x00) + return -1; + if (bytes_read >= 4) { + const uint8_t atyp = buf [3]; + if (atyp != 0x01 && atyp != 0x03 && atyp != 0x04) + return -1; + } + } + return rc; +} + +bool zmq::socks_response_decoder_t::message_ready () const +{ + if (bytes_read < 4) + return false; + else { + const uint8_t atyp = buf [3]; + zmq_assert (atyp == 0x01 || atyp == 0x03 || atyp == 0x04); + if (atyp == 0x01) + return bytes_read == 10; + else + if (atyp == 0x03) + return bytes_read > 4 && bytes_read == 4 + 1 + buf [4] + 2u; + else + return bytes_read == 22; + } +} + +zmq::socks_response_t zmq::socks_response_decoder_t::decode () +{ + zmq_assert (message_ready ()); + return socks_response_t (buf [1], "", 0); +} + +void zmq::socks_response_decoder_t::reset () +{ + bytes_read = 0; +} diff --git a/src/socks.hpp b/src/socks.hpp new file mode 100644 index 00000000..8fe4594b --- /dev/null +++ b/src/socks.hpp @@ -0,0 +1,125 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SOCKS_HPP_INCLUDED__ +#define __ZMQ_SOCKS_HPP_INCLUDED__ + +#include +#include "fd.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + struct socks_greeting_t + { + socks_greeting_t (uint8_t method); + socks_greeting_t (uint8_t *methods_, size_t num_methods_); + + uint8_t methods [255]; + const size_t num_methods; + }; + + class socks_greeting_encoder_t + { + public: + socks_greeting_encoder_t (); + void encode (const socks_greeting_t &greeting_); + int output (fd_t fd_); + bool has_pending_data () const; + void reset (); + + private: + size_t bytes_encoded; + size_t bytes_written; + uint8_t buf [2 + 255]; + }; + + struct socks_choice_t + { + socks_choice_t (uint8_t method_); + + uint8_t method; + }; + + class socks_choice_decoder_t + { + public: + socks_choice_decoder_t (); + int input (fd_t fd_); + bool message_ready () const; + socks_choice_t decode (); + void reset (); + + private: + unsigned char buf [2]; + size_t bytes_read; + }; + + struct socks_request_t + { + socks_request_t ( + uint8_t command_, std::string hostname_, uint16_t port_); + + const uint8_t command; + const std::string hostname; + const uint16_t port; + }; + + class socks_request_encoder_t + { + public: + socks_request_encoder_t (); + void encode (const socks_request_t &req); + int output (fd_t fd_); + bool has_pending_data () const; + void reset (); + + private: + size_t bytes_encoded; + size_t bytes_written; + uint8_t buf [4 + 256 + 2]; + }; + + struct socks_response_t + { + socks_response_t ( + uint8_t response_code_, std::string address_, uint16_t port_); + uint8_t response_code; + std::string address; + uint16_t port; + }; + + class socks_response_decoder_t + { + public: + socks_response_decoder_t (); + int input (fd_t fd_); + bool message_ready () const; + socks_response_t decode (); + void reset (); + + private: + uint8_t buf [4 + 256 + 2]; + size_t bytes_read; + }; + +} + +#endif diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp new file mode 100644 index 00000000..7527da48 --- /dev/null +++ b/src/socks_connecter.cpp @@ -0,0 +1,467 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "socks_connecter.hpp" +#include "stream_engine.hpp" +#include "platform.hpp" +#include "random.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "tcp.hpp" +#include "address.hpp" +#include "tcp_address.hpp" +#include "session_base.hpp" +#include "socks.hpp" + +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#endif + +zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_, + class session_base_t *session_, const options_t &options_, + address_t *addr_, address_t *proxy_addr_, bool delayed_start_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), + addr (addr_), + proxy_addr (proxy_addr_), + status (unplugged), + s (retired_fd), + delayed_start (delayed_start_), + session (session_), + current_reconnect_ivl (options.reconnect_ivl) +{ + zmq_assert (addr); + zmq_assert (addr->protocol == "tcp"); + proxy_addr->to_string (endpoint); + socket = session->get_socket (); +} + +zmq::socks_connecter_t::~socks_connecter_t () +{ + zmq_assert (s == retired_fd); +} + +void zmq::socks_connecter_t::process_plug () +{ + if (delayed_start) + start_timer (); + else + initiate_connect (); +} + +void zmq::socks_connecter_t::process_term (int linger_) +{ + switch (status) { + case unplugged: + break; + case waiting_for_reconnect_time: + cancel_timer (reconnect_timer_id); + break; + case waiting_for_proxy_connection: + case sending_greeting: + case waiting_for_choice: + case sending_request: + case waiting_for_response: + rm_fd (handle); + if (s != -1) + close (); + break; + } + + own_t::process_term (linger_); +} + +void zmq::socks_connecter_t::in_event () +{ + zmq_assert (status != unplugged + && status != waiting_for_reconnect_time); + + if (status == waiting_for_choice) { + const int rc = choice_decoder.input (s); + if (rc == 0 || rc == -1) + error (); + else + if (choice_decoder.message_ready ()) { + const socks_choice_t choice = choice_decoder.decode (); + const int rc = process_server_response (choice); + if (rc == -1) + error (); + else { + std::string hostname = ""; + uint16_t port = 0; + if (parse_address (addr->address, hostname, port) == -1) + error (); + else { + request_encoder.encode ( + socks_request_t (1, hostname, port)); + reset_pollin (handle); + set_pollout (handle); + status = sending_request; + } + } + } + } + else + if (status == waiting_for_response) { + const int rc = response_decoder.input (s); + if (rc == 0 || rc == -1) + error (); + else + if (response_decoder.message_ready ()) { + const socks_response_t response = response_decoder.decode (); + const int rc = process_server_response (response); + if (rc == -1) + error (); + else { + // Remember our fd for ZMQ_SRCFD in messages + socket->set_fd (s); + + // Create the engine object for this connection. + stream_engine_t *engine = new (std::nothrow) + stream_engine_t (s, options, endpoint); + alloc_assert (engine); + + // Attach the engine to the corresponding session object. + send_attach (session, engine); + + socket->event_connected (endpoint, s); + + rm_fd (handle); + s = -1; + status = unplugged; + + // Shut the connecter down. + terminate (); + } + } + } + else + error (); +} + +void zmq::socks_connecter_t::out_event () +{ + zmq_assert (status == waiting_for_proxy_connection + || status == sending_greeting + || status == sending_request); + + if (status == waiting_for_proxy_connection) { + const int rc = check_proxy_connection (); + if (rc == -1) + error (); + else { + greeting_encoder.encode ( + socks_greeting_t (socks_no_auth_required)); + status = sending_greeting; + } + } + else + if (status == sending_greeting) { + zmq_assert (greeting_encoder.has_pending_data ()); + const int rc = greeting_encoder.output (s); + if (rc == -1 || rc == 0) + error (); + else + if (!greeting_encoder.has_pending_data ()) { + reset_pollout (handle); + set_pollin (handle); + status = waiting_for_choice; + } + } + else { + zmq_assert (request_encoder.has_pending_data ()); + const int rc = request_encoder.output (s); + if (rc == -1 || rc == 0) + error (); + else + if (!request_encoder.has_pending_data ()) { + reset_pollout (handle); + set_pollin (handle); + status = waiting_for_response; + } + } +} + +void zmq::socks_connecter_t::initiate_connect () +{ + // Open the connecting socket. + const int rc = connect_to_proxy (); + + // Connect may succeed in synchronous manner. + if (rc == 0) { + handle = add_fd (s); + set_pollout (handle); + status = sending_greeting; + } + // Connection establishment may be delayed. Poll for its completion. + else + if (errno == EINPROGRESS) { + handle = add_fd (s); + set_pollout (handle); + status = waiting_for_proxy_connection; + socket->event_connect_delayed (endpoint, zmq_errno ()); + } + // Handle any other error condition by eventual reconnect. + else { + if (s != -1) + close (); + start_timer (); + } +} + +int zmq::socks_connecter_t::process_server_response ( + const socks_choice_t &response) +{ + // We do not support any authentication method for now. + return response.method == 0? 0: -1; +} + +int zmq::socks_connecter_t::process_server_response ( + const socks_response_t &response) +{ + return response.response_code == 0? 0: -1; +} + +void zmq::socks_connecter_t::timer_event (int id_) +{ + zmq_assert (status == waiting_for_reconnect_time); + zmq_assert (id_ == reconnect_timer_id); + initiate_connect (); +} + +void zmq::socks_connecter_t::error () +{ + rm_fd (handle); + close (); + greeting_encoder.reset (); + choice_decoder.reset (); + request_encoder.reset (); + response_decoder.reset (); + start_timer (); +} + +void zmq::socks_connecter_t::start_timer () +{ + const int interval = get_new_reconnect_ivl (); + add_timer (interval, reconnect_timer_id); + status = waiting_for_reconnect_time; + socket->event_connect_retried (endpoint, interval); +} + +int zmq::socks_connecter_t::get_new_reconnect_ivl () +{ + // The new interval is the current interval + random value. + const int interval = current_reconnect_ivl + + generate_random () % options.reconnect_ivl; + + // Only change the current reconnect interval if the maximum reconnect + // interval was set and if it's larger than the reconnect interval. + if (options.reconnect_ivl_max > 0 && + options.reconnect_ivl_max > options.reconnect_ivl) + // Calculate the next interval + current_reconnect_ivl = + std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max); + return interval; +} + +int zmq::socks_connecter_t::connect_to_proxy () +{ + zmq_assert (s == retired_fd); + + // Resolve the address + delete proxy_addr->resolved.tcp_addr; + proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); + alloc_assert (proxy_addr->resolved.tcp_addr); + + int rc = proxy_addr->resolved.tcp_addr->resolve ( + proxy_addr->address.c_str (), false, options.ipv6); + if (rc != 0) { + delete proxy_addr->resolved.tcp_addr; + proxy_addr->resolved.tcp_addr = NULL; + return -1; + } + zmq_assert (proxy_addr->resolved.tcp_addr != NULL); + const tcp_address_t *tcp_addr = proxy_addr->resolved.tcp_addr; + + // Create the socket. + s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); +#ifdef ZMQ_HAVE_WINDOWS + if (s == INVALID_SOCKET) + return -1; +#else + if (s == -1) + return -1; +#endif + + // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. + // Switch it on in such cases. + if (tcp_addr->family () == AF_INET6) + enable_ipv4_mapping (s); + + // Set the IP Type-Of-Service priority for this socket + if (options.tos != 0) + set_ip_type_of_service (s, options.tos); + + // Set the socket to non-blocking mode so that we get async connect(). + unblock_socket (s); + + // Set the socket buffer limits for the underlying socket. + if (options.sndbuf != 0) + set_tcp_send_buffer (s, options.sndbuf); + if (options.rcvbuf != 0) + set_tcp_receive_buffer (s, options.rcvbuf); + + // Set the IP Type-Of-Service for the underlying socket + if (options.tos != 0) + set_ip_type_of_service (s, options.tos); + + // Set a source address for conversations + if (tcp_addr->has_src_addr ()) { + rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ()); + if (rc == -1) { + close (); + return -1; + } + } + + // Connect to the remote peer. + rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); + + // Connect was successfull immediately. + if (rc == 0) + return 0; + + // Translate error codes indicating asynchronous connect has been + // launched to a uniform EINPROGRESS. +#ifdef ZMQ_HAVE_WINDOWS + const int error_code = WSAGetLastError (); + if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK) + errno = EINPROGRESS; + else { + errno = wsa_error_to_errno (error_code); + close (); + } +#else + if (errno == EINTR) + errno = EINPROGRESS; +#endif + return -1; +} + +zmq::fd_t zmq::socks_connecter_t::check_proxy_connection () +{ + // Async connect has finished. Check whether an error occurred + int err = 0; +#ifdef ZMQ_HAVE_HPUX + int len = sizeof err; +#else + socklen_t len = sizeof err; +#endif + + const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); + + // Assert if the error was caused by 0MQ bug. + // Networking problems are OK. No need to assert. +#ifdef ZMQ_HAVE_WINDOWS + zmq_assert (rc == 0); + if (err != 0) { + wsa_assert (err == WSAECONNREFUSED + || err == WSAETIMEDOUT + || err == WSAECONNABORTED + || err == WSAEHOSTUNREACH + || err == WSAENETUNREACH + || err == WSAENETDOWN + || err == WSAEACCES + || err == WSAEINVAL + || err == WSAEADDRINUSE); + return -1; + } +#else + // Following code should handle both Berkeley-derived socket + // implementations and Solaris. + if (rc == -1) + err = errno; + if (err != 0) { + errno = err; + errno_assert ( + errno == ECONNREFUSED || + errno == ECONNRESET || + errno == ETIMEDOUT || + errno == EHOSTUNREACH || + errno == ENETUNREACH || + errno == ENETDOWN || + errno == EINVAL); + return -1; + } +#endif + + tune_tcp_socket (s); + tune_tcp_keepalives (s, options.tcp_keepalive, options.tcp_keepalive_cnt, + options.tcp_keepalive_idle, options.tcp_keepalive_intvl); + + return 0; +} + +void zmq::socks_connecter_t::close () +{ + zmq_assert (s != retired_fd); +#ifdef ZMQ_HAVE_WINDOWS + const int rc = closesocket (s); + wsa_assert (rc != SOCKET_ERROR); +#else + const int rc = ::close (s); + errno_assert (rc == 0); +#endif + socket->event_closed (endpoint, s); + s = retired_fd; +} + +int zmq::socks_connecter_t::parse_address ( + const std::string &address_, std::string &hostname_, uint16_t &port_) +{ + // Find the ':' at end that separates address from the port number. + const size_t idx = address_.rfind (':'); + if (idx == std::string::npos) { + errno = EINVAL; + return -1; + } + + // Extract hostname + if (idx < 2 || address_ [0] != '[' || address_ [idx - 1] != ']') + hostname_ = address_.substr (0, idx); + else + hostname_ = address_.substr (1, idx - 2); + + // Separate the hostname/port. + const std::string port_str = address_.substr (idx + 1); + // Parse the port number (0 is not a valid port). + port_ = (uint16_t) atoi (port_str.c_str ()); + if (port_ == 0) { + errno = EINVAL; + return -1; + } + return 0; +} diff --git a/src/socks_connecter.hpp b/src/socks_connecter.hpp new file mode 100644 index 00000000..3313289e --- /dev/null +++ b/src/socks_connecter.hpp @@ -0,0 +1,153 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __SOCKS_CONNECTER_HPP_INCLUDED__ +#define __SOCKS_CONNECTER_HPP_INCLUDED__ + +#include "fd.hpp" +#include "io_object.hpp" +#include "own.hpp" +#include "stdint.hpp" +#include "../include/zmq.h" +#include "socks.hpp" + +namespace zmq +{ + + class io_thread_t; + class session_base_t; + struct address_t; + + class socks_connecter_t : public own_t, public io_object_t + { + public: + + // If 'delayed_start' is true connecter first waits for a while, + // then starts connection process. + socks_connecter_t (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_, const options_t &options_, + address_t *addr_, address_t *proxy_addr_, bool delayed_start_); + ~socks_connecter_t (); + + private: + enum { + unplugged, + waiting_for_reconnect_time, + waiting_for_proxy_connection, + sending_greeting, + waiting_for_choice, + sending_request, + waiting_for_response + }; + + // ID of the timer used to delay the reconnection. + enum { reconnect_timer_id = 1 }; + + // Method ID + enum { socks_no_auth_required = 0 }; + + // Handlers for incoming commands. + virtual void process_plug (); + virtual void process_term (int linger_); + + // Handlers for I/O events. + virtual void in_event (); + virtual void out_event (); + virtual void timer_event (int id_); + + // Internal function to start the actual connection establishment. + void initiate_connect (); + + int process_server_response (const socks_choice_t &response); + int process_server_response (const socks_response_t &response); + + int parse_address (const std::string &address_, + std::string &hostname_, uint16_t &port_); + + int connect_to_proxy (); + + void error (); + + // Internal function to start reconnect timer + void start_timer (); + + // Internal function to return a reconnect backoff delay. + // Will modify the current_reconnect_ivl used for next call + // Returns the currently used interval + int get_new_reconnect_ivl (); + + // Open TCP connecting socket. Returns -1 in case of error, + // 0 if connect was successfull immediately. Returns -1 with + // EAGAIN errno if async connect was launched. + int open (); + + // Close the connecting socket. + void close (); + + // Get the file descriptor of newly created connection. Returns + // retired_fd if the connection was unsuccessfull. + int check_proxy_connection (); + + socks_greeting_encoder_t greeting_encoder; + socks_choice_decoder_t choice_decoder; + socks_request_encoder_t request_encoder; + socks_response_decoder_t response_decoder; + + // Address to connect to. Owned by session_base_t. + address_t *addr; + + address_t *proxy_addr; + + int status; + + // Underlying socket. + fd_t s; + + // Handle corresponding to the listening socket. + handle_t handle; + + // If true file descriptor is registered with the poller and 'handle' + // contains valid value. + bool handle_valid; + + // If true, connecter is waiting a while before trying to connect. + const bool delayed_start; + + // True iff a timer has been started. + bool timer_started; + + // Reference to the session we belong to. + zmq::session_base_t *session; + + // Current reconnect ivl, updated for backoff strategy + int current_reconnect_ivl; + + // String representation of endpoint to connect to + std::string endpoint; + + // Socket + zmq::socket_base_t *socket; + + socks_connecter_t (const socks_connecter_t&); + const socks_connecter_t &operator = (const socks_connecter_t&); + }; + +} + +#endif diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 4c521dea..759fb051 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -54,6 +54,7 @@ #include "config.hpp" #include "err.hpp" #include "ip.hpp" +#include "tcp.hpp" #include "likely.hpp" #include "wire.hpp" @@ -272,7 +273,7 @@ void zmq::stream_engine_t::in_event () size_t bufsize = 0; decoder->get_buffer (&inpos, &bufsize); - int const rc = read (inpos, bufsize); + const int rc = tcp_read (s, inpos, bufsize); if (rc == 0) { error (connection_error); return; @@ -359,7 +360,7 @@ void zmq::stream_engine_t::out_event () // arbitrarily large. However, we assume that underlying TCP layer has // limited transmission buffer and thus the actual number of bytes // written should be reasonably modest. - int nbytes = write (outpos, outsize); + const int nbytes = tcp_write (s, outpos, outsize); // IO error has occurred. We stop waiting for output events. // The engine is not terminated until we detect input error; @@ -448,8 +449,8 @@ bool zmq::stream_engine_t::handshake () zmq_assert (greeting_bytes_read < greeting_size); // Receive the greeting. while (greeting_bytes_read < greeting_size) { - const int n = read (greeting_recv + greeting_bytes_read, - greeting_size - greeting_bytes_read); + const int n = tcp_read (s, greeting_recv + greeting_bytes_read, + greeting_size - greeting_bytes_read); if (n == 0) { error (connection_error); return false; @@ -892,107 +893,6 @@ void zmq::stream_engine_t::error (error_reason_t reason) delete this; } -int zmq::stream_engine_t::write (const void *data_, size_t size_) -{ -#ifdef ZMQ_HAVE_WINDOWS - - int nbytes = send (s, (char*) data_, (int) size_, 0); - - // If not a single byte can be written to the socket in non-blocking mode - // we'll get an error (this may happen during the speculative write). - if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; - - // Signalise peer failure. - if (nbytes == SOCKET_ERROR && ( - WSAGetLastError () == WSAENETDOWN || - WSAGetLastError () == WSAENETRESET || - WSAGetLastError () == WSAEHOSTUNREACH || - WSAGetLastError () == WSAECONNABORTED || - WSAGetLastError () == WSAETIMEDOUT || - WSAGetLastError () == WSAECONNRESET)) - return -1; - - wsa_assert (nbytes != SOCKET_ERROR); - return nbytes; - -#else - ssize_t nbytes = send (s, data_, size_, 0); - - // Several errors are OK. When speculative write is being done we may not - // be able to write a single byte from the socket. Also, SIGSTOP issued - // by a debugging tool can result in EINTR error. - if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) - return 0; - - // Signalise peer failure. - if (nbytes == -1) { - errno_assert (errno != EACCES - && errno != EBADF - && errno != EDESTADDRREQ - && errno != EFAULT - && errno != EINVAL - && errno != EISCONN - && errno != EMSGSIZE - && errno != ENOMEM - && errno != ENOTSOCK - && errno != EOPNOTSUPP); - return -1; - } - - return static_cast (nbytes); - -#endif -} - -int zmq::stream_engine_t::read (void *data_, size_t size_) -{ -#ifdef ZMQ_HAVE_WINDOWS - - const int rc = recv (s, (char*) data_, (int) size_, 0); - - // If not a single byte can be read from the socket in non-blocking mode - // we'll get an error (this may happen during the speculative read). - if (rc == SOCKET_ERROR) { - if (WSAGetLastError () == WSAEWOULDBLOCK) - errno = EAGAIN; - else { - wsa_assert (WSAGetLastError () == WSAENETDOWN - || WSAGetLastError () == WSAENETRESET - || WSAGetLastError () == WSAECONNABORTED - || WSAGetLastError () == WSAETIMEDOUT - || WSAGetLastError () == WSAECONNRESET - || WSAGetLastError () == WSAECONNREFUSED - || WSAGetLastError () == WSAENOTCONN); - errno = wsa_error_to_errno (WSAGetLastError ()); - } - } - - return rc == SOCKET_ERROR? -1: rc; - -#else - - const ssize_t rc = recv (s, data_, size_, 0); - - // Several errors are OK. When speculative read is being done we may not - // be able to read a single byte from the socket. Also, SIGSTOP issued - // by a debugging tool can result in EINTR error. - if (rc == -1) { - errno_assert (errno != EBADF - && errno != EFAULT - && errno != EINVAL - && errno != ENOMEM - && errno != ENOTSOCK); - if (errno == EWOULDBLOCK || errno == EINTR) - errno = EAGAIN; - } - - return static_cast (rc); - -#endif -} - void zmq::stream_engine_t::set_handshake_timer () { zmq_assert (!has_handshake_timer); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index ccee2693..c2448aa1 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -92,16 +92,6 @@ namespace zmq // Detects the protocol used by the peer. bool handshake (); - // Writes data to the socket. Returns the number of bytes actually - // written (even zero is to be considered to be a success). In case - // of error or orderly shutdown by the other peer -1 is returned. - int write (const void *data_, size_t size_); - - // Reads data from the socket (up to 'size' bytes). - // Returns the number of bytes actually read or -1 on error. - // Zero indicates the peer has closed the connection. - int read (void *data_, size_t size_); - int identity_msg (msg_t *msg_); int process_identity_msg (msg_t *msg_); diff --git a/src/tcp.cpp b/src/tcp.cpp index 736669fa..3d646e42 100755 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -141,3 +141,104 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int #endif // ZMQ_HAVE_SO_KEEPALIVE #endif // ZMQ_HAVE_WINDOWS } + +int zmq::tcp_write (fd_t s_, const void *data_, size_t size_) +{ +#ifdef ZMQ_HAVE_WINDOWS + + int nbytes = send (s_, (char*) data_, (int) size_, 0); + + // If not a single byte can be written to the socket in non-blocking mode + // we'll get an error (this may happen during the speculative write). + if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) + return 0; + + // Signalise peer failure. + if (nbytes == SOCKET_ERROR && ( + WSAGetLastError () == WSAENETDOWN || + WSAGetLastError () == WSAENETRESET || + WSAGetLastError () == WSAEHOSTUNREACH || + WSAGetLastError () == WSAECONNABORTED || + WSAGetLastError () == WSAETIMEDOUT || + WSAGetLastError () == WSAECONNRESET)) + return -1; + + wsa_assert (nbytes != SOCKET_ERROR); + return nbytes; + +#else + ssize_t nbytes = send (s_, data_, size_, 0); + + // Several errors are OK. When speculative write is being done we may not + // be able to write a single byte from the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR)) + return 0; + + // Signalise peer failure. + if (nbytes == -1) { + errno_assert (errno != EACCES + && errno != EBADF + && errno != EDESTADDRREQ + && errno != EFAULT + && errno != EINVAL + && errno != EISCONN + && errno != EMSGSIZE + && errno != ENOMEM + && errno != ENOTSOCK + && errno != EOPNOTSUPP); + return -1; + } + + return static_cast (nbytes); + +#endif +} + +int zmq::tcp_read (fd_t s_, void *data_, size_t size_) +{ +#ifdef ZMQ_HAVE_WINDOWS + + const int rc = recv (s_, (char*) data_, (int) size_, 0); + + // If not a single byte can be read from the socket in non-blocking mode + // we'll get an error (this may happen during the speculative read). + if (rc == SOCKET_ERROR) { + if (WSAGetLastError () == WSAEWOULDBLOCK) + errno = EAGAIN; + else { + wsa_assert (WSAGetLastError () == WSAENETDOWN + || WSAGetLastError () == WSAENETRESET + || WSAGetLastError () == WSAECONNABORTED + || WSAGetLastError () == WSAETIMEDOUT + || WSAGetLastError () == WSAECONNRESET + || WSAGetLastError () == WSAECONNREFUSED + || WSAGetLastError () == WSAENOTCONN); + errno = wsa_error_to_errno (WSAGetLastError ()); + } + } + + return rc == SOCKET_ERROR? -1: rc; + +#else + + const ssize_t rc = recv (s_, data_, size_, 0); + + // Several errors are OK. When speculative read is being done we may not + // be able to read a single byte from the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (rc == -1) { + errno_assert (errno != EBADF + && errno != EFAULT + && errno != EINVAL + && errno != ENOMEM + && errno != ENOTSOCK); + if (errno == EWOULDBLOCK || errno == EINTR) + errno = EAGAIN; + } + + return static_cast (rc); + +#endif +} diff --git a/src/tcp.hpp b/src/tcp.hpp index e5b5daca..d2856ecf 100644 --- a/src/tcp.hpp +++ b/src/tcp.hpp @@ -37,6 +37,16 @@ namespace zmq // Tunes TCP keep-alives void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_); + // Writes data to the socket. Returns the number of bytes actually + // written (even zero is to be considered to be a success). In case + // of error or orderly shutdown by the other peer -1 is returned. + int tcp_write (fd_t s_, const void *data_, size_t size_); + + // Reads data from the socket (up to 'size' bytes). + // Returns the number of bytes actually read or -1 on error. + // Zero indicates the peer has closed the connection. + int tcp_read (fd_t s_, void *data_, size_t size_); + } #endif