add thread affinity policy to threadpool

there are OS_DEFAULT, UNIFORM_DISTRIBUTION and CUSTOM
With custom policy we can run thread on specified cpu
This commit is contained in:
ale_bychuk 2015-03-11 23:39:21 +03:00
parent 70eabf06e6
commit 4c1fe9ef02
4 changed files with 131 additions and 43 deletions

View File

@ -32,7 +32,6 @@ namespace Poco {
class Runnable;
class PooledThread;
class Foundation_API ThreadPool
/// A thread pool always keeps a number of threads running, ready
/// to accept work.
@ -49,28 +48,39 @@ class Foundation_API ThreadPool
/// from the pool.
{
public:
typedef enum _ThreadAffinityPolicy
{
OS_DEFAULT = 0,
UNIFORM_DISTRIBUTION,
CUSTOM
} ThreadAffinityPolicy;
ThreadPool(int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE);
int stackSize = POCO_THREAD_STACK_SIZE,
ThreadAffinityPolicy affinityPolicy = OS_DEFAULT);
/// Creates a thread pool with minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
/// If a thread is running idle for more than idleTime seconds,
/// and more than minCapacity threads are running, the thread
/// is killed. Threads are created with given stack size.
/// Threads are created with given affinity Policy
ThreadPool(const std::string& name,
int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE);
int stackSize = POCO_THREAD_STACK_SIZE,
ThreadAffinityPolicy affinityPolicy = OS_DEFAULT);
/// Creates a thread pool with the given name and minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
/// If a thread is running idle for more than idleTime seconds,
/// and more than minCapacity threads are running, the thread
/// is killed. Threads are created with given stack size.
/// Threads are created with given affinity Policy
~ThreadPool();
/// Currently running threads will remain active
@ -99,24 +109,24 @@ public:
int available() const;
/// Returns the number available threads.
void start(Runnable& target);
/// Obtains a thread and starts the target.
void start(Runnable& target, int cpu = -1);
/// Obtains a thread and starts the target on specified cpu.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
void start(Runnable& target, const std::string& name);
/// Obtains a thread and starts the target.
void start(Runnable& target, const std::string& name, int cpu = -1);
/// Obtains a thread and starts the target on specified cpu.
/// Assigns the given name to the thread.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
void startWithPriority(Thread::Priority priority, Runnable& target);
/// Obtains a thread, adjusts the thread's priority, and starts the target.
void startWithPriority(Thread::Priority priority, Runnable& target, int cpu = -1);
/// Obtains a thread, adjusts the thread's priority, and starts the target on specified cpu.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name);
/// Obtains a thread, adjusts the thread's priority, and starts the target.
void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name, int cpu = -1);
/// Obtains a thread, adjusts the thread's priority, and starts the target on specified cpu.
/// Assigns the given name to the thread.
/// Throws a NoThreadAvailableException if no more
/// threads are available.
@ -168,7 +178,7 @@ protected:
private:
ThreadPool(const ThreadPool& pool);
ThreadPool& operator = (const ThreadPool& pool);
int getCorrectCpu(int cpu);
typedef std::vector<PooledThread*> ThreadVec;
std::string _name;
@ -180,6 +190,8 @@ private:
int _stackSize;
ThreadVec _threads;
mutable FastMutex _mutex;
ThreadAffinityPolicy _affinityPolicy;
AtomicCounter _lastCpu;
};

View File

@ -36,9 +36,9 @@ public:
PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
~PooledThread();
void start();
void start(Thread::Priority priority, Runnable& target);
void start(Thread::Priority priority, Runnable& target, const std::string& name);
void start(int cpu = -1);
void start(Thread::Priority priority, Runnable& target, int cpu = -1);
void start(Thread::Priority priority, Runnable& target, const std::string& name, int cpu = -1);
bool idle();
int idleTime();
void join();
@ -85,14 +85,17 @@ PooledThread::~PooledThread()
}
void PooledThread::start()
void PooledThread::start(int cpu)
{
_thread.start(*this);
_started.wait();
if (cpu >= 0) {
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
void PooledThread::start(Thread::Priority priority, Runnable& target)
void PooledThread::start(Thread::Priority priority, Runnable& target, int cpu)
{
FastMutex::ScopedLock lock(_mutex);
@ -101,10 +104,13 @@ void PooledThread::start(Thread::Priority priority, Runnable& target)
_pTarget = &target;
_thread.setPriority(priority);
_targetReady.set();
if (cpu >= 0) {
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name, int cpu)
{
FastMutex::ScopedLock lock(_mutex);
@ -126,6 +132,9 @@ void PooledThread::start(Thread::Priority priority, Runnable& target, const std:
_pTarget = &target;
_targetReady.set();
if (cpu >= 0) {
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
@ -239,21 +248,31 @@ void PooledThread::run()
ThreadPool::ThreadPool(int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
int stackSize,
ThreadAffinityPolicy affinityPolicy):
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_affinityPolicy(affinityPolicy),
_lastCpu(0)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
int cpu = -1;
int cpuCount = Poco::Environment::processorCount();
for (int i = 0; i < _minCapacity; i++)
{
if (_affinityPolicy == UNIFORM_DISTRIBUTION) {
cpu = _lastCpu.value() % cpuCount;
_lastCpu++;
}
PooledThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
pThread->start(cpu);
}
}
@ -262,22 +281,31 @@ ThreadPool::ThreadPool(const std::string& name,
int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
int stackSize,
ThreadAffinityPolicy affinityPolicy):
_name(name),
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
_stackSize(stackSize),
_affinityPolicy(affinityPolicy),
_lastCpu(0)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
int cpu = -1;
int cpuCount = Poco::Environment::processorCount();
for (int i = 0; i < _minCapacity; i++)
{
if (_affinityPolicy == UNIFORM_DISTRIBUTION) {
cpu = _lastCpu.value() % cpuCount;
_lastCpu++;
}
PooledThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
pThread->start(cpu);
}
}
@ -346,27 +374,52 @@ int ThreadPool::allocated() const
}
void ThreadPool::start(Runnable& target)
int ThreadPool::getCorrectCpu(int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target);
switch (static_cast<int>(_affinityPolicy)) {
case UNIFORM_DISTRIBUTION:
{
cpu = _lastCpu.value() % Environment::processorCount();
_lastCpu++;
}
break;
case OS_DEFAULT:
{
cpu = -1;
}
break;
case CUSTOM:
{
if ((cpu < -1) || (cpu >= Environment::processorCount())) {
throw InvalidArgumentException("cpu argument is invalid");
}
}
break;
}
return cpu;
}
void ThreadPool::start(Runnable& target, int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target, getCorrectCpu(cpu));
}
void ThreadPool::start(Runnable& target, const std::string& name)
void ThreadPool::start(Runnable& target, const std::string& name, int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target, name);
getThread()->start(Thread::PRIO_NORMAL, target, name, getCorrectCpu(cpu));
}
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, int cpu)
{
getThread()->start(priority, target);
getThread()->start(priority, target, getCorrectCpu(cpu));
}
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name, int cpu)
{
getThread()->start(priority, target, name);
getThread()->start(priority, target, name, getCorrectCpu(cpu));
}

View File

@ -35,9 +35,13 @@ ThreadPoolTest::~ThreadPoolTest()
}
void ThreadPoolTest::testThreadPool()
void ThreadPoolTest::startThreadPoolTest(int affinityPolicy)
{
ThreadPool pool(2, 3, 3);
int cpu = -1;
if (affinityPolicy == static_cast<int>(ThreadPool::CUSTOM)) {
cpu = 0;
}
ThreadPool pool(2, 3, 3, POCO_THREAD_STACK_SIZE, static_cast<ThreadPool::ThreadAffinityPolicy>(affinityPolicy));
pool.setStackSize(1);
assert (pool.allocated() == 2);
@ -51,25 +55,25 @@ void ThreadPoolTest::testThreadPool()
assert (pool.available() == 4);
RunnableAdapter<ThreadPoolTest> ra(*this, &ThreadPoolTest::count);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
assert (pool.available() == 2);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 3);
assert (pool.used() == 3);
assert (pool.capacity() == 4);
assert (pool.available() == 1);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 4);
assert (pool.used() == 4);
assert (pool.capacity() == 4);
@ -77,7 +81,7 @@ void ThreadPoolTest::testThreadPool()
try
{
pool.start(ra);
pool.start(ra, cpu);
failmsg("thread pool exhausted - must throw exception");
}
catch (Poco::NoThreadAvailableException&)
@ -108,13 +112,13 @@ void ThreadPoolTest::testThreadPool()
_count = 0;
_event.reset();
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);
pool.start(ra);
pool.start(ra, cpu);
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
@ -127,9 +131,23 @@ void ThreadPoolTest::testThreadPool()
assert (pool.allocated() == 2);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);
assert (pool.available() == 4);
}
void ThreadPoolTest::testThreadPool()
{
startThreadPoolTest(Poco::ThreadPool::OS_DEFAULT);
}
void ThreadPoolTest::testThreadPoolUniformDistribution()
{
startThreadPoolTest(Poco::ThreadPool::UNIFORM_DISTRIBUTION);
}
void ThreadPoolTest::testThreadPoolCustomDistribution()
{
startThreadPoolTest(Poco::ThreadPool::CUSTOM);
}
void ThreadPoolTest::setUp()
{
@ -160,6 +178,8 @@ CppUnit::Test* ThreadPoolTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ThreadPoolTest");
CppUnit_addTest(pSuite, ThreadPoolTest, testThreadPool);
CppUnit_addTest(pSuite, ThreadPoolTest, testThreadPoolUniformDistribution);
CppUnit_addTest(pSuite, ThreadPoolTest, testThreadPoolCustomDistribution);
return pSuite;
}

View File

@ -29,7 +29,9 @@ public:
~ThreadPoolTest();
void testThreadPool();
void testThreadPoolUniformDistribution();
void testThreadPoolCustomDistribution();
void setUp();
void tearDown();
@ -39,6 +41,7 @@ protected:
void count();
private:
void startThreadPoolTest(int affinityPolicy);
Poco::FastMutex _mutex;
Poco::Event _event;
int _count;