Add thread affinity support to taskmanager

This commit is contained in:
ale_bychuk
2015-03-12 16:12:33 +03:00
parent 4c1fe9ef02
commit 9d4d3e41dd
5 changed files with 35 additions and 20 deletions

View File

@@ -26,6 +26,7 @@
#include "Poco/AutoPtr.h" #include "Poco/AutoPtr.h"
#include "Poco/NotificationCenter.h" #include "Poco/NotificationCenter.h"
#include "Poco/Timestamp.h" #include "Poco/Timestamp.h"
#include "Poco/ThreadPool.h"
#include <list> #include <list>
@@ -33,7 +34,6 @@ namespace Poco {
class Notification; class Notification;
class ThreadPool;
class Exception; class Exception;
@@ -52,7 +52,7 @@ public:
typedef AutoPtr<Task> TaskPtr; typedef AutoPtr<Task> TaskPtr;
typedef std::list<TaskPtr> TaskList; typedef std::list<TaskPtr> TaskList;
TaskManager(); TaskManager(ThreadPool::ThreadAffinityPolicy affinityPolicy = ThreadPool::OS_DEFAULT);
/// Creates the TaskManager, using the /// Creates the TaskManager, using the
/// default ThreadPool. /// default ThreadPool.
@@ -63,10 +63,10 @@ public:
~TaskManager(); ~TaskManager();
/// Destroys the TaskManager. /// Destroys the TaskManager.
void start(Task* pTask); void start(Task* pTask, int cpu = -1);
/// Starts the given task in a thread obtained /// Starts the given task in a thread obtained
/// from the thread pool. /// from the thread pool,
/// /// on specified cpu.
/// The TaskManager takes ownership of the Task object /// The TaskManager takes ownership of the Task object
/// and deletes it when it it finished. /// and deletes it when it it finished.

View File

@@ -100,6 +100,12 @@ public:
int getStackSize() const; int getStackSize() const;
/// Returns the stack size used to create new threads. /// Returns the stack size used to create new threads.
void setAffinityPolicy(ThreadAffinityPolicy affinityPolicy);
/// Sets the thread affinity policy for newly created threads
ThreadAffinityPolicy getAffinityPolicy();
/// Returns the thread affinity policy used to create new thread
int used() const; int used() const;
/// Returns the number of currently used threads. /// Returns the number of currently used threads.
@@ -165,7 +171,7 @@ public:
/// or an empty string if no name has been /// or an empty string if no name has been
/// specified in the constructor. /// specified in the constructor.
static ThreadPool& defaultPool(); static ThreadPool& defaultPool(ThreadAffinityPolicy affinityPolicy = OS_DEFAULT);
/// Returns a reference to the default /// Returns a reference to the default
/// thread pool. /// thread pool.
@@ -209,6 +215,15 @@ inline int ThreadPool::getStackSize() const
return _stackSize; return _stackSize;
} }
inline void ThreadPool::setAffinityPolicy(ThreadPool::ThreadAffinityPolicy affinityPolicy)
{
_affinityPolicy = affinityPolicy;
}
inline ThreadPool::ThreadAffinityPolicy ThreadPool::getAffinityPolicy()
{
return _affinityPolicy;
}
inline const std::string& ThreadPool::name() const inline const std::string& ThreadPool::name() const
{ {

View File

@@ -16,7 +16,6 @@
#include "Poco/TaskManager.h" #include "Poco/TaskManager.h"
#include "Poco/TaskNotification.h" #include "Poco/TaskNotification.h"
#include "Poco/ThreadPool.h"
namespace Poco { namespace Poco {
@@ -25,8 +24,8 @@ namespace Poco {
const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds const int TaskManager::MIN_PROGRESS_NOTIFICATION_INTERVAL = 100000; // 100 milliseconds
TaskManager::TaskManager(): TaskManager::TaskManager(ThreadPool::ThreadAffinityPolicy affinityPolicy):
_threadPool(ThreadPool::defaultPool()) _threadPool(ThreadPool::defaultPool(affinityPolicy))
{ {
} }
@@ -42,7 +41,7 @@ TaskManager::~TaskManager()
} }
void TaskManager::start(Task* pTask) void TaskManager::start(Task* pTask, int cpu)
{ {
TaskPtr pAutoTask(pTask); // take ownership immediately TaskPtr pAutoTask(pTask); // take ownership immediately
FastMutex::ScopedLock lock(_mutex); FastMutex::ScopedLock lock(_mutex);
@@ -52,7 +51,7 @@ void TaskManager::start(Task* pTask)
_taskList.push_back(pAutoTask); _taskList.push_back(pAutoTask);
try try
{ {
_threadPool.start(*pAutoTask, pAutoTask->name()); _threadPool.start(*pAutoTask, pAutoTask->name(), cpu);
} }
catch (...) catch (...)
{ {

View File

@@ -550,13 +550,14 @@ public:
{ {
delete _pPool; delete _pPool;
} }
ThreadPool* pool() ThreadPool* pool(ThreadPool::ThreadAffinityPolicy affinityPolicy = ThreadPool::OS_DEFAULT)
{ {
FastMutex::ScopedLock lock(_mutex); FastMutex::ScopedLock lock(_mutex);
if (!_pPool) if (!_pPool)
{ {
_pPool = new ThreadPool("default"); _pPool = new ThreadPool("default");
_pPool->setAffinityPolicy(affinityPolicy);
if (POCO_THREAD_STACK_SIZE > 0) if (POCO_THREAD_STACK_SIZE > 0)
_pPool->setStackSize(POCO_THREAD_STACK_SIZE); _pPool->setStackSize(POCO_THREAD_STACK_SIZE);
} }
@@ -575,9 +576,9 @@ namespace
} }
ThreadPool& ThreadPool::defaultPool() ThreadPool& ThreadPool::defaultPool(ThreadAffinityPolicy affinityPolicy)
{ {
return *sh.pool(); return *sh.pool(affinityPolicy);
} }

View File

@@ -246,7 +246,7 @@ TaskManagerTest::~TaskManagerTest()
void TaskManagerTest::testFinish() void TaskManagerTest::testFinish()
{ {
TaskManager tm; TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
TaskObserver to; TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted)); tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled)); tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
@@ -281,7 +281,7 @@ void TaskManagerTest::testFinish()
void TaskManagerTest::testCancel() void TaskManagerTest::testCancel()
{ {
TaskManager tm; TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
TaskObserver to; TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted)); tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled)); tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
@@ -315,7 +315,7 @@ void TaskManagerTest::testCancel()
void TaskManagerTest::testError() void TaskManagerTest::testError()
{ {
TaskManager tm; TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
TaskObserver to; TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted)); tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled)); tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
@@ -348,7 +348,7 @@ void TaskManagerTest::testError()
void TaskManagerTest::testCustom() void TaskManagerTest::testCustom()
{ {
TaskManager tm; TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
CustomTaskObserver<int> ti(0); CustomTaskObserver<int> ti(0);
tm.addObserver( tm.addObserver(
@@ -431,7 +431,7 @@ void TaskManagerTest::testCustom()
void TaskManagerTest::testMultiTasks() void TaskManagerTest::testMultiTasks()
{ {
TaskManager tm; TaskManager tm(ThreadPool::UNIFORM_DISTRIBUTION);
tm.start(new SimpleTask); tm.start(new SimpleTask);
tm.start(new SimpleTask); tm.start(new SimpleTask);
tm.start(new SimpleTask); tm.start(new SimpleTask);
@@ -447,7 +447,7 @@ void TaskManagerTest::testMultiTasks()
void TaskManagerTest::testCustomThreadPool() void TaskManagerTest::testCustomThreadPool()
{ {
ThreadPool tp(2, 5, 120); ThreadPool tp(2, 5, 120, POCO_THREAD_STACK_SIZE, ThreadPool::UNIFORM_DISTRIBUTION);
TaskManager tm(tp); TaskManager tm(tp);
// fill up the thread pool // fill up the thread pool