Introduce extended set/get methods for ZMQ contexts (#3642)

* Introduce DRAFT zmq_ctx_set_ext() and zmq_ctx_get_ext() methods. Change
ZMQ_THREAD_NAME_PREFIX to allow for non-numeric thread name prefixes.
This commit is contained in:
Francesco Montorsi 2019-08-28 00:41:23 +02:00 committed by Luca Boccassi
parent 2aa87c94cc
commit b3582da8fb
8 changed files with 465 additions and 97 deletions

82
doc/zmq_ctx_get_ext.txt Normal file
View File

@ -0,0 +1,82 @@
zmq_ctx_get_ext(3)
==================
NAME
----
zmq_ctx_get_ext - get extended context options
SYNOPSIS
--------
*int zmq_ctx_get_ext (void '*context', int 'option_name', void '*option_value', size_t '*option_len');*
DESCRIPTION
-----------
The _zmq_ctx_get()_ function shall retrieve the value for the option
specified by the 'option_name' argument and store it in the buffer pointed to
by the 'option_value' argument.
The 'option_len' argument is the size in bytes of the buffer pointed
to by 'option_value'; upon successful completion _zmq_ctx_get_ext()_ shall
modify the 'option_len' argument to indicate the actual size of the option
value stored in the buffer.
The _zmq_ctx_get_ext()_ function accepts all the option names accepted by
_zmq_ctx_get()_.
Options that make most sense to retrieve using _zmq_ctx_get_ext()_ instead of
_zmq_ctx_get()_ are:
ZMQ_THREAD_NAME_PREFIX: Get name prefix for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_NAME_PREFIX' argument gets the string prefix of each thread
created for the internal context's thread pool.
[horizontal]
Option value type:: character string
Option value unit:: N/A
Default value:: empty string
RETURN VALUE
------------
The _zmq_ctx_get_ext()_ function returns a value of 0 or greater if successful.
Otherwise it returns `-1` and sets 'errno' to one of the values defined
below.
ERRORS
------
*EINVAL*::
The requested option _option_name_ is unknown.
EXAMPLE
-------
.Setting a prefix on internal ZMQ threda names:
----
void *context = zmq_ctx_new ();
const char prefix[] = "MyApp";
size_t prefixLen = sizeof(prefix);
zmq_ctx_set (context, ZMQ_THREAD_NAME_PREFIX, &prefix, &prefixLen);
char buff[256];
size_t buffLen = sizeof(buff);
int rc = zmq_ctx_get (context, ZMQ_THREAD_NAME_PREFIX, &buff, &buffLen);
assert (rc == 0);
assert (buffLen == prefixLen);
----
SEE ALSO
--------
linkzmq:zmq_ctx_get[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.

86
doc/zmq_ctx_set_ext.txt Normal file
View File

@ -0,0 +1,86 @@
zmq_ctx_set_ext(3)
==================
NAME
----
zmq_ctx_set_ext - set extended context options
SYNOPSIS
--------
*int zmq_ctx_set_ext (void '*context', int 'option_name', const void '*option_value', size_t 'option_len');*
DESCRIPTION
-----------
The _zmq_ctx_set_ext()_ function shall set the option specified by the
'option_name' argument to the value pointed to by the 'option_value' argument
for the 0MQ context pointed to by the 'context' argument. The 'option_len'
argument is the size of the option value in bytes. For options taking a value of
type "character string", the provided byte data should either contain no zero
bytes, or end in a single zero byte (terminating ASCII NUL character).
The _zmq_ctx_set_ext()_ function accepts all the option names accepted by
_zmq_ctx_set()_.
Options that make most sense to set using _zmq_ctx_set_ext()_ instead of
_zmq_ctx_set()_ are the following options:
ZMQ_THREAD_NAME_PREFIX: Set name prefix for I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_THREAD_NAME_PREFIX' argument sets a string prefix to each thread
created for the internal context's thread pool. This option is only supported on Linux.
This option is useful to help debugging done via "top -H" or "gdb"; in case
multiple processes on the system are using ZeroMQ it is useful to provide through
this context option an application-specific prefix to distinguish ZeroMQ background
threads that belong to different processes.
This option only applies before creating any sockets on the context.
[horizontal]
Option value type:: character string
Option value unit:: N/A
Default value:: empty string
RETURN VALUE
------------
The _zmq_ctx_set_ext()_ function returns zero if successful. Otherwise it
returns `-1` and sets 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The requested option _option_name_ is unknown.
EXAMPLE
-------
.Setting a prefix on internal ZMQ threda names:
----
void *context = zmq_ctx_new ();
const char prefix[] = "MyApp";
size_t prefixLen = sizeof(prefix);
zmq_ctx_set (context, ZMQ_THREAD_NAME_PREFIX, &prefix, &prefixLen);
char buff[256];
size_t buffLen = sizeof(buff);
int rc = zmq_ctx_get (context, ZMQ_THREAD_NAME_PREFIX, &buff, &buffLen);
assert (rc == 0);
assert (buffLen == prefixLen);
----
SEE ALSO
--------
linkzmq:zmq_ctx_set[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.

View File

@ -670,6 +670,16 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
/* DRAFT Context methods. */
ZMQ_EXPORT int zmq_ctx_set_ext (void *context_,
int option_,
const void *optval_,
size_t optvallen_);
ZMQ_EXPORT int zmq_ctx_get_ext (void *context_,
int option_,
void *optval_,
size_t *optvallen_);
/* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group);
ZMQ_EXPORT int zmq_leave (void *s, const char *group);

View File

@ -223,57 +223,152 @@ int zmq::ctx_t::shutdown ()
return 0;
}
int zmq::ctx_t::set (int option_, int optval_)
int zmq::ctx_t::set (int option_, const void *optval_, size_t optvallen_)
{
int rc = 0;
if (option_ == ZMQ_MAX_SOCKETS && optval_ >= 1
&& optval_ == clipped_maxsocket (optval_)) {
bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));
switch (option_) {
case ZMQ_MAX_SOCKETS:
if (is_int && value >= 1 && value == clipped_maxsocket (value)) {
scoped_lock_t locker (_opt_sync);
_max_sockets = optval_;
} else if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
scoped_lock_t locker (_opt_sync);
_io_thread_count = optval_;
} else if (option_ == ZMQ_IPV6 && optval_ >= 0) {
scoped_lock_t locker (_opt_sync);
_ipv6 = (optval_ != 0);
} else if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
scoped_lock_t locker (_opt_sync);
_blocky = (optval_ != 0);
} else if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
scoped_lock_t locker (_opt_sync);
_max_msgsz = optval_ < INT_MAX ? optval_ : INT_MAX;
} else if (option_ == ZMQ_ZERO_COPY_RECV && optval_ >= 0) {
scoped_lock_t locker (_opt_sync);
_zero_copy = (optval_ != 0);
} else {
rc = thread_ctx_t::set (option_, optval_);
_max_sockets = value;
return 0;
}
return rc;
break;
case ZMQ_IO_THREADS:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_io_thread_count = value;
return 0;
}
break;
case ZMQ_IPV6:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_ipv6 = (value != 0);
return 0;
}
break;
case ZMQ_BLOCKY:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_blocky = (value != 0);
return 0;
}
break;
case ZMQ_MAX_MSGSZ:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_max_msgsz = value < INT_MAX ? value : INT_MAX;
return 0;
}
break;
case ZMQ_ZERO_COPY_RECV:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_zero_copy = (value != 0);
return 0;
}
break;
default: {
return thread_ctx_t::set (option_, optval_, optvallen_);
}
}
errno = EINVAL;
return -1;
}
int zmq::ctx_t::get (int option_, void *optval_, size_t *optvallen_)
{
const bool is_int = (*optvallen_ == sizeof (int));
int *value = static_cast<int *> (optval_);
switch (option_) {
case ZMQ_MAX_SOCKETS:
if (is_int) {
*value = _max_sockets;
return 0;
}
break;
case ZMQ_SOCKET_LIMIT:
if (is_int) {
*value = clipped_maxsocket (65535);
return 0;
}
break;
case ZMQ_IO_THREADS:
if (is_int) {
*value = _io_thread_count;
return 0;
}
break;
case ZMQ_IPV6:
if (is_int) {
*value = _ipv6;
return 0;
}
break;
case ZMQ_BLOCKY:
if (is_int) {
*value = _blocky;
return 0;
}
break;
case ZMQ_MAX_MSGSZ:
if (is_int) {
*value = _max_msgsz;
return 0;
}
break;
case ZMQ_MSG_T_SIZE:
if (is_int) {
*value = sizeof (zmq_msg_t);
return 0;
}
break;
case ZMQ_ZERO_COPY_RECV:
if (is_int) {
*value = _zero_copy;
return 0;
}
break;
default: {
return thread_ctx_t::get (option_, optval_, optvallen_);
}
}
errno = EINVAL;
return -1;
}
int zmq::ctx_t::get (int option_)
{
int rc = 0;
if (option_ == ZMQ_MAX_SOCKETS)
rc = _max_sockets;
else if (option_ == ZMQ_SOCKET_LIMIT)
rc = clipped_maxsocket (65535);
else if (option_ == ZMQ_IO_THREADS)
rc = _io_thread_count;
else if (option_ == ZMQ_IPV6)
rc = _ipv6;
else if (option_ == ZMQ_BLOCKY)
rc = _blocky;
else if (option_ == ZMQ_MAX_MSGSZ)
rc = _max_msgsz;
else if (option_ == ZMQ_MSG_T_SIZE)
rc = sizeof (zmq_msg_t);
else if (option_ == ZMQ_ZERO_COPY_RECV) {
rc = _zero_copy;
} else {
rc = thread_ctx_t::get (option_);
}
return rc;
int optval_ = 0;
size_t optvallen_ = sizeof (int);
if (get (option_, &optval_, &optvallen_) == 0)
return optval_;
errno = EINVAL;
return -1;
}
bool zmq::ctx_t::start ()
@ -433,50 +528,100 @@ void zmq::thread_ctx_t::start_thread (thread_t &thread_,
thread_.start (tfn_, arg_, namebuf);
}
int zmq::thread_ctx_t::set (int option_, int optval_)
int zmq::thread_ctx_t::set (int option_, const void *optval_, size_t optvallen_)
{
int rc = 0;
if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));
switch (option_) {
case ZMQ_THREAD_SCHED_POLICY:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_thread_sched_policy = optval_;
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_ADD && optval_ >= 0) {
scoped_lock_t locker (_opt_sync);
_thread_affinity_cpus.insert (optval_);
} else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) {
scoped_lock_t locker (_opt_sync);
if (0 == _thread_affinity_cpus.erase (optval_)) {
errno = EINVAL;
rc = -1;
_thread_sched_policy = value;
return 0;
}
} else if (option_ == ZMQ_THREAD_NAME_PREFIX && optval_ >= 0) {
break;
case ZMQ_THREAD_AFFINITY_CPU_ADD:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_thread_affinity_cpus.insert (value);
return 0;
}
break;
case ZMQ_THREAD_AFFINITY_CPU_REMOVE:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
if (0 == _thread_affinity_cpus.erase (value)) {
errno = EINVAL;
return -1;
}
return 0;
}
break;
case ZMQ_THREAD_PRIORITY:
if (is_int && value >= 0) {
scoped_lock_t locker (_opt_sync);
_thread_priority = value;
return 0;
}
break;
case ZMQ_THREAD_NAME_PREFIX:
// start_thread() allows max 16 chars for thread name
if (is_int) {
std::ostringstream s;
s << optval_;
s << value;
scoped_lock_t locker (_opt_sync);
_thread_name_prefix = s.str ();
} else if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
return 0;
} else if (optvallen_ > 0 && optvallen_ <= 16) {
scoped_lock_t locker (_opt_sync);
_thread_priority = optval_;
} else {
errno = EINVAL;
rc = -1;
_thread_name_prefix.assign (static_cast<const char *> (optval_),
optvallen_);
return 0;
}
return rc;
break;
}
errno = EINVAL;
return -1;
}
int zmq::thread_ctx_t::get (int option_)
int zmq::thread_ctx_t::get (int option_, void *optval_, size_t *optvallen_)
{
int rc = 0;
if (option_ == ZMQ_THREAD_SCHED_POLICY) {
const bool is_int = (*optvallen_ == sizeof (int));
int *value = static_cast<int *> (optval_);
switch (option_) {
case ZMQ_THREAD_SCHED_POLICY:
if (is_int) {
scoped_lock_t locker (_opt_sync);
rc = _thread_sched_policy;
} else if (option_ == ZMQ_THREAD_NAME_PREFIX) {
scoped_lock_t locker (_opt_sync);
rc = atoi (_thread_name_prefix.c_str ());
} else {
errno = EINVAL;
rc = -1;
*value = _thread_sched_policy;
return 0;
}
return rc;
break;
case ZMQ_THREAD_NAME_PREFIX:
if (is_int) {
scoped_lock_t locker (_opt_sync);
*value = atoi (_thread_name_prefix.c_str ());
return 0;
} else if (*optvallen_ >= _thread_name_prefix.size ()) {
scoped_lock_t locker (_opt_sync);
memcpy (optval_, _thread_name_prefix.data (),
_thread_name_prefix.size ());
return 0;
}
break;
}
errno = EINVAL;
return -1;
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)

View File

@ -72,8 +72,8 @@ class thread_ctx_t
void *arg_,
const char *name_ = NULL) const;
int set (int option_, int optval_);
int get (int option_);
int set (int option_, const void *optval_, size_t optvallen_);
int get (int option_, void *optval_, size_t *optvallen_);
protected:
// Synchronisation of access to context options.
@ -115,7 +115,8 @@ class ctx_t : public thread_ctx_t
int shutdown ();
// Set and get context properties.
int set (int option_, int optval_);
int set (int option_, const void *optval_, size_t optvallen_);
int get (int option_, void *optval_, size_t *optvallen_);
int get (int option_);
// Create and destroy a socket.

View File

@ -176,22 +176,45 @@ int zmq_ctx_shutdown (void *ctx_)
int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
errno = EFAULT;
return -1;
}
return (static_cast<zmq::ctx_t *> (ctx_))->set (option_, optval_);
return zmq_ctx_set_ext (ctx_, option_, &optval_, sizeof (int));
}
int zmq_ctx_get (void *ctx_, int option_)
int zmq_ctx_set_ext (void *ctx_,
int option_,
const void *optval_,
size_t optvallen_)
{
if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
errno = EFAULT;
return -1;
}
return (static_cast<zmq::ctx_t *> (ctx_))->get (option_);
return (static_cast<zmq::ctx_t *> (ctx_))
->set (option_, optval_, optvallen_);
}
int zmq_ctx_get (void *ctx_, int option_)
{
int optval_ = 0;
size_t optvallen_ = sizeof (int);
if (zmq_ctx_get_ext (ctx_, option_, &optval_, &optvallen_) == 0) {
return optval_;
}
errno = EFAULT;
return -1;
}
int zmq_ctx_get_ext (void *ctx_, int option_, void *optval_, size_t *optvallen_)
{
if (!ctx_ || !(static_cast<zmq::ctx_t *> (ctx_))->check_tag ()) {
errno = EFAULT;
return -1;
}
return (static_cast<zmq::ctx_t *> (ctx_))
->get (option_, optval_, optvallen_);
}
// Stable/legacy context API
void *zmq_init (int io_threads_)

View File

@ -61,6 +61,16 @@
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
/* DRAFT Context methods. */
int zmq_ctx_set_ext (void *context_,
int option_,
const void *optval_,
size_t optvallen_);
int zmq_ctx_get_ext (void *context_,
int option_,
void *optval_,
size_t *optvallen_);
/* DRAFT Socket methods. */
int zmq_join (void *s_, const char *group_);
int zmq_leave (void *s_, const char *group_);

View File

@ -116,7 +116,6 @@ void test_ctx_thread_opts ()
}
#ifdef ZMQ_THREAD_AFFINITY_CPU_ADD
// test affinity:
// this should result in background threads being placed only on the
@ -138,16 +137,28 @@ void test_ctx_thread_opts ()
ZMQ_THREAD_AFFINITY_CPU_REMOVE,
cpus_remove[idx]));
}
#endif
#ifdef ZMQ_THREAD_NAME_PREFIX
// test thread name prefix:
// test INTEGER thread name prefix:
TEST_ASSERT_SUCCESS_ERRNO (
zmq_ctx_set (get_test_context (), ZMQ_THREAD_NAME_PREFIX, 1234));
TEST_ASSERT_EQUAL_INT (
1234, zmq_ctx_get (get_test_context (), ZMQ_THREAD_NAME_PREFIX));
#ifdef ZMQ_BUILD_DRAFT_API
// test STRING thread name prefix:
const char prefix[] = "MyPrefix9012345"; // max len is 16 chars
TEST_ASSERT_SUCCESS_ERRNO (
zmq_ctx_set_ext (get_test_context (), ZMQ_THREAD_NAME_PREFIX, prefix,
sizeof (prefix) / sizeof (char)));
char buf[16];
size_t buflen = sizeof (buf) / sizeof (char);
zmq_ctx_get_ext (get_test_context (), ZMQ_THREAD_NAME_PREFIX, buf, &buflen);
TEST_ASSERT_EQUAL_STRING (prefix, buf);
#endif
}