diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 88d6e41c..691764e4 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -556,9 +556,9 @@ int zmq::socket_base_t::bind (const char *addr_) return rc; } - if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") { + if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { // For convenience's sake, bind can be used interchangeable with - // connect for PGM, EPGM, NORM and UDP transports. + // connect for PGM, EPGM, NORM transports. EXIT_MUTEX (); rc = connect (addr_); if (rc != -1) @@ -566,6 +566,64 @@ int zmq::socket_base_t::bind (const char *addr_) return rc; } + if (protocol == "udp") { + if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) { + errno = ENOCOMPATPROTO; + EXIT_MUTEX (); + return -1; + } + + // Choose the I/O thread to run the session in. + io_thread_t *io_thread = choose_io_thread (options.affinity); + if (!io_thread) { + errno = EMTHREAD; + EXIT_MUTEX (); + return -1; + } + + address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ()); + alloc_assert (paddr); + + paddr->resolved.udp_addr = new (std::nothrow) udp_address_t (); + alloc_assert (paddr->resolved.udp_addr); + rc = paddr->resolved.udp_addr->resolve (address.c_str(), true); + if (rc != 0) { + LIBZMQ_DELETE(paddr); + EXIT_MUTEX (); + return -1; + } + + session_base_t *session = session_base_t::create (io_thread, true, this, + options, paddr); + errno_assert (session); + + pipe_t *newpipe = NULL; + + // Create a bi-directional pipe. + object_t *parents [2] = {this, session}; + pipe_t *new_pipes [2] = {NULL, NULL}; + + int hwms [2] = {options.sndhwm, options.rcvhwm}; + bool conflates [2] = {false, false}; + rc = pipepair (parents, new_pipes, hwms, conflates); + errno_assert (rc == 0); + + // Attach local end of the pipe to the socket object. + attach_pipe (new_pipes [0], true); + newpipe = new_pipes [0]; + + // Attach remote end of the pipe to the session object later on. + session->attach_pipe (new_pipes [1]); + + // Save last endpoint URI + paddr->to_string (last_endpoint); + + add_endpoint (addr_, (own_t *) session, newpipe); + + EXIT_MUTEX (); + return 0; + } + // Remaining transports require to be run in an I/O thread, so at this // point we'll choose one. io_thread_t *io_thread = choose_io_thread (options.affinity); @@ -881,9 +939,15 @@ int zmq::socket_base_t::connect (const char *addr_) #endif if (protocol == "udp") { + if (options.type != ZMQ_RADIO) { + errno = ENOCOMPATPROTO; + EXIT_MUTEX (); + return -1; + } + paddr->resolved.udp_addr = new (std::nothrow) udp_address_t (); alloc_assert (paddr->resolved.udp_addr); - rc = paddr->resolved.udp_addr->resolve (address.c_str(), (options.type == ZMQ_DISH || options.type == ZMQ_DGRAM)); + rc = paddr->resolved.udp_addr->resolve (address.c_str(), false); if (rc != 0) { LIBZMQ_DELETE(paddr); EXIT_MUTEX (); @@ -1284,7 +1348,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) int zmq::socket_base_t::close () { ENTER_MUTEX (); - + // Remove all existing signalers for thread safe sockets if (thread_safe) ((mailbox_safe_t*)mailbox)->clear_signalers(); diff --git a/src/udp_address.cpp b/src/udp_address.cpp index ced27882..260e9427 100644 --- a/src/udp_address.cpp +++ b/src/udp_address.cpp @@ -55,7 +55,7 @@ zmq::udp_address_t::~udp_address_t () { } -int zmq::udp_address_t::resolve (const char *name_, bool receiver_) +int zmq::udp_address_t::resolve (const char *name_, bool bind_) { // Find the ':' at end that separates address from the port number. const char *delimiter = strrchr (name_, ':'); @@ -78,8 +78,8 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_) dest_address.sin_family = AF_INET; dest_address.sin_port = htons (port); - // Only when the udp is receiver we allow * as the address - if (addr_str == "*" && receiver_) + // Only when the udp should bind we allow * as the address + if (addr_str == "*" && bind_) dest_address.sin_addr.s_addr = htons (INADDR_ANY); else dest_address.sin_addr.s_addr = inet_addr (addr_str.c_str ()); @@ -106,9 +106,9 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_) return -1; } - // If a receiver and not a multicast, the dest address + // If a should bind and not a multicast, the dest address // is actually the bind address - if (receiver_ && !is_mutlicast) + if (bind_ && !is_mutlicast) bind_address = dest_address; else { bind_address.sin_family = AF_INET; diff --git a/tests/test_dgram.cpp b/tests/test_dgram.cpp index 98516358..8dd94add 100644 --- a/tests/test_dgram.cpp +++ b/tests/test_dgram.cpp @@ -60,7 +60,11 @@ int main (void) void *sender = zmq_socket (ctx, ZMQ_DGRAM); void *listener = zmq_socket (ctx, ZMQ_DGRAM); - int rc = zmq_bind (listener, "udp://*:5556"); + // Connecting dgram shoudl fail + int rc = zmq_connect (listener, "udp://127.0.0.1:5556"); + assert (rc == -1); + + rc = zmq_bind (listener, "udp://*:5556"); assert (rc == 0); rc = zmq_bind (sender, "udp://*:5557"); @@ -68,7 +72,7 @@ int main (void) str_send_to (sender, "Is someone there ?", "127.0.0.1:5556"); - str_recv_from (listener, &message_string, &address); + str_recv_from (listener, &message_string, &address); assert (strcmp(message_string, "Is someone there ?") == 0); assert (strcmp(address, "127.0.0.1:5557") == 0); free (message_string); diff --git a/tests/test_udp.cpp b/tests/test_udp.cpp index 1394e091..265972f5 100644 --- a/tests/test_udp.cpp +++ b/tests/test_udp.cpp @@ -95,9 +95,17 @@ int main (void) void *radio = zmq_socket (ctx, ZMQ_RADIO); void *dish = zmq_socket (ctx, ZMQ_DISH); - int rc = zmq_bind (dish, "udp://*:5556"); + // Connecting dish should fail + int rc = zmq_connect (dish, "udp://127.0.0.1:5556"); + assert (rc == -1); + + rc = zmq_bind (dish, "udp://*:5556"); assert (rc == 0); + // Bind radio should fail + rc = zmq_bind (radio, "udp://*:5556"); + assert (rc == -1); + rc = zmq_connect (radio, "udp://127.0.0.1:5556"); assert (rc == 0);