diff --git a/src/signaler.cpp b/src/signaler.cpp index 30deb1ec..178a4c0e 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -80,13 +80,10 @@ zmq::signaler_t::signaler_t () { // Create the socketpair for signaling. - int rc = make_fdpair (&r, &w); - errno_assert (rc == 0); - - // Set both fds to non-blocking mode. - unblock_socket (w); - unblock_socket (r); - + if (make_fdpair (&r, &w) == 0) { + unblock_socket (w); + unblock_socket (r); + } #ifdef HAVE_FORK pid = getpid(); #endif @@ -273,7 +270,7 @@ void zmq::signaler_t::forked() // replace the file descriptors created in the parent with new // ones, and close the inherited ones - make_fdpair(&r, &w); + make_fdpair (&r, &w); #if defined ZMQ_HAVE_EVENTFD int rc = close (oldr); errno_assert (rc == 0); @@ -286,22 +283,23 @@ void zmq::signaler_t::forked() } #endif - - - +// Returns -1 if we could not make the socket pair successfully int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) { #if defined ZMQ_HAVE_EVENTFD - - // Create eventfd object. fd_t fd = eventfd (0, 0); - errno_assert (fd != -1); - *w_ = fd; - *r_ = fd; - return 0; + if (fd == -1) { + errno_assert (errno == ENFILE || errno == EMFILE); + *w_ = *r_ = -1; + return -1; + } + else { + *w_ = *r_ = fd; + return 0; + } #elif defined ZMQ_HAVE_WINDOWS -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE // Windows CE does not manage security attributes SECURITY_DESCRIPTOR sd; SECURITY_ATTRIBUTES sa; @@ -313,7 +311,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.lpSecurityDescriptor = &sd; -#endif +# endif // This function has to be in a system-wide critical section so that // two instances of the library don't accidentally create signaler @@ -322,13 +320,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Note that if the event object already exists, the CreateEvent requests // EVENT_ALL_ACCESS access right. If this fails, we try to open // the event object asking for SYNCHRONIZE access only. -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE HANDLE sync = CreateEvent (&sa, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync")); -#else +# else HANDLE sync = CreateEvent (NULL, FALSE, TRUE, TEXT ("Global\\zmq-signaler-port-sync")); -#endif +# endif if (sync == NULL && GetLastError () == ERROR_ACCESS_DENIED) - sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE, FALSE, TEXT ("Global\\zmq-signaler-port-sync")); + sync = OpenEvent (SYNCHRONIZE | EVENT_MODIFY_STATE, + FALSE, TEXT ("Global\\zmq-signaler-port-sync")); win_assert (sync != NULL); @@ -373,13 +372,13 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); wsa_assert (*w_ != INVALID_SOCKET); -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE // On Windows, preventing sockets to be inherited by child processes. BOOL brc = SetHandleInformation ((HANDLE) *w_, HANDLE_FLAG_INHERIT, 0); win_assert (brc); -#else +# else BOOL brc; -#endif +# endif // Set TCP_NODELAY on writer socket. rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, @@ -391,17 +390,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Save errno if connection fails int conn_errno = 0; - if (rc == SOCKET_ERROR) { + if (rc == SOCKET_ERROR) conn_errno = WSAGetLastError (); - } else { + else { // Accept connection from writer. *r_ = accept (listener, NULL, NULL); - - if (*r_ == INVALID_SOCKET) { + if (*r_ == INVALID_SOCKET) conn_errno = WSAGetLastError (); - } } - // We don't need the listening socket anymore. Close it. rc = closesocket (listener); wsa_assert (rc != SOCKET_ERROR); @@ -415,13 +411,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) win_assert (brc != 0); if (*r_ != INVALID_SOCKET) { -#if !defined _WIN32_WCE +# if !defined _WIN32_WCE // On Windows, preventing sockets to be inherited by child processes. brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0); win_assert (brc); -#endif +# endif return 0; - } else { + } + else { // Cleanup writer if connection failed rc = closesocket (*w_); wsa_assert (rc != SOCKET_ERROR); @@ -435,7 +432,6 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Unfortunately, it uses errno_assert() which gives "Unknown error" // We might as well assert here and print the actual error message wsa_assert_no (conn_errno); - return -1; } @@ -463,7 +459,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); errno_assert (rc != -1); - rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); errno_assert (rc != -1); socklen_t lcladdr_len = sizeof (lcladdr); @@ -493,15 +489,20 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) return 0; -#else // All other implementations support socketpair() - +#else + // All other implementations support socketpair() int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); - errno_assert (rc == 0); - *w_ = sv [0]; - *r_ = sv [1]; - return 0; - + if (rc == -1) { + errno_assert (errno == ENFILE || errno == EMFILE); + sv [0] = sv [1] = -1; + return -1; + } + else { + *w_ = sv [0]; + *r_ = sv [1]; + return 0; + } #endif } diff --git a/src/signaler.hpp b/src/signaler.hpp index b951011f..3e0d4f6d 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -58,7 +58,8 @@ namespace zmq // to pass the signals. static int make_fdpair (fd_t *r_, fd_t *w_); - // Underlying write & read file descriptor. + // Underlying write & read file descriptor + // Will be -1 if we exceeded number of available handles fd_t w; fd_t r; @@ -74,7 +75,6 @@ namespace zmq void close_internal(); #endif }; - } #endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b294a27e..76762d2e 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -81,47 +81,49 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, { socket_base_t *s = NULL; switch (type_) { - - case ZMQ_PAIR: - s = new (std::nothrow) pair_t (parent_, tid_, sid_); - break; - case ZMQ_PUB: - s = new (std::nothrow) pub_t (parent_, tid_, sid_); - break; - case ZMQ_SUB: - s = new (std::nothrow) sub_t (parent_, tid_, sid_); - break; - case ZMQ_REQ: - s = new (std::nothrow) req_t (parent_, tid_, sid_); - break; - case ZMQ_REP: - s = new (std::nothrow) rep_t (parent_, tid_, sid_); - break; - case ZMQ_DEALER: - s = new (std::nothrow) dealer_t (parent_, tid_, sid_); - break; - case ZMQ_ROUTER: - s = new (std::nothrow) router_t (parent_, tid_, sid_); - break; - case ZMQ_PULL: - s = new (std::nothrow) pull_t (parent_, tid_, sid_); - break; - case ZMQ_PUSH: - s = new (std::nothrow) push_t (parent_, tid_, sid_); - break; - case ZMQ_XPUB: - s = new (std::nothrow) xpub_t (parent_, tid_, sid_); - break; - case ZMQ_XSUB: - s = new (std::nothrow) xsub_t (parent_, tid_, sid_); - break; - case ZMQ_STREAM: - s = new (std::nothrow) stream_t (parent_, tid_, sid_); - break; - default: - errno = EINVAL; - return NULL; + case ZMQ_PAIR: + s = new (std::nothrow) pair_t (parent_, tid_, sid_); + break; + case ZMQ_PUB: + s = new (std::nothrow) pub_t (parent_, tid_, sid_); + break; + case ZMQ_SUB: + s = new (std::nothrow) sub_t (parent_, tid_, sid_); + break; + case ZMQ_REQ: + s = new (std::nothrow) req_t (parent_, tid_, sid_); + break; + case ZMQ_REP: + s = new (std::nothrow) rep_t (parent_, tid_, sid_); + break; + case ZMQ_DEALER: + s = new (std::nothrow) dealer_t (parent_, tid_, sid_); + break; + case ZMQ_ROUTER: + s = new (std::nothrow) router_t (parent_, tid_, sid_); + break; + case ZMQ_PULL: + s = new (std::nothrow) pull_t (parent_, tid_, sid_); + break; + case ZMQ_PUSH: + s = new (std::nothrow) push_t (parent_, tid_, sid_); + break; + case ZMQ_XPUB: + s = new (std::nothrow) xpub_t (parent_, tid_, sid_); + break; + case ZMQ_XSUB: + s = new (std::nothrow) xsub_t (parent_, tid_, sid_); + break; + case ZMQ_STREAM: + s = new (std::nothrow) stream_t (parent_, tid_, sid_); + break; + default: + errno = EINVAL; + return NULL; } + if (s->mailbox.get_fd () == -1) + return NULL; + alloc_assert (s); return s; }