Background threads enhancements (#2778)

* Background thread scheduling 

- add ZMQ_THREAD_AFFINITY ctx option; set all thread scheduling options
from the context of the secondary thread instead of using the main
process thread context!
- change ZMQ_THREAD_PRIORITY to support setting NICE of the background
thread when using SCHED_OTHER
This commit is contained in:
f18m 2017-10-16 13:29:03 +02:00 committed by Luca Boccassi
parent 577e713e2c
commit bfbb4ff2e9
10 changed files with 226 additions and 18 deletions

View File

@ -332,6 +332,7 @@ if (NOT CMAKE_CROSSCOMPILING)
zmq_check_tcp_keepalive () zmq_check_tcp_keepalive ()
zmq_check_tcp_tipc () zmq_check_tcp_tipc ()
zmq_check_pthread_setname () zmq_check_pthread_setname ()
zmq_check_pthread_setaffinity ()
zmq_check_getrandom () zmq_check_getrandom ()
endif () endif ()

View File

@ -265,6 +265,25 @@ int main(int argc, char *argv [])
set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS}) set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS})
endmacro() endmacro()
macro(zmq_check_pthread_setaffinity)
message(STATUS "Checking pthread_setaffinity signature")
set(SAVE_CMAKE_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS})
set(CMAKE_REQUIRED_FLAGS "-D_GNU_SOURCE -Werror -pthread")
check_c_source_runs(
"
#include <pthread.h>
int main(int argc, char *argv [])
{
cpu_set_t test;
pthread_setaffinity_np (pthread_self(), sizeof(cpu_set_t), &test);
return 0;
}
"
ZMQ_HAVE_PTHREAD_SETAFFINITY)
set(CMAKE_REQUIRED_FLAGS ${SAVE_CMAKE_REQUIRED_FLAGS})
endmacro()
macro(zmq_check_getrandom) macro(zmq_check_getrandom)
message(STATUS "Checking whether getrandom is supported") message(STATUS "Checking whether getrandom is supported")

View File

@ -659,6 +659,22 @@ AC_COMPILE_IFELSE(
AC_MSG_RESULT([no]) AC_MSG_RESULT([no])
]) ])
# pthread_setaffinity_np is non-posix:
AC_MSG_CHECKING([whether pthread_setaffinity_np() exists])
AC_COMPILE_IFELSE(
[AC_LANG_PROGRAM(
[[#include <pthread.h>]],
[[cpu_set_t test; pthread_setaffinity_np (pthread_self(), sizeof(cpu_set_t), &test); return 0;]])
],[
AC_MSG_RESULT([yes])
AC_DEFINE(ZMQ_HAVE_PTHREAD_SET_AFFINITY, [1],
[Whether pthread_setaffinity_np() exists])
],[
AC_MSG_RESULT([no])
])
LIBZMQ_CHECK_SOCK_CLOEXEC([ LIBZMQ_CHECK_SOCK_CLOEXEC([
AC_DEFINE([ZMQ_HAVE_SOCK_CLOEXEC], AC_DEFINE([ZMQ_HAVE_SOCK_CLOEXEC],
[1], [1],

View File

@ -611,6 +611,8 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_MSG_T_SIZE 6 #define ZMQ_MSG_T_SIZE 6
#define ZMQ_THREAD_AFFINITY 7
#define ZMQ_THREAD_AFFINITY_DFLT -1
/* DRAFT Socket methods. */ /* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group); ZMQ_EXPORT int zmq_join (void *s, const char *group);

View File

@ -76,7 +76,8 @@ zmq::ctx_t::ctx_t () :
blocky (true), blocky (true),
ipv6 (false), ipv6 (false),
thread_priority (ZMQ_THREAD_PRIORITY_DFLT), thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT) thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT),
thread_affinity (ZMQ_THREAD_AFFINITY_DFLT)
{ {
#ifdef HAVE_FORK #ifdef HAVE_FORK
pid = getpid(); pid = getpid();
@ -250,6 +251,11 @@ int zmq::ctx_t::set (int option_, int optval_)
thread_sched_policy = optval_; thread_sched_policy = optval_;
} }
else else
if (option_ == ZMQ_THREAD_AFFINITY && optval_ >= 0) {
scoped_lock_t locker(opt_sync);
thread_affinity = optval_;
}
else
if (option_ == ZMQ_BLOCKY && optval_ >= 0) { if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
scoped_lock_t locker(opt_sync); scoped_lock_t locker(opt_sync);
blocky = (optval_ != 0); blocky = (optval_ != 0);
@ -395,8 +401,8 @@ zmq::object_t *zmq::ctx_t::get_reaper ()
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{ {
thread_.setSchedulingParameters(thread_priority, thread_sched_policy, thread_affinity);
thread_.start(tfn_, arg_); thread_.start(tfn_, arg_);
thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
#ifndef ZMQ_HAVE_ANDROID #ifndef ZMQ_HAVE_ANDROID
thread_.setThreadName ("ZMQ background"); thread_.setThreadName ("ZMQ background");
#endif #endif

View File

@ -214,6 +214,7 @@ namespace zmq
// Thread scheduling parameters. // Thread scheduling parameters.
int thread_priority; int thread_priority;
int thread_sched_policy; int thread_sched_policy;
int thread_affinity;
// Synchronisation of access to context options. // Synchronisation of access to context options.
mutex_t opt_sync; mutex_t opt_sync;

View File

@ -70,11 +70,12 @@ void zmq::thread_t::stop ()
win_assert (rc2 != 0); win_assert (rc2 != 0);
} }
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_) void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_)
{ {
// not implemented // not implemented
LIBZMQ_UNUSED (priority_); LIBZMQ_UNUSED (priority_);
LIBZMQ_UNUSED (schedulingPolicy_); LIBZMQ_UNUSED (schedulingPolicy_);
LIBZMQ_UNUSED (affinity_);
} }
void zmq::thread_t::setThreadName(const char *name_) void zmq::thread_t::setThreadName(const char *name_)
@ -87,6 +88,8 @@ void zmq::thread_t::setThreadName(const char *name_)
#include <signal.h> #include <signal.h>
#include <unistd.h> #include <unistd.h>
#include <sys/time.h>
#include <sys/resource.h>
extern "C" extern "C"
{ {
@ -101,8 +104,8 @@ extern "C"
rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL); rc = pthread_sigmask (SIG_BLOCK, &signal_set, NULL);
posix_assert (rc); posix_assert (rc);
#endif #endif
zmq::thread_t *self = (zmq::thread_t*) arg_; zmq::thread_t *self = (zmq::thread_t*) arg_;
self->applySchedulingParameters();
self->tfn (self->arg); self->tfn (self->arg);
return NULL; return NULL;
} }
@ -122,7 +125,14 @@ void zmq::thread_t::stop ()
posix_assert (rc); posix_assert (rc);
} }
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_) void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_)
{
thread_priority=priority_;
thread_sched_policy=schedulingPolicy_;
thread_affinity=affinity_;
}
void zmq::thread_t::applySchedulingParameters() // to be called in secondary thread context
{ {
#if defined _POSIX_THREAD_PRIORITY_SCHEDULING && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0 #if defined _POSIX_THREAD_PRIORITY_SCHEDULING && _POSIX_THREAD_PRIORITY_SCHEDULING >= 0
int policy = 0; int policy = 0;
@ -136,14 +146,24 @@ void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_
int rc = pthread_getschedparam(descriptor, &policy, &param); int rc = pthread_getschedparam(descriptor, &policy, &param);
posix_assert (rc); posix_assert (rc);
if(priority_ != -1) if(thread_sched_policy != ZMQ_THREAD_SCHED_POLICY_DFLT)
{ {
param.sched_priority = priority_; policy = thread_sched_policy;
} }
if(schedulingPolicy_ != -1) /* Quoting docs:
"Linux allows the static priority range 1 to 99 for the SCHED_FIFO and
SCHED_RR policies, and the priority 0 for the remaining policies."
Other policies may use the "nice value" in place of the priority:
*/
bool use_nice_instead_priority = (policy != SCHED_FIFO) && (policy != SCHED_RR);
if(thread_priority != ZMQ_THREAD_PRIORITY_DFLT)
{ {
policy = schedulingPolicy_; if (use_nice_instead_priority)
param.sched_priority = 0; // this is the only supported priority for most scheduling policies
else
param.sched_priority = thread_priority; // user should provide a value between 1 and 99
} }
#ifdef __NetBSD__ #ifdef __NetBSD__
@ -158,10 +178,36 @@ void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_
#endif #endif
posix_assert (rc); posix_assert (rc);
#else
LIBZMQ_UNUSED (priority_); if (use_nice_instead_priority &&
LIBZMQ_UNUSED (schedulingPolicy_); thread_priority != ZMQ_THREAD_PRIORITY_DFLT)
{
// assume the user wants to decrease the thread's nice value
// i.e., increase the chance of this thread being scheduled: try setting that to
// maximum priority.
rc = nice(-20);
errno_assert (rc != -1);
// IMPORTANT: EPERM is typically returned for unprivileged processes: that's because
// CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM!
}
#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
if (thread_affinity != ZMQ_THREAD_AFFINITY_DFLT)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
for (unsigned int cpuidx=0; cpuidx<sizeof(int)*8; cpuidx++)
{
int cpubit = (1 << cpuidx);
if ( (thread_affinity & cpubit) != 0 )
CPU_SET( cpuidx , &cpuset );
}
rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
posix_assert (rc);
}
#endif
#endif #endif
} }

View File

@ -53,6 +53,9 @@ namespace zmq
inline thread_t () inline thread_t ()
: tfn(NULL) : tfn(NULL)
, arg(NULL) , arg(NULL)
, thread_priority(ZMQ_THREAD_PRIORITY_DFLT)
, thread_sched_policy(ZMQ_THREAD_SCHED_POLICY_DFLT)
, thread_affinity(ZMQ_THREAD_AFFINITY_DFLT)
{ {
} }
@ -65,7 +68,7 @@ namespace zmq
// Sets the thread scheduling parameters. Only implemented for // Sets the thread scheduling parameters. Only implemented for
// pthread. Has no effect on other platforms. // pthread. Has no effect on other platforms.
void setSchedulingParameters(int priority_, int schedulingPolicy_); void setSchedulingParameters(int priority_, int schedulingPolicy_, int affinity_);
// Sets the thread name, 16 characters max including terminating NUL. // Sets the thread name, 16 characters max including terminating NUL.
// Only implemented for pthread. Has no effect on other platforms. // Only implemented for pthread. Has no effect on other platforms.
@ -73,6 +76,7 @@ namespace zmq
// These are internal members. They should be private, however then // These are internal members. They should be private, however then
// they would not be accessible from the main C routine of the thread. // they would not be accessible from the main C routine of the thread.
void applySchedulingParameters();
thread_fn *tfn; thread_fn *tfn;
void *arg; void *arg;
@ -84,6 +88,11 @@ namespace zmq
pthread_t descriptor; pthread_t descriptor;
#endif #endif
// Thread scheduling parameters.
int thread_priority;
int thread_sched_policy;
int thread_affinity;
thread_t (const thread_t&); thread_t (const thread_t&);
const thread_t &operator = (const thread_t&); const thread_t &operator = (const thread_t&);
}; };

View File

@ -91,6 +91,8 @@
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_MSG_T_SIZE 6 #define ZMQ_MSG_T_SIZE 6
#define ZMQ_THREAD_AFFINITY 7
#define ZMQ_THREAD_AFFINITY_DFLT -1
/* DRAFT Socket methods. */ /* DRAFT Socket methods. */
int zmq_join (void *s, const char *group); int zmq_join (void *s, const char *group);

View File

@ -30,15 +30,112 @@
#include <limits> #include <limits>
#include "testutil.hpp" #include "testutil.hpp"
#define WAIT_FOR_BACKGROUND_THREAD_INSPECTION (0)
#ifdef ZMQ_HAVE_LINUX
#include <sys/time.h>
#include <sys/resource.h>
#include <unistd.h> // for sleep()
#define TEST_POLICY (SCHED_OTHER) // NOTE: SCHED_OTHER is the default Linux scheduler
bool is_allowed_to_raise_priority()
{
// NOTE1: if setrlimit() fails with EPERM, this means that current user has not enough permissions.
// NOTE2: even for privileged users (e.g., root) getrlimit() would usually return 0 as nice limit; the only way to
// discover if the user is able to increase the nice value is to actually try to change the rlimit:
struct rlimit rlim;
rlim.rlim_cur = 40;
rlim.rlim_max = 40;
if (setrlimit(RLIMIT_NICE, &rlim) == 0)
{
// rlim_cur == 40 means that this process is allowed to set a nice value of -20
if (WAIT_FOR_BACKGROUND_THREAD_INSPECTION)
printf ("This process has enough permissions to raise ZMQ background thread priority!\n");
return true;
}
if (WAIT_FOR_BACKGROUND_THREAD_INSPECTION)
printf ("This process has NOT enough permissions to raise ZMQ background thread priority.\n");
return false;
}
#else
#define TEST_POLICY (0)
bool is_allowed_to_raise_priority()
{
return false;
}
#endif
void test_ctx_thread_opts(void* ctx)
{
int rc;
// verify that setting negative values (e.g., default values) fail:
rc = zmq_ctx_set(ctx, ZMQ_THREAD_SCHED_POLICY, ZMQ_THREAD_SCHED_POLICY_DFLT);
assert (rc == -1 && errno == EINVAL);
rc = zmq_ctx_set(ctx, ZMQ_THREAD_PRIORITY, ZMQ_THREAD_PRIORITY_DFLT);
assert (rc == -1 && errno == EINVAL);
#if ZMQ_BUILD_DRAFT_API
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, ZMQ_THREAD_AFFINITY_DFLT);
assert (rc == -1 && errno == EINVAL);
#endif
// test scheduling policy:
// set context options that alter the background thread CPU scheduling/affinity settings;
// as of ZMQ 4.2.3 this has an effect only on POSIX systems (nothing happens on Windows, but still it should return success):
rc = zmq_ctx_set(ctx, ZMQ_THREAD_SCHED_POLICY, TEST_POLICY);
assert (rc == 0);
// test priority:
// in theory SCHED_OTHER supports only the static priority 0 but quoting the docs
// http://man7.org/linux/man-pages/man7/sched.7.html
// "The thread to run is chosen from the static priority 0 list based on
// a dynamic priority that is determined only inside this list. The
// dynamic priority is based on the nice value [...]
// The nice value can be modified using nice(2), setpriority(2), or sched_setattr(2)."
// ZMQ will internally use nice(2) to set the nice value when using SCHED_OTHER.
// However changing the nice value of a process requires appropriate permissions...
// check that the current effective user is able to do that:
if (is_allowed_to_raise_priority())
{
rc = zmq_ctx_set(ctx, ZMQ_THREAD_PRIORITY, 1 /* any positive value different than the default will be ok */);
assert (rc == 0);
}
#if ZMQ_BUILD_DRAFT_API
// test affinity:
int cpu_affinity_test = (1 << 0);
// this should result in background threads being placed only on the
// first CPU available on this system; try experimenting with other values
// (e.g., 1<<5 to use CPU index 5) and use "top -H" or "taskset -pc" to see the result
rc = zmq_ctx_set(ctx, ZMQ_THREAD_AFFINITY, cpu_affinity_test);
assert (rc == 0);
#endif
}
int main (void) int main (void)
{ {
setup_test_environment(); setup_test_environment();
int rc; int rc;
// Set up our context and sockets // Set up our context and sockets
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
assert (zmq_ctx_get (ctx, ZMQ_MAX_SOCKETS) == ZMQ_MAX_SOCKETS_DFLT); assert (zmq_ctx_get (ctx, ZMQ_MAX_SOCKETS) == ZMQ_MAX_SOCKETS_DFLT);
#if defined(ZMQ_USE_SELECT) #if defined(ZMQ_USE_SELECT)
assert (zmq_ctx_get (ctx, ZMQ_SOCKET_LIMIT) == FD_SETSIZE - 1); assert (zmq_ctx_get (ctx, ZMQ_SOCKET_LIMIT) == FD_SETSIZE - 1);
@ -51,10 +148,12 @@ int main (void)
#if defined (ZMQ_BUILD_DRAFT_AP) #if defined (ZMQ_BUILD_DRAFT_AP)
assert (zmq_ctx_get (ctx, ZMQ_MSG_T_SIZE) == sizeof (zmq_msg_t)); assert (zmq_ctx_get (ctx, ZMQ_MSG_T_SIZE) == sizeof (zmq_msg_t));
#endif #endif
rc = zmq_ctx_set (ctx, ZMQ_IPV6, true); rc = zmq_ctx_set (ctx, ZMQ_IPV6, true);
assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1); assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1);
test_ctx_thread_opts(ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER); void *router = zmq_socket (ctx, ZMQ_ROUTER);
int value; int value;
size_t optsize = sizeof (int); size_t optsize = sizeof (int);
@ -66,7 +165,14 @@ int main (void)
assert (value == -1); assert (value == -1);
rc = zmq_close (router); rc = zmq_close (router);
assert (rc == 0); assert (rc == 0);
#if WAIT_FOR_BACKGROUND_THREAD_INSPECTION
// this is useful when you want to use an external tool (like top or taskset) to view
// properties of the background threads
printf ("Sleeping for 100sec. You can now use 'top -H -p $(pgrep -f test_ctx_options)' and 'taskset -pc <ZMQ background thread PID>' to view ZMQ background thread properties.\n");
sleep(100);
#endif
rc = zmq_ctx_set (ctx, ZMQ_BLOCKY, false); rc = zmq_ctx_set (ctx, ZMQ_BLOCKY, false);
assert (zmq_ctx_get (ctx, ZMQ_BLOCKY) == 0); assert (zmq_ctx_get (ctx, ZMQ_BLOCKY) == 0);
router = zmq_socket (ctx, ZMQ_ROUTER); router = zmq_socket (ctx, ZMQ_ROUTER);