style and interface fixes for thread affinity

This commit is contained in:
Guenter Obiltschnig
2015-03-18 16:40:22 +01:00
parent 8b96fd4a33
commit 2b1301b3e3
12 changed files with 217 additions and 174 deletions

View File

@@ -89,7 +89,8 @@ void PooledThread::start(int cpu)
{
_thread.start(*this);
_started.wait();
if (cpu >= 0) {
if (cpu >= 0)
{
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
@@ -104,7 +105,8 @@ void PooledThread::start(Thread::Priority priority, Runnable& target, int cpu)
_pTarget = &target;
_thread.setPriority(priority);
_targetReady.set();
if (cpu >= 0) {
if (cpu >= 0)
{
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
@@ -132,7 +134,8 @@ void PooledThread::start(Thread::Priority priority, Runnable& target, const std:
_pTarget = &target;
_targetReady.set();
if (cpu >= 0) {
if (cpu >= 0)
{
_thread.setAffinity(static_cast<unsigned>(cpu));
}
}
@@ -267,7 +270,8 @@ ThreadPool::ThreadPool(int minCapacity,
for (int i = 0; i < _minCapacity; i++)
{
if (_affinityPolicy == UNIFORM_DISTRIBUTION) {
if (_affinityPolicy == TAP_UNIFORM_DISTRIBUTION)
{
cpu = _lastCpu.value() % cpuCount;
_lastCpu++;
}
@@ -300,7 +304,8 @@ ThreadPool::ThreadPool(const std::string& name,
int cpuCount = Poco::Environment::processorCount();
for (int i = 0; i < _minCapacity; i++)
{
if (_affinityPolicy == UNIFORM_DISTRIBUTION) {
if (_affinityPolicy == TAP_UNIFORM_DISTRIBUTION)
{
cpu = _lastCpu.value() % cpuCount;
_lastCpu++;
}
@@ -375,23 +380,25 @@ int ThreadPool::allocated() const
}
int ThreadPool::getCorrectCpu(int cpu)
int ThreadPool::affinity(int cpu)
{
switch (static_cast<int>(_affinityPolicy)) {
case UNIFORM_DISTRIBUTION:
switch (static_cast<int>(_affinityPolicy))
{
case TAP_UNIFORM_DISTRIBUTION:
{
cpu = _lastCpu.value() % Environment::processorCount();
_lastCpu++;
}
break;
case OS_DEFAULT:
case TAP_DEFAULT:
{
cpu = -1;
}
break;
case CUSTOM:
case TAP_CUSTOM:
{
if ((cpu < -1) || (cpu >= Environment::processorCount())) {
if ((cpu < -1) || (cpu >= Environment::processorCount()))
{
throw InvalidArgumentException("cpu argument is invalid");
}
}
@@ -400,27 +407,28 @@ int ThreadPool::getCorrectCpu(int cpu)
return cpu;
}
void ThreadPool::start(Runnable& target, int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target, getCorrectCpu(cpu));
getThread()->start(Thread::PRIO_NORMAL, target, affinity(cpu));
}
void ThreadPool::start(Runnable& target, const std::string& name, int cpu)
{
getThread()->start(Thread::PRIO_NORMAL, target, name, getCorrectCpu(cpu));
getThread()->start(Thread::PRIO_NORMAL, target, name, affinity(cpu));
}
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, int cpu)
{
getThread()->start(priority, target, getCorrectCpu(cpu));
getThread()->start(priority, target, affinity(cpu));
}
void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name, int cpu)
{
getThread()->start(priority, target, name, getCorrectCpu(cpu));
getThread()->start(priority, target, name, affinity(cpu));
}
@@ -518,14 +526,14 @@ PooledThread* ThreadPool::getThread()
{
pThread->start();
_threads.push_back(pThread);
} catch (...)
}
catch (...)
{
delete pThread;
throw;
}
}
else
throw NoThreadAvailableException();
else throw NoThreadAvailableException();
}
pThread->activate();
return pThread;
@@ -547,11 +555,13 @@ public:
{
_pPool = 0;
}
~ThreadPoolSingletonHolder()
{
delete _pPool;
}
ThreadPool* pool(ThreadPool::ThreadAffinityPolicy affinityPolicy = ThreadPool::OS_DEFAULT)
ThreadPool* pool(ThreadPool::ThreadAffinityPolicy affinityPolicy = ThreadPool::TAP_DEFAULT)
{
FastMutex::ScopedLock lock(_mutex);

View File

@@ -35,6 +35,8 @@
# include <mach/task.h>
# include <mach/thread_policy.h>
#endif
//
// Block SIGPIPE in main thread.
//
@@ -55,6 +57,7 @@ public:
}
};
static SignalBlocker signalBlocker;
}
#endif
@@ -183,7 +186,8 @@ void ThreadImpl::setStackSizeImpl(int size)
#endif
}
void ThreadImpl::setAffinityImpl(unsigned cpu)
void ThreadImpl::setAffinityImpl(int cpu)
{
#if defined (POCO_OS_FAMILY_UNIX) && POCO_OS != POCO_OS_MAC_OS_X
#ifdef HAVE_PTHREAD_SETAFFINITY_NP
@@ -203,8 +207,8 @@ void ThreadImpl::setAffinityImpl(unsigned cpu)
#endif // defined unix & !defined mac os x
#if POCO_OS == POCO_OS_MAC_OS_X
kern_return_t ret;
thread_affinity_policy policy;
kern_return_t ret;
thread_affinity_policy policy;
policy.affinity_tag = cpu;
ret = thread_policy_set(pthread_mach_thread_np(_pData->thread),
@@ -219,10 +223,11 @@ void ThreadImpl::setAffinityImpl(unsigned cpu)
yieldImpl();
}
unsigned ThreadImpl::getAffinityImpl() const
int ThreadImpl::getAffinityImpl() const
{
unsigned cpuSet = 0;
unsigned cpuCount = Environment::processorCount();
int cpuSet = -1;
int cpuCount = Environment::processorCount();
#if defined (POCO_OS_FAMILY_UNIX) && POCO_OS != POCO_OS_MAC_OS_X
#ifdef HAVE_PTHREAD_SETAFFINITY_NP
cpu_set_t cpuset;
@@ -234,7 +239,7 @@ unsigned ThreadImpl::getAffinityImpl() const
if (pthread_getaffinity_np(_pData->thread, &cpuset) != 0)
throw SystemException("Failed to get affinity", errno);
#endif
for (unsigned i = 0; i < cpuCount; i++)
for (int i = 0; i < cpuCount; i++)
{
if (CPU_ISSET(i, &cpuset))
{
@@ -248,10 +253,10 @@ unsigned ThreadImpl::getAffinityImpl() const
#endif // defined unix & !defined mac os x
#if POCO_OS == POCO_OS_MAC_OS_X
kern_return_t ret;
thread_affinity_policy policy;
kern_return_t ret;
thread_affinity_policy policy;
mach_msg_type_number_t count = THREAD_AFFINITY_POLICY_COUNT;
boolean_t get_default = FALSE;
boolean_t get_default = false;
ret = thread_policy_get(pthread_mach_thread_np(_pData->thread),
THREAD_AFFINITY_POLICY,
(thread_policy_t)&policy,
@@ -263,12 +268,13 @@ unsigned ThreadImpl::getAffinityImpl() const
}
cpuSet = policy.affinity_tag;
if (cpuSet >= cpuCount)
cpuSet = 0;
cpuSet = -1;
#endif
return cpuSet;
}
void ThreadImpl::startImpl(SharedPtr<Runnable> pTarget)
{
if (_pData->pRunnableTarget)

View File

@@ -72,7 +72,8 @@ ThreadImpl::ThreadImpl():
_thread(0),
_threadId(0),
_prio(PRIO_NORMAL_IMPL),
_stackSize(POCO_THREAD_STACK_SIZE)
_stackSize(POCO_THREAD_STACK_SIZE),
_cpu(-1)
{
}
@@ -102,7 +103,8 @@ void ThreadImpl::setOSPriorityImpl(int prio, int /* policy */)
setPriorityImpl(prio);
}
void ThreadImpl::setAffinityImpl(unsigned cpu)
void ThreadImpl::setAffinityImpl(int cpu)
{
DWORD mask = 1;
mask <<= cpu;
@@ -110,13 +112,16 @@ void ThreadImpl::setAffinityImpl(unsigned cpu)
{
throw SystemException("Failed to set affinity");
}
_cpu = cpu;
}
unsigned ThreadImpl::getAffinityImpl() const
int ThreadImpl::getAffinityImpl() const
{
throw Poco::NotImplementedException("Get thread affinity not supported on this system");
return _cpu;
}
void ThreadImpl::startImpl(SharedPtr<Runnable> pTarget)
{
if (isRunningImpl())