diff --git a/include/zmq.h b/include/zmq.h index 66c158e4..9a6a3f5f 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 +#define ZMQ_LAST_ENDPOINT_ID 39 /* Message options */ @@ -244,6 +245,8 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen); ZMQ_EXPORT int zmq_bind (void *s, const char *addr); ZMQ_EXPORT int zmq_connect (void *s, const char *addr); +ZMQ_EXPORT int zmq_unbind (void *s, void *ep); +ZMQ_EXPORT int zmq_disconnect (void *s, void *ep); ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); diff --git a/src/address.cpp b/src/address.cpp index a65ffa3d..3c5707c8 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -23,7 +23,8 @@ #include "tcp_address.hpp" #include "ipc_address.hpp" -#include +#include +#include zmq::address_t::address_t ( const std::string &protocol_, const std::string &address_) @@ -50,3 +51,28 @@ zmq::address_t::~address_t () } #endif } + +int zmq::address_t::to_string (std::string &addr_) +{ + if (protocol == "tcp") { + if (resolved.tcp_addr) { + return resolved.tcp_addr->to_string(addr_); + } + } +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + else if (protocol == "ipc") { + if (resolved.ipc_addr) { + return resolved.tcp_addr->to_string(addr_); + } + } +#endif + + if (!protocol.empty () && !address.empty ()) { + std::stringstream s; + s << protocol << "://" << address; + addr_ = s.str (); + return 0; + } + addr_.clear (); + return -1; +} diff --git a/src/address.hpp b/src/address.hpp index ec6d4a28..4e4bdd59 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -44,6 +44,8 @@ namespace zmq ipc_address_t *ipc_addr; #endif } resolved; + + int to_string (std::string &addr_); }; } diff --git a/src/ipc_address.cpp b/src/ipc_address.cpp index a5ef7ab1..169f2366 100644 --- a/src/ipc_address.cpp +++ b/src/ipc_address.cpp @@ -24,13 +24,24 @@ #include "err.hpp" -#include +#include +#include zmq::ipc_address_t::ipc_address_t () { memset (&address, 0, sizeof (address)); } +zmq::ipc_address_t::ipc_address_t (const sockaddr *sa, socklen_t sa_len) +{ + zmq_assert(sa && sa_len > 0); + + memset (&address, 0, sizeof (address)); + if (sa->sa_family == AF_UNIX) { + memcpy(&address, sa, sa_len); + } +} + zmq::ipc_address_t::~ipc_address_t () { } @@ -47,6 +58,19 @@ int zmq::ipc_address_t::resolve (const char *path_) return 0; } +int zmq::ipc_address_t::to_string (std::string &addr_) +{ + if (address.sun_family != AF_UNIX) { + addr_.clear (); + return -1; + } + + std::stringstream s; + s << "ipc://" << address.sun_path; + addr_ = s.str (); + return 0; +} + const sockaddr *zmq::ipc_address_t::addr () const { return (sockaddr*) &address; diff --git a/src/ipc_address.hpp b/src/ipc_address.hpp index 7047b04b..081fc542 100644 --- a/src/ipc_address.hpp +++ b/src/ipc_address.hpp @@ -21,6 +21,8 @@ #ifndef __ZMQ_IPC_ADDRESS_HPP_INCLUDED__ #define __ZMQ_IPC_ADDRESS_HPP_INCLUDED__ +#include + #include "platform.hpp" #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS @@ -36,11 +38,15 @@ namespace zmq public: ipc_address_t (); + ipc_address_t (const sockaddr *sa, socklen_t sa_len); ~ipc_address_t (); // This function sets up the address for UNIX domain transport. int resolve (const char* path_); + // The opposite to resolve() + int to_string (std::string &addr_); + const sockaddr *addr () const; socklen_t addrlen () const; diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 8600abcc..d2e0dfc1 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -98,22 +98,15 @@ void zmq::ipc_listener_t::in_event () int zmq::ipc_listener_t::get_address (std::string &addr_) { struct sockaddr_storage ss; - int rc; - - // Get the details of the IPC socket socklen_t sl = sizeof (ss); - rc = getsockname (s, (sockaddr *) &ss, &sl); + int rc = getsockname (s, (sockaddr *) &ss, &sl); if (rc != 0) { + addr_.clear (); return rc; } - // Store the address for retrieval by users using wildcards - addr_ = std::string ("ipc://"); - struct sockaddr_un saddr; - memcpy (&saddr, &ss, sizeof (saddr)); - - addr_.append (saddr.sun_path); - return 0; + ipc_address_t addr ((struct sockaddr *) &ss, sl); + return addr.to_string (addr_); } int zmq::ipc_listener_t::set_address (const char *addr_) diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index 7f3047b3..fb04606b 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -48,7 +48,7 @@ namespace zmq // Set address to listen on. int set_address (const char *addr_); - + // Get the bound address for use with wildcards int get_address (std::string &addr_); diff --git a/src/options.cpp b/src/options.cpp index fdd5299d..9cb659ad 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -30,6 +30,7 @@ zmq::options_t::options_t () : rcvhwm (1000), affinity (0), identity_size (0), + last_endpoint_id(NULL), rate (100), recovery_ivl (10000), multicast_hops (1), @@ -529,6 +530,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1); *optvallen_ = last_endpoint.size()+1; return 0; + + case ZMQ_LAST_ENDPOINT_ID: + if (*optvallen_ < sizeof (void *)) { + errno = EINVAL; + return -1; + } + *((void **) optval_) = last_endpoint_id; + *optvallen_ = sizeof (void *); + return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 80ca13dc..f1442b78 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -53,6 +53,8 @@ namespace zmq // Last socket endpoint URI std::string last_endpoint; + // Last socket endpoint ID + void *last_endpoint_id; // Maximum tranfer rate [kb/s]. Default 100kb/s. int rate; diff --git a/src/own.cpp b/src/own.cpp index 719c116d..d7c78b31 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -80,6 +80,11 @@ void zmq::own_t::launch_child (own_t *object_) send_own (this, object_); } +void zmq::own_t::term_child (own_t *object_) +{ + process_term_req (object_); +} + void zmq::own_t::process_term_req (own_t *object_) { // When shutting down we can ignore termination requests from owned diff --git a/src/own.hpp b/src/own.hpp index 205631a2..9b884d41 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -70,6 +70,9 @@ namespace zmq // Launch the supplied object and become its owner. void launch_child (own_t *object_); + // Terminate owned object + void term_child (own_t *object_); + // Ask owner object to terminate this object. It may take a while // while actual termination is started. This function should not be // called more than once. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 78438ebd..f1f75956 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -318,11 +318,16 @@ int zmq::socket_base_t::bind (const char *addr_) if (protocol == "inproc") { endpoint_t endpoint = {this, options}; - return register_endpoint (addr_, endpoint); + int rc = register_endpoint (addr_, endpoint); + if (rc == 0) { + // Save last endpoint info + options.last_endpoint.clear (); + options.last_endpoint_id = NULL; + } + return rc; } if (protocol == "pgm" || protocol == "epgm") { - // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. return connect (addr_); @@ -346,7 +351,10 @@ int zmq::socket_base_t::bind (const char *addr_) return -1; } - rc = listener->get_address (options.last_endpoint); + // Save last endpoint info + options.last_endpoint_id = (void *) ((own_t *) listener); + listener->get_address (options.last_endpoint); + launch_child (listener); return 0; } @@ -362,7 +370,10 @@ int zmq::socket_base_t::bind (const char *addr_) return -1; } - rc = listener->get_address (options.last_endpoint); + // Save last endpoint info + options.last_endpoint_id = (void *) ((own_t *) listener); + listener->get_address (options.last_endpoint); + launch_child (listener); return 0; } @@ -454,6 +465,10 @@ int zmq::socket_base_t::connect (const char *addr_) // increased here. send_bind (peer.socket, pipes [1], false); + // Save last endpoint info + options.last_endpoint.clear (); + options.last_endpoint_id = NULL; + return 0; } @@ -514,6 +529,10 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); + // Save last endpoint info + paddr->to_string (options.last_endpoint); + options.last_endpoint_id = (void *) ((own_t *) session); + // Activate the session. Make it a child of this socket. launch_child (session); @@ -586,6 +605,16 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) return 0; } +int zmq::socket_base_t::term_endpoint (void *ep_) +{ + if (unlikely (ctx_terminated)) { + errno = ETERM; + return -1; + } + term_child ((own_t *) ep_); + return 0; +} + int zmq::socket_base_t::recv (msg_t *msg_, int flags_) { // Check whether the library haven't been shut down yet. diff --git a/src/socket_base.hpp b/src/socket_base.hpp index eed43a6c..785c4026 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -71,6 +71,7 @@ namespace zmq int getsockopt (int option_, void *optval_, size_t *optvallen_); int bind (const char *addr_); int connect (const char *addr_); + int term_endpoint (void *ep_); int send (zmq::msg_t *msg_, int flags_); int recv (zmq::msg_t *msg_, int flags_); int close (); diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp index a9ca5742..9a2f37db 100644 --- a/src/tcp_address.cpp +++ b/src/tcp_address.cpp @@ -19,8 +19,8 @@ along with this program. If not, see . */ -#include #include +#include #include "tcp_address.hpp" #include "platform.hpp" @@ -251,9 +251,8 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_, int rc = resolve_nic_name (interface_, ipv4only_); if (rc != 0 && errno != ENODEV) return rc; - if (rc == 0) { + if (rc == 0) return 0; - } // There's no such interface name. Assume literal address. #if defined ZMQ_HAVE_OPENVMS && defined __ia64 @@ -367,6 +366,19 @@ zmq::tcp_address_t::tcp_address_t () memset (&address, 0, sizeof (address)); } +zmq::tcp_address_t::tcp_address_t (const sockaddr *sa, socklen_t sa_len) +{ + zmq_assert(sa && sa_len > 0); + + memset (&address, 0, sizeof (address)); + if (sa->sa_family == AF_INET && sa_len >= sizeof (address.ipv4)) { + memcpy(&address.ipv4, sa, sizeof (address.ipv4)); + } + else if (sa->sa_family == AF_INET6 && sa_len >= sizeof (address.ipv6)) { + memcpy(&address.ipv6, sa, sizeof (address.ipv6)); + } +} + zmq::tcp_address_t::~tcp_address_t () { } @@ -421,6 +433,34 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_) return 0; } +int zmq::tcp_address_t::to_string (std::string &addr_) +{ + if (address.generic.sa_family != AF_INET && address.generic.sa_family != AF_INET6) { + addr_.clear (); + return -1; + } + + // not using service resolv because of https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4 + char hbuf[NI_MAXHOST]; + int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0, NI_NUMERICHOST); + if (rc != 0) { + addr_.clear (); + return rc; + } + + if (address.generic.sa_family == AF_INET6) { + std::stringstream s; + s << "tcp://[" << hbuf << "]:" << ntohs (address.ipv6.sin6_port); + addr_ = s.str (); + } + else { + std::stringstream s; + s << "tcp://" << hbuf << ":" << ntohs (address.ipv4.sin_port); + addr_ = s.str (); + }; + return 0; +} + const sockaddr *zmq::tcp_address_t::addr () const { return &address.generic; @@ -504,6 +544,37 @@ int zmq::tcp_address_mask_t::resolve (const char *name_, bool ipv4only_) return 0; } +int zmq::tcp_address_mask_t::to_string (std::string &addr_) +{ + if (address.generic.sa_family != AF_INET && address.generic.sa_family != AF_INET6) { + addr_.clear (); + return -1; + } + if (address_mask == -1) { + addr_.clear (); + return -1; + } + + char hbuf[NI_MAXHOST]; + int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0, NI_NUMERICHOST); + if (rc != 0) { + addr_.clear (); + return rc; + } + + if (address.generic.sa_family == AF_INET6) { + std::stringstream s; + s << "[" << hbuf << "]/" << address_mask; + addr_ = s.str (); + } + else { + std::stringstream s; + s << hbuf << "/" << address_mask; + addr_ = s.str (); + }; + return 0; +} + const bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss, const socklen_t ss_len) const { zmq_assert (address_mask != -1 && ss != NULL && ss_len >= sizeof(struct sockaddr)); diff --git a/src/tcp_address.hpp b/src/tcp_address.hpp index 604735ed..43a5cab0 100644 --- a/src/tcp_address.hpp +++ b/src/tcp_address.hpp @@ -39,6 +39,7 @@ namespace zmq public: tcp_address_t (); + tcp_address_t (const sockaddr *sa, socklen_t sa_len); ~tcp_address_t (); // This function translates textual TCP address into an address @@ -47,6 +48,9 @@ namespace zmq // If 'ipv4only' is true, the name will never resolve to IPv6 address. int resolve (const char* name_, bool local_, bool ipv4only_); + // The opposite to resolve() + virtual int to_string (std::string &addr_); + #if defined ZMQ_HAVE_WINDOWS unsigned short family () const; #else @@ -79,6 +83,9 @@ namespace zmq // Works only with remote hostnames. int resolve (const char* name_, bool ipv4only_); + // The opposite to resolve() + int to_string (std::string &addr_); + const int mask () const; const bool match_address (const struct sockaddr *ss, const socklen_t ss_len) const; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index ee639ebe..aef2f46d 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -21,8 +21,7 @@ #include -#include -#include +#include #include "platform.hpp" #include "tcp_listener.hpp" @@ -121,37 +120,19 @@ void zmq::tcp_listener_t::close () } int zmq::tcp_listener_t::get_address (std::string &addr_) -{ - struct sockaddr_storage ss; - char host [NI_MAXHOST]; - int rc; - std::stringstream address; - +{ // Get the details of the TCP socket + struct sockaddr_storage ss; socklen_t sl = sizeof (ss); - rc = getsockname (s, (struct sockaddr *) &ss, &sl); + int rc = getsockname (s, (struct sockaddr *) &ss, &sl); + if (rc != 0) { + addr_.clear (); return rc; } - rc = getnameinfo ((struct sockaddr *) &ss, sl, host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); - if (rc != 0) { - return rc; - } - - if (ss.ss_family == AF_INET) { - struct sockaddr_in sa = {0}; - memcpy (&sa, &ss, sizeof (sa)); - - address << "tcp://" << host << ":" << ntohs (sa.sin_port); - } else { - struct sockaddr_in6 sa = {0}; - memcpy (&sa, &ss, sizeof (sa)); - - address << "tcp://[" << host << "]:" << ntohs (sa.sin6_port); - } - addr_ = address.str (); - return 0; + tcp_address_t addr ((struct sockaddr *) &ss, sl); + return addr.to_string (addr_); } int zmq::tcp_listener_t::set_address (const char *addr_) @@ -203,7 +184,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_) errno_assert (rc == 0); #endif - // Bind the socket to the network interface and port. rc = bind (s, address.addr (), address.addrlen ()); #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/zmq.cpp b/src/zmq.cpp index 54ec9ae4..36c48cfa 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -293,6 +293,26 @@ int zmq_connect (void *s_, const char *addr_) return result; } +int zmq_unbind (void *s_, void *ep_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + return s->term_endpoint (ep_); +} + +int zmq_disconnect (void *s_, void *ep_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + return s->term_endpoint (ep_); +} + // Sending functions. static int