mirror of
				https://github.com/zeromq/libzmq.git
				synced 2025-10-22 08:02:09 +02:00 
			
		
		
		
	zmq: add support for TIPC transport
A ZeroMQ application can opt for TIPC based sockets
using the TIPC port name format:
zmq_bind(sb, "tipc://{type,lower,upper}");
zmq_connect(sc, "tipc://{type,inst}");
'type' is the service ID, and 'lower/upper' can be
used for service partitioning or basic load
balancing.
ZeroMQ TIPC transport requires a kernel >= 3.8
(nonblocking connect support for TIPC).
Signed-off-by: Erik Hugne <erik.hugne@ericsson.com>
			
			
This commit is contained in:
		| @@ -165,7 +165,14 @@ libzmq_la_SOURCES = \ | ||||
|     raw_encoder.hpp \ | ||||
|     raw_encoder.cpp \ | ||||
|     ypipe_conflate.hpp \ | ||||
|     dbuffer.hpp | ||||
|     dbuffer.hpp \ | ||||
|     tipc_address.cpp \ | ||||
|     tipc_address.hpp \ | ||||
|     tipc_listener.cpp \ | ||||
|     tipc_listener.hpp \ | ||||
|     tipc_connecter.cpp \ | ||||
|     tipc_connecter.hpp | ||||
|  | ||||
|  | ||||
| if ON_MINGW | ||||
| libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ | ||||
|   | ||||
| @@ -17,10 +17,12 @@ | ||||
|     along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| */ | ||||
|  | ||||
| #include "platform.hpp" | ||||
| #include "address.hpp" | ||||
| #include "err.hpp" | ||||
| #include "tcp_address.hpp" | ||||
| #include "ipc_address.hpp" | ||||
| #include "tipc_address.hpp" | ||||
|  | ||||
| #include <string> | ||||
| #include <sstream> | ||||
| @@ -50,6 +52,14 @@ zmq::address_t::~address_t () | ||||
|         } | ||||
|     } | ||||
| #endif | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|     else if (protocol == "tipc") { | ||||
|         if (resolved.tipc_addr) { | ||||
|             delete resolved.tipc_addr; | ||||
|             resolved.tipc_addr = 0; | ||||
|         } | ||||
|     } | ||||
| #endif | ||||
| } | ||||
|  | ||||
| int zmq::address_t::to_string (std::string &addr_) const | ||||
| @@ -59,12 +69,19 @@ int zmq::address_t::to_string (std::string &addr_) const | ||||
|             return resolved.tcp_addr->to_string(addr_); | ||||
|     } | ||||
| #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS | ||||
|     else | ||||
|     else  | ||||
|     if (protocol == "ipc") { | ||||
|         if (resolved.ipc_addr) | ||||
|             return resolved.ipc_addr->to_string(addr_); | ||||
|     } | ||||
| #endif | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|     else if (protocol == "tipc") { | ||||
|         if (resolved.tipc_addr) { | ||||
|             return resolved.tipc_addr->to_string(addr_); | ||||
|         } | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|     if (!protocol.empty () && !address.empty ()) { | ||||
|         std::stringstream s; | ||||
|   | ||||
| @@ -27,6 +27,9 @@ namespace zmq | ||||
|     class tcp_address_t; | ||||
| #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS | ||||
|     class ipc_address_t; | ||||
| #endif | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|     class tipc_address_t; | ||||
| #endif | ||||
|     struct address_t { | ||||
|         address_t (const std::string &protocol_, const std::string &address_); | ||||
| @@ -41,6 +44,9 @@ namespace zmq | ||||
|             tcp_address_t *tcp_addr; | ||||
| #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS | ||||
|             ipc_address_t *ipc_addr; | ||||
| #endif | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|             tipc_address_t *tipc_addr; | ||||
| #endif | ||||
|         } resolved; | ||||
|  | ||||
|   | ||||
| @@ -24,6 +24,7 @@ | ||||
| #include "likely.hpp" | ||||
| #include "tcp_connecter.hpp" | ||||
| #include "ipc_connecter.hpp" | ||||
| #include "tipc_connecter.hpp" | ||||
| #include "pgm_sender.hpp" | ||||
| #include "pgm_receiver.hpp" | ||||
| #include "address.hpp" | ||||
| @@ -500,6 +501,16 @@ void zmq::session_base_t::start_connecting (bool wait_) | ||||
|         return; | ||||
|     } | ||||
| #endif | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|     if (addr->protocol == "tipc") { | ||||
|         tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t ( | ||||
|             io_thread, this, options, addr, wait_); | ||||
|         alloc_assert (connecter); | ||||
|         launch_child(connecter); | ||||
|         return; | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|  | ||||
| #ifdef ZMQ_HAVE_OPENPGM | ||||
|  | ||||
|   | ||||
| @@ -39,6 +39,7 @@ | ||||
| #include "socket_base.hpp" | ||||
| #include "tcp_listener.hpp" | ||||
| #include "ipc_listener.hpp" | ||||
| #include "tipc_listener.hpp" | ||||
| #include "tcp_connecter.hpp" | ||||
| #include "io_thread.hpp" | ||||
| #include "session_base.hpp" | ||||
| @@ -52,6 +53,7 @@ | ||||
| #include "address.hpp" | ||||
| #include "ipc_address.hpp" | ||||
| #include "tcp_address.hpp" | ||||
| #include "tipc_address.hpp" | ||||
| #ifdef ZMQ_HAVE_OPENPGM | ||||
| #include "pgm_socket.hpp" | ||||
| #endif | ||||
| @@ -184,7 +186,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) | ||||
| { | ||||
|     //  First check out whether the protcol is something we are aware of. | ||||
|     if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && | ||||
|           protocol_ != "pgm" && protocol_ != "epgm") { | ||||
|           protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc") { | ||||
|         errno = EPROTONOSUPPORT; | ||||
|         return -1; | ||||
|     } | ||||
| @@ -207,6 +209,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|     // TIPC transport is only available on Linux. | ||||
| #if !defined ZMQ_HAVE_LINUX | ||||
|     if (protocol_ = "tipc") { | ||||
|         errno = EPROTONOSUPPORT; | ||||
|         return -1; | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|     //  Check whether socket type and transport protocol match. | ||||
|     //  Specifically, multicast protocols can't be combined with | ||||
|     //  bi-directional messaging patterns (socket types). | ||||
| @@ -399,6 +409,25 @@ int zmq::socket_base_t::bind (const char *addr_) | ||||
|         return 0; | ||||
|     } | ||||
| #endif | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|     if (protocol == "tipc") { | ||||
|          tipc_listener_t *listener = new (std::nothrow) tipc_listener_t ( | ||||
|               io_thread, this, options); | ||||
|          alloc_assert (listener); | ||||
|          int rc = listener->set_address (address.c_str ()); | ||||
|          if (rc != 0) { | ||||
|              delete listener; | ||||
|              event_bind_failed (address, zmq_errno()); | ||||
|              return -1; | ||||
|          } | ||||
|  | ||||
|         // Save last endpoint URI | ||||
|         listener->get_address (last_endpoint); | ||||
|  | ||||
|         add_endpoint (addr_, (own_t *) listener, NULL); | ||||
|         return 0; | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|     zmq_assert (false); | ||||
|     return -1; | ||||
| @@ -560,6 +589,19 @@ int zmq::socket_base_t::connect (const char *addr_) | ||||
|             return -1; | ||||
|     } | ||||
| #endif | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|     else | ||||
|     if (protocol == "tipc") { | ||||
|         paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t (); | ||||
|         alloc_assert (paddr->resolved.tipc_addr); | ||||
|         int rc = paddr->resolved.tipc_addr->resolve (address.c_str()); | ||||
|         if (rc != 0) { | ||||
|             delete paddr; | ||||
|             return -1; | ||||
|         } | ||||
|     } | ||||
| #endif | ||||
|  | ||||
|     //  Create session. | ||||
|     session_base_t *session = session_base_t::create (io_thread, true, this, | ||||
|         options, paddr); | ||||
|   | ||||
							
								
								
									
										105
									
								
								src/tipc_address.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										105
									
								
								src/tipc_address.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,105 @@ | ||||
| /* | ||||
|     Copyright (c) 2013 Ericsson AB | ||||
|  | ||||
|     This file is part of 0MQ. | ||||
|  | ||||
|     0MQ is free software; you can redistribute it and/or modify it under | ||||
|     the terms of the GNU Lesser General Public License as published by | ||||
|     the Free Software Foundation; either version 3 of the License, or | ||||
|     (at your option) any later version. | ||||
|  | ||||
|     0MQ is distributed in the hope that it will be useful, | ||||
|     but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|     GNU Lesser General Public License for more details. | ||||
|  | ||||
|     You should have received a copy of the GNU Lesser General Public License | ||||
|     along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| */ | ||||
|  | ||||
| #include "tipc_address.hpp" | ||||
|  | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|  | ||||
| #include "err.hpp" | ||||
|  | ||||
| #include <string> | ||||
| #include <sstream> | ||||
|  | ||||
| zmq::tipc_address_t::tipc_address_t () | ||||
| { | ||||
|     memset (&address, 0, sizeof (address)); | ||||
| } | ||||
|  | ||||
| zmq::tipc_address_t::tipc_address_t (const sockaddr *sa, socklen_t sa_len) | ||||
| { | ||||
|     zmq_assert(sa && sa_len > 0); | ||||
|  | ||||
|     memset (&address, 0, sizeof (address)); | ||||
|     if (sa->sa_family == AF_TIPC) { | ||||
|         memcpy(&address, sa, sa_len); | ||||
|     } | ||||
| } | ||||
|  | ||||
| zmq::tipc_address_t::~tipc_address_t () | ||||
| { | ||||
| } | ||||
|  | ||||
| int zmq::tipc_address_t::resolve (const char *name) | ||||
| { | ||||
|     int res; | ||||
|     unsigned int type = 0; | ||||
|     unsigned int lower = 0; | ||||
|     unsigned int upper = 0; | ||||
|  | ||||
|     res = sscanf(name, "{%u,%u,%u}", &type, &lower, &upper); | ||||
|     if (res == 3) | ||||
|         goto nameseq; | ||||
|     else if (res == 2 && type > TIPC_RESERVED_TYPES) { | ||||
|         address.family = AF_TIPC; | ||||
|         address.addrtype = TIPC_ADDR_NAME; | ||||
|         address.addr.name.name.type = type; | ||||
|         address.addr.name.name.instance = lower; | ||||
|         address.scope = 0; | ||||
|         return 0; | ||||
|     } | ||||
|     else | ||||
|         return EINVAL; | ||||
| nameseq: | ||||
|     if (type < TIPC_RESERVED_TYPES || upper < lower) | ||||
|                return EINVAL; | ||||
|     address.family = AF_TIPC; | ||||
|     address.addrtype = TIPC_ADDR_NAMESEQ; | ||||
|     address.addr.nameseq.type = type; | ||||
|     address.addr.nameseq.lower = lower; | ||||
|     address.addr.nameseq.upper = upper; | ||||
|     address.scope = TIPC_ZONE_SCOPE; | ||||
|     return 0; | ||||
| } | ||||
|  | ||||
| int zmq::tipc_address_t::to_string (std::string &addr_) | ||||
| { | ||||
|     if (address.family != AF_TIPC) { | ||||
|         addr_.clear (); | ||||
|         return -1; | ||||
|     } | ||||
|     std::stringstream s; | ||||
|     s << "tipc://" << "{" << address.addr.nameseq.type; | ||||
|     s << ", " << address.addr.nameseq.lower; | ||||
|     s << ", " << address.addr.nameseq.upper << "}"; | ||||
|     addr_ = s.str (); | ||||
|     return 0; | ||||
| } | ||||
|  | ||||
| const sockaddr *zmq::tipc_address_t::addr () const | ||||
| { | ||||
|     return (sockaddr*) &address; | ||||
| } | ||||
|  | ||||
| socklen_t zmq::tipc_address_t::addrlen () const | ||||
| { | ||||
|     return (socklen_t) sizeof (address); | ||||
| } | ||||
|  | ||||
| #endif | ||||
|  | ||||
							
								
								
									
										65
									
								
								src/tipc_address.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										65
									
								
								src/tipc_address.hpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,65 @@ | ||||
| /* | ||||
|     Copyright (c) 2013 Ericsson AB | ||||
|  | ||||
|     This file is part of 0MQ. | ||||
|  | ||||
|     0MQ is free software; you can redistribute it and/or modify it under | ||||
|     the terms of the GNU Lesser General Public License as published by | ||||
|     the Free Software Foundation; either version 3 of the License, or | ||||
|     (at your option) any later version. | ||||
|  | ||||
|     0MQ is distributed in the hope that it will be useful, | ||||
|     but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|     GNU Lesser General Public License for more details. | ||||
|  | ||||
|     You should have received a copy of the GNU Lesser General Public License | ||||
|     along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| */ | ||||
|  | ||||
| #ifndef __ZMQ_TIPC_ADDRESS_HPP_INCLUDED__ | ||||
| #define __ZMQ_TIPC_ADDRESS_HPP_INCLUDED__ | ||||
|  | ||||
| #include <string> | ||||
|  | ||||
| #include "platform.hpp" | ||||
|  | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|  | ||||
| #include <sys/socket.h> | ||||
| #include <linux/tipc.h> | ||||
|  | ||||
| namespace zmq | ||||
| { | ||||
|  | ||||
|     class tipc_address_t | ||||
|     { | ||||
|     public: | ||||
|  | ||||
|         tipc_address_t (); | ||||
|         tipc_address_t (const sockaddr *sa, socklen_t sa_len); | ||||
|         ~tipc_address_t (); | ||||
|  | ||||
|         //  This function sets up the address "{type, lower, upper}" for TIPC transport | ||||
|         int resolve (const char* name); | ||||
|  | ||||
|         //  The opposite to resolve() | ||||
|         int to_string (std::string &addr_); | ||||
|  | ||||
|         const sockaddr *addr () const; | ||||
|         socklen_t addrlen () const; | ||||
|  | ||||
|     private: | ||||
|  | ||||
|         struct sockaddr_tipc address; | ||||
|  | ||||
|         tipc_address_t (const tipc_address_t&); | ||||
|         const tipc_address_t &operator = (const tipc_address_t&); | ||||
|     }; | ||||
|  | ||||
| } | ||||
|  | ||||
| #endif | ||||
|  | ||||
| #endif | ||||
|  | ||||
							
								
								
									
										256
									
								
								src/tipc_connecter.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										256
									
								
								src/tipc_connecter.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,256 @@ | ||||
| /* | ||||
|     Copyright (c) 2013 Ericsson AB | ||||
|  | ||||
|     This file is part of 0MQ. | ||||
|  | ||||
|     0MQ is free software; you can redistribute it and/or modify it under | ||||
|     the terms of the GNU Lesser General Public License as published by | ||||
|     the Free Software Foundation; either version 3 of the License, or | ||||
|     (at your option) any later version. | ||||
|  | ||||
|     0MQ is distributed in the hope that it will be useful, | ||||
|     but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|     GNU Lesser General Public License for more details. | ||||
|  | ||||
|     You should have received a copy of the GNU Lesser General Public License | ||||
|     along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| */ | ||||
|  | ||||
| #include "tipc_connecter.hpp" | ||||
|  | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|  | ||||
| #include <new> | ||||
| #include <string> | ||||
|  | ||||
| #include "stream_engine.hpp" | ||||
| #include "io_thread.hpp" | ||||
| #include "platform.hpp" | ||||
| #include "random.hpp" | ||||
| #include "err.hpp" | ||||
| #include "ip.hpp" | ||||
| #include "address.hpp" | ||||
| #include "tipc_address.hpp" | ||||
| #include "session_base.hpp" | ||||
|  | ||||
| #include <unistd.h> | ||||
| #include <sys/types.h> | ||||
| #include <sys/socket.h> | ||||
|  | ||||
| zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_, | ||||
|       class session_base_t *session_, const options_t &options_, | ||||
|       const address_t *addr_, bool delayed_start_) : | ||||
|     own_t (io_thread_, options_), | ||||
|     io_object_t (io_thread_), | ||||
|     addr (addr_), | ||||
|     s (retired_fd), | ||||
|     handle_valid (false), | ||||
|     delayed_start (delayed_start_), | ||||
|     timer_started (false), | ||||
|     session (session_), | ||||
|     current_reconnect_ivl(options.reconnect_ivl) | ||||
| { | ||||
|     zmq_assert (addr); | ||||
|     zmq_assert (addr->protocol == "tipc"); | ||||
|     addr->to_string (endpoint); | ||||
|     socket = session-> get_socket(); | ||||
| } | ||||
|  | ||||
| zmq::tipc_connecter_t::~tipc_connecter_t () | ||||
| { | ||||
|     zmq_assert (!timer_started); | ||||
|     zmq_assert (!handle_valid); | ||||
|     zmq_assert (s == retired_fd); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::process_plug () | ||||
| { | ||||
|     if (delayed_start) | ||||
|         add_reconnect_timer (); | ||||
|     else | ||||
|         start_connecting (); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::process_term (int linger_) | ||||
| { | ||||
|     if (timer_started) { | ||||
|         cancel_timer (reconnect_timer_id); | ||||
|         timer_started = false; | ||||
|    } | ||||
|  | ||||
|     if (handle_valid) { | ||||
|         rm_fd (handle); | ||||
|         handle_valid = false; | ||||
|     } | ||||
|  | ||||
|     if (s != retired_fd) | ||||
|         close (); | ||||
|  | ||||
|     own_t::process_term (linger_); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::in_event () | ||||
| { | ||||
|     //  We are not polling for incomming data, so we are actually called | ||||
|     //  because of error here. However, we can get error on out event as well | ||||
|     //  on some platforms, so we'll simply handle both events in the same way. | ||||
|     out_event (); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::out_event () | ||||
| { | ||||
|     fd_t fd = connect (); | ||||
|     rm_fd (handle); | ||||
|     handle_valid = false; | ||||
|  | ||||
|     //  Handle the error condition by attempt to reconnect. | ||||
|     if (fd == retired_fd) { | ||||
|         close (); | ||||
|         add_reconnect_timer(); | ||||
|         return; | ||||
|     } | ||||
|     //  Create the engine object for this connection. | ||||
|     stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); | ||||
|     alloc_assert (engine); | ||||
|  | ||||
|     //  Attach the engine to the corresponding session object. | ||||
|     send_attach (session, engine); | ||||
|  | ||||
|     //  Shut the connecter down. | ||||
|     terminate (); | ||||
|  | ||||
|     socket->event_connected (endpoint, fd); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::timer_event (int id_) | ||||
| { | ||||
|     zmq_assert (id_ == reconnect_timer_id); | ||||
|     timer_started = false; | ||||
|     start_connecting (); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::start_connecting () | ||||
| { | ||||
|     //  Open the connecting socket. | ||||
|     int rc = open (); | ||||
|  | ||||
|     //  Connect may succeed in synchronous manner. | ||||
|     if (rc == 0) { | ||||
|         handle = add_fd (s); | ||||
|         handle_valid = true; | ||||
|         out_event (); | ||||
|     } | ||||
|  | ||||
|     //  Connection establishment may be delayed. Poll for its completion. | ||||
|     else | ||||
|     if (rc == -1 && errno == EINPROGRESS) { | ||||
|         handle = add_fd (s); | ||||
|         handle_valid = true; | ||||
|         set_pollout (handle); | ||||
|         socket->event_connect_delayed (endpoint, zmq_errno()); | ||||
|     } | ||||
|  | ||||
|     //  Handle any other error condition by eventual reconnect. | ||||
|     else { | ||||
|         if (s != retired_fd) | ||||
|             close (); | ||||
|         add_reconnect_timer (); | ||||
|     } | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::add_reconnect_timer() | ||||
| { | ||||
|     int rc_ivl = get_new_reconnect_ivl(); | ||||
|     add_timer (rc_ivl, reconnect_timer_id); | ||||
|     socket->event_connect_retried (endpoint, rc_ivl); | ||||
|     timer_started = true; | ||||
| } | ||||
|  | ||||
| int zmq::tipc_connecter_t::get_new_reconnect_ivl () | ||||
| { | ||||
|     //  The new interval is the current interval + random value. | ||||
|     int this_interval = current_reconnect_ivl + | ||||
|         (generate_random () % options.reconnect_ivl); | ||||
|  | ||||
|     //  Only change the current reconnect interval  if the maximum reconnect | ||||
|     //  interval was set and if it's larger than the reconnect interval. | ||||
|     if (options.reconnect_ivl_max > 0 && | ||||
|         options.reconnect_ivl_max > options.reconnect_ivl) { | ||||
|  | ||||
|         //  Calculate the next interval | ||||
|         current_reconnect_ivl = current_reconnect_ivl * 2; | ||||
|         if(current_reconnect_ivl >= options.reconnect_ivl_max) { | ||||
|             current_reconnect_ivl = options.reconnect_ivl_max; | ||||
|         } | ||||
|     } | ||||
|    return this_interval; | ||||
| } | ||||
|  | ||||
| int zmq::tipc_connecter_t::open () | ||||
| { | ||||
|     zmq_assert (s == retired_fd); | ||||
|  | ||||
|     //  Create the socket. | ||||
|     s = open_socket (AF_TIPC, SOCK_STREAM, 0); | ||||
|     if (s == -1) | ||||
|         return -1; | ||||
|  | ||||
|     //  Set the non-blocking flag. | ||||
|     unblock_socket (s); | ||||
|     //  Connect to the remote peer. | ||||
|     int rc = ::connect ( | ||||
|       s, addr->resolved.tipc_addr->addr (), | ||||
|       addr->resolved.tipc_addr->addrlen ()); | ||||
|  | ||||
|     //  Connect was successfull immediately. | ||||
|     if (rc == 0) | ||||
|         return 0; | ||||
|  | ||||
|     //  Translate other error codes indicating asynchronous connect has been | ||||
|     //  launched to a uniform EINPROGRESS. | ||||
|     if (rc == -1 && errno == EINTR) { | ||||
|         errno = EINPROGRESS; | ||||
|         return -1; | ||||
|     } | ||||
|     //  Forward the error. | ||||
|     return -1; | ||||
| } | ||||
|  | ||||
| void zmq::tipc_connecter_t::close () | ||||
| { | ||||
|     zmq_assert (s != retired_fd); | ||||
|     int rc = ::close (s); | ||||
|     errno_assert (rc == 0); | ||||
|     socket->event_closed (endpoint, s); | ||||
|     s = retired_fd; | ||||
| } | ||||
|  | ||||
| zmq::fd_t zmq::tipc_connecter_t::connect () | ||||
| { | ||||
|     //  Following code should handle both Berkeley-derived socket | ||||
|     //  implementations and Solaris. | ||||
|     int err = 0; | ||||
|     socklen_t len = sizeof (err); | ||||
|  | ||||
|     int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); | ||||
|     if (rc == -1) | ||||
|         err = errno; | ||||
|     if (err != 0) { | ||||
|  | ||||
|         //  Assert if the error was caused by 0MQ bug. | ||||
|         //  Networking problems are OK. No need to assert. | ||||
|         errno = err; | ||||
|         errno_assert (errno == ECONNREFUSED || errno == ECONNRESET || | ||||
|             errno == ETIMEDOUT || errno == EHOSTUNREACH || | ||||
|             errno == ENETUNREACH || errno == ENETDOWN); | ||||
|  | ||||
|         return retired_fd; | ||||
|     } | ||||
|     fd_t result = s; | ||||
|     s = retired_fd; | ||||
|     return result; | ||||
| } | ||||
|  | ||||
| #endif | ||||
|  | ||||
							
								
								
									
										127
									
								
								src/tipc_connecter.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										127
									
								
								src/tipc_connecter.hpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,127 @@ | ||||
| /* | ||||
|     Copyright (c) 2013 Ericsson AB | ||||
|  | ||||
|     This file is part of 0MQ. | ||||
|  | ||||
|     0MQ is free software; you can redistribute it and/or modify it under | ||||
|     the terms of the GNU Lesser General Public License as published by | ||||
|     the Free Software Foundation; either version 3 of the License, or | ||||
|     (at your option) any later version. | ||||
|  | ||||
|     0MQ is distributed in the hope that it will be useful, | ||||
|     but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|     GNU Lesser General Public License for more details. | ||||
|  | ||||
|     You should have received a copy of the GNU Lesser General Public License | ||||
|     along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| */ | ||||
|  | ||||
| #ifndef __TIPC_CONNECTER_HPP_INCLUDED__ | ||||
| #define __TIPC_CONNECTER_HPP_INCLUDED__ | ||||
|  | ||||
| #include "platform.hpp" | ||||
|  | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|  | ||||
| #include "fd.hpp" | ||||
| #include "own.hpp" | ||||
| #include "stdint.hpp" | ||||
| #include "io_object.hpp" | ||||
|  | ||||
| namespace zmq | ||||
| { | ||||
|  | ||||
|     class io_thread_t; | ||||
|     class session_base_t; | ||||
|     struct address_t; | ||||
|  | ||||
|     class tipc_connecter_t : public own_t, public io_object_t | ||||
|     { | ||||
|     public: | ||||
|  | ||||
|         //  If 'delayed_start' is true connecter first waits for a while, | ||||
|         //  then starts connection process. | ||||
|         tipc_connecter_t (zmq::io_thread_t *io_thread_, | ||||
|             zmq::session_base_t *session_, const options_t &options_, | ||||
|             const address_t *addr_, bool delayed_start_); | ||||
|         ~tipc_connecter_t (); | ||||
|  | ||||
|     private: | ||||
|  | ||||
|         //  ID of the timer used to delay the reconnection. | ||||
|         enum {reconnect_timer_id = 1}; | ||||
|  | ||||
|         //  Handlers for incoming commands. | ||||
|         void process_plug (); | ||||
|         void process_term (int linger_); | ||||
|  | ||||
|         //  Handlers for I/O events. | ||||
|         void in_event (); | ||||
|         void out_event (); | ||||
|         void timer_event (int id_); | ||||
|  | ||||
|         //  Internal function to start the actual connection establishment. | ||||
|         void start_connecting (); | ||||
|  | ||||
|         //  Internal function to add a reconnect timer | ||||
|         void add_reconnect_timer(); | ||||
|  | ||||
|         //  Close the connecting socket. | ||||
|         void close (); | ||||
|  | ||||
|         //  Get the file descriptor of newly created connection. Returns | ||||
|         //  retired_fd if the connection was unsuccessfull. | ||||
|         fd_t connect (); | ||||
|  | ||||
|         //  Address to connect to. Owned by session_base_t. | ||||
|         const address_t *addr; | ||||
|  | ||||
|         //  Underlying socket. | ||||
|         fd_t s; | ||||
|  | ||||
|         //  Handle corresponding to the listening socket. | ||||
|         handle_t handle; | ||||
|  | ||||
|         //  If true file descriptor is registered with the poller and 'handle' | ||||
|         //  contains valid value. | ||||
|         bool handle_valid; | ||||
|  | ||||
|         //  If true, connecter is waiting a while before trying to connect. | ||||
|         const bool delayed_start; | ||||
|  | ||||
|         //  True iff a timer has been started. | ||||
|         bool timer_started; | ||||
|  | ||||
|         //  Reference to the session we belong to. | ||||
|         zmq::session_base_t *session; | ||||
|  | ||||
|         //  Current reconnect ivl, updated for backoff strategy | ||||
|         int current_reconnect_ivl; | ||||
|  | ||||
|         // String representation of endpoint to connect to | ||||
|         std::string endpoint; | ||||
|  | ||||
|         // Socket | ||||
|         zmq::socket_base_t *socket; | ||||
|  | ||||
|         //  Internal function to return a reconnect backoff delay. | ||||
|         //  Will modify the current_reconnect_ivl used for next call | ||||
|         //  Returns the currently used interval | ||||
|         int get_new_reconnect_ivl (); | ||||
|  | ||||
|         //  Open IPC connecting socket. Returns -1 in case of error, | ||||
|         //  0 if connect was successfull immediately. Returns -1 with | ||||
|         //  EAGAIN errno if async connect was launched. | ||||
|         int open (); | ||||
|  | ||||
|         tipc_connecter_t (const tipc_connecter_t&); | ||||
|         const tipc_connecter_t &operator = (const tipc_connecter_t&); | ||||
|     }; | ||||
|  | ||||
| } | ||||
|  | ||||
| #endif | ||||
|  | ||||
| #endif | ||||
|  | ||||
							
								
								
									
										178
									
								
								src/tipc_listener.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										178
									
								
								src/tipc_listener.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,178 @@ | ||||
|  /* | ||||
|     Copyright (c) 2013 Ericsson AB | ||||
|  | ||||
|     This file is part of 0MQ. | ||||
|  | ||||
|     0MQ is free software; you can redistribute it and/or modify it under | ||||
|     the terms of the GNU Lesser General Public License as published by | ||||
|     the Free Software Foundation; either version 3 of the License, or | ||||
|     (at your option) any later version. | ||||
|  | ||||
|     0MQ is distributed in the hope that it will be useful, | ||||
|     but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|     GNU Lesser General Public License for more details. | ||||
|  | ||||
|     You should have received a copy of the GNU Lesser General Public License | ||||
|     along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| */ | ||||
|  | ||||
| #include "tipc_listener.hpp" | ||||
|  | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|  | ||||
| #include <new> | ||||
|  | ||||
| #include <string.h> | ||||
|  | ||||
| #include "stream_engine.hpp" | ||||
| #include "tipc_address.hpp" | ||||
| #include "io_thread.hpp" | ||||
| #include "session_base.hpp" | ||||
| #include "config.hpp" | ||||
| #include "err.hpp" | ||||
| #include "ip.hpp" | ||||
| #include "socket_base.hpp" | ||||
|  | ||||
| #include <unistd.h> | ||||
| #include <sys/socket.h> | ||||
| #include <fcntl.h> | ||||
| #include <linux/tipc.h> | ||||
|  | ||||
| zmq::tipc_listener_t::tipc_listener_t (io_thread_t *io_thread_, | ||||
|       socket_base_t *socket_, const options_t &options_) : | ||||
|     own_t (io_thread_, options_), | ||||
|     io_object_t (io_thread_), | ||||
|     s (retired_fd), | ||||
|     socket (socket_) | ||||
| { | ||||
| } | ||||
|  | ||||
| zmq::tipc_listener_t::~tipc_listener_t () | ||||
| { | ||||
|     zmq_assert (s == retired_fd); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_listener_t::process_plug () | ||||
| { | ||||
|     //  Start polling for incoming connections. | ||||
|     handle = add_fd (s); | ||||
|     set_pollin (handle); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_listener_t::process_term (int linger_) | ||||
| { | ||||
|     rm_fd (handle); | ||||
|     close (); | ||||
|     own_t::process_term (linger_); | ||||
| } | ||||
|  | ||||
| void zmq::tipc_listener_t::in_event () | ||||
| { | ||||
|     fd_t fd = accept (); | ||||
|  | ||||
|     //  If connection was reset by the peer in the meantime, just ignore it. | ||||
|     //  TODO: Handle specific errors like ENFILE/EMFILE etc. | ||||
|     if (fd == retired_fd) { | ||||
|         socket->event_accept_failed (endpoint, zmq_errno()); | ||||
|         return; | ||||
|     } | ||||
|  | ||||
|     //  Create the engine object for this connection. | ||||
|     stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); | ||||
|     alloc_assert (engine); | ||||
|  | ||||
|     //  Choose I/O thread to run connecter in. Given that we are already | ||||
|     //  running in an I/O thread, there must be at least one available. | ||||
|     io_thread_t *io_thread = choose_io_thread (options.affinity); | ||||
|     zmq_assert (io_thread); | ||||
|  | ||||
|     //  Create and launch a session object. | ||||
|     session_base_t *session = session_base_t::create (io_thread, false, socket, | ||||
|         options, NULL); | ||||
|     errno_assert (session); | ||||
|     session->inc_seqnum (); | ||||
|     launch_child (session); | ||||
|     send_attach (session, engine, false); | ||||
|     socket->event_accepted (endpoint, fd); | ||||
| } | ||||
|  | ||||
| int zmq::tipc_listener_t::get_address (std::string &addr_) | ||||
| { | ||||
|     struct sockaddr_storage ss; | ||||
|     socklen_t sl = sizeof (ss); | ||||
|  | ||||
|     int rc = getsockname (s, (sockaddr *) &ss, &sl); | ||||
|     if (rc != 0) { | ||||
|         addr_.clear (); | ||||
|         return rc; | ||||
|     } | ||||
|  | ||||
|     tipc_address_t addr ((struct sockaddr *) &ss, sl); | ||||
|     return addr.to_string (addr_); | ||||
| } | ||||
|  | ||||
| int zmq::tipc_listener_t::set_address (const char *addr_) | ||||
| { | ||||
|     //convert str to address struct | ||||
|     int rc = address.resolve(addr_); | ||||
|     if (rc != 0) | ||||
|         return -1; | ||||
|     //  Create a listening socket. | ||||
|     s = open_socket (AF_TIPC, SOCK_STREAM, 0); | ||||
|     if (s == -1) | ||||
|         return -1; | ||||
|  | ||||
|     address.to_string (endpoint); | ||||
|  | ||||
|     //  Bind the socket to tipc name. | ||||
|     rc = bind (s, address.addr (), address.addrlen ()); | ||||
|     if (rc != 0) | ||||
|         goto error; | ||||
|  | ||||
|     //  Listen for incomming connections. | ||||
|     rc = listen (s, options.backlog); | ||||
|     if (rc != 0) | ||||
|         goto error; | ||||
|  | ||||
|     socket->event_listening (endpoint, s); | ||||
|     return 0; | ||||
|  | ||||
| error: | ||||
|     int err = errno; | ||||
|     close (); | ||||
|     errno = err; | ||||
|     return -1; | ||||
| } | ||||
|  | ||||
| void zmq::tipc_listener_t::close () | ||||
| { | ||||
|     zmq_assert (s != retired_fd); | ||||
|     int rc = ::close (s); | ||||
|     errno_assert (rc == 0); | ||||
|     s = retired_fd; | ||||
|     socket->event_closed (endpoint, s); | ||||
| } | ||||
|  | ||||
| zmq::fd_t zmq::tipc_listener_t::accept () | ||||
| { | ||||
|     //  Accept one connection and deal with different failure modes. | ||||
|     //  The situation where connection cannot be accepted due to insufficient | ||||
|     //  resources is considered valid and treated by ignoring the connection. | ||||
|     struct sockaddr_storage ss = {}; | ||||
|     socklen_t ss_len = sizeof(ss); | ||||
|  | ||||
|     zmq_assert (s != retired_fd); | ||||
|     fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len); | ||||
|     if (sock == -1) { | ||||
|         errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS || | ||||
|             errno == EINTR || errno == ECONNABORTED || errno == EPROTO || errno == EMFILE || | ||||
|             errno == ENFILE); | ||||
|         return retired_fd; | ||||
|     } | ||||
|     /*FIXME Accept filters?*/ | ||||
|     return sock; | ||||
| } | ||||
|  | ||||
| #endif | ||||
|  | ||||
							
								
								
									
										97
									
								
								src/tipc_listener.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										97
									
								
								src/tipc_listener.hpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,97 @@ | ||||
| /* | ||||
|     Copyright (c) 2013 Ericsson AB | ||||
|  | ||||
|     This file is part of 0MQ. | ||||
|  | ||||
|     0MQ is free software; you can redistribute it and/or modify it under | ||||
|     the terms of the GNU Lesser General Public License as published by | ||||
|     the Free Software Foundation; either version 3 of the License, or | ||||
|     (at your option) any later version. | ||||
|  | ||||
|     0MQ is distributed in the hope that it will be useful, | ||||
|     but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
|     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
|     GNU Lesser General Public License for more details. | ||||
|  | ||||
|     You should have received a copy of the GNU Lesser General Public License | ||||
|     along with this program.  If not, see <http://www.gnu.org/licenses/>. | ||||
| */ | ||||
|  | ||||
| #ifndef __ZMQ_TIPC_LISTENER_HPP_INCLUDED__ | ||||
| #define __ZMQ_TIPC_LISTENER_HPP_INCLUDED__ | ||||
|  | ||||
| #include "platform.hpp" | ||||
|  | ||||
| #if defined ZMQ_HAVE_LINUX | ||||
|  | ||||
| #include <string> | ||||
|  | ||||
| #include "fd.hpp" | ||||
| #include "own.hpp" | ||||
| #include "stdint.hpp" | ||||
| #include "io_object.hpp" | ||||
| #include "tipc_address.hpp" | ||||
|  | ||||
| namespace zmq | ||||
| { | ||||
|  | ||||
|     class io_thread_t; | ||||
|     class socket_base_t; | ||||
|  | ||||
|     class tipc_listener_t : public own_t, public io_object_t | ||||
|     { | ||||
|     public: | ||||
|  | ||||
|         tipc_listener_t (zmq::io_thread_t *io_thread_, | ||||
|             zmq::socket_base_t *socket_, const options_t &options_); | ||||
|         ~tipc_listener_t (); | ||||
|  | ||||
|         //  Set address to listen on. | ||||
|         int set_address (const char *addr_); | ||||
|  | ||||
|         // Get the bound address for use with wildcards | ||||
|         int get_address (std::string &addr_); | ||||
|  | ||||
|     private: | ||||
|  | ||||
|         //  Handlers for incoming commands. | ||||
|         void process_plug (); | ||||
|         void process_term (int linger_); | ||||
|  | ||||
|         //  Handlers for I/O events. | ||||
|         void in_event (); | ||||
|  | ||||
|         //  Close the listening socket. | ||||
|         void close (); | ||||
|  | ||||
|         //  Accept the new connection. Returns the file descriptor of the | ||||
|         //  newly created connection. The function may return retired_fd | ||||
|         //  if the connection was dropped while waiting in the listen backlog. | ||||
|         fd_t accept (); | ||||
|  | ||||
|        // Address to listen on | ||||
|        tipc_address_t address; | ||||
|  | ||||
|         //  Underlying socket. | ||||
|         fd_t s; | ||||
|  | ||||
|  | ||||
|         //  Handle corresponding to the listening socket. | ||||
|         handle_t handle; | ||||
|  | ||||
|         //  Socket the listerner belongs to. | ||||
|         zmq::socket_base_t *socket; | ||||
|  | ||||
|        // String representation of endpoint to bind to | ||||
|         std::string endpoint; | ||||
|  | ||||
|         tipc_listener_t (const tipc_listener_t&); | ||||
|         const tipc_listener_t &operator = (const tipc_listener_t&); | ||||
|     }; | ||||
|  | ||||
| } | ||||
|  | ||||
| #endif | ||||
|  | ||||
| #endif | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 Erik Hugne
					Erik Hugne