From ded0e5a6d93198e480a1f44301a252e7c9054188 Mon Sep 17 00:00:00 2001 From: somdoron Date: Mon, 16 May 2016 12:04:08 +0300 Subject: [PATCH] problem: udp_engine didn't work with dgram socket type --- src/udp_engine.cpp | 122 ++++++++++++++++++++++++++++++++++----------- src/udp_engine.hpp | 10 +++- 2 files changed, 103 insertions(+), 29 deletions(-) diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 8f44a02a..1d0b385c 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -101,8 +101,18 @@ void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_) io_object_t::plug (io_thread_); handle = add_fd (fd); - if (send_enabled) + if (send_enabled) { + if (!options.raw_socket) { + out_address = address->resolved.udp_addr->dest_addr (); + out_addrlen = address->resolved.udp_addr->dest_addrlen (); + } + else { + out_address = (sockaddr *) &raw_address; + out_addrlen = sizeof (sockaddr_in); + } + set_pollout (handle); + } if (recv_enabled) { int on = 1; @@ -152,6 +162,54 @@ void zmq::udp_engine_t::terminate() delete this; } +void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg, sockaddr_in* addr) +{ + char* name = inet_ntoa(addr->sin_addr); + + char port[6]; + snprintf (port, 6, "%d", (int)ntohs (addr->sin_port)); + + int size = strlen (name) + strlen (port) + 1 + 1; // Colon + NULL + int rc = msg->init_size (size); + errno_assert (rc == 0); + msg->set_flags (msg_t::more); + char *address = (char*)msg->data (); + + strcpy (address, name); + strcat (address, ":"); + strcat (address, port); +} + +int zmq::udp_engine_t::resolve_raw_address (char *name_, int length_) +{ + const char *delimiter = strrchr (name_, ':'); + if (!delimiter) { + errno = EINVAL; + return -1; + } + + std::string addr_str (name_, delimiter - name_); + std::string port_str (delimiter + 1); + + // Parse the port number (0 is not a valid port). + uint16_t port = (uint16_t) atoi (port_str.c_str ()); + if (port == 0) { + errno = EINVAL; + return -1; + } + + raw_address.sin_family = AF_INET; + raw_address.sin_port = htons (port); + raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ()); + + if (raw_address.sin_addr.s_addr == INADDR_NONE) { + errno = EINVAL; + return -1; + } + + return 0; +} + void zmq::udp_engine_t::out_event() { msg_t group_msg; @@ -164,20 +222,29 @@ void zmq::udp_engine_t::out_event() size_t group_size = group_msg.size (); size_t body_size = body_msg.size (); - size_t size = group_size + body_size + 1; + size_t size; - struct sockaddr* out_address = (struct sockaddr*) address->resolved.udp_addr->dest_addr (); - socklen_t out_addrlen = address->resolved.udp_addr->dest_addrlen (); - if (options.raw_socket) { - if (group_size > 0) { - out_address = (struct sockaddr*) group_msg.data(); - out_addrlen = group_size; - size = body_size; + rc = resolve_raw_address ((char*) group_msg.data(), group_size); + + // We discard the message if address is not valid + if (rc != 0) { + rc = group_msg.close (); + errno_assert (rc == 0); + + body_msg.close (); + errno_assert (rc == 0); + + return; } + + size = body_size; + memcpy (out_buffer, body_msg.data (), body_size); } else { + size = group_size + body_size + 1; + // TODO: check if larger than maximum size out_buffer[0] = (unsigned char) group_size; memcpy (out_buffer + 1, group_msg.data (), group_size); @@ -192,8 +259,7 @@ void zmq::udp_engine_t::out_event() #ifdef ZMQ_HAVE_WINDOWS rc = sendto (fd, (const char *) out_buffer, (int) size, 0, - address->resolved.udp_addr->dest_addr (), - (int) address->resolved.udp_addr->dest_addrlen ()); + out_address, (int) out_addrlen); wsa_assert (rc != SOCKET_ERROR); #else rc = sendto (fd, out_buffer, size, 0, out_address, out_addrlen); @@ -242,34 +308,34 @@ void zmq::udp_engine_t::in_event() return; } #endif - - void* group_buffer; - int group_size; + int rc; int body_size; + int body_offset; msg_t msg; - + if (options.raw_socket) { - group_buffer = (void*) &(in_address); - group_size = in_addrlen; - - body_size = nbytes - 1; + sockaddr_to_msg (&msg, &in_address); + + body_size = nbytes; + body_offset = 0; } else { - group_buffer = in_buffer + 1; - group_size = in_buffer[0]; + char* group_buffer = (char *)in_buffer + 1; + int group_size = in_buffer[0]; + + rc = msg.init_size (group_size); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + memcpy (msg.data (), group_buffer, group_size); // This doesn't fit, just ingore if (nbytes - 1 < group_size) return; body_size = nbytes - 1 - group_size; + body_offset = 1 + group_size; } - - int rc = msg.init_size (group_size); - errno_assert (rc == 0); - msg.set_flags (msg_t::more); - memcpy (msg.data (), group_buffer, group_size); - + rc = session->push_msg (&msg); errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); @@ -286,7 +352,7 @@ void zmq::udp_engine_t::in_event() errno_assert (rc == 0); rc = msg.init_size (body_size); errno_assert (rc == 0); - memcpy (msg.data (), in_buffer + 1 + group_size, body_size); + memcpy (msg.data (), in_buffer + body_offset, body_size); rc = session->push_msg (&msg); errno_assert (rc == 0); rc = msg.close (); diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp index 44b63c5d..151abd35 100644 --- a/src/udp_engine.hpp +++ b/src/udp_engine.hpp @@ -6,6 +6,7 @@ #include "i_engine.hpp" #include "address.hpp" #include "udp_address.hpp" +#include "msg.hpp" #define MAX_UDP_MSG 8192 @@ -45,15 +46,22 @@ namespace zmq private: + int resolve_raw_address (char *addr_, int length_); + void sockaddr_to_msg (zmq::msg_t *msg, sockaddr_in* addr); + bool plugged; fd_t fd; session_base_t* session; handle_t handle; address_t *address; - + options_t options; + sockaddr_in raw_address; + const struct sockaddr* out_address; + socklen_t out_addrlen; + unsigned char out_buffer[MAX_UDP_MSG]; unsigned char in_buffer[MAX_UDP_MSG]; bool send_enabled;