mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 10:52:56 +01:00
problem: udp doesn't enforce correct usage of bind/connect
solution: enforce that dish and gram can only bind and radio can only connect
This commit is contained in:
parent
0db70e247c
commit
c4d0146f2c
@ -556,9 +556,9 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") {
|
||||
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
|
||||
// For convenience's sake, bind can be used interchangeable with
|
||||
// connect for PGM, EPGM, NORM and UDP transports.
|
||||
// connect for PGM, EPGM, NORM transports.
|
||||
EXIT_MUTEX ();
|
||||
rc = connect (addr_);
|
||||
if (rc != -1)
|
||||
@ -566,6 +566,64 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (protocol == "udp") {
|
||||
if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
|
||||
errno = ENOCOMPATPROTO;
|
||||
EXIT_MUTEX ();
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Choose the I/O thread to run the session in.
|
||||
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
||||
if (!io_thread) {
|
||||
errno = EMTHREAD;
|
||||
EXIT_MUTEX ();
|
||||
return -1;
|
||||
}
|
||||
|
||||
address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
|
||||
alloc_assert (paddr);
|
||||
|
||||
paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
|
||||
alloc_assert (paddr->resolved.udp_addr);
|
||||
rc = paddr->resolved.udp_addr->resolve (address.c_str(), true);
|
||||
if (rc != 0) {
|
||||
LIBZMQ_DELETE(paddr);
|
||||
EXIT_MUTEX ();
|
||||
return -1;
|
||||
}
|
||||
|
||||
session_base_t *session = session_base_t::create (io_thread, true, this,
|
||||
options, paddr);
|
||||
errno_assert (session);
|
||||
|
||||
pipe_t *newpipe = NULL;
|
||||
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents [2] = {this, session};
|
||||
pipe_t *new_pipes [2] = {NULL, NULL};
|
||||
|
||||
int hwms [2] = {options.sndhwm, options.rcvhwm};
|
||||
bool conflates [2] = {false, false};
|
||||
rc = pipepair (parents, new_pipes, hwms, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Attach local end of the pipe to the socket object.
|
||||
attach_pipe (new_pipes [0], true);
|
||||
newpipe = new_pipes [0];
|
||||
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (new_pipes [1]);
|
||||
|
||||
// Save last endpoint URI
|
||||
paddr->to_string (last_endpoint);
|
||||
|
||||
add_endpoint (addr_, (own_t *) session, newpipe);
|
||||
|
||||
EXIT_MUTEX ();
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Remaining transports require to be run in an I/O thread, so at this
|
||||
// point we'll choose one.
|
||||
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
||||
@ -881,9 +939,15 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
#endif
|
||||
|
||||
if (protocol == "udp") {
|
||||
if (options.type != ZMQ_RADIO) {
|
||||
errno = ENOCOMPATPROTO;
|
||||
EXIT_MUTEX ();
|
||||
return -1;
|
||||
}
|
||||
|
||||
paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
|
||||
alloc_assert (paddr->resolved.udp_addr);
|
||||
rc = paddr->resolved.udp_addr->resolve (address.c_str(), (options.type == ZMQ_DISH || options.type == ZMQ_DGRAM));
|
||||
rc = paddr->resolved.udp_addr->resolve (address.c_str(), false);
|
||||
if (rc != 0) {
|
||||
LIBZMQ_DELETE(paddr);
|
||||
EXIT_MUTEX ();
|
||||
@ -1284,7 +1348,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
||||
int zmq::socket_base_t::close ()
|
||||
{
|
||||
ENTER_MUTEX ();
|
||||
|
||||
|
||||
// Remove all existing signalers for thread safe sockets
|
||||
if (thread_safe)
|
||||
((mailbox_safe_t*)mailbox)->clear_signalers();
|
||||
|
@ -55,7 +55,7 @@ zmq::udp_address_t::~udp_address_t ()
|
||||
{
|
||||
}
|
||||
|
||||
int zmq::udp_address_t::resolve (const char *name_, bool receiver_)
|
||||
int zmq::udp_address_t::resolve (const char *name_, bool bind_)
|
||||
{
|
||||
// Find the ':' at end that separates address from the port number.
|
||||
const char *delimiter = strrchr (name_, ':');
|
||||
@ -78,8 +78,8 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_)
|
||||
dest_address.sin_family = AF_INET;
|
||||
dest_address.sin_port = htons (port);
|
||||
|
||||
// Only when the udp is receiver we allow * as the address
|
||||
if (addr_str == "*" && receiver_)
|
||||
// Only when the udp should bind we allow * as the address
|
||||
if (addr_str == "*" && bind_)
|
||||
dest_address.sin_addr.s_addr = htons (INADDR_ANY);
|
||||
else
|
||||
dest_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
|
||||
@ -106,9 +106,9 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// If a receiver and not a multicast, the dest address
|
||||
// If a should bind and not a multicast, the dest address
|
||||
// is actually the bind address
|
||||
if (receiver_ && !is_mutlicast)
|
||||
if (bind_ && !is_mutlicast)
|
||||
bind_address = dest_address;
|
||||
else {
|
||||
bind_address.sin_family = AF_INET;
|
||||
|
@ -60,7 +60,11 @@ int main (void)
|
||||
void *sender = zmq_socket (ctx, ZMQ_DGRAM);
|
||||
void *listener = zmq_socket (ctx, ZMQ_DGRAM);
|
||||
|
||||
int rc = zmq_bind (listener, "udp://*:5556");
|
||||
// Connecting dgram shoudl fail
|
||||
int rc = zmq_connect (listener, "udp://127.0.0.1:5556");
|
||||
assert (rc == -1);
|
||||
|
||||
rc = zmq_bind (listener, "udp://*:5556");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (sender, "udp://*:5557");
|
||||
@ -68,7 +72,7 @@ int main (void)
|
||||
|
||||
str_send_to (sender, "Is someone there ?", "127.0.0.1:5556");
|
||||
|
||||
str_recv_from (listener, &message_string, &address);
|
||||
str_recv_from (listener, &message_string, &address);
|
||||
assert (strcmp(message_string, "Is someone there ?") == 0);
|
||||
assert (strcmp(address, "127.0.0.1:5557") == 0);
|
||||
free (message_string);
|
||||
|
@ -95,9 +95,17 @@ int main (void)
|
||||
void *radio = zmq_socket (ctx, ZMQ_RADIO);
|
||||
void *dish = zmq_socket (ctx, ZMQ_DISH);
|
||||
|
||||
int rc = zmq_bind (dish, "udp://*:5556");
|
||||
// Connecting dish should fail
|
||||
int rc = zmq_connect (dish, "udp://127.0.0.1:5556");
|
||||
assert (rc == -1);
|
||||
|
||||
rc = zmq_bind (dish, "udp://*:5556");
|
||||
assert (rc == 0);
|
||||
|
||||
// Bind radio should fail
|
||||
rc = zmq_bind (radio, "udp://*:5556");
|
||||
assert (rc == -1);
|
||||
|
||||
rc = zmq_connect (radio, "udp://127.0.0.1:5556");
|
||||
assert (rc == 0);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user