mirror of
https://github.com/zeromq/libzmq.git
synced 2025-11-05 12:47:49 +01:00
Problem: socks_connecter_t duplicates code with stream_connecter_base_t
Solution: let socks_connecter_t derive from stream_connecter_base_t and remove duplicate code
This commit is contained in:
@@ -58,64 +58,23 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
|
||||
address_t *addr_,
|
||||
address_t *proxy_addr_,
|
||||
bool delayed_start_) :
|
||||
own_t (io_thread_, options_),
|
||||
io_object_t (io_thread_),
|
||||
_addr (addr_),
|
||||
stream_connecter_base_t (
|
||||
io_thread_, session_, options_, addr_, delayed_start_),
|
||||
_proxy_addr (proxy_addr_),
|
||||
_status (unplugged),
|
||||
_s (retired_fd),
|
||||
_handle (static_cast<handle_t> (NULL)),
|
||||
_handle_valid (false),
|
||||
_delayed_start (delayed_start_),
|
||||
_timer_started (false),
|
||||
_session (session_),
|
||||
_current_reconnect_ivl (options.reconnect_ivl)
|
||||
_status (unplugged)
|
||||
{
|
||||
zmq_assert (_addr);
|
||||
zmq_assert (_addr->protocol == protocol_name::tcp);
|
||||
_proxy_addr->to_string (_endpoint);
|
||||
_socket = _session->get_socket ();
|
||||
}
|
||||
|
||||
zmq::socks_connecter_t::~socks_connecter_t ()
|
||||
{
|
||||
zmq_assert (_s == retired_fd);
|
||||
LIBZMQ_DELETE (_proxy_addr);
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::process_plug ()
|
||||
{
|
||||
if (_delayed_start)
|
||||
start_timer ();
|
||||
else
|
||||
initiate_connect ();
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::process_term (int linger_)
|
||||
{
|
||||
switch (_status) {
|
||||
case unplugged:
|
||||
break;
|
||||
case waiting_for_reconnect_time:
|
||||
cancel_timer (reconnect_timer_id);
|
||||
break;
|
||||
case waiting_for_proxy_connection:
|
||||
case sending_greeting:
|
||||
case waiting_for_choice:
|
||||
case sending_request:
|
||||
case waiting_for_response:
|
||||
rm_fd (_handle);
|
||||
if (_s != retired_fd)
|
||||
close ();
|
||||
break;
|
||||
}
|
||||
|
||||
own_t::process_term (linger_);
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::in_event ()
|
||||
{
|
||||
zmq_assert (_status != unplugged && _status != waiting_for_reconnect_time);
|
||||
zmq_assert (_status != unplugged);
|
||||
|
||||
if (_status == waiting_for_choice) {
|
||||
int rc = _choice_decoder.input (_s);
|
||||
@@ -127,7 +86,7 @@ void zmq::socks_connecter_t::in_event ()
|
||||
if (rc == -1)
|
||||
error ();
|
||||
else {
|
||||
std::string hostname = "";
|
||||
std::string hostname;
|
||||
uint16_t port = 0;
|
||||
if (parse_address (_addr->address, hostname, port) == -1)
|
||||
error ();
|
||||
@@ -150,26 +109,11 @@ void zmq::socks_connecter_t::in_event ()
|
||||
if (rc == -1)
|
||||
error ();
|
||||
else {
|
||||
const endpoint_uri_pair_t endpoint_pair = endpoint_uri_pair_t (
|
||||
get_socket_name<tcp_address_t> (_s, socket_end_local),
|
||||
_endpoint, endpoint_type_connect);
|
||||
|
||||
// Create the engine object for this connection.
|
||||
stream_engine_t *engine = new (std::nothrow)
|
||||
stream_engine_t (_s, options, endpoint_pair);
|
||||
alloc_assert (engine);
|
||||
|
||||
// Attach the engine to the corresponding session object.
|
||||
send_attach (_session, engine);
|
||||
|
||||
_socket->event_connected (endpoint_pair, _s);
|
||||
|
||||
rm_fd (_handle);
|
||||
rm_handle ();
|
||||
create_engine (
|
||||
_s, get_socket_name<tcp_address_t> (_s, socket_end_local));
|
||||
_s = -1;
|
||||
_status = unplugged;
|
||||
|
||||
// Shut the connecter down.
|
||||
terminate ();
|
||||
}
|
||||
}
|
||||
} else
|
||||
@@ -213,8 +157,10 @@ void zmq::socks_connecter_t::out_event ()
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::initiate_connect ()
|
||||
void zmq::socks_connecter_t::start_connecting ()
|
||||
{
|
||||
zmq_assert (_status == unplugged);
|
||||
|
||||
// Open the connecting socket.
|
||||
const int rc = connect_to_proxy ();
|
||||
|
||||
@@ -236,7 +182,7 @@ void zmq::socks_connecter_t::initiate_connect ()
|
||||
else {
|
||||
if (_s != retired_fd)
|
||||
close ();
|
||||
start_timer ();
|
||||
add_reconnect_timer ();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -253,13 +199,6 @@ int zmq::socks_connecter_t::process_server_response (
|
||||
return response_.response_code == 0 ? 0 : -1;
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::timer_event (int id_)
|
||||
{
|
||||
zmq_assert (_status == waiting_for_reconnect_time);
|
||||
zmq_assert (id_ == reconnect_timer_id);
|
||||
initiate_connect ();
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::error ()
|
||||
{
|
||||
rm_fd (_handle);
|
||||
@@ -268,34 +207,7 @@ void zmq::socks_connecter_t::error ()
|
||||
_choice_decoder.reset ();
|
||||
_request_encoder.reset ();
|
||||
_response_decoder.reset ();
|
||||
start_timer ();
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::start_timer ()
|
||||
{
|
||||
if (options.reconnect_ivl != -1) {
|
||||
const int interval = get_new_reconnect_ivl ();
|
||||
add_timer (interval, reconnect_timer_id);
|
||||
_status = waiting_for_reconnect_time;
|
||||
_socket->event_connect_retried (
|
||||
make_unconnected_connect_endpoint_pair (_endpoint), interval);
|
||||
}
|
||||
}
|
||||
|
||||
int zmq::socks_connecter_t::get_new_reconnect_ivl ()
|
||||
{
|
||||
// The new interval is the current interval + random value.
|
||||
const int interval =
|
||||
_current_reconnect_ivl + generate_random () % options.reconnect_ivl;
|
||||
|
||||
// Only change the current reconnect interval if the maximum reconnect
|
||||
// interval was set and if it's larger than the reconnect interval.
|
||||
if (options.reconnect_ivl_max > 0
|
||||
&& options.reconnect_ivl_max > options.reconnect_ivl)
|
||||
// Calculate the next interval
|
||||
_current_reconnect_ivl =
|
||||
std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max);
|
||||
return interval;
|
||||
add_reconnect_timer ();
|
||||
}
|
||||
|
||||
int zmq::socks_connecter_t::connect_to_proxy ()
|
||||
@@ -439,21 +351,6 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
|
||||
return 0;
|
||||
}
|
||||
|
||||
void zmq::socks_connecter_t::close ()
|
||||
{
|
||||
zmq_assert (_s != retired_fd);
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
const int rc = closesocket (_s);
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
const int rc = ::close (_s);
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
_socket->event_closed (make_unconnected_connect_endpoint_pair (_endpoint),
|
||||
_s);
|
||||
_s = retired_fd;
|
||||
}
|
||||
|
||||
int zmq::socks_connecter_t::parse_address (const std::string &address_,
|
||||
std::string &hostname_,
|
||||
uint16_t &port_)
|
||||
|
||||
@@ -31,8 +31,7 @@
|
||||
#define __SOCKS_CONNECTER_HPP_INCLUDED__
|
||||
|
||||
#include "fd.hpp"
|
||||
#include "io_object.hpp"
|
||||
#include "own.hpp"
|
||||
#include "stream_connecter_base.hpp"
|
||||
#include "stdint.hpp"
|
||||
#include "socks.hpp"
|
||||
|
||||
@@ -42,8 +41,7 @@ class io_thread_t;
|
||||
class session_base_t;
|
||||
struct address_t;
|
||||
|
||||
// TODO consider refactoring this to derive from stream_connecter_base_t
|
||||
class socks_connecter_t : public own_t, public io_object_t
|
||||
class socks_connecter_t : public stream_connecter_base_t
|
||||
{
|
||||
public:
|
||||
// If 'delayed_start' is true connecter first waits for a while,
|
||||
@@ -68,29 +66,18 @@ class socks_connecter_t : public own_t, public io_object_t
|
||||
waiting_for_response
|
||||
};
|
||||
|
||||
// ID of the timer used to delay the reconnection.
|
||||
enum
|
||||
{
|
||||
reconnect_timer_id = 1
|
||||
};
|
||||
|
||||
// Method ID
|
||||
enum
|
||||
{
|
||||
socks_no_auth_required = 0
|
||||
};
|
||||
|
||||
// Handlers for incoming commands.
|
||||
virtual void process_plug ();
|
||||
virtual void process_term (int linger_);
|
||||
|
||||
// Handlers for I/O events.
|
||||
virtual void in_event ();
|
||||
virtual void out_event ();
|
||||
virtual void timer_event (int id_);
|
||||
|
||||
// Internal function to start the actual connection establishment.
|
||||
void initiate_connect ();
|
||||
void start_connecting ();
|
||||
|
||||
int process_server_response (const socks_choice_t &response_);
|
||||
int process_server_response (const socks_response_t &response_);
|
||||
@@ -103,22 +90,11 @@ class socks_connecter_t : public own_t, public io_object_t
|
||||
|
||||
void error ();
|
||||
|
||||
// Internal function to start reconnect timer
|
||||
void start_timer ();
|
||||
|
||||
// Internal function to return a reconnect backoff delay.
|
||||
// Will modify the current_reconnect_ivl used for next call
|
||||
// Returns the currently used interval
|
||||
int get_new_reconnect_ivl ();
|
||||
|
||||
// Open TCP connecting socket. Returns -1 in case of error,
|
||||
// 0 if connect was successful immediately. Returns -1 with
|
||||
// EAGAIN errno if async connect was launched.
|
||||
int open ();
|
||||
|
||||
// Close the connecting socket.
|
||||
void close ();
|
||||
|
||||
// Get the file descriptor of newly created connection. Returns
|
||||
// retired_fd if the connection was unsuccessful.
|
||||
zmq::fd_t check_proxy_connection ();
|
||||
@@ -128,42 +104,11 @@ class socks_connecter_t : public own_t, public io_object_t
|
||||
socks_request_encoder_t _request_encoder;
|
||||
socks_response_decoder_t _response_decoder;
|
||||
|
||||
// Address to connect to. Owned by session_base_t.
|
||||
address_t *_addr;
|
||||
|
||||
// SOCKS address; owned by this connecter.
|
||||
address_t *_proxy_addr;
|
||||
|
||||
int _status;
|
||||
|
||||
// Underlying socket.
|
||||
fd_t _s;
|
||||
|
||||
// Handle corresponding to the listening socket.
|
||||
handle_t _handle;
|
||||
|
||||
// If true file descriptor is registered with the poller and 'handle'
|
||||
// contains valid value.
|
||||
bool _handle_valid;
|
||||
|
||||
// If true, connecter is waiting a while before trying to connect.
|
||||
const bool _delayed_start;
|
||||
|
||||
// True iff a timer has been started.
|
||||
bool _timer_started;
|
||||
|
||||
// Reference to the session we belong to.
|
||||
zmq::session_base_t *_session;
|
||||
|
||||
// Current reconnect ivl, updated for backoff strategy
|
||||
int _current_reconnect_ivl;
|
||||
|
||||
// String representation of endpoint to connect to
|
||||
std::string _endpoint;
|
||||
|
||||
// Socket
|
||||
zmq::socket_base_t *_socket;
|
||||
|
||||
socks_connecter_t (const socks_connecter_t &);
|
||||
const socks_connecter_t &operator= (const socks_connecter_t &);
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user