diff --git a/src/decoder.cpp b/src/decoder.cpp index 30714da5..06fb35b7 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -23,6 +23,11 @@ #include #include +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#endif + #include "decoder.hpp" #include "session_base.hpp" #include "likely.hpp" diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 82dee0d3..c25ee8ca 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -59,16 +59,14 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : { } -// Create, bind and connect PGM socket. +// Resolve PGM socket address. // network_ of the form : // e.g. eth0;239.192.0.1:7500 // link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000 // ;[fe80::1%en0]:7500 -int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) +int zmq::pgm_socket_t::init_address (const char *network_, + struct pgm_addrinfo_t **addr, uint16_t *port_number) { - // Can not open transport before destroying old one. - zmq_assert (sock == NULL); - // Parse port number, start from end for IPv6 const char *port_delim = strrchr (network_, ':'); if (!port_delim) { @@ -76,7 +74,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) return -1; } - uint16_t port_number = atoi (port_delim + 1); + *port_number = atoi (port_delim + 1); char network [256]; if (port_delim - network_ >= (int) sizeof (network) - 1) { @@ -86,13 +84,6 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) memset (network, '\0', sizeof (network)); memcpy (network, network_, port_delim - network_); - zmq_assert (options.rate > 0); - - // Zero counter used in msgrecv. - nbytes_rec = 0; - nbytes_processed = 0; - pgm_msgv_processed = 0; - pgm_error_t *pgm_error = NULL; struct pgm_addrinfo_t hints, *res = NULL; sa_family_t sa_family; @@ -103,18 +94,45 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) // Invalid parameters don't set pgm_error_t. zmq_assert (pgm_error != NULL); - if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( + if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && // NB: cannot catch EAI_BADFLAGS. - pgm_error->code != PGM_ERROR_SERVICE && - pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) + ( pgm_error->code != PGM_ERROR_SERVICE && + pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) { // User, host, or network configuration or transient error. - goto err_abort; + pgm_error_free (pgm_error); + errno = EINVAL; + return -1; + } // Fatal OpenPGM internal error. zmq_assert (false); } + return 0; +} + +// Create, bind and connect PGM socket. +int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) +{ + // Can not open transport before destroying old one. + zmq_assert (sock == NULL); + zmq_assert (options.rate > 0); + + // Zero counter used in msgrecv. + nbytes_rec = 0; + nbytes_processed = 0; + pgm_msgv_processed = 0; + + uint16_t port_number; + struct pgm_addrinfo_t *res = NULL; + sa_family_t sa_family; + + pgm_error_t *pgm_error = NULL; + + if (init_address(network_, &res, &port_number) < 0) { + goto err_abort; + } zmq_assert (res != NULL); diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 5a5ef99d..80fcb4af 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -57,6 +57,9 @@ namespace zmq // Initialize PGM network structures (GSI, GSRs). int init (bool udp_encapsulation_, const char *network_); + + // Resolve PGM socket address. + static int init_address(const char *network_, struct pgm_addrinfo_t **addr, uint16_t *port_number); // Get receiver fds and store them into user allocated memory. void get_receiver_fds (fd_t *receive_fd_, fd_t *waiting_pipe_fd_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 994f2a64..eaddced6 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -55,6 +55,9 @@ #include "address.hpp" #include "ipc_address.hpp" #include "tcp_address.hpp" +#ifdef ZMQ_HAVE_OPENPGM +#include "pgm_socket.hpp" +#endif #include "pair.hpp" #include "pub.hpp" @@ -510,6 +513,17 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } } +#endif +#ifdef ZMQ_HAVE_OPENPGM + if (protocol == "pgm" || protocol == "epgm") { + struct pgm_addrinfo_t *res = NULL; + uint16_t port_number = 0; + int rc = pgm_socket_t::init_address(address.c_str(), &res, &port_number); + if (res != NULL) + pgm_freeaddrinfo (res); + if (rc != 0 || port_number == 0) + return -1; + } #endif // Create session. session_base_t *session = session_base_t::create (io_thread, true, this,