diff --git a/src/Makefile.am b/src/Makefile.am index 4d3cba36..59357470 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,6 +6,7 @@ pkgconfig_DATA = libzmq.pc include_HEADERS = ../include/zmq.h ../include/zmq_utils.h libzmq_la_SOURCES = \ + address.hpp \ array.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ @@ -76,6 +77,7 @@ libzmq_la_SOURCES = \ xsub.hpp \ ypipe.hpp \ yqueue.hpp \ + address.cpp \ clock.cpp \ ctx.cpp \ decoder.cpp \ diff --git a/src/address.cpp b/src/address.cpp new file mode 100644 index 00000000..693c1610 --- /dev/null +++ b/src/address.cpp @@ -0,0 +1,50 @@ +/* + Copyright (c) 2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "address.hpp" +#include "err.hpp" +#include "tcp_address.hpp" +#include "ipc_address.hpp" + +#include + +zmq::address_t::address_t ( + const std::string &protocol_, const std::string &address_) + : protocol (protocol_), + address (address_) +{ + memset (&resolved, 0, sizeof (resolved)); +} + +zmq::address_t::~address_t () +{ + if (protocol == "tcp") { + if (resolved.tcp_addr) { + delete resolved.tcp_addr; + resolved.tcp_addr = 0; + } + } + else if (protocol == "ipc") { + if (resolved.ipc_addr) { + delete resolved.ipc_addr; + resolved.ipc_addr = 0; + } + } +} diff --git a/src/address.hpp b/src/address.hpp new file mode 100644 index 00000000..acb6ef10 --- /dev/null +++ b/src/address.hpp @@ -0,0 +1,48 @@ +/* + Copyright (c) 2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ADDRESS_HPP_INCLUDED__ +#define __ZMQ_ADDRESS_HPP_INCLUDED__ + +#include + +namespace zmq +{ + class tcp_address_t; + class ipc_address_t; + + struct address_t { + address_t (const std::string &protocol_, const std::string &address_); + + ~address_t (); + + const std::string protocol; + const std::string address; + + // Protocol specific resolved address + union { + tcp_address_t *tcp_addr; + ipc_address_t *ipc_addr; + } resolved; + }; + +} + +#endif diff --git a/src/ipc_address.cpp b/src/ipc_address.cpp index d601c565..a5ef7ab1 100644 --- a/src/ipc_address.cpp +++ b/src/ipc_address.cpp @@ -47,12 +47,12 @@ int zmq::ipc_address_t::resolve (const char *path_) return 0; } -sockaddr *zmq::ipc_address_t::addr () +const sockaddr *zmq::ipc_address_t::addr () const { return (sockaddr*) &address; } -socklen_t zmq::ipc_address_t::addrlen () +socklen_t zmq::ipc_address_t::addrlen () const { return (socklen_t) sizeof (address); } diff --git a/src/ipc_address.hpp b/src/ipc_address.hpp index 4a7f2309..7047b04b 100644 --- a/src/ipc_address.hpp +++ b/src/ipc_address.hpp @@ -41,8 +41,8 @@ namespace zmq // This function sets up the address for UNIX domain transport. int resolve (const char* path_); - sockaddr *addr (); - socklen_t addrlen (); + const sockaddr *addr () const; + socklen_t addrlen () const; private: diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index eeb070d4..63689446 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -31,6 +31,8 @@ #include "random.hpp" #include "err.hpp" #include "ip.hpp" +#include "address.hpp" +#include "ipc_address.hpp" #include #include @@ -39,19 +41,18 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, const options_t &options_, - const char *address_, bool wait_) : + const address_t *addr_, bool wait_) : own_t (io_thread_, options_), io_object_t (io_thread_), + addr (addr_), s (retired_fd), handle_valid (false), wait (wait_), session (session_), current_reconnect_ivl(options.reconnect_ivl) { - // TODO: set_addess should be called separately, so that the error - // can be propagated. - int rc = set_address (address_); - zmq_assert (rc == 0); + zmq_assert (addr); + zmq_assert (addr->protocol == "ipc"); } zmq::ipc_connecter_t::~ipc_connecter_t () @@ -165,11 +166,6 @@ int zmq::ipc_connecter_t::get_new_reconnect_ivl () return this_interval; } -int zmq::ipc_connecter_t::set_address (const char *addr_) -{ - return address.resolve (addr_); -} - int zmq::ipc_connecter_t::open () { zmq_assert (s == retired_fd); @@ -183,7 +179,9 @@ int zmq::ipc_connecter_t::open () unblock_socket (s); // Connect to the remote peer. - int rc = ::connect (s, address.addr (), address.addrlen ()); + int rc = ::connect ( + s, addr->resolved.ipc_addr->addr (), + addr->resolved.ipc_addr->addrlen ()); // Connect was successfull immediately. if (rc == 0) diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index a2cb4db6..e3ca6f8b 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -29,13 +29,13 @@ #include "own.hpp" #include "stdint.hpp" #include "io_object.hpp" -#include "ipc_address.hpp" namespace zmq { class io_thread_t; class session_base_t; + struct address_t; class ipc_connecter_t : public own_t, public io_object_t { @@ -45,7 +45,7 @@ namespace zmq // connection process. ipc_connecter_t (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_, const options_t &options_, - const char *address_, bool delay_); + const address_t *addr_, bool delay_); ~ipc_connecter_t (); private: @@ -72,9 +72,6 @@ namespace zmq // Returns the currently used interval int get_new_reconnect_ivl (); - // Set address to connect to. - int set_address (const char *addr_); - // Open IPC connecting socket. Returns -1 in case of error, // 0 if connect was successfull immediately. Returns -1 with // EAGAIN errno if async connect was launched. @@ -87,8 +84,8 @@ namespace zmq // retired_fd if the connection was unsuccessfull. fd_t connect (); - // Address to connect to. - ipc_address_t address; + // Address to connect to. Owned by session_base_t. + const address_t *addr; // Underlying socket. fd_t s; diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 668464a3..a9a33d19 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -88,7 +88,7 @@ void zmq::ipc_listener_t::in_event () // Create and launch a session object. session_base_t *session = session_base_t::create (io_thread, false, socket, - options, NULL, NULL); + options, NULL); errno_assert (session); session->inc_seqnum (); launch_child (session); diff --git a/src/pair.cpp b/src/pair.cpp index 82fd39b5..c066b293 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -119,9 +119,8 @@ bool zmq::pair_t::xhas_out () zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/pair.hpp b/src/pair.hpp index 5deea184..1c0859e6 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -65,7 +65,7 @@ namespace zmq pair_session_t (zmq::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~pair_session_t (); private: diff --git a/src/pub.cpp b/src/pub.cpp index 7458d5f0..e2ad8a68 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -46,9 +46,8 @@ bool zmq::pub_t::xhas_in () zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - xpub_session_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + xpub_session_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/pub.hpp b/src/pub.hpp index 46726000..fb481730 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -55,7 +55,7 @@ namespace zmq pub_session_t (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~pub_session_t (); private: diff --git a/src/pull.cpp b/src/pull.cpp index ee840d91..f6538e6f 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -62,9 +62,8 @@ bool zmq::pull_t::xhas_in () zmq::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/pull.hpp b/src/pull.hpp index 11c16b86..2bdc5d0d 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -67,7 +67,7 @@ namespace zmq pull_session_t (zmq::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~pull_session_t (); private: diff --git a/src/push.cpp b/src/push.cpp index 2a29d353..80e8f4cf 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -62,9 +62,8 @@ bool zmq::push_t::xhas_out () zmq::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/push.hpp b/src/push.hpp index 18eaf095..1012ce28 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -66,7 +66,7 @@ namespace zmq push_session_t (zmq::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~push_session_t (); private: diff --git a/src/rep.cpp b/src/rep.cpp index 02a825c1..fbc981cd 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -114,9 +114,8 @@ bool zmq::rep_t::xhas_out () zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - xrep_session_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + xrep_session_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/rep.hpp b/src/rep.hpp index 04b3d862..04ac5c0b 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -66,7 +66,7 @@ namespace zmq rep_session_t (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~rep_session_t (); private: diff --git a/src/req.cpp b/src/req.cpp index cf7dd455..3d2a6bc8 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -139,9 +139,8 @@ bool zmq::req_t::xhas_out () zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - xreq_session_t (io_thread_, connect_, socket_, options_, protocol_, - address_), + const address_t *addr_) : + xreq_session_t (io_thread_, connect_, socket_, options_, addr_), state (identity) { } diff --git a/src/req.hpp b/src/req.hpp index e743cb8e..0edcb046 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -67,7 +67,7 @@ namespace zmq req_session_t (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~req_session_t (); // Overloads of the functions from session_base_t. diff --git a/src/session_base.cpp b/src/session_base.cpp index f2ee7139..6dc5166f 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -30,6 +30,7 @@ #include "ipc_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" +#include "address.hpp" #include "req.hpp" #include "xreq.hpp" @@ -45,52 +46,52 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) + const address_t *addr_) { session_base_t *s = NULL; switch (options_.type) { case ZMQ_REQ: s = new (std::nothrow) req_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_XREQ: s = new (std::nothrow) xreq_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); case ZMQ_REP: s = new (std::nothrow) rep_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_XREP: s = new (std::nothrow) xrep_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_PUB: s = new (std::nothrow) pub_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_XPUB: s = new (std::nothrow) xpub_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_SUB: s = new (std::nothrow) sub_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_XSUB: s = new (std::nothrow) xsub_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_PUSH: s = new (std::nothrow) push_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_PULL: s = new (std::nothrow) pull_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; case ZMQ_PAIR: s = new (std::nothrow) pair_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); + socket_, options_, addr_); break; default: errno = EINVAL; @@ -102,7 +103,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : + const address_t *addr_) : own_t (io_thread_, options_), io_object_t (io_thread_), connect (connect_), @@ -114,12 +115,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, io_thread (io_thread_), has_linger_timer (false), send_identity (options_.send_identity), - recv_identity (options_.recv_identity) + recv_identity (options_.recv_identity), + addr (addr_) { - if (protocol_) - protocol = protocol_; - if (address_) - address = address_; } zmq::session_base_t::~session_base_t () @@ -135,6 +133,9 @@ zmq::session_base_t::~session_base_t () // Close the engine. if (engine) engine->terminate (); + + if (addr) + delete addr; } void zmq::session_base_t::attach_pipe (pipe_t *pipe_) @@ -393,18 +394,18 @@ void zmq::session_base_t::start_connecting (bool wait_) // Create the connecter object. - if (protocol == "tcp") { + if (addr->protocol == "tcp") { tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( - io_thread, this, options, address.c_str (), wait_); + io_thread, this, options, addr, wait_); alloc_assert (connecter); launch_child (connecter); return; } #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS - if (protocol == "ipc") { + if (addr->protocol == "ipc") { ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( - io_thread, this, options, address.c_str (), wait_); + io_thread, this, options, addr, wait_); alloc_assert (connecter); launch_child (connecter); return; @@ -414,10 +415,10 @@ void zmq::session_base_t::start_connecting (bool wait_) #if defined ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. - if (protocol == "pgm" || protocol == "epgm") { + if (addr->protocol == "pgm" || addr->protocol == "epgm") { // For EPGM transport with UDP encapsulation of PGM is used. - bool udp_encapsulation = (protocol == "epgm"); + bool udp_encapsulation = (addr->protocol == "epgm"); // At this point we'll create message pipes to the session straight // away. There's no point in delaying it as no concept of 'connect' @@ -429,7 +430,7 @@ void zmq::session_base_t::start_connecting (bool wait_) io_thread, options); alloc_assert (pgm_sender); - int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); + int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ()); zmq_assert (rc == 0); send_attach (this, pgm_sender); @@ -441,7 +442,7 @@ void zmq::session_base_t::start_connecting (bool wait_) io_thread, options); alloc_assert (pgm_receiver); - int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); + int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ()); zmq_assert (rc == 0); send_attach (this, pgm_receiver); diff --git a/src/session_base.hpp b/src/session_base.hpp index 6be110bc..bc0ecd62 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -36,6 +36,7 @@ namespace zmq class io_thread_t; class socket_base_t; struct i_engine; + struct address_t; class session_base_t : public own_t, @@ -47,8 +48,7 @@ namespace zmq // Create a session of the particular type. static session_base_t *create (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, - const options_t &options_, const char *protocol_, - const char *address_); + const options_t &options_, const address_t *addr_); // To be used once only, when creating the session. void attach_pipe (zmq::pipe_t *pipe_); @@ -69,8 +69,8 @@ namespace zmq session_base_t (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); - ~session_base_t (); + const address_t *addr_); + virtual ~session_base_t (); private: @@ -129,8 +129,7 @@ namespace zmq bool recv_identity; // Protocol and address to use when connecting. - std::string protocol; - std::string address; + const address_t *addr; session_base_t (const session_base_t&); const session_base_t &operator = (const session_base_t&); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index d9bfd484..14b24c8b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -49,6 +49,9 @@ #include "platform.hpp" #include "likely.hpp" #include "msg.hpp" +#include "address.hpp" +#include "ipc_address.hpp" +#include "tcp_address.hpp" #include "pair.hpp" #include "pub.hpp" @@ -447,9 +450,33 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } + address_t *paddr = new (std::nothrow) address_t (protocol, address); + zmq_assert (paddr); + + // Resolve address (if needed by the protocol) + if (protocol == "tcp") { + paddr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); + zmq_assert (paddr->resolved.tcp_addr); + int rc = paddr->resolved.tcp_addr->resolve ( + address.c_str (), false, options.ipv4only ? true : false); + if (rc != 0) { + delete paddr; + return -1; + } + } + else if(protocol == "ipc") { + paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t (); + zmq_assert (paddr->resolved.ipc_addr); + int rc = paddr->resolved.ipc_addr->resolve (address.c_str ()); + if (rc != 0) { + delete paddr; + return -1; + } + } + // Create session. session_base_t *session = session_base_t::create (io_thread, true, this, - options, protocol.c_str (), address.c_str ()); + options, paddr); errno_assert (session); // Create a bi-directional pipe. diff --git a/src/sub.cpp b/src/sub.cpp index 3249aeab..12422dbf 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -82,9 +82,8 @@ bool zmq::sub_t::xhas_out () zmq::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - xsub_session_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + xsub_session_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/sub.hpp b/src/sub.hpp index 1c69b06f..778786d6 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -57,7 +57,7 @@ namespace zmq sub_session_t (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~sub_session_t (); private: diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp index 9fe6083c..b1e3c174 100644 --- a/src/tcp_address.cpp +++ b/src/tcp_address.cpp @@ -419,12 +419,12 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_) return 0; } -sockaddr *zmq::tcp_address_t::addr () +const sockaddr *zmq::tcp_address_t::addr () const { return &address.generic; } -socklen_t zmq::tcp_address_t::addrlen () +socklen_t zmq::tcp_address_t::addrlen () const { if (address.generic.sa_family == AF_INET6) return (socklen_t) sizeof (address.ipv6); @@ -433,9 +433,9 @@ socklen_t zmq::tcp_address_t::addrlen () } #if defined ZMQ_HAVE_WINDOWS -unsigned short zmq::tcp_address_t::family () +unsigned short zmq::tcp_address_t::family () const #else -sa_family_t zmq::tcp_address_t::family () +sa_family_t zmq::tcp_address_t::family () const #endif { return address.generic.sa_family; diff --git a/src/tcp_address.hpp b/src/tcp_address.hpp index d4768c7e..2d369751 100644 --- a/src/tcp_address.hpp +++ b/src/tcp_address.hpp @@ -48,12 +48,12 @@ namespace zmq int resolve (const char* name_, bool local_, bool ipv4only_); #if defined ZMQ_HAVE_WINDOWS - unsigned short family (); + unsigned short family () const; #else - sa_family_t family (); + sa_family_t family () const; #endif - sockaddr *addr (); - socklen_t addrlen (); + const sockaddr *addr () const; + socklen_t addrlen () const; private: diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 1d05b1e7..6ad2f808 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -29,6 +29,8 @@ #include "random.hpp" #include "err.hpp" #include "ip.hpp" +#include "address.hpp" +#include "tcp_address.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" @@ -48,19 +50,18 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, const options_t &options_, - const char *address_, bool wait_) : + const address_t *addr_, bool wait_) : own_t (io_thread_, options_), io_object_t (io_thread_), + addr (addr_), s (retired_fd), handle_valid (false), wait (wait_), session (session_), current_reconnect_ivl(options.reconnect_ivl) { - // TODO: set_addess should be called separately, so that the error - // can be propagated. - int rc = set_address (address_); - errno_assert (rc == 0); + zmq_assert (addr); + zmq_assert (addr->protocol == "tcp"); } zmq::tcp_connecter_t::~tcp_connecter_t () @@ -176,17 +177,12 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl () return this_interval; } -int zmq::tcp_connecter_t::set_address (const char *addr_) -{ - return address.resolve (addr_, false, options.ipv4only ? true : false); -} - int zmq::tcp_connecter_t::open () { zmq_assert (s == retired_fd); // Create the socket. - s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP); + s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); #ifdef ZMQ_HAVE_WINDOWS if (s == INVALID_SOCKET) { wsa_error_to_errno (); @@ -199,14 +195,16 @@ int zmq::tcp_connecter_t::open () // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. // Switch it on in such cases. - if (address.family () == AF_INET6) + if (addr->resolved.tcp_addr->family () == AF_INET6) enable_ipv4_mapping (s); // Set the socket to non-blocking mode so that we get async connect(). unblock_socket (s); // Connect to the remote peer. - int rc = ::connect (s, address.addr (), address.addrlen ()); + int rc = ::connect ( + s, addr->resolved.tcp_addr->addr (), + addr->resolved.tcp_addr->addrlen ()); // Connect was successfull immediately. if (rc == 0) diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 19b6ec59..0891710c 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -26,13 +26,13 @@ #include "own.hpp" #include "stdint.hpp" #include "io_object.hpp" -#include "tcp_address.hpp" namespace zmq { class io_thread_t; class session_base_t; + struct address_t; class tcp_connecter_t : public own_t, public io_object_t { @@ -42,7 +42,7 @@ namespace zmq // connection process. tcp_connecter_t (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_, const options_t &options_, - const char *address_, bool delay_); + const address_t *addr_, bool delay_); ~tcp_connecter_t (); private: @@ -69,9 +69,6 @@ namespace zmq // Returns the currently used interval int get_new_reconnect_ivl (); - // Set address to connect to. - int set_address (const char *addr_); - // Open TCP connecting socket. Returns -1 in case of error, // 0 if connect was successfull immediately. Returns -1 with // EAGAIN errno if async connect was launched. @@ -84,8 +81,8 @@ namespace zmq // retired_fd if the connection was unsuccessfull. fd_t connect (); - // Address to connect to. - tcp_address_t address; + // Address to connect to. Owned by session_base_t. + const address_t *addr; // Underlying socket. fd_t s; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 8c65ff7f..50054402 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -100,7 +100,7 @@ void zmq::tcp_listener_t::in_event () // Create and launch a session object. session_base_t *session = session_base_t::create (io_thread, false, socket, - options, NULL, NULL); + options, NULL); errno_assert (session); session->inc_seqnum (); launch_child (session); diff --git a/src/xpub.cpp b/src/xpub.cpp index 717e26c4..f0b28922 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -176,9 +176,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/xpub.hpp b/src/xpub.hpp index 3e6721b7..9c8b40f0 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -91,7 +91,7 @@ namespace zmq xpub_session_t (zmq::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~xpub_session_t (); private: diff --git a/src/xrep.cpp b/src/xrep.cpp index 4d9e5937..e5845061 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -309,9 +309,8 @@ bool zmq::xrep_t::xhas_out () zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/xrep.hpp b/src/xrep.hpp index 9fb19b7c..414ce7a4 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -110,7 +110,7 @@ namespace zmq xrep_session_t (zmq::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~xrep_session_t (); private: diff --git a/src/xreq.cpp b/src/xreq.cpp index 13809d9f..5c30558b 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -117,9 +117,8 @@ void zmq::xreq_t::xterminated (pipe_t *pipe_) zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/xreq.hpp b/src/xreq.hpp index 4a213446..678e50c1 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -78,7 +78,7 @@ namespace zmq xreq_session_t (zmq::io_thread_t *io_thread_, bool connect_, zmq::socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~xreq_session_t (); private: diff --git a/src/xsub.cpp b/src/xsub.cpp index 80ca1f2d..245b346d 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -220,9 +220,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_, zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) + const address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_) { } diff --git a/src/xsub.hpp b/src/xsub.hpp index fd199367..003034d9 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -93,7 +93,7 @@ namespace zmq xsub_session_t (class io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); + const address_t *addr_); ~xsub_session_t (); private: diff --git a/tests/Makefile.am b/tests/Makefile.am index bcc3c5e1..439fbeb3 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -11,7 +11,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_reqrep_device \ test_sub_forward \ test_invalid_rep \ - test_msg_flags + test_msg_flags \ + test_connect_resolve if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -30,6 +31,7 @@ test_reqrep_device_SOURCES = test_reqrep_device.cpp test_sub_forward_SOURCES = test_sub_forward.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp test_msg_flags_SOURCES = test_msg_flags.cpp +test_connect_resolve_SOURCES = test_connect_resolve.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_connect_resolve.cpp b/tests/test_connect_resolve.cpp new file mode 100644 index 00000000..cec85bb5 --- /dev/null +++ b/tests/test_connect_resolve.cpp @@ -0,0 +1,54 @@ +/* + Copyright (c) 2012 Spotify AB + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + + +#include +#include +#include + +#include "../include/zmq.h" + +int main (int argc, char *argv []) +{ + fprintf (stderr, "test_connect_resolve running...\n"); + + void *ctx = zmq_init (1); + assert (ctx); + + // Create pair of socket, each with high watermark of 2. Thus the total + // buffer space should be 4 messages. + void *sock = zmq_socket (ctx, ZMQ_PUB); + assert (sock); + + int rc = zmq_connect (sock, "tcp://localhost:1234"); + assert (rc == 0); + + rc = zmq_connect (sock, "tcp://foobar123xyz:1234"); + assert (rc == -1); + assert (errno == EINVAL); + + rc = zmq_close (sock); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0; +}