mirror of
https://github.com/zeromq/libzmq.git
synced 2025-11-08 14:56:14 +01:00
Problem: socks_connecter_t duplicates code with stream_connecter_base_t
Solution: let socks_connecter_t derive from stream_connecter_base_t and remove duplicate code
This commit is contained in:
@@ -58,64 +58,23 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
|
|||||||
address_t *addr_,
|
address_t *addr_,
|
||||||
address_t *proxy_addr_,
|
address_t *proxy_addr_,
|
||||||
bool delayed_start_) :
|
bool delayed_start_) :
|
||||||
own_t (io_thread_, options_),
|
stream_connecter_base_t (
|
||||||
io_object_t (io_thread_),
|
io_thread_, session_, options_, addr_, delayed_start_),
|
||||||
_addr (addr_),
|
|
||||||
_proxy_addr (proxy_addr_),
|
_proxy_addr (proxy_addr_),
|
||||||
_status (unplugged),
|
_status (unplugged)
|
||||||
_s (retired_fd),
|
|
||||||
_handle (static_cast<handle_t> (NULL)),
|
|
||||||
_handle_valid (false),
|
|
||||||
_delayed_start (delayed_start_),
|
|
||||||
_timer_started (false),
|
|
||||||
_session (session_),
|
|
||||||
_current_reconnect_ivl (options.reconnect_ivl)
|
|
||||||
{
|
{
|
||||||
zmq_assert (_addr);
|
|
||||||
zmq_assert (_addr->protocol == protocol_name::tcp);
|
zmq_assert (_addr->protocol == protocol_name::tcp);
|
||||||
_proxy_addr->to_string (_endpoint);
|
_proxy_addr->to_string (_endpoint);
|
||||||
_socket = _session->get_socket ();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::socks_connecter_t::~socks_connecter_t ()
|
zmq::socks_connecter_t::~socks_connecter_t ()
|
||||||
{
|
{
|
||||||
zmq_assert (_s == retired_fd);
|
|
||||||
LIBZMQ_DELETE (_proxy_addr);
|
LIBZMQ_DELETE (_proxy_addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socks_connecter_t::process_plug ()
|
|
||||||
{
|
|
||||||
if (_delayed_start)
|
|
||||||
start_timer ();
|
|
||||||
else
|
|
||||||
initiate_connect ();
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::socks_connecter_t::process_term (int linger_)
|
|
||||||
{
|
|
||||||
switch (_status) {
|
|
||||||
case unplugged:
|
|
||||||
break;
|
|
||||||
case waiting_for_reconnect_time:
|
|
||||||
cancel_timer (reconnect_timer_id);
|
|
||||||
break;
|
|
||||||
case waiting_for_proxy_connection:
|
|
||||||
case sending_greeting:
|
|
||||||
case waiting_for_choice:
|
|
||||||
case sending_request:
|
|
||||||
case waiting_for_response:
|
|
||||||
rm_fd (_handle);
|
|
||||||
if (_s != retired_fd)
|
|
||||||
close ();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
own_t::process_term (linger_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::socks_connecter_t::in_event ()
|
void zmq::socks_connecter_t::in_event ()
|
||||||
{
|
{
|
||||||
zmq_assert (_status != unplugged && _status != waiting_for_reconnect_time);
|
zmq_assert (_status != unplugged);
|
||||||
|
|
||||||
if (_status == waiting_for_choice) {
|
if (_status == waiting_for_choice) {
|
||||||
int rc = _choice_decoder.input (_s);
|
int rc = _choice_decoder.input (_s);
|
||||||
@@ -127,7 +86,7 @@ void zmq::socks_connecter_t::in_event ()
|
|||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
error ();
|
error ();
|
||||||
else {
|
else {
|
||||||
std::string hostname = "";
|
std::string hostname;
|
||||||
uint16_t port = 0;
|
uint16_t port = 0;
|
||||||
if (parse_address (_addr->address, hostname, port) == -1)
|
if (parse_address (_addr->address, hostname, port) == -1)
|
||||||
error ();
|
error ();
|
||||||
@@ -150,26 +109,11 @@ void zmq::socks_connecter_t::in_event ()
|
|||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
error ();
|
error ();
|
||||||
else {
|
else {
|
||||||
const endpoint_uri_pair_t endpoint_pair = endpoint_uri_pair_t (
|
rm_handle ();
|
||||||
get_socket_name<tcp_address_t> (_s, socket_end_local),
|
create_engine (
|
||||||
_endpoint, endpoint_type_connect);
|
_s, get_socket_name<tcp_address_t> (_s, socket_end_local));
|
||||||
|
|
||||||
// Create the engine object for this connection.
|
|
||||||
stream_engine_t *engine = new (std::nothrow)
|
|
||||||
stream_engine_t (_s, options, endpoint_pair);
|
|
||||||
alloc_assert (engine);
|
|
||||||
|
|
||||||
// Attach the engine to the corresponding session object.
|
|
||||||
send_attach (_session, engine);
|
|
||||||
|
|
||||||
_socket->event_connected (endpoint_pair, _s);
|
|
||||||
|
|
||||||
rm_fd (_handle);
|
|
||||||
_s = -1;
|
_s = -1;
|
||||||
_status = unplugged;
|
_status = unplugged;
|
||||||
|
|
||||||
// Shut the connecter down.
|
|
||||||
terminate ();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else
|
} else
|
||||||
@@ -213,8 +157,10 @@ void zmq::socks_connecter_t::out_event ()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socks_connecter_t::initiate_connect ()
|
void zmq::socks_connecter_t::start_connecting ()
|
||||||
{
|
{
|
||||||
|
zmq_assert (_status == unplugged);
|
||||||
|
|
||||||
// Open the connecting socket.
|
// Open the connecting socket.
|
||||||
const int rc = connect_to_proxy ();
|
const int rc = connect_to_proxy ();
|
||||||
|
|
||||||
@@ -236,7 +182,7 @@ void zmq::socks_connecter_t::initiate_connect ()
|
|||||||
else {
|
else {
|
||||||
if (_s != retired_fd)
|
if (_s != retired_fd)
|
||||||
close ();
|
close ();
|
||||||
start_timer ();
|
add_reconnect_timer ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -253,13 +199,6 @@ int zmq::socks_connecter_t::process_server_response (
|
|||||||
return response_.response_code == 0 ? 0 : -1;
|
return response_.response_code == 0 ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socks_connecter_t::timer_event (int id_)
|
|
||||||
{
|
|
||||||
zmq_assert (_status == waiting_for_reconnect_time);
|
|
||||||
zmq_assert (id_ == reconnect_timer_id);
|
|
||||||
initiate_connect ();
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::socks_connecter_t::error ()
|
void zmq::socks_connecter_t::error ()
|
||||||
{
|
{
|
||||||
rm_fd (_handle);
|
rm_fd (_handle);
|
||||||
@@ -268,34 +207,7 @@ void zmq::socks_connecter_t::error ()
|
|||||||
_choice_decoder.reset ();
|
_choice_decoder.reset ();
|
||||||
_request_encoder.reset ();
|
_request_encoder.reset ();
|
||||||
_response_decoder.reset ();
|
_response_decoder.reset ();
|
||||||
start_timer ();
|
add_reconnect_timer ();
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::socks_connecter_t::start_timer ()
|
|
||||||
{
|
|
||||||
if (options.reconnect_ivl != -1) {
|
|
||||||
const int interval = get_new_reconnect_ivl ();
|
|
||||||
add_timer (interval, reconnect_timer_id);
|
|
||||||
_status = waiting_for_reconnect_time;
|
|
||||||
_socket->event_connect_retried (
|
|
||||||
make_unconnected_connect_endpoint_pair (_endpoint), interval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int zmq::socks_connecter_t::get_new_reconnect_ivl ()
|
|
||||||
{
|
|
||||||
// The new interval is the current interval + random value.
|
|
||||||
const int interval =
|
|
||||||
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
|
|
||||||
|
|
||||||
// Only change the current reconnect interval if the maximum reconnect
|
|
||||||
// interval was set and if it's larger than the reconnect interval.
|
|
||||||
if (options.reconnect_ivl_max > 0
|
|
||||||
&& options.reconnect_ivl_max > options.reconnect_ivl)
|
|
||||||
// Calculate the next interval
|
|
||||||
_current_reconnect_ivl =
|
|
||||||
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
|
|
||||||
return interval;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socks_connecter_t::connect_to_proxy ()
|
int zmq::socks_connecter_t::connect_to_proxy ()
|
||||||
@@ -439,21 +351,6 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socks_connecter_t::close ()
|
|
||||||
{
|
|
||||||
zmq_assert (_s != retired_fd);
|
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
|
||||||
const int rc = closesocket (_s);
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
#else
|
|
||||||
const int rc = ::close (_s);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
#endif
|
|
||||||
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
|
|
||||||
_s);
|
|
||||||
_s = retired_fd;
|
|
||||||
}
|
|
||||||
|
|
||||||
int zmq::socks_connecter_t::parse_address (const std::string &address_,
|
int zmq::socks_connecter_t::parse_address (const std::string &address_,
|
||||||
std::string &hostname_,
|
std::string &hostname_,
|
||||||
uint16_t &port_)
|
uint16_t &port_)
|
||||||
|
|||||||
@@ -31,8 +31,7 @@
|
|||||||
#define __SOCKS_CONNECTER_HPP_INCLUDED__
|
#define __SOCKS_CONNECTER_HPP_INCLUDED__
|
||||||
|
|
||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "io_object.hpp"
|
#include "stream_connecter_base.hpp"
|
||||||
#include "own.hpp"
|
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "socks.hpp"
|
#include "socks.hpp"
|
||||||
|
|
||||||
@@ -42,8 +41,7 @@ class io_thread_t;
|
|||||||
class session_base_t;
|
class session_base_t;
|
||||||
struct address_t;
|
struct address_t;
|
||||||
|
|
||||||
// TODO consider refactoring this to derive from stream_connecter_base_t
|
class socks_connecter_t : public stream_connecter_base_t
|
||||||
class socks_connecter_t : public own_t, public io_object_t
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
// If 'delayed_start' is true connecter first waits for a while,
|
// If 'delayed_start' is true connecter first waits for a while,
|
||||||
@@ -68,29 +66,18 @@ class socks_connecter_t : public own_t, public io_object_t
|
|||||||
waiting_for_response
|
waiting_for_response
|
||||||
};
|
};
|
||||||
|
|
||||||
// ID of the timer used to delay the reconnection.
|
|
||||||
enum
|
|
||||||
{
|
|
||||||
reconnect_timer_id = 1
|
|
||||||
};
|
|
||||||
|
|
||||||
// Method ID
|
// Method ID
|
||||||
enum
|
enum
|
||||||
{
|
{
|
||||||
socks_no_auth_required = 0
|
socks_no_auth_required = 0
|
||||||
};
|
};
|
||||||
|
|
||||||
// Handlers for incoming commands.
|
|
||||||
virtual void process_plug ();
|
|
||||||
virtual void process_term (int linger_);
|
|
||||||
|
|
||||||
// Handlers for I/O events.
|
// Handlers for I/O events.
|
||||||
virtual void in_event ();
|
virtual void in_event ();
|
||||||
virtual void out_event ();
|
virtual void out_event ();
|
||||||
virtual void timer_event (int id_);
|
|
||||||
|
|
||||||
// Internal function to start the actual connection establishment.
|
// Internal function to start the actual connection establishment.
|
||||||
void initiate_connect ();
|
void start_connecting ();
|
||||||
|
|
||||||
int process_server_response (const socks_choice_t &response_);
|
int process_server_response (const socks_choice_t &response_);
|
||||||
int process_server_response (const socks_response_t &response_);
|
int process_server_response (const socks_response_t &response_);
|
||||||
@@ -103,22 +90,11 @@ class socks_connecter_t : public own_t, public io_object_t
|
|||||||
|
|
||||||
void error ();
|
void error ();
|
||||||
|
|
||||||
// Internal function to start reconnect timer
|
|
||||||
void start_timer ();
|
|
||||||
|
|
||||||
// Internal function to return a reconnect backoff delay.
|
|
||||||
// Will modify the current_reconnect_ivl used for next call
|
|
||||||
// Returns the currently used interval
|
|
||||||
int get_new_reconnect_ivl ();
|
|
||||||
|
|
||||||
// Open TCP connecting socket. Returns -1 in case of error,
|
// Open TCP connecting socket. Returns -1 in case of error,
|
||||||
// 0 if connect was successful immediately. Returns -1 with
|
// 0 if connect was successful immediately. Returns -1 with
|
||||||
// EAGAIN errno if async connect was launched.
|
// EAGAIN errno if async connect was launched.
|
||||||
int open ();
|
int open ();
|
||||||
|
|
||||||
// Close the connecting socket.
|
|
||||||
void close ();
|
|
||||||
|
|
||||||
// Get the file descriptor of newly created connection. Returns
|
// Get the file descriptor of newly created connection. Returns
|
||||||
// retired_fd if the connection was unsuccessful.
|
// retired_fd if the connection was unsuccessful.
|
||||||
zmq::fd_t check_proxy_connection ();
|
zmq::fd_t check_proxy_connection ();
|
||||||
@@ -128,42 +104,11 @@ class socks_connecter_t : public own_t, public io_object_t
|
|||||||
socks_request_encoder_t _request_encoder;
|
socks_request_encoder_t _request_encoder;
|
||||||
socks_response_decoder_t _response_decoder;
|
socks_response_decoder_t _response_decoder;
|
||||||
|
|
||||||
// Address to connect to. Owned by session_base_t.
|
|
||||||
address_t *_addr;
|
|
||||||
|
|
||||||
// SOCKS address; owned by this connecter.
|
// SOCKS address; owned by this connecter.
|
||||||
address_t *_proxy_addr;
|
address_t *_proxy_addr;
|
||||||
|
|
||||||
int _status;
|
int _status;
|
||||||
|
|
||||||
// Underlying socket.
|
|
||||||
fd_t _s;
|
|
||||||
|
|
||||||
// Handle corresponding to the listening socket.
|
|
||||||
handle_t _handle;
|
|
||||||
|
|
||||||
// If true file descriptor is registered with the poller and 'handle'
|
|
||||||
// contains valid value.
|
|
||||||
bool _handle_valid;
|
|
||||||
|
|
||||||
// If true, connecter is waiting a while before trying to connect.
|
|
||||||
const bool _delayed_start;
|
|
||||||
|
|
||||||
// True iff a timer has been started.
|
|
||||||
bool _timer_started;
|
|
||||||
|
|
||||||
// Reference to the session we belong to.
|
|
||||||
zmq::session_base_t *_session;
|
|
||||||
|
|
||||||
// Current reconnect ivl, updated for backoff strategy
|
|
||||||
int _current_reconnect_ivl;
|
|
||||||
|
|
||||||
// String representation of endpoint to connect to
|
|
||||||
std::string _endpoint;
|
|
||||||
|
|
||||||
// Socket
|
|
||||||
zmq::socket_base_t *_socket;
|
|
||||||
|
|
||||||
socks_connecter_t (const socks_connecter_t &);
|
socks_connecter_t (const socks_connecter_t &);
|
||||||
const socks_connecter_t &operator= (const socks_connecter_t &);
|
const socks_connecter_t &operator= (const socks_connecter_t &);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user