Merge pull request #743 from bas524/affinity

add thread affinity policy to threadpool
This commit is contained in:
Aleksandar Fabijanic 2015-03-17 17:47:54 -05:00
commit 8b96fd4a33
8 changed files with 174 additions and 68 deletions

View File

@ -111,11 +111,14 @@ add_definitions( -DPCRE_STATIC)
if(UNIX AND NOT APPLE)
INCLUDE (CheckFunctionExists)
INCLUDE (CheckCXXSourceCompiles)
CHECK_FUNCTION_EXISTS(pthread_setaffinity_np HAVE_PTHREAD_SETAFFINITY_NP)
if(HAVE_PTHREAD_SETAFFINITY_NP)
message(STATUS "Platform has PTHREAD_SETAFFINITY_NP")
add_definitions(-DHAVE_PTHREAD_SETAFFINITY_NP)
INCLUDE (CheckLibraryExists)
CHECK_LIBRARY_EXISTS(pthread pthread_setaffinity_np "pthread.h" HAVE_PTHREAD_SETAFFINITY_NP)
#set(CMAKE_EXTRA_INCLUDE_FILES pthread.h)
#CHECK_FUNCTION_EXISTS(pthread_setaffinity_np HAVE_PTHREAD_SETAFFINITY_NP)
if(NOT HAVE_PTHREAD_SETAFFINITY_NP)
message(STATUS "Platform has not PTHREAD_SETAFFINITY_NP")
else(HAVE_PTHREAD_SETAFFINITY_NP)
add_definitions(-DHAVE_PTHREAD_SETAFFINITY_NP)
CHECK_CXX_SOURCE_COMPILES("
#include <sched.h>
int main() {
@ -142,7 +145,7 @@ if(UNIX AND NOT APPLE)
endif(HAVE_TWO_PARAM_SCHED_SETAFFINITY)
endif(HAVE_THREE_PARAM_SCHED_SETAFFINITY)
endif(HAVE_PTHREAD_SETAFFINITY_NP)
endif(NOT HAVE_PTHREAD_SETAFFINITY_NP)
endif(UNIX AND NOT APPLE)
add_library( "${LIBNAME}" ${LIB_MODE} ${SRCS})

View File

@ -26,6 +26,7 @@
#include "Poco/AutoPtr.h"
#include "Poco/NotificationCenter.h"
#include "Poco/Timestamp.h"
#include "Poco/ThreadPool.h"
#include <list>
@ -33,7 +34,6 @@ namespace Poco {
class Notification;
class ThreadPool;
class Exception;
@ -52,7 +52,7 @@ public:
typedef AutoPtr<Task> TaskPtr;
typedef std::list<TaskPtr> TaskList;
TaskManager();
TaskManager(ThreadPool::ThreadAffinityPolicy affinityPolicy = ThreadPool::OS_DEFAULT);
/// Creates the TaskManager, using the
/// default ThreadPool.
@ -63,10 +63,10 @@ public:
~TaskManager();
/// Destroys the TaskManager.
void start(Task* pTask);
void start(Task* pTask, int cpu = -1);
/// Starts the given task in a thread obtained
/// from the thread pool.
///
/// from the thread pool,
/// on specified cpu.
/// The TaskManager takes ownership of the Task object
/// and deletes it when it it finished.

View File

@ -32,7 +32,6 @@ namespace Poco {
class Runnable;
class PooledThread;
class Foundation_API ThreadPool
/// A thread pool always keeps a number of threads running, ready
/// to accept work.
@ -49,28 +48,39 @@ class Foundation_API ThreadPool
/// from the pool.
{
public:
enum ThreadAffinityPolicy
{
OS_DEFAULT = 0,
UNIFORM_DISTRIBUTION,
CUSTOM
};
ThreadPool(int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE);
int stackSize = POCO_THREAD_STACK_SIZE,
ThreadAffinityPolicy affinityPolicy = OS_DEFAULT);
/// Creates a thread pool with minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
/// If a thread is running idle for more than idleTime seconds,
/// and more than minCapacity threads are running, the thread
/// is killed. Threads are created with given stack size.
/// Threads are created with given affinity Policy
ThreadPool(const std::string& name,
int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE);
int stackSize = POCO_THREAD_STACK_SIZE,
ThreadAffinityPolicy affinityPolicy = OS_DEFAULT);
/// Creates a thread pool with the given name and minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
/// If a thread is running idle for more than idleTime seconds,
/// and more than minCapacity threads are running, the thread
/// is killed. Threads are created with given stack size.
/// Threads are created with given affinity Policy
~ThreadPool();
/// Currently running threads will remain active
@ -90,6 +100,12 @@ public:
int getStackSize() const;
/// Returns the stack size used to create new threads.
void setAffinityPolicy(ThreadAffinityPolicy affinityPolicy);
/// Sets the thread affinity policy for newly created threads
ThreadAffinityPolicy getAffinityPolicy();
/// Returns the thread affinity policy used to create new thread
int used() const;
/// Returns the number of currently used threads.
@ -99,24 +115,24 @@ public:
int available() const;
/// Returns the number available threads.
void start(Runnable& target);
/// Obtains a thread and starts the target.
void start(Runnable& target, int cpu = -1);
/// Obtains a thread and starts the target on specified cpu.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
void start(Runnable& target, const std::string& name);
/// Obtains a thread and starts the target.
void start(Runnable& target, const std::string& name, int cpu = -1);
/// Obtains a thread and starts the target on specified cpu.
/// Assigns the given name to the thread.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
void startWithPriority(Thread::Priority priority, Runnable& target);
/// Obtains a thread, adjusts the thread's priority, and starts the target.
void startWithPriority(Thread::Priority priority, Runnable& target, int cpu = -1);
/// Obtains a thread, adjusts the thread's priority, and starts the target on specified cpu.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name);
/// Obtains a thread, adjusts the thread's priority, and starts the target.
void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name, int cpu = -1);
/// Obtains a thread, adjusts the thread's priority, and starts the target on specified cpu.
/// Assigns the given name to the thread.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
@ -155,7 +171,7 @@ public:
/// or an empty string if no name has been
/// specified in the constructor.
static ThreadPool& defaultPool();
static ThreadPool& defaultPool(ThreadAffinityPolicy affinityPolicy = OS_DEFAULT);
/// Returns a reference to the default
/// thread pool.
@ -168,7 +184,7 @@ protected:
private:
ThreadPool(const ThreadPool& pool);
ThreadPool& operator = (const ThreadPool& pool);
int getCorrectCpu(int cpu);
typedef std::vector<PooledThread*> ThreadVec;
std::string _name;
@ -180,6 +196,8 @@ private:
int _stackSize;
ThreadVec _threads;
mutable FastMutex _mutex;
ThreadAffinityPolicy _affinityPolicy;
AtomicCounter _lastCpu;
};
@ -197,6 +215,15 @@ inline int ThreadPool::getStackSize() const
return _stackSize;
}
inline void ThreadPool::setAffinityPolicy(ThreadPool::ThreadAffinityPolicy affinityPolicy)
{
_affinityPolicy = affinityPolicy;
}
inline ThreadPool::ThreadAffinityPolicy ThreadPool::getAffinityPolicy()
{
return _affinityPolicy;
}
inline const std::string& ThreadPool::name() const
{

View File

@ -16,7 +16,6 @@
#include "Poco/TaskManager.h"
#include "Poco/TaskNotification.h"
#include "Poco/ThreadPool.h"
namespace Poco {
@ -25,8 +24,8 @@ namespace Poco {
const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds
TaskManager::TaskManager():
_threadPool(ThreadPool::defaultPool())
TaskManager::TaskManager(ThreadPool::ThreadAffinityPolicy affinityPolicy):
_threadPool(ThreadPool::defaultPool(affinityPolicy))
{
}
@ -42,7 +41,7 @@ TaskManager::~TaskManager()
}
void TaskManager::start(Task* pTask)
void TaskManager::start(Task* pTask, int cpu)
{
TaskPtr pAutoTask(pTask); // take ownership immediately
FastMutex::ScopedLock lock(_mutex);
@ -52,7 +51,7 @@ void TaskManager::start(Task* pTask)
_taskList.push_back(pAutoTask);
try
{
_threadPool.start(*pAutoTask, pAutoTask->name());
_threadPool.start(*pAutoTask, pAutoTask->name(), cpu);
}
catch (...)
{

View File

@ -36,9 +36,9 @@ public:
PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
~PooledThread();
void start();
void start(Thread::Priority priority, Runnable& target);
void start(Thread::Priority priority, Runnable& target, const std::string& name);
void start(int cpu = -1);
void start(Thread::Priority priority, Runnable& target, int cpu = -1);
void start(Thread::Priority priority, Runnable& target, const std::string& name, int cpu = -1);
bool idle();
int idleTime();
void join();
@ -85,14 +85,17 @@ PooledThread::~PooledThread()
}
void PooledThread::start()
void PooledThread::start(int cpu)
{
_thread.start(*this);
_started.wait();
if (cpu >= 0) {
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
void PooledThread::start(Thread::Priority priority, Runnable& target)
void PooledThread::start(Thread::Priority priority, Runnable& target, int cpu)
{
FastMutex::ScopedLock lock(_mutex);
@ -101,10 +104,13 @@ void PooledThread::start(Thread::Priority priority, Runnable& target)
_pTarget = &target;
_thread.setPriority(priority);
_targetReady.set();
if (cpu >= 0) {
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name, int cpu)
{
FastMutex::ScopedLock lock(_mutex);
@ -126,6 +132,9 @@ void PooledThread::start(Thread::Priority priority, Runnable& target, const std:
_pTarget = &target;
_targetReady.set();
if (cpu >= 0) {
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
@ -240,21 +249,31 @@ void PooledThread::run()
ThreadPool::ThreadPool(int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
int stackSize,
ThreadAffinityPolicy affinityPolicy):
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_affinityPolicy(affinityPolicy),
_lastCpu(0)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
int cpu = -1;
int cpuCount = Poco::Environment::processorCount();
for (int i = 0; i < _minCapacity; i++)
{
if (_affinityPolicy == UNIFORM_DISTRIBUTION) {
cpu = _lastCpu.value() % cpuCount;
_lastCpu++;
}
PooledThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
pThread->start(cpu);
}
}
@ -263,22 +282,31 @@ ThreadPool::ThreadPool(const std::string& name,
int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
int stackSize,
ThreadAffinityPolicy affinityPolicy):
_name(name),
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_affinityPolicy(affinityPolicy),
_lastCpu(0)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
int cpu = -1;
int cpuCount = Poco::Environment::processorCount();
for (int i = 0; i < _minCapacity; i++)
{
if (_affinityPolicy == UNIFORM_DISTRIBUTION) {
cpu = _lastCpu.value() % cpuCount;
_lastCpu++;
}
PooledThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
pThread->start(cpu);
}
}
@ -347,27 +375,52 @@ int ThreadPool::allocated() const
}
void ThreadPool::start(Runnable& target)
int ThreadPool::getCorrectCpu(int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target);
switch (static_cast<int>(_affinityPolicy)) {
case UNIFORM_DISTRIBUTION:
{
cpu = _lastCpu.value() % Environment::processorCount();
_lastCpu++;
}
break;
case OS_DEFAULT:
{
cpu = -1;
}
break;
case CUSTOM:
{
if ((cpu < -1) || (cpu >= Environment::processorCount())) {
throw InvalidArgumentException("cpu argument is invalid");
}
}
break;
}
return cpu;
}
void ThreadPool::start(Runnable& target, int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target, getCorrectCpu(cpu));
}
void ThreadPool::start(Runnable& target, const std::string& name)
void ThreadPool::start(Runnable& target, const std::string& name, int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target, name);
getThread()->start(Thread::PRIO_NORMAL, target, name, getCorrectCpu(cpu));
}
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, int cpu)
{
getThread()->start(priority, target);
getThread()->start(priority, target, getCorrectCpu(cpu));
}
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name, int cpu)
{
getThread()->start(priority, target, name);
getThread()->start(priority, target, name, getCorrectCpu(cpu));
}
@ -498,13 +551,14 @@ public:
{
delete _pPool;
}
ThreadPool* pool()
ThreadPool* pool(ThreadPool::ThreadAffinityPolicy affinityPolicy = ThreadPool::OS_DEFAULT)
{
FastMutex::ScopedLock lock(_mutex);
if (!_pPool)
{
_pPool = new ThreadPool("default");
_pPool->setAffinityPolicy(affinityPolicy);
if (POCO_THREAD_STACK_SIZE > 0)
_pPool->setStackSize(POCO_THREAD_STACK_SIZE);
}
@ -523,9 +577,9 @@ namespace
}
ThreadPool& ThreadPool::defaultPool()
ThreadPool& ThreadPool::defaultPool(ThreadAffinityPolicy affinityPolicy)
{
return *sh.pool();
return *sh.pool(affinityPolicy);
}

View File

@ -246,7 +246,7 @@ TaskManagerTest::~TaskManagerTest()
void TaskManagerTest::testFinish()
{
TaskManager tm;
TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
@ -281,7 +281,7 @@ void TaskManagerTest::testFinish()
void TaskManagerTest::testCancel()
{
TaskManager tm;
TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
@ -315,7 +315,7 @@ void TaskManagerTest::testCancel()
void TaskManagerTest::testError()
{
TaskManager tm;
TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
@ -348,7 +348,7 @@ void TaskManagerTest::testError()
void TaskManagerTest::testCustom()
{
TaskManager tm;
TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
CustomTaskObserver<int> ti(0);
tm.addObserver(
@ -431,7 +431,7 @@ void TaskManagerTest::testCustom()
void TaskManagerTest::testMultiTasks()
{
TaskManager tm;
TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
tm.start(new SimpleTask);
tm.start(new SimpleTask);
tm.start(new SimpleTask);
@ -447,7 +447,7 @@ void TaskManagerTest::testMultiTasks()
void TaskManagerTest::testCustomThreadPool()
{
ThreadPool tp(2, 5, 120);
ThreadPool tp(2, 5, 120, POCO_THREAD_STACK_SIZE, ThreadPool::UNIFORM_DISTRIBUTION);
TaskManager tm(tp);
// fill up the thread pool

View File

@ -35,9 +35,13 @@ ThreadPoolTest::~ThreadPoolTest()
}
void ThreadPoolTest::testThreadPool()
void ThreadPoolTest::startThreadPoolTest(int affinityPolicy)
{
ThreadPool pool(2, 3, 3);
int cpu = -1;
if (affinityPolicy == static_cast<int>(ThreadPool::CUSTOM)) {
cpu = 0;
}
ThreadPool pool(2, 3, 3, POCO_THREAD_STACK_SIZE, static_cast<ThreadPool::ThreadAffinityPolicy>(affinityPolicy));
pool.setStackSize(1);
assert (pool.allocated() == 2);
@ -51,25 +55,25 @@ void ThreadPoolTest::testThreadPool()
assert (pool.available() == 4);
RunnableAdapter<ThreadPoolTest> ra(*this, &ThreadPoolTest::count);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
assert (pool.available() == 2);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 3);
assert (pool.used() == 3);
assert (pool.capacity() == 4);
assert (pool.available() == 1);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 4);
assert (pool.used() == 4);
assert (pool.capacity() == 4);
@ -77,7 +81,7 @@ void ThreadPoolTest::testThreadPool()
try
{
pool.start(ra);
pool.start(ra, cpu);
failmsg("thread pool exhausted - must throw exception");
}
catch (Poco::NoThreadAvailableException&)
@ -108,13 +112,13 @@ void ThreadPoolTest::testThreadPool()
_count = 0;
_event.reset();
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
@ -130,6 +134,20 @@ void ThreadPoolTest::testThreadPool()
assert (pool.available() == 4);
}
void ThreadPoolTest::testThreadPool()
{
startThreadPoolTest(Poco::ThreadPool::OS_DEFAULT);
}
void ThreadPoolTest::testThreadPoolUniformDistribution()
{
startThreadPoolTest(Poco::ThreadPool::UNIFORM_DISTRIBUTION);
}
void ThreadPoolTest::testThreadPoolCustomDistribution()
{
startThreadPoolTest(Poco::ThreadPool::CUSTOM);
}
void ThreadPoolTest::setUp()
{
@ -160,6 +178,8 @@ CppUnit::Test* ThreadPoolTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ThreadPoolTest");
CppUnit_addTest(pSuite, ThreadPoolTest, testThreadPool);
CppUnit_addTest(pSuite, ThreadPoolTest, testThreadPoolUniformDistribution);
CppUnit_addTest(pSuite, ThreadPoolTest, testThreadPoolCustomDistribution);
return pSuite;
}

View File

@ -29,6 +29,8 @@ public:
~ThreadPoolTest();
void testThreadPool();
void testThreadPoolUniformDistribution();
void testThreadPoolCustomDistribution();
void setUp();
void tearDown();
@ -39,6 +41,7 @@ protected:
void count();
private:
void startThreadPoolTest(int affinityPolicy);
Poco::FastMutex _mutex;
Poco::Event _event;
int _count;