diff --git a/src/epoll.cpp b/src/epoll.cpp index cee48f6c..7f3f9c1a 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -45,8 +46,7 @@ #include "i_poll_events.hpp" zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) : - ctx (ctx_), - stopping (false) + worker_poller_base_t (ctx_) { #ifdef ZMQ_USE_EPOLL_CLOEXEC // Setting this option result in sane behaviour when exec() functions @@ -62,7 +62,7 @@ zmq::epoll_t::epoll_t (const zmq::thread_ctx_t &ctx_) : zmq::epoll_t::~epoll_t () { // Wait till the worker thread exits. - worker.stop (); + stop_worker (); close (epoll_fd); for (retired_t::iterator it = retired.begin (); it != retired.end (); @@ -73,6 +73,7 @@ zmq::epoll_t::~epoll_t () zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) { + check_thread (); poll_entry_t *pe = new (std::nothrow) poll_entry_t; alloc_assert (pe); @@ -96,6 +97,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) void zmq::epoll_t::rm_fd (handle_t handle_) { + check_thread (); poll_entry_t *pe = (poll_entry_t *) handle_; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -110,6 +112,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_) { + check_thread (); poll_entry_t *pe = (poll_entry_t *) handle_; pe->ev.events |= EPOLLIN; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); @@ -118,6 +121,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::reset_pollin (handle_t handle_) { + check_thread (); poll_entry_t *pe = (poll_entry_t *) handle_; pe->ev.events &= ~((short) EPOLLIN); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); @@ -126,6 +130,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_) void zmq::epoll_t::set_pollout (handle_t handle_) { + check_thread (); poll_entry_t *pe = (poll_entry_t *) handle_; pe->ev.events |= EPOLLOUT; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); @@ -134,20 +139,16 @@ void zmq::epoll_t::set_pollout (handle_t handle_) void zmq::epoll_t::reset_pollout (handle_t handle_) { + check_thread (); poll_entry_t *pe = (poll_entry_t *) handle_; pe->ev.events &= ~((short) EPOLLOUT); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); } -void zmq::epoll_t::start () -{ - ctx.start_thread (worker, worker_routine, this); -} - void zmq::epoll_t::stop () { - stopping = true; + check_thread (); } int zmq::epoll_t::max_fds () @@ -159,10 +160,18 @@ void zmq::epoll_t::loop () { epoll_event ev_buf[max_io_events]; - while (!stopping) { + while (true) { // Execute any due timers. int timeout = (int) execute_timers (); + if (get_load () == 0) { + if (timeout == 0) + break; + + // TODO sleep for timeout + continue; + } + // Wait for events. int n = epoll_wait (epoll_fd, &ev_buf[0], max_io_events, timeout ? timeout : -1); @@ -199,9 +208,4 @@ void zmq::epoll_t::loop () } } -void zmq::epoll_t::worker_routine (void *arg_) -{ - ((epoll_t *) arg_)->loop (); -} - #endif diff --git a/src/epoll.hpp b/src/epoll.hpp index fc878abe..d320764f 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -50,7 +50,7 @@ struct i_poll_events; // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. -class epoll_t : public poller_base_t +class epoll_t : public worker_poller_base_t { public: typedef void *handle_t; @@ -65,21 +65,14 @@ class epoll_t : public poller_base_t void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void start (); void stop (); static int max_fds (); private: - // Main worker thread routine. - static void worker_routine (void *arg_); - // Main event loop. void loop (); - // Reference to ZMQ context. - const thread_ctx_t &ctx; - // Main epoll file descriptor fd_t epoll_fd; @@ -94,9 +87,6 @@ class epoll_t : public poller_base_t typedef std::vector retired_t; retired_t retired; - // If true, thread is in the process of shutting down. - bool stopping; - // Handle of the physical thread doing the I/O work. thread_t worker; diff --git a/src/ip.cpp b/src/ip.cpp index c2a8f8a2..ab253b96 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -31,6 +31,7 @@ #include "ip.hpp" #include "err.hpp" #include "macros.hpp" +#include "config.hpp" #if !defined ZMQ_HAVE_WINDOWS #include @@ -39,12 +40,26 @@ #include #include #include +#else +#include "tcp.hpp" #endif #if defined ZMQ_HAVE_OPENVMS #include #endif +#if defined ZMQ_HAVE_EVENTFD +#include +#endif + +#if defined ZMQ_HAVE_OPENPGM +#ifdef ZMQ_HAVE_WINDOWS +#define __PGM_WININT_H__ +#endif + +#include +#endif + zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_) { int rc; @@ -229,3 +244,368 @@ void zmq::bind_to_device (fd_t s_, std::string &bound_device_) LIBZMQ_UNUSED (bound_device_); #endif } + +bool zmq::initialize_network () +{ +#if defined ZMQ_HAVE_OPENPGM + + // Init PGM transport. Ensure threading and timer are enabled. Find PGM + // protocol ID. Note that if you want to use gettimeofday and sleep for + // openPGM timing, set environment variables PGM_TIMER to "GTOD" and + // PGM_SLEEP to "USLEEP". + pgm_error_t *pgm_error = NULL; + const bool ok = pgm_init (&pgm_error); + if (ok != TRUE) { + // Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); + if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME + && (pgm_error->code == PGM_ERROR_FAILED)) { + // Failed to access RTC or HPET device. + pgm_error_free (pgm_error); + errno = EINVAL; + return false; + } + + // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg. + zmq_assert (false); + } +#endif + +#ifdef ZMQ_HAVE_WINDOWS + // Intialise Windows sockets. Note that WSAStartup can be called multiple + // times given that WSACleanup will be called for each WSAStartup. + + WORD version_requested = MAKEWORD (2, 2); + WSADATA wsa_data; + int rc = WSAStartup (version_requested, &wsa_data); + zmq_assert (rc == 0); + zmq_assert (LOBYTE (wsa_data.wVersion) == 2 + && HIBYTE (wsa_data.wVersion) == 2); +#endif + + return true; +} + +void zmq::shutdown_network () +{ +#ifdef ZMQ_HAVE_WINDOWS + // On Windows, uninitialise socket layer. + int rc = WSACleanup (); + wsa_assert (rc != SOCKET_ERROR); +#endif + +#if defined ZMQ_HAVE_OPENPGM + // Shut down the OpenPGM library. + if (pgm_shutdown () != TRUE) + zmq_assert (false); +#endif +} + +#if defined ZMQ_HAVE_WINDOWS +static void tune_socket (const SOCKET socket) +{ + BOOL tcp_nodelay = 1; + int rc = setsockopt (socket, IPPROTO_TCP, TCP_NODELAY, + (char *) &tcp_nodelay, sizeof tcp_nodelay); + wsa_assert (rc != SOCKET_ERROR); + + zmq::tcp_tune_loopback_fast_path (socket); +} +#endif + +int zmq::make_fdpair (fd_t *r_, fd_t *w_) +{ +#if defined ZMQ_HAVE_EVENTFD + int flags = 0; +#if defined ZMQ_HAVE_EVENTFD_CLOEXEC + // Setting this option result in sane behaviour when exec() functions + // are used. Old sockets are closed and don't block TCP ports, avoid + // leaks, etc. + flags |= EFD_CLOEXEC; +#endif + fd_t fd = eventfd (0, flags); + if (fd == -1) { + errno_assert (errno == ENFILE || errno == EMFILE); + *w_ = *r_ = -1; + return -1; + } else { + *w_ = *r_ = fd; + return 0; + } + +#elif defined ZMQ_HAVE_WINDOWS +#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP + // Windows CE does not manage security attributes + SECURITY_DESCRIPTOR sd; + SECURITY_ATTRIBUTES sa; + memset (&sd, 0, sizeof sd); + memset (&sa, 0, sizeof sa); + + InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION); + SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE); + + sa.nLength = sizeof (SECURITY_ATTRIBUTES); + sa.lpSecurityDescriptor = &sd; +#endif + + // This function has to be in a system-wide critical section so that + // two instances of the library don't accidentally create signaler + // crossing the process boundary. + // We'll use named event object to implement the critical section. + // Note that if the event object already exists, the CreateEvent requests + // EVENT_ALL_ACCESS access right. If this fails, we try to open + // the event object asking for SYNCHRONIZE access only. + HANDLE sync = NULL; + + // Create critical section only if using fixed signaler port + // Use problematic Event implementation for compatibility if using old port 5905. + // Otherwise use Mutex implementation. + int event_signaler_port = 5905; + + if (signaler_port == event_signaler_port) { +#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP + sync = + CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync"); +#else + sync = + CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync"); +#endif + if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) + sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE, + L"Global\\zmq-signaler-port-sync"); + + win_assert (sync != NULL); + } else if (signaler_port != 0) { + wchar_t mutex_name[MAX_PATH]; +#ifdef __MINGW32__ + _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", + signaler_port); +#else + swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", + signaler_port); +#endif + +#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP + sync = CreateMutexW (&sa, FALSE, mutex_name); +#else + sync = CreateMutexW (NULL, FALSE, mutex_name); +#endif + if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) + sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name); + + win_assert (sync != NULL); + } + + // Windows has no 'socketpair' function. CreatePipe is no good as pipe + // handles cannot be polled on. Here we create the socketpair by hand. + *w_ = INVALID_SOCKET; + *r_ = INVALID_SOCKET; + + // Create listening socket. + SOCKET listener; + listener = open_socket (AF_INET, SOCK_STREAM, 0); + wsa_assert (listener != INVALID_SOCKET); + + // Set SO_REUSEADDR and TCP_NODELAY on listening socket. + BOOL so_reuseaddr = 1; + int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, + (char *) &so_reuseaddr, sizeof so_reuseaddr); + wsa_assert (rc != SOCKET_ERROR); + + tune_socket (listener); + + // Init sockaddr to signaler port. + struct sockaddr_in addr; + memset (&addr, 0, sizeof addr); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + addr.sin_port = htons (signaler_port); + + // Create the writer socket. + *w_ = open_socket (AF_INET, SOCK_STREAM, 0); + wsa_assert (*w_ != INVALID_SOCKET); + + // Set TCP_NODELAY on writer socket. + tune_socket (*w_); + + if (sync != NULL) { + // Enter the critical section. + DWORD dwrc = WaitForSingleObject (sync, INFINITE); + zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED); + } + + // Bind listening socket to signaler port. + rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr); + + if (rc != SOCKET_ERROR && signaler_port == 0) { + // Retrieve ephemeral port number + int addrlen = sizeof addr; + rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen); + } + + // Listen for incoming connections. + if (rc != SOCKET_ERROR) + rc = listen (listener, 1); + + // Connect writer to the listener. + if (rc != SOCKET_ERROR) + rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr); + + // Accept connection from writer. + if (rc != SOCKET_ERROR) + *r_ = accept (listener, NULL, NULL); + + // Send/receive large chunk to work around TCP slow start + // This code is a workaround for #1608 + if (*r_ != INVALID_SOCKET) { + size_t dummy_size = + 1024 * 1024; // 1M to overload default receive buffer + unsigned char *dummy = (unsigned char *) malloc (dummy_size); + wsa_assert (dummy); + + int still_to_send = (int) dummy_size; + int still_to_recv = (int) dummy_size; + while (still_to_send || still_to_recv) { + int nbytes; + if (still_to_send > 0) { + nbytes = + ::send (*w_, (char *) (dummy + dummy_size - still_to_send), + still_to_send, 0); + wsa_assert (nbytes != SOCKET_ERROR); + still_to_send -= nbytes; + } + nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), + still_to_recv, 0); + wsa_assert (nbytes != SOCKET_ERROR); + still_to_recv -= nbytes; + } + free (dummy); + } + + // Save errno if error occurred in bind/listen/connect/accept. + int saved_errno = 0; + if (*r_ == INVALID_SOCKET) + saved_errno = WSAGetLastError (); + + // We don't need the listening socket anymore. Close it. + rc = closesocket (listener); + wsa_assert (rc != SOCKET_ERROR); + + if (sync != NULL) { + // Exit the critical section. + BOOL brc; + if (signaler_port == event_signaler_port) + brc = SetEvent (sync); + else + brc = ReleaseMutex (sync); + win_assert (brc != 0); + + // Release the kernel object + brc = CloseHandle (sync); + win_assert (brc != 0); + } + + if (*r_ != INVALID_SOCKET) { +#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP + // On Windows, preventing sockets to be inherited by child processes. + BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0); + win_assert (brc); +#endif + return 0; + } else { + // Cleanup writer if connection failed + if (*w_ != INVALID_SOCKET) { + rc = closesocket (*w_); + wsa_assert (rc != SOCKET_ERROR); + *w_ = INVALID_SOCKET; + } + // Set errno from saved value + errno = wsa_error_to_errno (saved_errno); + return -1; + } + +#elif defined ZMQ_HAVE_OPENVMS + + // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, + // it does not set the socket options TCP_NODELAY and TCP_NODELACK which + // can lead to performance problems. + // + // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll + // create the socket pair manually. + struct sockaddr_in lcladdr; + memset (&lcladdr, 0, sizeof lcladdr); + lcladdr.sin_family = AF_INET; + lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + lcladdr.sin_port = 0; + + int listener = open_socket (AF_INET, SOCK_STREAM, 0); + errno_assert (listener != -1); + + int on = 1; + int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on); + errno_assert (rc != -1); + + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on); + errno_assert (rc != -1); + + rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr); + errno_assert (rc != -1); + + socklen_t lcladdr_len = sizeof lcladdr; + + rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len); + errno_assert (rc != -1); + + rc = listen (listener, 1); + errno_assert (rc != -1); + + *w_ = open_socket (AF_INET, SOCK_STREAM, 0); + errno_assert (*w_ != -1); + + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on); + errno_assert (rc != -1); + + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on); + errno_assert (rc != -1); + + rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr); + errno_assert (rc != -1); + + *r_ = accept (listener, NULL, NULL); + errno_assert (*r_ != -1); + + close (listener); + + return 0; + +#else + // All other implementations support socketpair() + int sv[2]; + int type = SOCK_STREAM; + // Setting this option result in sane behaviour when exec() functions + // are used. Old sockets are closed and don't block TCP ports, avoid + // leaks, etc. +#if defined ZMQ_HAVE_SOCK_CLOEXEC + type |= SOCK_CLOEXEC; +#endif + int rc = socketpair (AF_UNIX, type, 0, sv); + if (rc == -1) { + errno_assert (errno == ENFILE || errno == EMFILE); + *w_ = *r_ = -1; + return -1; + } else { + // If there's no SOCK_CLOEXEC, let's try the second best option. Note that + // race condition can cause socket not to be closed (if fork happens + // between socket creation and this point). +#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC + rc = fcntl (sv[0], F_SETFD, FD_CLOEXEC); + errno_assert (rc != -1); + rc = fcntl (sv[1], F_SETFD, FD_CLOEXEC); + errno_assert (rc != -1); +#endif + *w_ = sv[0]; + *r_ = sv[1]; + return 0; + } +#endif +} diff --git a/src/ip.hpp b/src/ip.hpp index 19cd186b..25fa944b 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -57,6 +57,16 @@ int set_nosigpipe (fd_t s_); // Binds the underlying socket to the given device, eg. VRF or interface void bind_to_device (fd_t s_, std::string &bound_device_); + +// Initialize network subsystem. May be called multiple times. Each call must be matched by a call to shutdown_network. +bool initialize_network (); + +// Shutdown network subsystem. Must be called once for each call to initialize_network before terminating. +void shutdown_network (); + +// Creates a pair of sockets (using signaler_port on OS using TCP sockets). +// Returns -1 if we could not make the socket pair successfully +int make_fdpair (fd_t *r_, fd_t *w_); } #endif diff --git a/src/signaler.cpp b/src/signaler.cpp index 7c396c23..fb8c5e61 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -67,10 +67,6 @@ #include "ip.hpp" #include "tcp.hpp" -#if defined ZMQ_HAVE_EVENTFD -#include -#endif - #if !defined ZMQ_HAVE_WINDOWS #include #include @@ -384,313 +380,3 @@ void zmq::signaler_t::forked () make_fdpair (&r, &w); } #endif - -#if defined ZMQ_HAVE_WINDOWS -static void tune_socket (const SOCKET socket) -{ - BOOL tcp_nodelay = 1; - int rc = setsockopt (socket, IPPROTO_TCP, TCP_NODELAY, - (char *) &tcp_nodelay, sizeof tcp_nodelay); - wsa_assert (rc != SOCKET_ERROR); - - zmq::tcp_tune_loopback_fast_path (socket); -} -#endif - -// Returns -1 if we could not make the socket pair successfully -int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) -{ -#if defined ZMQ_HAVE_EVENTFD - int flags = 0; -#if defined ZMQ_HAVE_EVENTFD_CLOEXEC - // Setting this option result in sane behaviour when exec() functions - // are used. Old sockets are closed and don't block TCP ports, avoid - // leaks, etc. - flags |= EFD_CLOEXEC; -#endif - fd_t fd = eventfd (0, flags); - if (fd == -1) { - errno_assert (errno == ENFILE || errno == EMFILE); - *w_ = *r_ = -1; - return -1; - } else { - *w_ = *r_ = fd; - return 0; - } - -#elif defined ZMQ_HAVE_WINDOWS -#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP - // Windows CE does not manage security attributes - SECURITY_DESCRIPTOR sd; - SECURITY_ATTRIBUTES sa; - memset (&sd, 0, sizeof sd); - memset (&sa, 0, sizeof sa); - - InitializeSecurityDescriptor (&sd, SECURITY_DESCRIPTOR_REVISION); - SetSecurityDescriptorDacl (&sd, TRUE, 0, FALSE); - - sa.nLength = sizeof (SECURITY_ATTRIBUTES); - sa.lpSecurityDescriptor = &sd; -#endif - - // This function has to be in a system-wide critical section so that - // two instances of the library don't accidentally create signaler - // crossing the process boundary. - // We'll use named event object to implement the critical section. - // Note that if the event object already exists, the CreateEvent requests - // EVENT_ALL_ACCESS access right. If this fails, we try to open - // the event object asking for SYNCHRONIZE access only. - HANDLE sync = NULL; - - // Create critical section only if using fixed signaler port - // Use problematic Event implementation for compatibility if using old port 5905. - // Otherwise use Mutex implementation. - int event_signaler_port = 5905; - - if (signaler_port == event_signaler_port) { -#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP - sync = - CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync"); -#else - sync = - CreateEventW (NULL, FALSE, TRUE, L"Global\\zmq-signaler-port-sync"); -#endif - if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) - sync = OpenEventW (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE, - L"Global\\zmq-signaler-port-sync"); - - win_assert (sync != NULL); - } else if (signaler_port != 0) { - wchar_t mutex_name[MAX_PATH]; -#ifdef __MINGW32__ - _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", - signaler_port); -#else - swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", - signaler_port); -#endif - -#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP - sync = CreateMutexW (&sa, FALSE, mutex_name); -#else - sync = CreateMutexW (NULL, FALSE, mutex_name); -#endif - if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) - sync = OpenMutexW (SYNCHRONIZE, FALSE, mutex_name); - - win_assert (sync != NULL); - } - - // Windows has no 'socketpair' function. CreatePipe is no good as pipe - // handles cannot be polled on. Here we create the socketpair by hand. - *w_ = INVALID_SOCKET; - *r_ = INVALID_SOCKET; - - // Create listening socket. - SOCKET listener; - listener = open_socket (AF_INET, SOCK_STREAM, 0); - wsa_assert (listener != INVALID_SOCKET); - - // Set SO_REUSEADDR and TCP_NODELAY on listening socket. - BOOL so_reuseaddr = 1; - int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, - (char *) &so_reuseaddr, sizeof so_reuseaddr); - wsa_assert (rc != SOCKET_ERROR); - - tune_socket (listener); - - // Init sockaddr to signaler port. - struct sockaddr_in addr; - memset (&addr, 0, sizeof addr); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - addr.sin_port = htons (signaler_port); - - // Create the writer socket. - *w_ = open_socket (AF_INET, SOCK_STREAM, 0); - wsa_assert (*w_ != INVALID_SOCKET); - - // Set TCP_NODELAY on writer socket. - tune_socket (*w_); - - if (sync != NULL) { - // Enter the critical section. - DWORD dwrc = WaitForSingleObject (sync, INFINITE); - zmq_assert (dwrc == WAIT_OBJECT_0 || dwrc == WAIT_ABANDONED); - } - - // Bind listening socket to signaler port. - rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr); - - if (rc != SOCKET_ERROR && signaler_port == 0) { - // Retrieve ephemeral port number - int addrlen = sizeof addr; - rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen); - } - - // Listen for incoming connections. - if (rc != SOCKET_ERROR) - rc = listen (listener, 1); - - // Connect writer to the listener. - if (rc != SOCKET_ERROR) - rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr); - - // Accept connection from writer. - if (rc != SOCKET_ERROR) - *r_ = accept (listener, NULL, NULL); - - // Send/receive large chunk to work around TCP slow start - // This code is a workaround for #1608 - if (*r_ != INVALID_SOCKET) { - size_t dummy_size = - 1024 * 1024; // 1M to overload default receive buffer - unsigned char *dummy = (unsigned char *) malloc (dummy_size); - wsa_assert (dummy); - - int still_to_send = (int) dummy_size; - int still_to_recv = (int) dummy_size; - while (still_to_send || still_to_recv) { - int nbytes; - if (still_to_send > 0) { - nbytes = - ::send (*w_, (char *) (dummy + dummy_size - still_to_send), - still_to_send, 0); - wsa_assert (nbytes != SOCKET_ERROR); - still_to_send -= nbytes; - } - nbytes = ::recv (*r_, (char *) (dummy + dummy_size - still_to_recv), - still_to_recv, 0); - wsa_assert (nbytes != SOCKET_ERROR); - still_to_recv -= nbytes; - } - free (dummy); - } - - // Save errno if error occurred in bind/listen/connect/accept. - int saved_errno = 0; - if (*r_ == INVALID_SOCKET) - saved_errno = WSAGetLastError (); - - // We don't need the listening socket anymore. Close it. - rc = closesocket (listener); - wsa_assert (rc != SOCKET_ERROR); - - if (sync != NULL) { - // Exit the critical section. - BOOL brc; - if (signaler_port == event_signaler_port) - brc = SetEvent (sync); - else - brc = ReleaseMutex (sync); - win_assert (brc != 0); - - // Release the kernel object - brc = CloseHandle (sync); - win_assert (brc != 0); - } - - if (*r_ != INVALID_SOCKET) { -#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP - // On Windows, preventing sockets to be inherited by child processes. - BOOL brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0); - win_assert (brc); -#endif - return 0; - } else { - // Cleanup writer if connection failed - if (*w_ != INVALID_SOCKET) { - rc = closesocket (*w_); - wsa_assert (rc != SOCKET_ERROR); - *w_ = INVALID_SOCKET; - } - // Set errno from saved value - errno = wsa_error_to_errno (saved_errno); - return -1; - } - -#elif defined ZMQ_HAVE_OPENVMS - - // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, - // it does not set the socket options TCP_NODELAY and TCP_NODELACK which - // can lead to performance problems. - // - // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll - // create the socket pair manually. - struct sockaddr_in lcladdr; - memset (&lcladdr, 0, sizeof lcladdr); - lcladdr.sin_family = AF_INET; - lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - lcladdr.sin_port = 0; - - int listener = open_socket (AF_INET, SOCK_STREAM, 0); - errno_assert (listener != -1); - - int on = 1; - int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on); - errno_assert (rc != -1); - - rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on); - errno_assert (rc != -1); - - rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr); - errno_assert (rc != -1); - - socklen_t lcladdr_len = sizeof lcladdr; - - rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len); - errno_assert (rc != -1); - - rc = listen (listener, 1); - errno_assert (rc != -1); - - *w_ = open_socket (AF_INET, SOCK_STREAM, 0); - errno_assert (*w_ != -1); - - rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof on); - errno_assert (rc != -1); - - rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on); - errno_assert (rc != -1); - - rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr); - errno_assert (rc != -1); - - *r_ = accept (listener, NULL, NULL); - errno_assert (*r_ != -1); - - close (listener); - - return 0; - -#else - // All other implementations support socketpair() - int sv[2]; - int type = SOCK_STREAM; - // Setting this option result in sane behaviour when exec() functions - // are used. Old sockets are closed and don't block TCP ports, avoid - // leaks, etc. -#if defined ZMQ_HAVE_SOCK_CLOEXEC - type |= SOCK_CLOEXEC; -#endif - int rc = socketpair (AF_UNIX, type, 0, sv); - if (rc == -1) { - errno_assert (errno == ENFILE || errno == EMFILE); - *w_ = *r_ = -1; - return -1; - } else { - // If there's no SOCK_CLOEXEC, let's try the second best option. Note that - // race condition can cause socket not to be closed (if fork happens - // between socket creation and this point). -#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC - rc = fcntl (sv[0], F_SETFD, FD_CLOEXEC); - errno_assert (rc != -1); - rc = fcntl (sv[1], F_SETFD, FD_CLOEXEC); - errno_assert (rc != -1); -#endif - *w_ = sv[0]; - *r_ = sv[1]; - return 0; - } -#endif -} diff --git a/src/signaler.hpp b/src/signaler.hpp index f44b7e20..e8797c7f 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -66,10 +66,6 @@ class signaler_t #endif private: - // Creates a pair of file descriptors that will be used - // to pass the signals. - static int make_fdpair (fd_t *r_, fd_t *w_); - // Underlying write & read file descriptor // Will be -1 if an error occurred during initialization, e.g. we // exceeded the number of available handles diff --git a/src/zmq.cpp b/src/zmq.cpp index 421241dc..59dfc287 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -87,6 +87,7 @@ struct iovec #include "signaler.hpp" #include "socket_poller.hpp" #include "timers.hpp" +#include "ip.hpp" #if defined ZMQ_HAVE_OPENPGM #define __PGM_WININT_H__ @@ -121,42 +122,11 @@ int zmq_errno (void) void *zmq_ctx_new (void) { -#if defined ZMQ_HAVE_OPENPGM - - // Init PGM transport. Ensure threading and timer are enabled. Find PGM - // protocol ID. Note that if you want to use gettimeofday and sleep for - // openPGM timing, set environment variables PGM_TIMER to "GTOD" and - // PGM_SLEEP to "USLEEP". - pgm_error_t *pgm_error = NULL; - const bool ok = pgm_init (&pgm_error); - if (ok != TRUE) { - // Invalid parameters don't set pgm_error_t - zmq_assert (pgm_error != NULL); - if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME - && (pgm_error->code == PGM_ERROR_FAILED)) { - // Failed to access RTC or HPET device. - pgm_error_free (pgm_error); - errno = EINVAL; - return NULL; - } - - // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg. - zmq_assert (false); - } -#endif - -#ifdef ZMQ_HAVE_WINDOWS - // Intialise Windows sockets. Note that WSAStartup can be called multiple - // times given that WSACleanup will be called for each WSAStartup. // We do this before the ctx constructor since its embedded mailbox_t - // object needs Winsock to be up and running. - WORD version_requested = MAKEWORD (2, 2); - WSADATA wsa_data; - int rc = WSAStartup (version_requested, &wsa_data); - zmq_assert (rc == 0); - zmq_assert (LOBYTE (wsa_data.wVersion) == 2 - && HIBYTE (wsa_data.wVersion) == 2); -#endif + // object needs the network to be up and running (at least on Windows). + if (!zmq::initialize_network ()) { + return NULL; + } // Create 0MQ context. zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; @@ -181,17 +151,7 @@ int zmq_ctx_term (void *ctx_) // Shut down only if termination was not interrupted by a signal. if (!rc || en != EINTR) { -#ifdef ZMQ_HAVE_WINDOWS - // On Windows, uninitialise socket layer. - rc = WSACleanup (); - wsa_assert (rc != SOCKET_ERROR); -#endif - -#if defined ZMQ_HAVE_OPENPGM - // Shut down the OpenPGM library. - if (pgm_shutdown () != TRUE) - zmq_assert (false); -#endif + zmq::shutdown_network (); } errno = en; @@ -722,7 +682,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_) } } - // Polling. +// Polling. #if defined ZMQ_HAVE_POLLER inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) diff --git a/unittests/unittest_poller.cpp b/unittests/unittest_poller.cpp index 4df2f9b8..58e2c3b1 100644 --- a/unittests/unittest_poller.cpp +++ b/unittests/unittest_poller.cpp @@ -20,9 +20,15 @@ along with this program. If not, see . #include "../tests/testutil.hpp" #include +#include +#include #include +#ifndef _WIN32 +#define closesocket close +#endif + void setUp () { } @@ -36,12 +42,177 @@ void test_create () zmq::poller_t poller (thread_ctx); } +#if 0 +// TODO this triggers an assertion. should it be a valid use case? +void test_start_empty () +{ + zmq::thread_ctx_t thread_ctx; + zmq::poller_t poller (thread_ctx); + poller.start (); + msleep (SETTLE_TIME); +} +#endif + +struct test_events_t : zmq::i_poll_events +{ + test_events_t (zmq::fd_t fd_, zmq::poller_t &poller_) : + fd (fd_), + poller (poller_) + { + } + + virtual void in_event () + { + in_events.add (1); + poller.rm_fd (handle); + handle = (zmq::poller_t::handle_t) NULL; + } + + + virtual void out_event () + { + // TODO + } + + + virtual void timer_event (int id_) + { + LIBZMQ_UNUSED (id_); + timer_events.add (1); + poller.rm_fd (handle); + handle = (zmq::poller_t::handle_t) NULL; + } + + void set_handle (zmq::poller_t::handle_t handle_) { handle = handle_; } + + zmq::atomic_counter_t in_events, timer_events; + + private: + zmq::fd_t fd; + zmq::poller_t &poller; + zmq::poller_t::handle_t handle; +}; + +void wait_in_events (test_events_t &events) +{ + void *watch = zmq_stopwatch_start (); + while (events.in_events.get () < 1) { +#ifdef ZMQ_BUILD_DRAFT + TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME, + zmq_stopwatch_intermediate (watch), + "Timeout waiting for in event"); +#endif + } + zmq_stopwatch_stop (watch); +} + +void wait_timer_events (test_events_t &events) +{ + void *watch = zmq_stopwatch_start (); + while (events.timer_events.get () < 1) { +#ifdef ZMQ_BUILD_DRAFT + TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (SETTLE_TIME, + zmq_stopwatch_intermediate (watch), + "Timeout waiting for timer event"); +#endif + } + zmq_stopwatch_stop (watch); +} + +void create_nonblocking_fdpair (zmq::fd_t *r, zmq::fd_t *w) +{ + int rc = zmq::make_fdpair (r, w); + TEST_ASSERT_EQUAL_INT (0, rc); + TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *r); + TEST_ASSERT_NOT_EQUAL (zmq::retired_fd, *w); + zmq::unblock_socket (*r); + zmq::unblock_socket (*w); +} + +void send_signal (zmq::fd_t w) +{ +#if defined ZMQ_HAVE_EVENTFD + const uint64_t inc = 1; + ssize_t sz = write (w, &inc, sizeof (inc)); + assert (sz == sizeof (inc)); +#else + { + char msg[] = "test"; + int rc = send (w, msg, sizeof (msg), 0); + assert (rc == sizeof (msg)); + } +#endif +} + +void close_fdpair (zmq::fd_t w, zmq::fd_t r) +{ + int rc = closesocket (w); + TEST_ASSERT_EQUAL_INT (0, rc); +#if !defined ZMQ_HAVE_EVENTFD + rc = closesocket (r); + TEST_ASSERT_EQUAL_INT (0, rc); +#else + LIBZMQ_UNUSED (r); +#endif +} + +void test_add_fd_and_start_and_receive_data () +{ + zmq::thread_ctx_t thread_ctx; + zmq::poller_t poller (thread_ctx); + + zmq::fd_t r, w; + create_nonblocking_fdpair (&r, &w); + + test_events_t events (r, poller); + + zmq::poller_t::handle_t handle = poller.add_fd (r, &events); + events.set_handle (handle); + poller.set_pollin (handle); + poller.start (); + + send_signal (w); + + wait_in_events (events); + + // required cleanup + close_fdpair (w, r); +} + +void test_add_fd_and_remove_by_timer () +{ + zmq::fd_t r, w; + create_nonblocking_fdpair (&r, &w); + + zmq::thread_ctx_t thread_ctx; + zmq::poller_t poller (thread_ctx); + + test_events_t events (r, poller); + + zmq::poller_t::handle_t handle = poller.add_fd (r, &events); + events.set_handle (handle); + + poller.add_timer (50, &events, 0); + poller.start (); + + wait_timer_events (events); + + // required cleanup + close_fdpair (w, r); +} + int main (void) { + UNITY_BEGIN (); + + zmq::initialize_network (); setup_test_environment (); - UNITY_BEGIN (); RUN_TEST (test_create); + RUN_TEST (test_add_fd_and_start_and_receive_data); + RUN_TEST (test_add_fd_and_remove_by_timer); + + zmq::shutdown_network (); return UNITY_END (); } diff --git a/unittests/unittest_ypipe.cpp b/unittests/unittest_ypipe.cpp index 75c91d23..c99717fb 100644 --- a/unittests/unittest_ypipe.cpp +++ b/unittests/unittest_ypipe.cpp @@ -35,12 +35,53 @@ void test_create () zmq::ypipe_t ypipe; } +void test_check_read_empty () +{ + zmq::ypipe_t ypipe; + TEST_ASSERT_FALSE (ypipe.check_read ()); +} + +void test_read_empty () +{ + zmq::ypipe_t ypipe; + int read_value = -1; + TEST_ASSERT_FALSE (ypipe.read (&read_value)); + TEST_ASSERT_EQUAL (-1, read_value); +} + +void test_write_complete_and_check_read_and_read () +{ + const int value = 42; + zmq::ypipe_t ypipe; + ypipe.write (value, false); + TEST_ASSERT_FALSE (ypipe.check_read ()); + int read_value = -1; + TEST_ASSERT_FALSE (ypipe.read (&read_value)); + TEST_ASSERT_EQUAL_INT (-1, read_value); +} + +void test_write_complete_and_flush_and_check_read_and_read () +{ + const int value = 42; + zmq::ypipe_t ypipe; + ypipe.write (value, false); + ypipe.flush (); + TEST_ASSERT_TRUE (ypipe.check_read ()); + int read_value = -1; + TEST_ASSERT_TRUE (ypipe.read (&read_value)); + TEST_ASSERT_EQUAL_INT (value, read_value); +} + int main (void) { setup_test_environment (); UNITY_BEGIN (); RUN_TEST (test_create); + RUN_TEST (test_check_read_empty); + RUN_TEST (test_read_empty); + RUN_TEST (test_write_complete_and_check_read_and_read); + RUN_TEST (test_write_complete_and_flush_and_check_read_and_read); return UNITY_END (); }