diff --git a/src/session_base.cpp b/src/session_base.cpp index 35580c8c..48676921 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -550,51 +550,6 @@ void zmq::session_base_t::reconnect () _pipe->hiccup (); } -zmq::session_base_t::connecter_factory_entry_t - zmq::session_base_t::_connecter_factories[] = { - std::make_pair (protocol_name::tcp, - &zmq::session_base_t::create_connecter_tcp), -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ - && !defined ZMQ_HAVE_VXWORKS - std::make_pair (protocol_name::ipc, - &zmq::session_base_t::create_connecter_ipc), -#endif -#if defined ZMQ_HAVE_TIPC - std::make_pair (protocol_name::tipc, - &zmq::session_base_t::create_connecter_tipc), -#endif -#if defined ZMQ_HAVE_VMCI - std::make_pair (protocol_name::vmci, - &zmq::session_base_t::create_connecter_vmci), -#endif -}; - -zmq::session_base_t::connecter_factory_map_t - zmq::session_base_t::_connecter_factories_map ( - _connecter_factories, - _connecter_factories - + sizeof (_connecter_factories) / sizeof (_connecter_factories[0])); - -zmq::session_base_t::start_connecting_entry_t - zmq::session_base_t::_start_connecting_entries[] = { - std::make_pair (protocol_name::udp, - &zmq::session_base_t::start_connecting_udp), -#if defined ZMQ_HAVE_OPENPGM - std::make_pair ("pgm", &zmq::session_base_t::start_connecting_pgm), - std::make_pair ("epgm", &zmq::session_base_t::start_connecting_pgm), -#endif -#if defined ZMQ_HAVE_NORM - std::make_pair ("norm", &zmq::session_base_t::start_connecting_norm), -#endif -}; - -zmq::session_base_t::start_connecting_map_t - zmq::session_base_t::_start_connecting_map ( - _start_connecting_entries, - _start_connecting_entries - + sizeof (_start_connecting_entries) - / sizeof (_start_connecting_entries[0])); - void zmq::session_base_t::start_connecting (bool wait_) { zmq_assert (_active); @@ -605,160 +560,145 @@ void zmq::session_base_t::start_connecting (bool wait_) zmq_assert (io_thread); // Create the connecter object. - const connecter_factory_map_t::const_iterator connecter_factories_it = - _connecter_factories_map.find (_addr->protocol); - if (connecter_factories_it != _connecter_factories_map.end ()) { - own_t *connecter = - (this->*connecter_factories_it->second) (io_thread, wait_); - + own_t *connecter = NULL; + if (_addr->protocol == protocol_name::tcp) { + if (!options.socks_proxy_address.empty ()) { + address_t *proxy_address = new (std::nothrow) + address_t (protocol_name::tcp, options.socks_proxy_address, + this->get_ctx ()); + alloc_assert (proxy_address); + connecter = new (std::nothrow) socks_connecter_t ( + io_thread, this, options, _addr, proxy_address, wait_); + } else { + connecter = new (std::nothrow) + tcp_connecter_t (io_thread, this, options, _addr, wait_); + } + } +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ + && !defined ZMQ_HAVE_VXWORKS + else if (_addr->protocol == protocol_name::ipc) { + connecter = new (std::nothrow) + ipc_connecter_t (io_thread, this, options, _addr, wait_); + } +#endif +#if defined ZMQ_HAVE_TIPC + else if (_addr->protocol == protocol_name::tipc) { + connecter = new (std::nothrow) + tipc_connecter_t (io_thread, this, options, _addr, wait_); + } +#endif +#if defined ZMQ_HAVE_VMCI + else if (_addr->protocol == protocol_name::vmci) { + connecter = new (std::nothrow) + vmci_connecter_t (io_thread, this, options, _addr, wait_); + } +#endif + if (connecter != NULL) { alloc_assert (connecter); launch_child (connecter); return; } - const start_connecting_map_t::const_iterator start_connecting_it = - _start_connecting_map.find (_addr->protocol); - if (start_connecting_it != _start_connecting_map.end ()) { - (this->*start_connecting_it->second) (io_thread); + + if (_addr->protocol == protocol_name::udp) { + zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO + || options.type == ZMQ_DGRAM); + + udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); + alloc_assert (engine); + + bool recv = false; + bool send = false; + + if (options.type == ZMQ_RADIO) { + send = true; + recv = false; + } else if (options.type == ZMQ_DISH) { + send = false; + recv = true; + } else if (options.type == ZMQ_DGRAM) { + send = true; + recv = true; + } + + int rc = engine->init (_addr, send, recv); + errno_assert (rc == 0); + + send_attach (this, engine); + return; } - zmq_assert (false); -} - -#if defined ZMQ_HAVE_VMCI -zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_, - bool wait_) -{ - return new (std::nothrow) - vmci_connecter_t (io_thread_, this, options, _addr, wait_); -} -#endif - -#if defined ZMQ_HAVE_TIPC -zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_, - bool wait_) -{ - return new (std::nothrow) - tipc_connecter_t (io_thread_, this, options, _addr, wait_); -} -#endif - -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ - && !defined ZMQ_HAVE_VXWORKS -zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_, - bool wait_) -{ - return new (std::nothrow) - ipc_connecter_t (io_thread_, this, options, _addr, wait_); -} -#endif - -zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_, - bool wait_) -{ - if (!options.socks_proxy_address.empty ()) { - address_t *proxy_address = new (std::nothrow) address_t ( - protocol_name::tcp, options.socks_proxy_address, this->get_ctx ()); - alloc_assert (proxy_address); - return new (std::nothrow) socks_connecter_t ( - io_thread_, this, options, _addr, proxy_address, wait_); - } - return new (std::nothrow) - tcp_connecter_t (io_thread_, this, options, _addr, wait_); -} - #ifdef ZMQ_HAVE_OPENPGM -void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_) -{ - zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB - || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); - // For EPGM transport with UDP encapsulation of PGM is used. - bool const udp_encapsulation = _addr->protocol == "epgm"; + // Both PGM and EPGM transports are using the same infrastructure. + if (_addr->protocol == "pgm" || _addr->protocol == "epgm") { + zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB + || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // PGM sender. - pgm_sender_t *pgm_sender = - new (std::nothrow) pgm_sender_t (io_thread_, options); - alloc_assert (pgm_sender); + // For EPGM transport with UDP encapsulation of PGM is used. + bool const udp_encapsulation = _addr->protocol == "epgm"; - int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); - errno_assert (rc == 0); + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + // PGM sender. + pgm_sender_t *pgm_sender = + new (std::nothrow) pgm_sender_t (io_thread, options); + alloc_assert (pgm_sender); - send_attach (this, pgm_sender); - } else { - // PGM receiver. - pgm_receiver_t *pgm_receiver = - new (std::nothrow) pgm_receiver_t (io_thread_, options); - alloc_assert (pgm_receiver); + int rc = + pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); + errno_assert (rc == 0); - int rc = - pgm_receiver->init (udp_encapsulation, _addr->address.c_str ()); - errno_assert (rc == 0); + send_attach (this, pgm_sender); + } else { + // PGM receiver. + pgm_receiver_t *pgm_receiver = + new (std::nothrow) pgm_receiver_t (io_thread, options); + alloc_assert (pgm_receiver); - send_attach (this, pgm_receiver); + int rc = + pgm_receiver->init (udp_encapsulation, _addr->address.c_str ()); + errno_assert (rc == 0); + + send_attach (this, pgm_receiver); + } + + return; } -} #endif #ifdef ZMQ_HAVE_NORM -void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_) -{ - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with NORM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - // NORM sender. - norm_engine_t *norm_sender = - new (std::nothrow) norm_engine_t (io_thread_, options); - alloc_assert (norm_sender); + if (_addr->protocol == "norm") { + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with NORM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + // NORM sender. + norm_engine_t *norm_sender = + new (std::nothrow) norm_engine_t (io_thread, options); + alloc_assert (norm_sender); - int rc = norm_sender->init (_addr->address.c_str (), true, false); - errno_assert (rc == 0); + int rc = norm_sender->init (_addr->address.c_str (), true, false); + errno_assert (rc == 0); - send_attach (this, norm_sender); - } else { // ZMQ_SUB or ZMQ_XSUB + send_attach (this, norm_sender); + } else { // ZMQ_SUB or ZMQ_XSUB - // NORM receiver. - norm_engine_t *norm_receiver = - new (std::nothrow) norm_engine_t (io_thread_, options); - alloc_assert (norm_receiver); + // NORM receiver. + norm_engine_t *norm_receiver = + new (std::nothrow) norm_engine_t (io_thread, options); + alloc_assert (norm_receiver); - int rc = norm_receiver->init (_addr->address.c_str (), false, true); - errno_assert (rc == 0); + int rc = norm_receiver->init (_addr->address.c_str (), false, true); + errno_assert (rc == 0); - send_attach (this, norm_receiver); + send_attach (this, norm_receiver); + } + return; } -} -#endif - -void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/) -{ - zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO - || options.type == ZMQ_DGRAM); - - udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); - alloc_assert (engine); - - bool recv = false; - bool send = false; - - if (options.type == ZMQ_RADIO) { - send = true; - recv = false; - } else if (options.type == ZMQ_DISH) { - send = false; - recv = true; - } else if (options.type == ZMQ_DGRAM) { - send = true; - recv = true; - } - - const int rc = engine->init (_addr, send, recv); - errno_assert (rc == 0); - - send_attach (this, engine); +#endif // ZMQ_HAVE_NORM + + zmq_assert (false); } diff --git a/src/session_base.hpp b/src/session_base.hpp index 20292a85..74e31810 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -105,33 +105,6 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events private: void start_connecting (bool wait_); - typedef own_t *(session_base_t::*connecter_factory_fun_t) ( - io_thread_t *io_thread, bool wait_); - typedef std::pair - connecter_factory_entry_t; - static connecter_factory_entry_t _connecter_factories[]; - typedef std::map - connecter_factory_map_t; - static connecter_factory_map_t _connecter_factories_map; - - own_t *create_connecter_vmci (io_thread_t *io_thread_, bool wait_); - own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_); - own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_); - own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_); - - typedef void (session_base_t::*start_connecting_fun_t) ( - io_thread_t *io_thread); - typedef std::pair - start_connecting_entry_t; - static start_connecting_entry_t _start_connecting_entries[]; - typedef std::map - start_connecting_map_t; - static start_connecting_map_t _start_connecting_map; - - void start_connecting_pgm (io_thread_t *io_thread_); - void start_connecting_norm (io_thread_t *io_thread_); - void start_connecting_udp (io_thread_t *io_thread_); - void reconnect (); // Handlers for incoming commands.