mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-15 15:16:52 +02:00
Adding ZMQ_LAST_ENDPOINT for wildcard support on TCP and IPC sockets
This commit is contained in:
@@ -193,6 +193,7 @@ ZMQ_EXPORT int zmq_term (void *context);
|
|||||||
#define ZMQ_RCVTIMEO 27
|
#define ZMQ_RCVTIMEO 27
|
||||||
#define ZMQ_SNDTIMEO 28
|
#define ZMQ_SNDTIMEO 28
|
||||||
#define ZMQ_IPV4ONLY 31
|
#define ZMQ_IPV4ONLY 31
|
||||||
|
#define ZMQ_LAST_ENDPOINT 32
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
@@ -201,6 +202,9 @@ ZMQ_EXPORT int zmq_term (void *context);
|
|||||||
#define ZMQ_DONTWAIT 1
|
#define ZMQ_DONTWAIT 1
|
||||||
#define ZMQ_SNDMORE 2
|
#define ZMQ_SNDMORE 2
|
||||||
|
|
||||||
|
/* Wildcard endpoint support. */
|
||||||
|
#define ZMQ_ENDPOINT_MAX 256
|
||||||
|
|
||||||
ZMQ_EXPORT void *zmq_socket (void *context, int type);
|
ZMQ_EXPORT void *zmq_socket (void *context, int type);
|
||||||
ZMQ_EXPORT int zmq_close (void *s);
|
ZMQ_EXPORT int zmq_close (void *s);
|
||||||
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
|
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
|
||||||
|
@@ -95,8 +95,25 @@ void zmq::ipc_listener_t::in_event ()
|
|||||||
send_attach (session, engine, false);
|
send_attach (session, engine, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::ipc_listener_t::get_address (unsigned char *addr, size_t *len)
|
||||||
|
{
|
||||||
|
if (bound_addr_len == 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy (addr, bound_addr, bound_addr_len + 1);
|
||||||
|
*len = bound_addr_len + 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int zmq::ipc_listener_t::set_address (const char *addr_)
|
int zmq::ipc_listener_t::set_address (const char *addr_)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
// Allow wildcard file
|
||||||
|
if(*addr_ == '*') {
|
||||||
|
addr_ = tempnam(NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
// Get rid of the file associated with the UNIX domain socket that
|
// Get rid of the file associated with the UNIX domain socket that
|
||||||
// may have been left behind by the previous run of the application.
|
// may have been left behind by the previous run of the application.
|
||||||
::unlink (addr_);
|
::unlink (addr_);
|
||||||
@@ -125,6 +142,9 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
|
|||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
|
// Return the bound address
|
||||||
|
bound_addr_len = sprintf(bound_addr, "ipc://%s", addr_);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -49,6 +49,9 @@ namespace zmq
|
|||||||
// Set address to listen on.
|
// Set address to listen on.
|
||||||
int set_address (const char *addr_);
|
int set_address (const char *addr_);
|
||||||
|
|
||||||
|
// Get the bound address for use with wildcards
|
||||||
|
int get_address(unsigned char *addr, size_t *len);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Handlers for incoming commands.
|
// Handlers for incoming commands.
|
||||||
@@ -66,6 +69,10 @@ namespace zmq
|
|||||||
// if the connection was dropped while waiting in the listen backlog.
|
// if the connection was dropped while waiting in the listen backlog.
|
||||||
fd_t accept ();
|
fd_t accept ();
|
||||||
|
|
||||||
|
// Store the connected endpoint for binds to port 0
|
||||||
|
char bound_addr[256];
|
||||||
|
size_t bound_addr_len;
|
||||||
|
|
||||||
// True, if the undelying file for UNIX domain socket exists.
|
// True, if the undelying file for UNIX domain socket exists.
|
||||||
bool has_file;
|
bool has_file;
|
||||||
|
|
||||||
|
@@ -30,6 +30,7 @@ zmq::options_t::options_t () :
|
|||||||
rcvhwm (1000),
|
rcvhwm (1000),
|
||||||
affinity (0),
|
affinity (0),
|
||||||
identity_size (0),
|
identity_size (0),
|
||||||
|
last_endpoint_size(0),
|
||||||
rate (100),
|
rate (100),
|
||||||
recovery_ivl (10000),
|
recovery_ivl (10000),
|
||||||
multicast_hops (1),
|
multicast_hops (1),
|
||||||
@@ -213,7 +214,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
|||||||
ipv4only = val;
|
ipv4only = val;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
@@ -386,6 +386,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
|||||||
*optvallen_ = sizeof (int);
|
*optvallen_ = sizeof (int);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
case ZMQ_LAST_ENDPOINT:
|
||||||
|
if (*optvallen_ < last_endpoint_size) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memcpy (optval_, last_endpoint, last_endpoint_size);
|
||||||
|
*optvallen_ = last_endpoint_size;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
|
@@ -47,6 +47,10 @@ namespace zmq
|
|||||||
unsigned char identity_size;
|
unsigned char identity_size;
|
||||||
unsigned char identity [256];
|
unsigned char identity [256];
|
||||||
|
|
||||||
|
// Last socket endpoint URI
|
||||||
|
unsigned char last_endpoint [256];
|
||||||
|
size_t last_endpoint_size;
|
||||||
|
|
||||||
// Maximum tranfer rate [kb/s]. Default 100kb/s.
|
// Maximum tranfer rate [kb/s]. Default 100kb/s.
|
||||||
int rate;
|
int rate;
|
||||||
|
|
||||||
|
@@ -160,6 +160,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
|
|||||||
}
|
}
|
||||||
protocol_ = uri.substr (0, pos);
|
protocol_ = uri.substr (0, pos);
|
||||||
address_ = uri.substr (pos + 3);
|
address_ = uri.substr (pos + 3);
|
||||||
|
|
||||||
if (protocol_.empty () || address_.empty ()) {
|
if (protocol_.empty () || address_.empty ()) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
@@ -339,6 +340,8 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
delete listener;
|
delete listener;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rc = listener->get_address (options.last_endpoint, &(options.last_endpoint_size));
|
||||||
launch_child (listener);
|
launch_child (listener);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -353,6 +356,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
delete listener;
|
delete listener;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
rc = listener->get_address (options.last_endpoint, &(options.last_endpoint_size));
|
||||||
launch_child (listener);
|
launch_child (listener);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@@ -387,11 +387,17 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
|
|||||||
addr_str [addr_str.size () - 1] == ']')
|
addr_str [addr_str.size () - 1] == ']')
|
||||||
addr_str = addr_str.substr (1, addr_str.size () - 2);
|
addr_str = addr_str.substr (1, addr_str.size () - 2);
|
||||||
|
|
||||||
// Parse the port number (0 is not a valid port).
|
uint16_t port;
|
||||||
uint16_t port = (uint16_t) atoi (port_str.c_str());
|
if (port_str[0] == '*') {
|
||||||
if (port == 0) {
|
// Resolve wildcard to 0 to allow autoselection of port
|
||||||
errno = EINVAL;
|
port = 0;
|
||||||
return -1;
|
} else {
|
||||||
|
// Parse the port number (0 is not a valid port).
|
||||||
|
port = (uint16_t) atoi (port_str.c_str());
|
||||||
|
if (port == 0) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Resolve the IP address.
|
// Resolve the IP address.
|
||||||
|
@@ -52,6 +52,7 @@ zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
|
|||||||
socket_base_t *socket_, const options_t &options_) :
|
socket_base_t *socket_, const options_t &options_) :
|
||||||
own_t (io_thread_, options_),
|
own_t (io_thread_, options_),
|
||||||
io_object_t (io_thread_),
|
io_object_t (io_thread_),
|
||||||
|
bound_addr_len (0),
|
||||||
has_file (false),
|
has_file (false),
|
||||||
s (retired_fd),
|
s (retired_fd),
|
||||||
socket (socket_)
|
socket (socket_)
|
||||||
@@ -119,6 +120,17 @@ void zmq::tcp_listener_t::close ()
|
|||||||
s = retired_fd;
|
s = retired_fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::tcp_listener_t::get_address (unsigned char *addr, size_t *len)
|
||||||
|
{
|
||||||
|
if (bound_addr_len == 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy (addr, bound_addr, bound_addr_len + 1);
|
||||||
|
*len = bound_addr_len + 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int zmq::tcp_listener_t::set_address (const char *addr_)
|
int zmq::tcp_listener_t::set_address (const char *addr_)
|
||||||
{
|
{
|
||||||
// Convert the textual address into address structure.
|
// Convert the textual address into address structure.
|
||||||
@@ -168,6 +180,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
|
|||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
// Bind the socket to the network interface and port.
|
// Bind the socket to the network interface and port.
|
||||||
rc = bind (s, address.addr (), address.addrlen ());
|
rc = bind (s, address.addr (), address.addrlen ());
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
@@ -180,6 +193,25 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
struct sockaddr sa;
|
||||||
|
socklen_t sl = sizeof(sockaddr);
|
||||||
|
rc = getsockname (s, &sa, &sl);
|
||||||
|
if (rc == 0) {
|
||||||
|
char host[INET6_ADDRSTRLEN];
|
||||||
|
int port;
|
||||||
|
|
||||||
|
if ( sa.sa_family == AF_INET ) {
|
||||||
|
inet_ntop(AF_INET, &(((struct sockaddr_in *)&sa)->sin_addr), host, INET6_ADDRSTRLEN);
|
||||||
|
port = ntohs( ((struct sockaddr_in *)&sa)->sin_port);
|
||||||
|
} else {
|
||||||
|
inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)&sa)->sin6_addr), host, INET6_ADDRSTRLEN);
|
||||||
|
port = ntohs( ((struct sockaddr_in6 *)&sa)->sin6_port);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the address for retrieval by users using wildcards
|
||||||
|
bound_addr_len = sprintf(bound_addr, "tcp://%s:%d", host, port);
|
||||||
|
}
|
||||||
|
|
||||||
// Listen for incomming connections.
|
// Listen for incomming connections.
|
||||||
rc = listen (s, options.backlog);
|
rc = listen (s, options.backlog);
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
|
@@ -45,6 +45,9 @@ namespace zmq
|
|||||||
// Set address to listen on.
|
// Set address to listen on.
|
||||||
int set_address (const char *addr_);
|
int set_address (const char *addr_);
|
||||||
|
|
||||||
|
// Get the bound address for use with wildcards
|
||||||
|
int get_address(unsigned char *addr, size_t *len);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Handlers for incoming commands.
|
// Handlers for incoming commands.
|
||||||
@@ -65,6 +68,10 @@ namespace zmq
|
|||||||
// Address to listen on.
|
// Address to listen on.
|
||||||
tcp_address_t address;
|
tcp_address_t address;
|
||||||
|
|
||||||
|
// Store the connected endpoint for binds to port 0
|
||||||
|
char bound_addr[256];
|
||||||
|
size_t bound_addr_len;
|
||||||
|
|
||||||
// True, if the undelying file for UNIX domain socket exists.
|
// True, if the undelying file for UNIX domain socket exists.
|
||||||
bool has_file;
|
bool has_file;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user