Problem: Socket options lack type-safety (#393)

Solution: Implement a type for each socket option.

Each option has an associated type, therefore they
can't be simply defined as an enum class.

Use new sockopt getter in test util

Add socket option get function for strings and tests
This commit is contained in:
Gudmundur Adalsteinsson
2020-03-24 08:33:50 +00:00
committed by GitHub
parent 10431084bb
commit a3e5b54c3c
3 changed files with 846 additions and 37 deletions

429
zmq.hpp
View File

@@ -80,6 +80,11 @@
#define ZMQ_CONSTEXPR_VAR const
#define ZMQ_CPP11_DEPRECATED(msg)
#endif
#if defined(ZMQ_CPP17)
#define ZMQ_INLINE_VAR inline
#else
#define ZMQ_INLINE_VAR
#endif
#include <zmq.h>
@@ -1268,6 +1273,297 @@ constexpr const_buffer operator"" _zbuf(const char32_t *str, size_t len) noexcep
#endif // ZMQ_CPP11
#ifdef ZMQ_CPP11
namespace sockopt
{
// There are two types of options,
// integral type with known compiler time size (int, bool, int64_t, uint64_t)
// and arrays with dynamic size (strings, binary data).
// BoolUnit: if true accepts values of type bool (but passed as T into libzmq)
template<int Opt, class T, bool BoolUnit = false> struct integral_option
{
};
// NullTerm:
// 0: binary data
// 1: null-terminated string (`getsockopt` size includes null)
// 2: binary (size 32) or Z85 encoder string of size 41 (null included)
template<int Opt, int NullTerm = 1> struct array_option
{
};
#define ZMQ_DEFINE_INTEGRAL_OPT(OPT, NAME, TYPE) \
using NAME##_t = integral_option<OPT, TYPE, false>; \
ZMQ_INLINE_VAR ZMQ_CONSTEXPR_VAR NAME##_t NAME
#define ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(OPT, NAME, TYPE) \
using NAME##_t = integral_option<OPT, TYPE, true>; \
ZMQ_INLINE_VAR ZMQ_CONSTEXPR_VAR NAME##_t NAME
#define ZMQ_DEFINE_ARRAY_OPT(OPT, NAME) \
using NAME##_t = array_option<OPT>; \
ZMQ_INLINE_VAR ZMQ_CONSTEXPR_VAR NAME##_t NAME
#define ZMQ_DEFINE_ARRAY_OPT_BINARY(OPT, NAME) \
using NAME##_t = array_option<OPT, 0>; \
ZMQ_INLINE_VAR ZMQ_CONSTEXPR_VAR NAME##_t NAME
#define ZMQ_DEFINE_ARRAY_OPT_BIN_OR_Z85(OPT, NAME) \
using NAME##_t = array_option<OPT, 2>; \
ZMQ_INLINE_VAR ZMQ_CONSTEXPR_VAR NAME##_t NAME
// duplicate definition from libzmq 4.3.3
#if defined _WIN32
#if defined _WIN64
typedef unsigned __int64 cppzmq_fd_t;
#else
typedef unsigned int cppzmq_fd_t;
#endif
#else
typedef int cppzmq_fd_t;
#endif
#ifdef ZMQ_AFFINITY
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_AFFINITY, affinity, uint64_t);
#endif
#ifdef ZMQ_BACKLOG
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_BACKLOG, backlog, int);
#endif
#ifdef ZMQ_BINDTODEVICE
ZMQ_DEFINE_ARRAY_OPT_BINARY(ZMQ_BINDTODEVICE, bindtodevice);
#endif
#ifdef ZMQ_CONFLATE
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_CONFLATE, conflate, int);
#endif
#ifdef ZMQ_CONNECT_ROUTING_ID
ZMQ_DEFINE_ARRAY_OPT(ZMQ_CONNECT_ROUTING_ID, connect_routing_id);
#endif
#ifdef ZMQ_CONNECT_TIMEOUT
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_CONNECT_TIMEOUT, connect_timeout, int);
#endif
#ifdef ZMQ_CURVE_PUBLICKEY
ZMQ_DEFINE_ARRAY_OPT_BIN_OR_Z85(ZMQ_CURVE_PUBLICKEY, curve_publickey);
#endif
#ifdef ZMQ_CURVE_SECRETKEY
ZMQ_DEFINE_ARRAY_OPT_BIN_OR_Z85(ZMQ_CURVE_SECRETKEY, curve_secretkey);
#endif
#ifdef ZMQ_CURVE_SERVER
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_CURVE_SERVER, curve_server, int);
#endif
#ifdef ZMQ_CURVE_SERVERKEY
ZMQ_DEFINE_ARRAY_OPT_BIN_OR_Z85(ZMQ_CURVE_SERVERKEY, curve_serverkey);
#endif
#ifdef ZMQ_EVENTS
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_EVENTS, events, int);
#endif
#ifdef ZMQ_FD
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_FD, fd, cppzmq_fd_t);
#endif
#ifdef ZMQ_GSSAPI_PLAINTEXT
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_GSSAPI_PLAINTEXT, gssapi_plaintext, int);
#endif
#ifdef ZMQ_GSSAPI_SERVER
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_GSSAPI_SERVER, gssapi_server, int);
#endif
#ifdef ZMQ_GSSAPI_SERVICE_PRINCIPAL
ZMQ_DEFINE_ARRAY_OPT(ZMQ_GSSAPI_SERVICE_PRINCIPAL, gssapi_service_principal);
#endif
#ifdef ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE,
gssapi_service_principal_nametype,
int);
#endif
#ifdef ZMQ_GSSAPI_PRINCIPAL
ZMQ_DEFINE_ARRAY_OPT(ZMQ_GSSAPI_PRINCIPAL, gssapi_principal);
#endif
#ifdef ZMQ_GSSAPI_PRINCIPAL_NAMETYPE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_GSSAPI_PRINCIPAL_NAMETYPE,
gssapi_principal_nametype,
int);
#endif
#ifdef ZMQ_HANDSHAKE_IVL
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_HANDSHAKE_IVL, handshake_ivl, int);
#endif
#ifdef ZMQ_HEARTBEAT_IVL
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_HEARTBEAT_IVL, heartbeat_ivl, int);
#endif
#ifdef ZMQ_HEARTBEAT_TIMEOUT
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_HEARTBEAT_TIMEOUT, heartbeat_timeout, int);
#endif
#ifdef ZMQ_HEARTBEAT_TTL
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_HEARTBEAT_TTL, heartbeat_ttl, int);
#endif
#ifdef ZMQ_IMMEDIATE
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_IMMEDIATE, immediate, int);
#endif
#ifdef ZMQ_INVERT_MATCHING
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_INVERT_MATCHING, invert_matching, int);
#endif
#ifdef ZMQ_IPV6
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_IPV6, ipv6, int);
#endif
#ifdef ZMQ_LAST_ENDPOINT
ZMQ_DEFINE_ARRAY_OPT(ZMQ_LAST_ENDPOINT, last_endpoint);
#endif
#ifdef ZMQ_LINGER
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_LINGER, linger, int);
#endif
#ifdef ZMQ_MAXMSGSIZE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_MAXMSGSIZE, maxmsgsize, int64_t);
#endif
#ifdef ZMQ_MECHANISM
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_MECHANISM, mechanism, int);
#endif
#ifdef ZMQ_METADATA
ZMQ_DEFINE_ARRAY_OPT(ZMQ_METADATA, metadata);
#endif
#ifdef ZMQ_MULTICAST_HOPS
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_MULTICAST_HOPS, multicast_hops, int);
#endif
#ifdef ZMQ_MULTICAST_LOOP
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_MULTICAST_LOOP, multicast_loop, int);
#endif
#ifdef ZMQ_MULTICAST_MAXTPDU
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_MULTICAST_MAXTPDU, multicast_maxtpdu, int);
#endif
#ifdef ZMQ_PLAIN_SERVER
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_PLAIN_SERVER, plain_server, int);
#endif
#ifdef ZMQ_PLAIN_PASSWORD
ZMQ_DEFINE_ARRAY_OPT(ZMQ_PLAIN_PASSWORD, plain_password);
#endif
#ifdef ZMQ_PLAIN_USERNAME
ZMQ_DEFINE_ARRAY_OPT(ZMQ_PLAIN_USERNAME, plain_username);
#endif
#ifdef ZMQ_USE_FD
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_USE_FD, use_fd, int);
#endif
#ifdef ZMQ_PROBE_ROUTER
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_PROBE_ROUTER, probe_router, int);
#endif
#ifdef ZMQ_RATE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RATE, rate, int);
#endif
#ifdef ZMQ_RCVBUF
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RCVBUF, rcvbuf, int);
#endif
#ifdef ZMQ_RCVHWM
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RCVHWM, rcvhwm, int);
#endif
#ifdef ZMQ_RCVMORE
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_RCVMORE, rcvmore, int);
#endif
#ifdef ZMQ_RCVTIMEO
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RCVTIMEO, rcvtimeo, int);
#endif
#ifdef ZMQ_RECONNECT_IVL
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RECONNECT_IVL, reconnect_ivl, int);
#endif
#ifdef ZMQ_RECONNECT_IVL_MAX
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RECONNECT_IVL_MAX, reconnect_ivl_max, int);
#endif
#ifdef ZMQ_RECOVERY_IVL
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_RECOVERY_IVL, recovery_ivl, int);
#endif
#ifdef ZMQ_REQ_CORRELATE
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_REQ_CORRELATE, req_correlate, int);
#endif
#ifdef ZMQ_REQ_RELAXED
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_REQ_RELAXED, req_relaxed, int);
#endif
#ifdef ZMQ_ROUTER_HANDOVER
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_ROUTER_HANDOVER, router_handover, int);
#endif
#ifdef ZMQ_ROUTER_MANDATORY
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_ROUTER_MANDATORY, router_mandatory, int);
#endif
#ifdef ZMQ_ROUTER_NOTIFY
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_ROUTER_NOTIFY, router_notify, int);
#endif
#ifdef ZMQ_ROUTING_ID
ZMQ_DEFINE_ARRAY_OPT_BINARY(ZMQ_ROUTING_ID, routing_id);
#endif
#ifdef ZMQ_SNDBUF
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_SNDBUF, sndbuf, int);
#endif
#ifdef ZMQ_SNDHWM
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_SNDHWM, sndhwm, int);
#endif
#ifdef ZMQ_SNDTIMEO
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_SNDTIMEO, sndtimeo, int);
#endif
#ifdef ZMQ_SOCKS_PROXY
ZMQ_DEFINE_ARRAY_OPT(ZMQ_SOCKS_PROXY, socks_proxy);
#endif
#ifdef ZMQ_STREAM_NOTIFY
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_STREAM_NOTIFY, stream_notify, int);
#endif
#ifdef ZMQ_SUBSCRIBE
ZMQ_DEFINE_ARRAY_OPT(ZMQ_SUBSCRIBE, subscribe);
#endif
#ifdef ZMQ_TCP_KEEPALIVE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_TCP_KEEPALIVE, tcp_keepalive, int);
#endif
#ifdef ZMQ_TCP_KEEPALIVE_CNT
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_TCP_KEEPALIVE_CNT, tcp_keepalive_cnt, int);
#endif
#ifdef ZMQ_TCP_KEEPALIVE_IDLE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_TCP_KEEPALIVE_IDLE, tcp_keepalive_idle, int);
#endif
#ifdef ZMQ_TCP_KEEPALIVE_INTVL
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_TCP_KEEPALIVE_INTVL, tcp_keepalive_intvl, int);
#endif
#ifdef ZMQ_TCP_MAXRT
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_TCP_MAXRT, tcp_maxrt, int);
#endif
#ifdef ZMQ_THREAD_SAFE
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_THREAD_SAFE, thread_safe, int);
#endif
#ifdef ZMQ_TOS
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_TOS, tos, int);
#endif
#ifdef ZMQ_TYPE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_TYPE, type, int);
#endif
#ifdef ZMQ_UNSUBSCRIBE
ZMQ_DEFINE_ARRAY_OPT(ZMQ_UNSUBSCRIBE, unsubscribe);
#endif
#ifdef ZMQ_VMCI_BUFFER_SIZE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_VMCI_BUFFER_SIZE, vmci_buffer_size, uint64_t);
#endif
#ifdef ZMQ_VMCI_BUFFER_MIN_SIZE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_VMCI_BUFFER_MIN_SIZE, vmci_buffer_min_size, uint64_t);
#endif
#ifdef ZMQ_VMCI_BUFFER_MAX_SIZE
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_VMCI_BUFFER_MAX_SIZE, vmci_buffer_max_size, uint64_t);
#endif
#ifdef ZMQ_VMCI_CONNECT_TIMEOUT
ZMQ_DEFINE_INTEGRAL_OPT(ZMQ_VMCI_CONNECT_TIMEOUT, vmci_connect_timeout, int);
#endif
#ifdef ZMQ_XPUB_VERBOSE
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_XPUB_VERBOSE, xpub_verbose, int);
#endif
#ifdef ZMQ_XPUB_VERBOSER
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_XPUB_VERBOSER, xpub_verboser, int);
#endif
#ifdef ZMQ_XPUB_MANUAL
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_XPUB_MANUAL, xpub_manual, int);
#endif
#ifdef ZMQ_XPUB_NODROP
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_XPUB_NODROP, xpub_nodrop, int);
#endif
#ifdef ZMQ_XPUB_WELCOME_MSG
ZMQ_DEFINE_ARRAY_OPT(ZMQ_XPUB_WELCOME_MSG, xpub_welcome_msg);
#endif
#ifdef ZMQ_ZAP_ENFORCE_DOMAIN
ZMQ_DEFINE_INTEGRAL_BOOL_UNIT_OPT(ZMQ_ZAP_ENFORCE_DOMAIN, zap_enforce_domain, int);
#endif
#ifdef ZMQ_ZAP_DOMAIN
ZMQ_DEFINE_ARRAY_OPT(ZMQ_ZAP_DOMAIN, zap_domain);
#endif
} // namespace sockopt
#endif // ZMQ_CPP11
namespace detail
{
class socket_base
@@ -1276,11 +1572,14 @@ class socket_base
socket_base() ZMQ_NOTHROW : _handle(ZMQ_NULLPTR) {}
ZMQ_EXPLICIT socket_base(void *handle) ZMQ_NOTHROW : _handle(handle) {}
template<typename T> void setsockopt(int option_, T const &optval)
template<typename T>
ZMQ_CPP11_DEPRECATED("from 4.7.0, use `set` taking option from zmq::sockopt")
void setsockopt(int option_, T const &optval)
{
setsockopt(option_, &optval, sizeof(T));
}
ZMQ_CPP11_DEPRECATED("from 4.7.0, use `set` taking option from zmq::sockopt")
void setsockopt(int option_, const void *optval_, size_t optvallen_)
{
int rc = zmq_setsockopt(_handle, option_, optval_, optvallen_);
@@ -1288,6 +1587,7 @@ class socket_base
throw error_t();
}
ZMQ_CPP11_DEPRECATED("from 4.7.0, use `get` taking option from zmq::sockopt")
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
{
int rc = zmq_getsockopt(_handle, option_, optval_, optvallen_);
@@ -1295,7 +1595,9 @@ class socket_base
throw error_t();
}
template<typename T> T getsockopt(int option_) const
template<typename T>
ZMQ_CPP11_DEPRECATED("from 4.7.0, use `get` taking option from zmq::sockopt")
T getsockopt(int option_) const
{
T optval;
size_t optlen = sizeof(T);
@@ -1303,6 +1605,114 @@ class socket_base
return optval;
}
#ifdef ZMQ_CPP11
// Set integral socket option, e.g.
// `socket.set(zmq::sockopt::linger, 0)`
template<int Opt, class T, bool BoolUnit>
void set(sockopt::integral_option<Opt, T, BoolUnit>, const T &val)
{
static_assert(std::is_integral<T>::value, "T must be integral");
set_option(Opt, &val, sizeof val);
}
// Set integral socket option from boolean, e.g.
// `socket.set(zmq::sockopt::immediate, false)`
template<int Opt, class T>
void set(sockopt::integral_option<Opt, T, true>, bool val)
{
static_assert(std::is_integral<T>::value, "T must be integral");
T rep_val = val;
set_option(Opt, &rep_val, sizeof rep_val);
}
// Set array socket option, e.g.
// `socket.set(zmq::sockopt::plain_username, "foo123")`
template<int Opt, int NullTerm>
void set(sockopt::array_option<Opt, NullTerm>, const char *buf)
{
set_option(Opt, buf, std::strlen(buf));
}
// Set array socket option, e.g.
// `socket.set(zmq::sockopt::routing_id, zmq::buffer(id))`
template<int Opt, int NullTerm>
void set(sockopt::array_option<Opt, NullTerm>, const_buffer buf)
{
set_option(Opt, buf.data(), buf.size());
}
// Set array socket option, e.g.
// `socket.set(zmq::sockopt::routing_id, id_str)`
template<int Opt, int NullTerm>
void set(sockopt::array_option<Opt, NullTerm>, const std::string &buf)
{
set_option(Opt, buf.data(), buf.size());
}
#ifdef ZMQ_CPP17
// Set array socket option, e.g.
// `socket.set(zmq::sockopt::routing_id, id_str)`
template<int Opt, int NullTerm>
void set(sockopt::array_option<Opt, NullTerm>, std::string_view buf)
{
set_option(Opt, buf.data(), buf.size());
}
#endif
// Get scalar socket option, e.g.
// `auto opt = socket.get(zmq::sockopt::linger)`
template<int Opt, class T, bool BoolUnit>
ZMQ_NODISCARD T get(sockopt::integral_option<Opt, T, BoolUnit>) const
{
static_assert(std::is_integral<T>::value, "T must be integral");
T val;
size_t size = sizeof val;
get_option(Opt, &val, &size);
assert(size == sizeof val);
return val;
}
// Get array socket option, writes to buf, returns option size in bytes, e.g.
// `size_t optsize = socket.get(zmq::sockopt::routing_id, zmq::buffer(id))`
template<int Opt, int NullTerm>
ZMQ_NODISCARD size_t get(sockopt::array_option<Opt, NullTerm>,
mutable_buffer buf) const
{
size_t size = buf.size();
get_option(Opt, buf.data(), &size);
return size;
}
// Get array socket option as string (initializes the string buffer size to init_size) e.g.
// `auto s = socket.get(zmq::sockopt::routing_id)`
// Note: removes the null character from null-terminated string options,
// i.e. the string size excludes the null character.
template<int Opt, int NullTerm>
ZMQ_NODISCARD std::string get(sockopt::array_option<Opt, NullTerm>,
size_t init_size = 1024) const
{
if (NullTerm == 2 && init_size == 1024) {
init_size = 41; // get as Z85 string
}
std::string str(init_size, '\0');
size_t size = get(sockopt::array_option<Opt>{}, buffer(str));
if (NullTerm == 1) {
if (size > 0) {
assert(str[size - 1] == '\0');
--size;
}
} else if (NullTerm == 2) {
assert(size == 32 || size == 41);
if (size == 41) {
assert(str[size - 1] == '\0');
--size;
}
}
str.resize(size);
return str;
}
#endif
void bind(std::string const &addr) { bind(addr.c_str()); }
void bind(const char *addr_)
@@ -1502,6 +1912,21 @@ class socket_base
protected:
void *_handle;
private:
void set_option(int option_, const void *optval_, size_t optvallen_)
{
int rc = zmq_setsockopt(_handle, option_, optval_, optvallen_);
if (rc != 0)
throw error_t();
}
void get_option(int option_, void *optval_, size_t *optvallen_) const
{
int rc = zmq_getsockopt(_handle, option_, optval_, optvallen_);
if (rc != 0)
throw error_t();
}
};
} // namespace detail