Resolve addresses in the calling thread on connect.

This allows us to actually report an error to the caller on resolve
failure, rather than asserting later on in the io thread.

Signed-off-by: Staffan Gimåker <staffan@spotify.com>
This commit is contained in:
Staffan Gimåker
2012-02-02 14:56:51 +01:00
parent b2e2fa622d
commit b9fb48f47b
40 changed files with 292 additions and 130 deletions

View File

@@ -30,6 +30,7 @@
#include "ipc_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "address.hpp"
#include "req.hpp"
#include "xreq.hpp"
@@ -45,52 +46,52 @@
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_)
const address_t *addr_)
{
session_base_t *s = NULL;
switch (options_.type) {
case ZMQ_REQ:
s = new (std::nothrow) req_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_XREQ:
s = new (std::nothrow) xreq_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
case ZMQ_REP:
s = new (std::nothrow) rep_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_XREP:
s = new (std::nothrow) xrep_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_PUB:
s = new (std::nothrow) pub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_XPUB:
s = new (std::nothrow) xpub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_SUB:
s = new (std::nothrow) sub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_XSUB:
s = new (std::nothrow) xsub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_PUSH:
s = new (std::nothrow) push_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_PULL:
s = new (std::nothrow) pull_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
case ZMQ_PAIR:
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
socket_, options_, addr_);
break;
default:
errno = EINVAL;
@@ -102,7 +103,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
const address_t *addr_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
connect (connect_),
@@ -114,12 +115,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_thread (io_thread_),
has_linger_timer (false),
send_identity (options_.send_identity),
recv_identity (options_.recv_identity)
recv_identity (options_.recv_identity),
addr (addr_)
{
if (protocol_)
protocol = protocol_;
if (address_)
address = address_;
}
zmq::session_base_t::~session_base_t ()
@@ -135,6 +133,9 @@ zmq::session_base_t::~session_base_t ()
// Close the engine.
if (engine)
engine->terminate ();
if (addr)
delete addr;
}
void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
@@ -393,18 +394,18 @@ void zmq::session_base_t::start_connecting (bool wait_)
// Create the connecter object.
if (protocol == "tcp") {
if (addr->protocol == "tcp") {
tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
io_thread, this, options, address.c_str (), wait_);
io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
if (protocol == "ipc") {
if (addr->protocol == "ipc") {
ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
io_thread, this, options, address.c_str (), wait_);
io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
@@ -414,10 +415,10 @@ void zmq::session_base_t::start_connecting (bool wait_)
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
if (protocol == "pgm" || protocol == "epgm") {
if (addr->protocol == "pgm" || addr->protocol == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (protocol == "epgm");
bool udp_encapsulation = (addr->protocol == "epgm");
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
@@ -429,7 +430,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
io_thread, options);
alloc_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_sender);
@@ -441,7 +442,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
io_thread, options);
alloc_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_receiver);