libzmq/src/zmq.cpp
KIU Shueng Chuan d11f501dc1 problem: not using official api FD_ZERO to init fd_set
solution: fix it

In particular, on Windows, using FD_ZERO is much more efficient than
zeroing out the whole structure.
2017-05-06 08:03:09 +08:00

1532 lines
42 KiB
C++

/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// "Tell them I was a writer.
// A maker of software.
// A humanist. A father.
// And many things.
// But above all, a writer.
// Thank You. :)"
// - Pieter Hintjens
#include "precompiled.hpp"
#define ZMQ_TYPE_UNSAFE
#include "macros.hpp"
#include "poller.hpp"
// On AIX platform, poll.h has to be included first to get consistent
// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
// instead of 'events' and 'revents' and defines macros to map from POSIX-y
// names to AIX-specific names).
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS
#include <poll.h>
#endif
// TODO: determine if this is an issue, since zmq.h is being loaded from pch.
// zmq.h must be included *after* poll.h for AIX to build properly
//#include "../include/zmq.h"
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#endif
// XSI vector I/O
#if defined ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
struct iovec {
void *iov_base;
size_t iov_len;
};
#endif
#include <string.h>
#include <stdlib.h>
#include <new>
#include <climits>
#include "proxy.hpp"
#include "socket_base.hpp"
#include "stdint.hpp"
#include "config.hpp"
#include "likely.hpp"
#include "clock.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "fd.hpp"
#include "metadata.hpp"
#include "signaler.hpp"
#include "socket_poller.hpp"
#include "timers.hpp"
#if defined ZMQ_HAVE_OPENPGM
#define __PGM_WININT_H__
#include <pgm/pgm.h>
#endif
// Compile time check whether msg_t fits into zmq_msg_t.
typedef char check_msg_t_size
[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
void zmq_version (int *major_, int *minor_, int *patch_)
{
*major_ = ZMQ_VERSION_MAJOR;
*minor_ = ZMQ_VERSION_MINOR;
*patch_ = ZMQ_VERSION_PATCH;
}
const char *zmq_strerror (int errnum_)
{
return zmq::errno_to_string (errnum_);
}
int zmq_errno (void)
{
return errno;
}
// New context API
void *zmq_ctx_new (void)
{
#if defined ZMQ_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
// protocol ID. Note that if you want to use gettimeofday and sleep for
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// PGM_SLEEP to "USLEEP".
pgm_error_t *pgm_error = NULL;
const bool ok = pgm_init (&pgm_error);
if (ok != TRUE) {
// Invalid parameters don't set pgm_error_t
zmq_assert (pgm_error != NULL);
if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
pgm_error->code == PGM_ERROR_FAILED)) {
// Failed to access RTC or HPET device.
pgm_error_free (pgm_error);
errno = EINVAL;
return NULL;
}
// PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
zmq_assert (false);
}
#endif
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
// times given that WSACleanup will be called for each WSAStartup.
// We do this before the ctx constructor since its embedded mailbox_t
// object needs Winsock to be up and running.
WORD version_requested = MAKEWORD (2, 2);
WSADATA wsa_data;
int rc = WSAStartup (version_requested, &wsa_data);
zmq_assert (rc == 0);
zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
HIBYTE (wsa_data.wVersion) == 2);
#endif
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
alloc_assert (ctx);
return ctx;
}
int zmq_ctx_term (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
int rc = ((zmq::ctx_t *) ctx_)->terminate ();
int en = errno;
// Shut down only if termination was not interrupted by a signal.
if (!rc || en != EINTR) {
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
rc = WSACleanup ();
wsa_assert (rc != SOCKET_ERROR);
#endif
#if defined ZMQ_HAVE_OPENPGM
// Shut down the OpenPGM library.
if (pgm_shutdown () != TRUE)
zmq_assert (false);
#endif
}
errno = en;
return rc;
}
int zmq_ctx_shutdown (void *ctx_)
{
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t *) ctx_)->shutdown ();
}
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t *) ctx_)->set (option_, optval_);
}
int zmq_ctx_get (void *ctx_, int option_)
{
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t *) ctx_)->get (option_);
}
// Stable/legacy context API
void *zmq_init (int io_threads_)
{
if (io_threads_ >= 0) {
void *ctx = zmq_ctx_new ();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
return ctx;
}
errno = EINVAL;
return NULL;
}
int zmq_term (void *ctx_)
{
return zmq_ctx_term (ctx_);
}
int zmq_ctx_destroy (void *ctx_)
{
return zmq_ctx_term (ctx_);
}
// Sockets
void *zmq_socket (void *ctx_, int type_)
{
if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) {
errno = EFAULT;
return NULL;
}
zmq::ctx_t *ctx = (zmq::ctx_t *) ctx_;
zmq::socket_base_t *s = ctx->create_socket (type_);
return (void *) s;
}
int zmq_close (void *s_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
((zmq::socket_base_t*) s_)->close ();
return 0;
}
int zmq_setsockopt (void *s_, int option_, const void *optval_,
size_t optvallen_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->setsockopt (option_, optval_, optvallen_);
return result;
}
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->getsockopt (option_, optval_, optvallen_);
return result;
}
int zmq_socket_monitor (void *s_, const char *addr_, int events_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->monitor (addr_, events_);
return result;
}
int zmq_join (void *s_, const char* group_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->join (group_);
return result;
}
int zmq_leave (void *s_, const char* group_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->leave (group_);
return result;
}
int zmq_bind (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->bind (addr_);
return result;
}
int zmq_connect (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->connect (addr_);
return result;
}
int zmq_unbind (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (addr_);
}
int zmq_disconnect (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (addr_);
}
// Sending functions.
static inline int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
size_t sz = zmq_msg_size (msg_);
int rc = s_->send ((zmq::msg_t *) msg_, flags_);
if (unlikely (rc < 0))
return -1;
// This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
// int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
size_t max_msgsz = INT_MAX;
// Truncate returned size to INT_MAX to avoid overflow to negative values
return (int) (sz < max_msgsz? sz: max_msgsz);
}
/* To be deprecated once zmq_msg_send() is stable */
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
return zmq_msg_send (msg_, s_, flags_);
}
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq_msg_t msg;
if (zmq_msg_init_size (&msg, len_))
return -1;
// We explicitly allow a send from NULL, size zero
if (len_) {
assert (buf_);
memcpy (zmq_msg_data (&msg), buf_, len_);
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int rc = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
return -1;
}
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
return rc;
}
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq_msg_t msg;
int rc = zmq_msg_init_data (&msg, (void *)buf_, len_, NULL, NULL);
if (rc != 0)
return -1;
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
rc = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
return -1;
}
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
return rc;
}
// Send multiple messages.
// TODO: this function has no man page
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
// ZMQ_SNDMORE bit switched off.
//
int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
if (unlikely (count_ <= 0 || !a_)) {
errno = EINVAL;
return -1;
}
int rc = 0;
zmq_msg_t msg;
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
for (size_t i = 0; i < count_; ++i) {
rc = zmq_msg_init_size (&msg, a_[i].iov_len);
if (rc != 0) {
rc = -1;
break;
}
memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
if (i == count_ - 1)
flags_ = flags_ & ~ZMQ_SNDMORE;
rc = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
rc = -1;
break;
}
}
return rc;
}
// Receiving functions.
static int
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{
int rc = s_->recv ((zmq::msg_t *) msg_, flags_);
if (unlikely (rc < 0))
return -1;
// Truncate returned size to INT_MAX to avoid overflow to negative values
size_t sz = zmq_msg_size (msg_);
return (int) (sz < INT_MAX? sz: INT_MAX);
}
/* To be deprecated once zmq_msg_recv() is stable */
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
return zmq_msg_recv (msg_, s_, flags_);
}
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int nbytes = s_recvmsg (s, &msg, flags_);
if (unlikely (nbytes < 0)) {
int err = errno;
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
errno = err;
return -1;
}
// An oversized message is silently truncated.
size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
// We explicitly allow a null buffer argument if len is zero
if (to_copy) {
assert (buf_);
memcpy (buf_, zmq_msg_data (&msg), to_copy);
}
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
return nbytes;
}
// Receive a multi-part message
//
// Receives up to *count_ parts of a multi-part message.
// Sets *count_ to the actual number of parts read.
// ZMQ_RCVMORE is set to indicate if a complete multi-part message was read.
// Returns number of message parts read, or -1 on error.
//
// Note: even if -1 is returned, some parts of the message
// may have been read. Therefore the client must consult
// *count_ to retrieve message parts successfully read,
// even if -1 is returned.
//
// The iov_base* buffers of each iovec *a_ filled in by this
// function may be freed using free().
// TODO: this function has no man page
//
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
if (unlikely (!count_ || *count_ <= 0 || !a_)) {
errno = EINVAL;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
size_t count = *count_;
int nread = 0;
bool recvmore = true;
*count_ = 0;
for (size_t i = 0; recvmore && i < count; ++i) {
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
int nbytes = s_recvmsg (s, &msg, flags_);
if (unlikely (nbytes < 0)) {
int err = errno;
rc = zmq_msg_close (&msg);
errno_assert (rc == 0);
errno = err;
nread = -1;
break;
}
a_[i].iov_len = zmq_msg_size (&msg);
a_[i].iov_base = static_cast<char *> (malloc(a_[i].iov_len));
if (unlikely (!a_[i].iov_base)) {
errno = ENOMEM;
return -1;
}
memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
a_[i].iov_len);
// Assume zmq_socket ZMQ_RVCMORE is properly set.
zmq::msg_t* p_msg = reinterpret_cast<zmq::msg_t*>(&msg);
recvmore = p_msg->flags() & zmq::msg_t::more;
rc = zmq_msg_close(&msg);
errno_assert (rc == 0);
++*count_;
++nread;
}
return nread;
}
// Message manipulators.
int zmq_msg_init (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->init ();
}
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
return ((zmq::msg_t*) msg_)->init_size (size_);
}
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_, void *hint_)
{
return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
}
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s_sendmsg (s, msg_, flags_);
return result;
}
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s_recvmsg (s, msg_, flags_);
return result;
}
int zmq_msg_close (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->close ();
}
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
}
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
}
void *zmq_msg_data (zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->data ();
}
size_t zmq_msg_size (const zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->size ();
}
int zmq_msg_more (const zmq_msg_t *msg_)
{
return zmq_msg_get (msg_, ZMQ_MORE);
}
int zmq_msg_get (const zmq_msg_t *msg_, int property_)
{
const char* fd_string;
switch (property_) {
case ZMQ_MORE:
return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
case ZMQ_SRCFD:
fd_string = zmq_msg_gets(msg_, "__fd");
if (fd_string == NULL)
return (int)-1;
return atoi(fd_string);
case ZMQ_SHARED:
return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
(((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
default:
errno = EINVAL;
return -1;
}
}
int zmq_msg_set (zmq_msg_t *, int, int)
{
// No properties supported at present
errno = EINVAL;
return -1;
}
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
{
return ((zmq::msg_t *) msg_)->set_routing_id (routing_id_);
}
uint32_t zmq_msg_routing_id (zmq_msg_t *msg_)
{
return ((zmq::msg_t *) msg_)->get_routing_id ();
}
int zmq_msg_set_group (zmq_msg_t *msg_, const char *group_)
{
return ((zmq::msg_t *) msg_)->set_group (group_);
}
const char *zmq_msg_group (zmq_msg_t *msg_)
{
return ((zmq::msg_t *) msg_)->group ();
}
// Get message metadata string
const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
{
zmq::metadata_t *metadata = ((zmq::msg_t *) msg_)->metadata ();
const char *value = NULL;
if (metadata)
value = metadata->get (std::string (property_));
if (value)
return value;
else {
errno = EINVAL;
return NULL;
}
}
// Polling.
#if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
// implement zmq_poll on top of zmq_poller
int rc;
zmq_poller_event_t *events;
zmq::socket_poller_t poller;
events = new (std::nothrow) zmq_poller_event_t[nitems_];
alloc_assert(events);
bool repeat_items = false;
// Register sockets with poller
for (int i = 0; i < nitems_; i++) {
items_[i].revents = 0;
bool modify = false;
short e = items_[i].events;
if (items_[i].socket) {
// Poll item is a 0MQ socket.
for (int j = 0; j < i; ++j) {
// Check for repeat entries
if (items_[j].socket == items_[i].socket) {
repeat_items = true;
modify = true;
e |= items_[j].events;
}
}
if (modify) {
rc = zmq_poller_modify (&poller, items_[i].socket, e);
} else {
rc = zmq_poller_add (&poller, items_[i].socket, NULL, e);
}
if (rc < 0) {
delete [] events;
return rc;
}
} else {
// Poll item is a raw file descriptor.
for (int j = 0; j < i; ++j) {
// Check for repeat entries
if (!items_[j].socket && items_[j].fd == items_[i].fd) {
repeat_items = true;
modify = true;
e |= items_[j].events;
}
}
if (modify) {
rc = zmq_poller_modify_fd (&poller, items_[i].fd, e);
} else {
rc = zmq_poller_add_fd (&poller, items_[i].fd, NULL, e);
}
if (rc < 0) {
delete [] events;
return rc;
}
}
}
// Wait for events
rc = zmq_poller_wait_all (&poller, events, nitems_, timeout_);
if (rc < 0) {
delete [] events;
if (zmq_errno() == ETIMEDOUT) {
return 0;
}
return rc;
}
// Transform poller events into zmq_pollitem events.
// items_ contains all items, while events only contains fired events.
// If no sockets are repeated (likely), the two are still co-ordered, so step through the items
// checking for matches only on the first event.
// If there are repeat items, they cannot be assumed to be co-ordered,
// so each pollitem must check fired events from the beginning.
int j_start = 0, found_events = rc;
for (int i = 0; i < nitems_; i++) {
for (int j = j_start; j < found_events; ++j) {
if (
(items_[i].socket && items_[i].socket == events[j].socket) ||
(!(items_[i].socket || events[j].socket) && items_[i].fd == events[j].fd)
) {
items_[i].revents = events[j].events & items_[i].events;
if (!repeat_items) {
// no repeats, we can ignore events we've already seen
j_start++;
}
break;
}
if (!repeat_items) {
// no repeats, never have to look at j > j_start
break;
}
}
}
// Cleanup
delete [] events;
return rc;
}
#endif // ZMQ_HAVE_POLLER
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
// TODO: the function implementation can just call zmq_pollfd_poll with
// pollfd as NULL, however pollfd is not yet stable.
#if defined ZMQ_HAVE_POLLER
// if poller is present, use that.
return zmq_poller_poll(items_, nitems_, timeout_);
#else
#if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
if (!items_) {
errno = EFAULT;
return -1;
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
pollfd spollfds[ZMQ_POLLITEMS_DFLT];
pollfd *pollfds = spollfds;
if (nitems_ > ZMQ_POLLITEMS_DFLT) {
pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
alloc_assert (pollfds);
}
// Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket, we poll on the file descriptor
// retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
&zmq_fd_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
pollfds [i].events = items_ [i].events ? POLLIN : 0;
}
// Else, the poll item is a raw file descriptor. Just convert the
// events to normal POLLIN/POLLOUT for poll ().
else {
pollfds [i].fd = items_ [i].fd;
pollfds [i].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
(items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
}
}
bool first_pass = true;
int nevents = 0;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
{
int rc = poll (pollfds, nitems_, timeout);
if (rc == -1 && errno == EINTR) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
errno_assert (rc >= 0);
}
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (pollfds [i].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
if (pollfds [i].revents & POLLPRI)
items_ [i].revents |= ZMQ_POLLPRI;
if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
if (pollfds != spollfds)
free (pollfds);
return nevents;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (nitems_ <= FD_SETSIZE);
fd_set pollset_in;
FD_ZERO (&pollset_in);
fd_set pollset_out;
FD_ZERO (&pollset_out);
fd_set pollset_err;
FD_ZERO (&pollset_err);
zmq::fd_t maxfd = 0;
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
zmq::fd_t notify_fd;
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size) == -1)
return -1;
if (items_ [i].events) {
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
maxfd = notify_fd;
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (items_ [i].events & ZMQ_POLLIN)
FD_SET (items_ [i].fd, &pollset_in);
if (items_ [i].events & ZMQ_POLLOUT)
FD_SET (items_ [i].fd, &pollset_out);
if (items_ [i].events & ZMQ_POLLERR)
FD_SET (items_ [i].fd, &pollset_err);
if (maxfd < items_ [i].fd)
maxfd = items_ [i].fd;
}
}
bool first_pass = true;
int nevents = 0;
fd_set inset, outset, errset;
while (true) {
// Compute the timeout for the subsequent poll.
timeval timeout;
timeval *ptimeout;
if (first_pass) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
ptimeout = &timeout;
}
else
if (timeout_ < 0)
ptimeout = NULL;
else {
timeout.tv_sec = (long) ((end - now) / 1000);
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
ptimeout = &timeout;
}
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
#if defined ZMQ_HAVE_WINDOWS
// On Windows we don't need to copy the whole fd_set.
// SOCKETS are continuous from the beginning of fd_array in fd_set.
// We just need to copy fd_count elements of fd_array.
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
memcpy (&inset, &pollset_in , (char *) (pollset_in.fd_array + pollset_in.fd_count ) - (char *) &pollset_in );
memcpy (&outset, &pollset_out, (char *) (pollset_out.fd_array + pollset_out.fd_count) - (char *) &pollset_out);
memcpy (&errset, &pollset_err, (char *) (pollset_err.fd_array + pollset_err.fd_count) - (char *) &pollset_err);
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
}
#endif
break;
}
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1)
return -1;
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
return nevents;
#else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif
#endif // ZMQ_HAVE_POLLER
}
// The poller functionality
void *zmq_poller_new (void)
{
zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t;
alloc_assert (poller);
return poller;
}
int zmq_poller_destroy (void **poller_p_)
{
void *poller;
if (!poller_p_ || !(poller = *poller_p_) ||
!((zmq::socket_poller_t*) poller)->check_tag ()) {
errno = EFAULT;
return -1;
}
delete ((zmq::socket_poller_t*) poller);
*poller_p_ = NULL;
return 0;
}
int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->add (socket, user_data_, events_);
}
#if defined _WIN32
int zmq_poller_add_fd (void *poller_, SOCKET fd_, void *user_data_, short events_)
#else
int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
#endif
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->add_fd (fd_, user_data_, events_);
}
int zmq_poller_modify (void *poller_, void *s_, short events_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->modify (socket, events_);
}
#if defined _WIN32
int zmq_poller_modify_fd (void *poller_, SOCKET fd_, short events_)
#else
int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
#endif
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->modify_fd (fd_, events_);
}
int zmq_poller_remove (void *poller_, void *s_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->remove (socket);
}
#if defined _WIN32
int zmq_poller_remove_fd (void *poller_, SOCKET fd_)
#else
int zmq_poller_remove_fd (void *poller_, int fd_)
#endif
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_);
}
int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
zmq_assert (event != NULL);
int rc = zmq_poller_wait_all(poller_, event, 1, timeout_);
if (rc < 0) {
memset (event, 0, sizeof(zmq_poller_event_t));
}
// wait_all returns number of events, but we return 0 for any success
return rc >= 0 ? 0 : rc;
}
int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, int n_events, long timeout_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (n_events < 0) {
errno = EINVAL;
return -1;
}
zmq_assert (events != NULL);
int rc = ((zmq::socket_poller_t*)poller_)->wait ((zmq::socket_poller_t::event_t *)events, n_events, timeout_);
return rc;
}
// Timers
void *zmq_timers_new (void)
{
zmq::timers_t *timers = new (std::nothrow) zmq::timers_t;
alloc_assert (timers);
return timers;
}
int zmq_timers_destroy (void **timers_p_)
{
void *timers = *timers_p_;
if (!timers || !((zmq::timers_t *) timers)->check_tag ()) {
errno = EFAULT;
return -1;
}
delete ((zmq::timers_t *) timers);
*timers_p_ = NULL;
return 0;
}
int zmq_timers_add (void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->add (interval_, handler_, arg_);
}
int zmq_timers_cancel (void *timers_, int timer_id_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->cancel (timer_id_);
}
int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->set_interval (timer_id_, interval_);
}
int zmq_timers_reset (void *timers_, int timer_id_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->reset (timer_id_);
}
long zmq_timers_timeout (void *timers_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->timeout ();
}
int zmq_timers_execute (void *timers_)
{
if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::timers_t*)timers_)->execute ();
}
// The proxy functionality
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_);
}
int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_);
}
// The deprecated device functionality
int zmq_device (int /* type */, void *frontend_, void *backend_)
{
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_, NULL);
}
// Probe library capabilities; for now, reports on transport and security
int zmq_has (const char *capability)
{
#if !defined (ZMQ_HAVE_WINDOWS) && !defined (ZMQ_HAVE_OPENVMS)
if (strcmp (capability, "ipc") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_OPENPGM)
if (strcmp (capability, "pgm") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_TIPC)
if (strcmp (capability, "tipc") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_NORM)
if (strcmp (capability, "norm") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_CURVE)
if (strcmp (capability, "curve") == 0)
return true;
#endif
#if defined (HAVE_LIBGSSAPI_KRB5)
if (strcmp (capability, "gssapi") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_VMCI)
if (strcmp (capability, "vmci") == 0)
return true;
#endif
#if defined (ZMQ_BUILD_DRAFT_API)
if (strcmp (capability, "draft") == 0)
return true;
#endif
// Whatever the application asked for, we don't have
return false;
}