diff --git a/include/zmq.h b/include/zmq.h index 23770836..3c4aedfb 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -179,10 +179,14 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); #define ZMQ_IO_THREADS 1 #define ZMQ_MAX_SOCKETS 2 #define ZMQ_SOCKET_LIMIT 3 +#define ZMQ_THREAD_PRIORITY 3 +#define ZMQ_THREAD_SCHED_POLICY 4 /* Default for new contexts */ #define ZMQ_IO_THREADS_DFLT 1 #define ZMQ_MAX_SOCKETS_DFLT 1023 +#define ZMQ_THREAD_PRIORITY_DFLT -1 +#define ZMQ_THREAD_SCHED_POLICY_DFLT -1 ZMQ_EXPORT void *zmq_ctx_new (void); ZMQ_EXPORT int zmq_ctx_term (void *context); diff --git a/src/ctx.cpp b/src/ctx.cpp index 0155ef23..929553ab 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -57,7 +57,9 @@ zmq::ctx_t::ctx_t () : slots (NULL), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), io_thread_count (ZMQ_IO_THREADS_DFLT), - ipv6 (false) + ipv6 (false), + thread_priority (ZMQ_THREAD_PRIORITY_DFLT), + thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT) { #ifdef HAVE_FORK pid = getpid(); @@ -194,6 +196,18 @@ int zmq::ctx_t::set (int option_, int optval_) ipv6 = (optval_ != 0); opt_sync.unlock (); } + else + if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) { + opt_sync.lock(); + thread_priority = optval_; + opt_sync.unlock(); + } + else + if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) { + opt_sync.lock(); + thread_sched_policy = optval_; + opt_sync.unlock(); + } else { errno = EINVAL; rc = -1; @@ -324,6 +338,12 @@ zmq::object_t *zmq::ctx_t::get_reaper () return reaper; } +void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const +{ + thread_.start(tfn_, arg_); + thread_.setSchedulingParameters(thread_priority, thread_sched_policy); +} + void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); diff --git a/src/ctx.hpp b/src/ctx.hpp index 2327d34d..e4b9927f 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -32,6 +32,7 @@ #include "stdint.hpp" #include "options.hpp" #include "atomic_counter.hpp" +#include "thread.hpp" namespace zmq { @@ -87,6 +88,9 @@ namespace zmq zmq::socket_base_t *create_socket (int type_); void destroy_socket (zmq::socket_base_t *socket_); + // Start a new thread with proper scheduling parameters. + void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const; + // Send command to the destination thread. void send_command (uint32_t tid_, const command_t &command_); @@ -185,6 +189,10 @@ namespace zmq // Is IPv6 enabled on this context? bool ipv6; + // Thread scheduling parameters. + int thread_priority; + int thread_sched_policy; + // Synchronisation of access to context options. mutex_t opt_sync; diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 352a43dd..ecb01dcf 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -35,7 +35,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::devpoll_t::devpoll_t () : +zmq::devpoll_t::devpoll_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), stopping (false) { devpoll_fd = open ("/dev/poll", O_RDWR); @@ -125,7 +126,7 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_) void zmq::devpoll_t::start () { - worker.start (worker_routine, this); + ctx.start_thread (worker, worker_routine, this); } void zmq::devpoll_t::stop () diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 707ba95e..3d7f2d84 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -26,6 +26,7 @@ #include +#include "ctx.hpp" #include "fd.hpp" #include "thread.hpp" #include "poller_base.hpp" @@ -43,7 +44,7 @@ namespace zmq typedef fd_t handle_t; - devpoll_t (); + devpoll_t (const ctx_t &ctx_); ~devpoll_t (); // "poller" concept. @@ -66,6 +67,9 @@ namespace zmq // Main event loop. void loop (); + // Reference to ZMQ context. + const ctx_t &ctx; + // File descriptor referring to "/dev/poll" pseudo-device. fd_t devpoll_fd; diff --git a/src/epoll.cpp b/src/epoll.cpp index 324d4bd8..13d970f2 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -32,7 +32,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::epoll_t::epoll_t () : +zmq::epoll_t::epoll_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), stopping (false) { epoll_fd = epoll_create (1); @@ -118,7 +119,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_) void zmq::epoll_t::start () { - worker.start (worker_routine, this); + ctx.start_thread (worker, worker_routine, this); } void zmq::epoll_t::stop () diff --git a/src/epoll.hpp b/src/epoll.hpp index fbf871c0..e48f2303 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -27,6 +27,7 @@ #include #include +#include "ctx.hpp" #include "fd.hpp" #include "thread.hpp" #include "poller_base.hpp" @@ -45,7 +46,7 @@ namespace zmq typedef void* handle_t; - epoll_t (); + epoll_t (const ctx_t &ctx_); ~epoll_t (); // "poller" concept. @@ -68,6 +69,9 @@ namespace zmq // Main event loop. void loop (); + // Reference to ZMQ context. + const ctx_t &ctx; + // Main epoll file descriptor fd_t epoll_fd; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index b05b56bf..e7fb1417 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -27,7 +27,7 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_) { - poller = new (std::nothrow) poller_t; + poller = new (std::nothrow) poller_t (*ctx_); alloc_assert (poller); mailbox_handle = poller->add_fd (mailbox.get_fd (), this); diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 75cb4156..e77cf722 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -42,7 +42,8 @@ #define kevent_udata_t void * #endif -zmq::kqueue_t::kqueue_t () : +zmq::kqueue_t::kqueue_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), stopping (false) { // Create event queue @@ -144,7 +145,7 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_) void zmq::kqueue_t::start () { - worker.start (worker_routine, this); + ctx.start_thread (worker, worker_routine, this); } void zmq::kqueue_t::stop () diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 49ce13c2..b3e68d6c 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -27,6 +27,7 @@ #include #include +#include "ctx.hpp" #include "fd.hpp" #include "thread.hpp" #include "poller_base.hpp" @@ -45,7 +46,7 @@ namespace zmq typedef void* handle_t; - kqueue_t (); + kqueue_t (const ctx_t &ctx_); ~kqueue_t (); // "poller" concept. @@ -68,6 +69,9 @@ namespace zmq // Main event loop. void loop (); + // Reference to ZMQ context. + const ctx_t &ctx; + // File descriptor referring to the kernel event queue. fd_t kqueue_fd; diff --git a/src/poll.cpp b/src/poll.cpp index 49e82f94..18778500 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -30,7 +30,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::poll_t::poll_t () : +zmq::poll_t::poll_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), retired (false), stopping (false) { @@ -106,7 +107,7 @@ void zmq::poll_t::reset_pollout (handle_t handle_) void zmq::poll_t::start () { - worker.start (worker_routine, this); + ctx.start_thread (worker, worker_routine, this); } void zmq::poll_t::stop () diff --git a/src/poll.hpp b/src/poll.hpp index 66d5b4ab..c2d6c6eb 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -28,6 +28,7 @@ #include #include +#include "ctx.hpp" #include "fd.hpp" #include "thread.hpp" #include "poller_base.hpp" @@ -46,7 +47,7 @@ namespace zmq typedef fd_t handle_t; - poll_t (); + poll_t (const ctx_t &ctx_); ~poll_t (); // "poller" concept. @@ -69,6 +70,9 @@ namespace zmq // Main event loop. void loop (); + // Reference to ZMQ context. + const ctx_t &ctx; + struct fd_entry_t { fd_t index; diff --git a/src/reaper.cpp b/src/reaper.cpp index 379e46d1..015ae983 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -26,7 +26,7 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : sockets (0), terminating (false) { - poller = new (std::nothrow) poller_t; + poller = new (std::nothrow) poller_t (*ctx_); alloc_assert (poller); mailbox_handle = poller->add_fd (mailbox.get_fd (), this); diff --git a/src/select.cpp b/src/select.cpp index 9b2e9774..f65f8e4d 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -41,7 +41,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::select_t::select_t () : +zmq::select_t::select_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), maxfd (retired_fd), retired (false), stopping (false) @@ -136,7 +137,7 @@ void zmq::select_t::reset_pollout (handle_t handle_) void zmq::select_t::start () { - worker.start (worker_routine, this); + ctx.start_thread (worker, worker_routine, this); } void zmq::select_t::stop () diff --git a/src/select.hpp b/src/select.hpp index 050889bb..a35448b4 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -38,6 +38,7 @@ #include #endif +#include "ctx.hpp" #include "fd.hpp" #include "thread.hpp" #include "poller_base.hpp" @@ -56,7 +57,7 @@ namespace zmq typedef fd_t handle_t; - select_t (); + select_t (const ctx_t &ctx_); ~select_t (); // "poller" concept. @@ -79,6 +80,9 @@ namespace zmq // Main event loop. void loop (); + // Reference to ZMQ context. + const ctx_t &ctx; + struct fd_entry_t { fd_t fd; diff --git a/src/thread.cpp b/src/thread.cpp index af010460..9848eb79 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -59,10 +59,14 @@ void zmq::thread_t::stop () win_assert (rc2 != 0); } +void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_) +{ + // not implemented +} + #else #include -#include extern "C" { @@ -84,33 +88,12 @@ extern "C" } } -bool getenvi (const char *env_, int &result_) -{ - char *str = getenv (env_); - if (str == NULL) - return false; - - std::stringstream ss(str); - return ss >> result_; -} - void zmq::thread_t::start (thread_fn *tfn_, void *arg_) { tfn = tfn_; arg = arg_; int rc = pthread_create (&descriptor, NULL, thread_routine, this); posix_assert (rc); - - int prio; - if (getenvi ("ZMQ_THREAD_PRIO", prio)) { - int policy = SCHED_RR; - getenvi ("ZMQ_THREAD_POLICY", policy); - - struct sched_param param; - param.sched_priority = prio; - rc = pthread_setschedparam (descriptor, policy, ¶m); - posix_assert (rc); - } } void zmq::thread_t::stop () @@ -119,6 +102,28 @@ void zmq::thread_t::stop () posix_assert (rc); } +void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_) +{ + int policy = 0; + struct sched_param param; + + int rc = pthread_getschedparam(descriptor, &policy, ¶m); + posix_assert (rc); + + if(priority_ != -1) + { + param.sched_priority = priority_; + } + + if(schedulingPolicy_ != -1) + { + policy = schedulingPolicy_; + } + + rc = pthread_setschedparam(descriptor, policy, ¶m); + posix_assert (rc); +} + #endif diff --git a/src/thread.hpp b/src/thread.hpp index bdf6fe2b..6c40e23b 100644 --- a/src/thread.hpp +++ b/src/thread.hpp @@ -55,6 +55,10 @@ namespace zmq // Waits for thread termination. void stop (); + // Sets the thread scheduling parameters. Only implemented for + // pthread. Has no effect on other platforms. + void setSchedulingParameters(int priority_, int schedulingPolicy_); + // These are internal members. They should be private, however then // they would not be accessible from the main C routine of the thread. thread_fn *tfn;