Problem: No type-safe alternatives when polling or needing a reference to a socket

Solution: Introduce a socket_ref that is a non-owning nullable reference to a socket
This commit is contained in:
Gudmundur Adalsteinsson 2019-04-07 21:10:04 +00:00
parent b0ac8acd60
commit c6a3529cd1
5 changed files with 443 additions and 209 deletions

View File

@ -20,6 +20,7 @@ add_executable(
message.cpp
context.cpp
socket.cpp
socket_ref.cpp
poller.cpp
active_poller.cpp
multipart.cpp

View File

@ -25,9 +25,11 @@ TEST_CASE("socket create assign", "[socket]")
{
zmq::context_t context;
zmq::socket_t socket(context, ZMQ_ROUTER);
CHECK(static_cast<void*>(socket));
CHECK(static_cast<bool>(socket));
CHECK(socket.handle() != nullptr);
socket = {};
CHECK(!static_cast<void*>(socket));
CHECK(!static_cast<bool>(socket));
CHECK(socket.handle() == nullptr);
}
TEST_CASE("socket create by enum and destroy", "[socket]")
@ -75,7 +77,7 @@ TEST_CASE("socket proxy", "[socket]")
auto s3 = std::move(capture);
try
{
zmq::proxy(s1, s2, &s3);
zmq::proxy(s1, s2, zmq::socket_ref(s3));
}
catch (const zmq::error_t& e)
{
@ -102,7 +104,7 @@ TEST_CASE("socket proxy steerable", "[socket]")
auto s3 = std::move(control);
try
{
zmq::proxy_steerable(s1, s2, ZMQ_NULLPTR, &s3);
zmq::proxy_steerable(s1, s2, zmq::socket_ref(), s3);
}
catch (const zmq::error_t& e)
{

116
tests/socket_ref.cpp Normal file
View File

@ -0,0 +1,116 @@
#include <catch.hpp>
#include <zmq.hpp>
#ifdef ZMQ_CPP11
#ifdef ZMQ_CPP17
static_assert(std::is_nothrow_swappable_v<zmq::socket_ref>);
#endif
static_assert(sizeof(zmq::socket_ref) == sizeof(void *), "size mismatch");
static_assert(alignof(zmq::socket_ref) == alignof(void *), "alignment mismatch");
static_assert(std::is_trivially_copyable<zmq::socket_ref>::value,
"needs to be trivially copyable");
TEST_CASE("socket_ref default init", "[socket_ref]")
{
zmq::socket_ref sr;
CHECK(!sr);
CHECK(sr == nullptr);
CHECK(nullptr == sr);
CHECK(sr.handle() == nullptr);
}
TEST_CASE("socket_ref create from nullptr", "[socket_ref]")
{
zmq::socket_ref sr = nullptr;
CHECK(sr == nullptr);
CHECK(sr.handle() == nullptr);
}
TEST_CASE("socket_ref create from handle", "[socket_ref]")
{
void *np = nullptr;
zmq::socket_ref sr{zmq::from_handle, np};
CHECK(sr == nullptr);
CHECK(sr.handle() == nullptr);
}
TEST_CASE("socket_ref compare", "[socket_ref]")
{
zmq::socket_ref sr1;
zmq::socket_ref sr2;
CHECK(sr1 == sr2);
CHECK(!(sr1 != sr2));
}
TEST_CASE("socket_ref compare from socket_t", "[socket_ref]")
{
zmq::context_t context;
zmq::socket_t s1(context, zmq::socket_type::router);
zmq::socket_t s2(context, zmq::socket_type::dealer);
zmq::socket_ref sr1 = s1;
zmq::socket_ref sr2 = s2;
CHECK(sr1);
CHECK(sr2);
CHECK(sr1 == s1);
CHECK(sr2 == s2);
CHECK(sr1.handle() == s1.handle());
CHECK(sr1 != sr2);
CHECK(sr1.handle() != sr2.handle());
CHECK(sr1 != nullptr);
CHECK(nullptr != sr1);
CHECK(sr2 != nullptr);
const bool comp1 = (sr1 < sr2) != (sr1 >= sr2);
CHECK(comp1);
const bool comp2 = (sr1 > sr2) != (sr1 <= sr2);
CHECK(comp2);
std::hash<zmq::socket_ref> hash;
CHECK(hash(sr1) != hash(sr2));
CHECK(hash(sr1) == hash(s1));
}
TEST_CASE("socket_ref assignment", "[socket_ref]")
{
zmq::context_t context;
zmq::socket_t s1(context, zmq::socket_type::router);
zmq::socket_t s2(context, zmq::socket_type::dealer);
zmq::socket_ref sr1 = s1;
zmq::socket_ref sr2 = s2;
sr1 = s2;
CHECK(sr1 == sr2);
CHECK(sr1.handle() == sr2.handle());
sr1 = std::move(sr2);
CHECK(sr1 == sr2);
CHECK(sr1.handle() == sr2.handle());
sr2 = nullptr;
CHECK(sr1 != sr2);
sr1 = nullptr;
CHECK(sr1 == sr2);
}
TEST_CASE("socket_ref swap", "[socket_ref]")
{
zmq::socket_ref sr1;
zmq::socket_ref sr2;
using std::swap;
swap(sr1, sr2);
}
TEST_CASE("socket_ref reinterpret as void*", "[socket_ref]")
{
struct SVP
{
void *p;
};
struct SSR
{
zmq::socket_ref sr;
} ssr;
zmq::context_t context;
zmq::socket_t socket(context, zmq::socket_type::router);
CHECK(socket.handle() != nullptr);
reinterpret_cast<SVP *>(&ssr)->p = socket.handle();
CHECK(ssr.sr == socket);
}
#endif

511
zmq.hpp
View File

@ -56,11 +56,15 @@
#define ZMQ_EXPLICIT explicit
#define ZMQ_OVERRIDE override
#define ZMQ_NULLPTR nullptr
#define ZMQ_CONSTEXPR_FN constexpr
#define ZMQ_CONSTEXPR_VAR constexpr
#else
#define ZMQ_NOTHROW throw()
#define ZMQ_EXPLICIT
#define ZMQ_OVERRIDE
#define ZMQ_NULLPTR 0
#define ZMQ_CONSTEXPR_FN
#define ZMQ_CONSTEXPR_VAR const
#endif
#include <zmq.h>
@ -230,8 +234,6 @@ inline std::tuple<int, int, int> version()
class message_t
{
friend class socket_t;
public:
message_t()
{
@ -346,14 +348,14 @@ class message_t
ZMQ_DEPRECATED("from 4.3.1, use move taking non-const reference instead")
void move(message_t const *msg_)
{
int rc = zmq_msg_move(&msg, const_cast<zmq_msg_t *>(&(msg_->msg)));
int rc = zmq_msg_move(&msg, const_cast<zmq_msg_t *>(msg_->handle()));
if (rc != 0)
throw error_t();
}
void move(message_t &msg_)
{
int rc = zmq_msg_move(&msg, &msg_.msg);
int rc = zmq_msg_move(&msg, msg_.handle());
if (rc != 0)
throw error_t();
}
@ -361,14 +363,14 @@ class message_t
ZMQ_DEPRECATED("from 4.3.1, use copy taking non-const reference instead")
void copy(message_t const *msg_)
{
int rc = zmq_msg_copy(&msg, const_cast<zmq_msg_t *>(&(msg_->msg)));
int rc = zmq_msg_copy(&msg, const_cast<zmq_msg_t *>(msg_->handle()));
if (rc != 0)
throw error_t();
}
void copy(message_t &msg_)
{
int rc = zmq_msg_copy(&msg, &msg_.msg);
int rc = zmq_msg_copy(&msg, msg_.handle());
if (rc != 0)
throw error_t();
}
@ -511,6 +513,9 @@ class message_t
std::swap(msg, other.msg);
}
ZMQ_NODISCARD zmq_msg_t *handle() ZMQ_NOTHROW { return &msg; }
ZMQ_NODISCARD const zmq_msg_t *handle() const ZMQ_NOTHROW { return &msg; }
private:
// The underlying message
zmq_msg_t msg;
@ -610,6 +615,158 @@ inline void swap(context_t &a, context_t &b) ZMQ_NOTHROW {
a.swap(b);
}
namespace detail
{
class socket_base
{
public:
socket_base() ZMQ_NOTHROW : _handle(ZMQ_NULLPTR) {}
ZMQ_EXPLICIT socket_base(void *handle) ZMQ_NOTHROW : _handle(handle) {}
template<typename T> void setsockopt(int option_, T const &optval)
{
setsockopt(option_, &optval, sizeof(T));
}
void setsockopt(int option_, const void *optval_, size_t optvallen_)
{
int rc = zmq_setsockopt(_handle, option_, optval_, optvallen_);
if (rc != 0)
throw error_t();
}
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
{
int rc = zmq_getsockopt(_handle, option_, optval_, optvallen_);
if (rc != 0)
throw error_t();
}
template<typename T> T getsockopt(int option_) const
{
T optval;
size_t optlen = sizeof(T);
getsockopt(option_, &optval, &optlen);
return optval;
}
void bind(std::string const &addr) { bind(addr.c_str()); }
void bind(const char *addr_)
{
int rc = zmq_bind(_handle, addr_);
if (rc != 0)
throw error_t();
}
void unbind(std::string const &addr) { unbind(addr.c_str()); }
void unbind(const char *addr_)
{
int rc = zmq_unbind(_handle, addr_);
if (rc != 0)
throw error_t();
}
void connect(std::string const &addr) { connect(addr.c_str()); }
void connect(const char *addr_)
{
int rc = zmq_connect(_handle, addr_);
if (rc != 0)
throw error_t();
}
void disconnect(std::string const &addr) { disconnect(addr.c_str()); }
void disconnect(const char *addr_)
{
int rc = zmq_disconnect(_handle, addr_);
if (rc != 0)
throw error_t();
}
bool connected() const ZMQ_NOTHROW { return (_handle != ZMQ_NULLPTR); }
size_t send(const void *buf_, size_t len_, int flags_ = 0)
{
int nbytes = zmq_send(_handle, buf_, len_, flags_);
if (nbytes >= 0)
return (size_t) nbytes;
if (zmq_errno() == EAGAIN)
return 0;
throw error_t();
}
bool send(message_t &msg_, int flags_ = 0)
{
int nbytes = zmq_msg_send(msg_.handle(), _handle, flags_);
if (nbytes >= 0)
return true;
if (zmq_errno() == EAGAIN)
return false;
throw error_t();
}
template<typename T> bool send(T first, T last, int flags_ = 0)
{
zmq::message_t msg(first, last);
return send(msg, flags_);
}
#ifdef ZMQ_HAS_RVALUE_REFS
bool send(message_t &&msg_, int flags_ = 0) { return send(msg_, flags_); }
#endif
size_t recv(void *buf_, size_t len_, int flags_ = 0)
{
int nbytes = zmq_recv(_handle, buf_, len_, flags_);
if (nbytes >= 0)
return (size_t) nbytes;
if (zmq_errno() == EAGAIN)
return 0;
throw error_t();
}
bool recv(message_t *msg_, int flags_ = 0)
{
int nbytes = zmq_msg_recv(msg_->handle(), _handle, flags_);
if (nbytes >= 0)
return true;
if (zmq_errno() == EAGAIN)
return false;
throw error_t();
}
#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
void join(const char* group)
{
int rc = zmq_join(_handle, group);
if (rc != 0)
throw error_t();
}
void leave(const char* group)
{
int rc = zmq_leave(_handle, group);
if (rc != 0)
throw error_t();
}
#endif
ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _handle; }
ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _handle; }
ZMQ_EXPLICIT operator bool() const ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; }
// note: non-const operator bool can be removed once
// operator void* is removed from socket_t
ZMQ_EXPLICIT operator bool() ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; }
protected:
void *_handle;
};
} // namespace detail
#ifdef ZMQ_CPP11
enum class socket_type : int
{
@ -636,22 +793,106 @@ enum class socket_type : int
};
#endif
class socket_t
struct from_handle_t
{
struct _private {}; // disabling use other than with from_handle
ZMQ_CONSTEXPR_FN ZMQ_EXPLICIT from_handle_t(_private /*p*/) ZMQ_NOTHROW {}
};
ZMQ_CONSTEXPR_VAR from_handle_t from_handle = from_handle_t(from_handle_t::_private());
// A non-owning nullable reference to a socket.
// The reference is invalidated on socket close or destruction.
class socket_ref : public detail::socket_base
{
public:
socket_ref() ZMQ_NOTHROW : detail::socket_base() {}
#ifdef ZMQ_CPP11
socket_ref(std::nullptr_t) ZMQ_NOTHROW : detail::socket_base() {}
#endif
socket_ref(from_handle_t /*fh*/, void *handle) ZMQ_NOTHROW
: detail::socket_base(handle) {}
};
#ifdef ZMQ_CPP11
inline bool operator==(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW
{
return sr.handle() == nullptr;
}
inline bool operator==(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW
{
return sr.handle() == nullptr;
}
inline bool operator!=(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW
{
return !(sr == nullptr);
}
inline bool operator!=(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW
{
return !(sr == nullptr);
}
#endif
inline bool operator==(socket_ref a, socket_ref b) ZMQ_NOTHROW
{
return std::equal_to<void*>()(a.handle(), b.handle());
}
inline bool operator!=(socket_ref a, socket_ref b) ZMQ_NOTHROW
{
return !(a == b);
}
inline bool operator<(socket_ref a, socket_ref b) ZMQ_NOTHROW
{
return std::less<void*>()(a.handle(), b.handle());
}
inline bool operator>(socket_ref a, socket_ref b) ZMQ_NOTHROW
{
return b < a;
}
inline bool operator<=(socket_ref a, socket_ref b) ZMQ_NOTHROW
{
return !(a > b);
}
inline bool operator>=(socket_ref a, socket_ref b) ZMQ_NOTHROW
{
return !(a < b);
}
} // namespace zmq
#ifdef ZMQ_CPP11
namespace std
{
template<>
struct hash<zmq::socket_ref>
{
size_t operator()(zmq::socket_ref sr) const ZMQ_NOTHROW
{
return hash<void*>()(sr.handle());
}
};
} // namespace std
#endif
namespace zmq
{
class socket_t : public detail::socket_base
{
friend class monitor_t;
public:
socket_t() ZMQ_NOTHROW
: ptr(ZMQ_NULLPTR)
: detail::socket_base(ZMQ_NULLPTR)
, ctxptr(ZMQ_NULLPTR)
{
}
socket_t(context_t &context_, int type_)
: ptr(zmq_socket(static_cast<void*>(context_), type_))
: detail::socket_base(zmq_socket(static_cast<void*>(context_), type_))
, ctxptr(static_cast<void*>(context_))
{
if (ptr == ZMQ_NULLPTR)
if (_handle == ZMQ_NULLPTR)
throw error_t();
}
@ -663,177 +904,59 @@ class socket_t
#endif
#ifdef ZMQ_HAS_RVALUE_REFS
socket_t(socket_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr), ctxptr(rhs.ctxptr)
socket_t(socket_t &&rhs) ZMQ_NOTHROW : detail::socket_base(rhs._handle), ctxptr(rhs.ctxptr)
{
rhs.ptr = ZMQ_NULLPTR;
rhs._handle = ZMQ_NULLPTR;
rhs.ctxptr = ZMQ_NULLPTR;
}
socket_t &operator=(socket_t &&rhs) ZMQ_NOTHROW
{
std::swap(ptr, rhs.ptr);
std::swap(_handle, rhs._handle);
return *this;
}
#endif
~socket_t() ZMQ_NOTHROW { close(); }
operator void *() ZMQ_NOTHROW { return ptr; }
operator void *() ZMQ_NOTHROW { return _handle; }
operator void const *() const ZMQ_NOTHROW { return ptr; }
operator void const *() const ZMQ_NOTHROW { return _handle; }
void close() ZMQ_NOTHROW
{
if (ptr == ZMQ_NULLPTR)
if (_handle == ZMQ_NULLPTR)
// already closed
return;
int rc = zmq_close(ptr);
int rc = zmq_close(_handle);
ZMQ_ASSERT(rc == 0);
ptr = ZMQ_NULLPTR;
_handle = ZMQ_NULLPTR;
}
template<typename T> void setsockopt(int option_, T const &optval)
{
setsockopt(option_, &optval, sizeof(T));
}
void setsockopt(int option_, const void *optval_, size_t optvallen_)
{
int rc = zmq_setsockopt(ptr, option_, optval_, optvallen_);
if (rc != 0)
throw error_t();
}
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
{
int rc = zmq_getsockopt(ptr, option_, optval_, optvallen_);
if (rc != 0)
throw error_t();
}
template<typename T> T getsockopt(int option_) const
{
T optval;
size_t optlen = sizeof(T);
getsockopt(option_, &optval, &optlen);
return optval;
}
void bind(std::string const &addr) { bind(addr.c_str()); }
void bind(const char *addr_)
{
int rc = zmq_bind(ptr, addr_);
if (rc != 0)
throw error_t();
}
void unbind(std::string const &addr) { unbind(addr.c_str()); }
void unbind(const char *addr_)
{
int rc = zmq_unbind(ptr, addr_);
if (rc != 0)
throw error_t();
}
void connect(std::string const &addr) { connect(addr.c_str()); }
void connect(const char *addr_)
{
int rc = zmq_connect(ptr, addr_);
if (rc != 0)
throw error_t();
}
void disconnect(std::string const &addr) { disconnect(addr.c_str()); }
void disconnect(const char *addr_)
{
int rc = zmq_disconnect(ptr, addr_);
if (rc != 0)
throw error_t();
}
bool connected() const ZMQ_NOTHROW { return (ptr != ZMQ_NULLPTR); }
size_t send(const void *buf_, size_t len_, int flags_ = 0)
{
int nbytes = zmq_send(ptr, buf_, len_, flags_);
if (nbytes >= 0)
return (size_t) nbytes;
if (zmq_errno() == EAGAIN)
return 0;
throw error_t();
}
bool send(message_t &msg_, int flags_ = 0)
{
int nbytes = zmq_msg_send(&(msg_.msg), ptr, flags_);
if (nbytes >= 0)
return true;
if (zmq_errno() == EAGAIN)
return false;
throw error_t();
}
template<typename T> bool send(T first, T last, int flags_ = 0)
{
zmq::message_t msg(first, last);
return send(msg, flags_);
}
#ifdef ZMQ_HAS_RVALUE_REFS
bool send(message_t &&msg_, int flags_ = 0) { return send(msg_, flags_); }
#endif
size_t recv(void *buf_, size_t len_, int flags_ = 0)
{
int nbytes = zmq_recv(ptr, buf_, len_, flags_);
if (nbytes >= 0)
return (size_t) nbytes;
if (zmq_errno() == EAGAIN)
return 0;
throw error_t();
}
bool recv(message_t *msg_, int flags_ = 0)
{
int nbytes = zmq_msg_recv(&(msg_->msg), ptr, flags_);
if (nbytes >= 0)
return true;
if (zmq_errno() == EAGAIN)
return false;
throw error_t();
}
#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
void join(const char* group)
{
int rc = zmq_join(ptr, group);
if (rc != 0)
throw error_t();
}
void leave(const char* group)
{
int rc = zmq_leave(ptr, group);
if (rc != 0)
throw error_t();
}
#endif
void swap(socket_t &other) ZMQ_NOTHROW
{
std::swap(ptr, other.ptr);
std::swap(_handle, other._handle);
std::swap(ctxptr, other.ctxptr);
}
operator socket_ref() ZMQ_NOTHROW
{
return socket_ref(from_handle, _handle);
}
private:
void *ptr;
void *ctxptr;
socket_t(const socket_t &) ZMQ_DELETED_FUNCTION;
void operator=(const socket_t &) ZMQ_DELETED_FUNCTION;
// used by monitor_t
socket_t(void *context_, int type_)
: detail::socket_base(zmq_socket(context_, type_))
, ctxptr(context_)
{
if (_handle == ZMQ_NULLPTR)
throw error_t();
}
};
inline void swap(socket_t &a, socket_t &b) ZMQ_NOTHROW {
@ -848,13 +971,12 @@ inline void proxy(void *frontend, void *backend, void *capture)
throw error_t();
}
inline void proxy(socket_t &frontend, socket_t &backend, socket_t *capture = ZMQ_NULLPTR)
inline void
proxy(socket_ref frontend, socket_ref backend, socket_ref capture = socket_ref())
{
int rc = zmq_proxy(static_cast<void *>(frontend),
static_cast<void *>(backend),
capture ? static_cast<void *>(*capture) : ZMQ_NULLPTR);
int rc = zmq_proxy(frontend.handle(), backend.handle(), capture.handle());
if (rc != 0)
throw error_t();
throw error_t();
}
#ifdef ZMQ_HAS_PROXY_STEERABLE
@ -867,13 +989,13 @@ proxy_steerable(void *frontend, void *backend, void *capture, void *control)
throw error_t();
}
inline void
proxy_steerable(socket_t &frontend, socket_t &backend, socket_t *capture, socket_t *control)
inline void proxy_steerable(socket_ref frontend,
socket_ref backend,
socket_ref capture,
socket_ref control)
{
int rc = zmq_proxy_steerable(static_cast<void *>(frontend),
static_cast<void *>(backend),
capture ? static_cast<void *>(*capture) : ZMQ_NULLPTR,
control ? static_cast<void *>(*control) : ZMQ_NULLPTR);
int rc = zmq_proxy_steerable(frontend.handle(), backend.handle(),
capture.handle(), control.handle());
if (rc != 0)
throw error_t();
}
@ -882,7 +1004,7 @@ proxy_steerable(socket_t &frontend, socket_t &backend, socket_t *capture, socket
class monitor_t
{
public:
monitor_t() : socketPtr(ZMQ_NULLPTR), monitor_socket(ZMQ_NULLPTR) {}
monitor_t() : _socket(), _monitor_socket() {}
virtual ~monitor_t()
{
@ -890,20 +1012,18 @@ class monitor_t
}
#ifdef ZMQ_HAS_RVALUE_REFS
monitor_t(monitor_t &&rhs) ZMQ_NOTHROW : socketPtr(rhs.socketPtr),
monitor_socket(rhs.monitor_socket)
monitor_t(monitor_t &&rhs) ZMQ_NOTHROW : _socket(), _monitor_socket()
{
rhs.socketPtr = ZMQ_NULLPTR;
rhs.monitor_socket = ZMQ_NULLPTR;
std::swap(_socket, rhs._socket);
std::swap(_monitor_socket, rhs._monitor_socket);
}
monitor_t &operator=(monitor_t &&rhs) ZMQ_NOTHROW
{
close();
socketPtr = ZMQ_NULLPTR;
monitor_socket = ZMQ_NULLPTR;
std::swap(socketPtr, rhs.socketPtr);
std::swap(monitor_socket, rhs.monitor_socket);
_socket = socket_ref();
std::swap(_socket, rhs._socket);
std::swap(_monitor_socket, rhs._monitor_socket);
return *this;
}
#endif
@ -930,35 +1050,32 @@ class monitor_t
void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
{
int rc = zmq_socket_monitor(socket.ptr, addr_, events);
int rc = zmq_socket_monitor(socket.handle(), addr_, events);
if (rc != 0)
throw error_t();
socketPtr = socket.ptr;
monitor_socket = zmq_socket(socket.ctxptr, ZMQ_PAIR);
assert(monitor_socket);
rc = zmq_connect(monitor_socket, addr_);
assert(rc == 0);
_socket = socket;
_monitor_socket = socket_t(socket.ctxptr, ZMQ_PAIR);
_monitor_socket.connect(addr_);
on_monitor_started();
}
bool check_event(int timeout = 0)
{
assert(monitor_socket);
assert(_monitor_socket);
zmq_msg_t eventMsg;
zmq_msg_init(&eventMsg);
zmq::pollitem_t items[] = {
{monitor_socket, 0, ZMQ_POLLIN, 0},
{_monitor_socket.handle(), 0, ZMQ_POLLIN, 0},
};
zmq::poll(&items[0], 1, timeout);
if (items[0].revents & ZMQ_POLLIN) {
int rc = zmq_msg_recv(&eventMsg, monitor_socket, 0);
int rc = zmq_msg_recv(&eventMsg, _monitor_socket.handle(), 0);
if (rc == -1 && zmq_errno() == ETERM)
return false;
assert(rc != -1);
@ -982,7 +1099,7 @@ class monitor_t
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
zmq_msg_t addrMsg;
zmq_msg_init(&addrMsg);
int rc = zmq_msg_recv(&addrMsg, monitor_socket, 0);
int rc = zmq_msg_recv(&addrMsg, _monitor_socket.handle(), 0);
if (rc == -1 && zmq_errno() == ETERM) {
zmq_msg_close(&eventMsg);
return false;
@ -1071,10 +1188,10 @@ class monitor_t
#ifdef ZMQ_EVENT_MONITOR_STOPPED
void abort()
{
if (socketPtr)
zmq_socket_monitor(socketPtr, ZMQ_NULLPTR, 0);
if (_socket)
zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0);
socketPtr = ZMQ_NULLPTR;
_socket = socket_ref();
}
#endif
virtual void on_monitor_started() {}
@ -1179,16 +1296,14 @@ class monitor_t
monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION;
void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION;
void *socketPtr;
void *monitor_socket;
socket_ref _socket;
socket_t _monitor_socket;
void close() ZMQ_NOTHROW
{
if (socketPtr)
zmq_socket_monitor(socketPtr, ZMQ_NULLPTR, 0);
if (monitor_socket)
zmq_close(monitor_socket);
if (_socket)
zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0);
_monitor_socket.close();
}
};
@ -1198,26 +1313,26 @@ template<typename T = void> class poller_t
public:
poller_t() = default;
void add(zmq::socket_t &socket, short events, T *user_data)
void add(zmq::socket_ref socket, short events, T *user_data)
{
if (0
!= zmq_poller_add(poller_ptr.get(), static_cast<void *>(socket),
!= zmq_poller_add(poller_ptr.get(), socket.handle(),
user_data, events)) {
throw error_t();
}
}
void remove(zmq::socket_t &socket)
void remove(zmq::socket_ref socket)
{
if (0 != zmq_poller_remove(poller_ptr.get(), static_cast<void *>(socket))) {
if (0 != zmq_poller_remove(poller_ptr.get(), socket.handle())) {
throw error_t();
}
}
void modify(zmq::socket_t &socket, short events)
void modify(zmq::socket_ref socket, short events)
{
if (0
!= zmq_poller_modify(poller_ptr.get(), static_cast<void *>(socket),
!= zmq_poller_modify(poller_ptr.get(), socket.handle(),
events)) {
throw error_t();
}

View File

@ -373,12 +373,12 @@ class active_poller_t
using handler_t = std::function<void(short)>;
void add(zmq::socket_t &socket, short events, handler_t handler)
void add(zmq::socket_ref socket, short events, handler_t handler)
{
auto it = decltype(handlers)::iterator{};
auto inserted = bool{};
std::tie(it, inserted) =
handlers.emplace(static_cast<void *>(socket),
handlers.emplace(socket,
std::make_shared<handler_t>(std::move(handler)));
try {
base_poller.add(socket, events,
@ -388,20 +388,20 @@ class active_poller_t
catch (const zmq::error_t &) {
// rollback
if (inserted) {
handlers.erase(static_cast<void *>(socket));
handlers.erase(socket);
}
throw;
}
}
void remove(zmq::socket_t &socket)
void remove(zmq::socket_ref socket)
{
base_poller.remove(socket);
handlers.erase(static_cast<void *>(socket));
handlers.erase(socket);
need_rebuild = true;
}
void modify(zmq::socket_t &socket, short events)
void modify(zmq::socket_ref socket, short events)
{
base_poller.modify(socket, events);
}
@ -435,7 +435,7 @@ class active_poller_t
bool need_rebuild{false};
poller_t<handler_t> base_poller{};
std::unordered_map<void *, std::shared_ptr<handler_t>> handlers{};
std::unordered_map<socket_ref, std::shared_ptr<handler_t>> handlers{};
std::vector<zmq_poller_event_t> poller_events{};
std::vector<std::shared_ptr<handler_t>> poller_handlers{};
}; // class active_poller_t