mirror of
https://github.com/pocoproject/poco.git
synced 2024-12-12 10:13:51 +01:00
enh(Poco::ActiveThreadPool): make it easy to use correctly (#4624)
* make Poco::ActiveThreadPool easy to use (#4544)
* code format
* Fix ThreadSanitizer thread leak error
* enh(ActivePooledThread): Change pointers to references
* enh(ActivePooledThread): remove unused method
* enh(Poco::ActiveThreadPool): Use std::unique_ptr instead of raw pointer
* enh(Poco::ActiveThreadPool): Use C++ static_cast instead of C casting
* enh(Poco::ActiveThreadPool): Use standard containers instead of implementing own
* enh(Poco::ActiveThreadPool): Change pointer to reference
* enh(Poco::ActiveThreadPool): Use smart pointers instead of bare pointers
* enh(Poco::ActiveThreadPool): Fix codeql warning: A stack address which arrived via a may be assigned to a non-local variable.
* enh(Poco::ActiveThreadPool): More test case
* enh(Poco::ActiveThreadPool): std::optional::value unavailable on earlier macOS versions
* enh(Poco::ActiveThreadPool): Fix compare function for make heap
* enh(Poco::ActiveThreadPool): Add more test case
* enh(Poco::ActiveThreadPool): Add more test case
* enh(Poco::ActiveThreadPool): Code style
* enh(Poco::ActiveThreadPool): Test case
* enh(Poco::ActiveThreadPool): Test case
* enh(Poco::ActiveThreadPool): Fix test case error
* Revert "enh(Poco::ActiveThreadPool): std::optional::value unavailable on earlier macOS versions"
This reverts commit cba4673b47
.
* enh(macOS): require min deployment macOS version 10.15 which has full support for C++17
* enh(Poco::ActiveThreadPool): Remove useless "{}"
* enh(Poco::ActiveThreadPool): Rename member variable m_impl to _impl
---------
Co-authored-by: Matej Kenda <matejken@gmail.com>
This commit is contained in:
parent
aa8084c6a0
commit
73df3689bf
@ -20,38 +20,35 @@
|
||||
|
||||
#include "Poco/Foundation.h"
|
||||
#include "Poco/Thread.h"
|
||||
#include "Poco/Mutex.h"
|
||||
#include "Poco/Environment.h"
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace Poco {
|
||||
|
||||
|
||||
class Runnable;
|
||||
class ActiveThread;
|
||||
class ActiveThreadPoolPrivate;
|
||||
|
||||
|
||||
class Foundation_API ActiveThreadPool
|
||||
/// A thread pool always keeps a number of threads running, ready
|
||||
/// to accept work.
|
||||
/// Threads in an active thread pool are re-used
|
||||
/// Every thread in the pool has own notification-queue with Runnable
|
||||
/// Every Runnable executes on next thread (round-robin model)
|
||||
/// The thread pool always keeps fixed number of threads running.
|
||||
/// A thread pool manages and recycles individual Poco::Thread objects
|
||||
/// to help reduce thread creation costs in programs that use threads.
|
||||
///
|
||||
/// The thread pool supports a task queue.
|
||||
/// When there are no idle threads, tasks are placed in the task queue to wait for execution.
|
||||
/// Use case for this pool is running many (more than os-max-thread-count) short live tasks
|
||||
/// Round-robin model allow efficiently utilize cpu cores
|
||||
{
|
||||
public:
|
||||
ActiveThreadPool(int capacity = static_cast<int>(Environment::processorCount()) + 1,
|
||||
int stackSize = POCO_THREAD_STACK_SIZE);
|
||||
/// Creates a thread pool with fixed capacity threads.
|
||||
/// Creates a thread pool with a maximum thread count of capacity.
|
||||
/// Threads are created with given stack size.
|
||||
|
||||
ActiveThreadPool(std::string name,
|
||||
ActiveThreadPool(const std::string& name,
|
||||
int capacity = static_cast<int>(Environment::processorCount()) + 1,
|
||||
int stackSize = POCO_THREAD_STACK_SIZE);
|
||||
/// Creates a thread pool with the given name and fixed capacity threads.
|
||||
/// Creates a thread pool with the given name and a maximum thread count of capacity.
|
||||
/// Threads are created with given stack size.
|
||||
|
||||
~ActiveThreadPool();
|
||||
@ -64,39 +61,19 @@ public:
|
||||
int getStackSize() const;
|
||||
/// Returns the stack size used to create new threads.
|
||||
|
||||
void start(Runnable& target);
|
||||
int expiryTimeout() const;
|
||||
/// Returns the thread expiry timeout value in milliseconds.
|
||||
/// The default expiryTimeout is 30000 milliseconds (30 seconds).
|
||||
|
||||
void setExpiryTimeout(int expiryTimeout);
|
||||
/// Set the thread expiry timeout value in milliseconds.
|
||||
/// The default expiryTimeout is 30000 milliseconds (30 seconds).
|
||||
|
||||
void start(Runnable& target, int priority = 0);
|
||||
/// Obtains a thread and starts the target.
|
||||
|
||||
void start(Runnable& target, const std::string& name);
|
||||
/// Obtains a thread and starts the target.
|
||||
/// Assigns the given name to the thread.
|
||||
|
||||
void startWithPriority(Thread::Priority priority, Runnable& target);
|
||||
/// Obtains a thread, adjusts the thread's priority, and starts the target.
|
||||
|
||||
void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name);
|
||||
/// Obtains a thread, adjusts the thread's priority, and starts the target.
|
||||
/// Assigns the given name to the thread.
|
||||
|
||||
void stopAll();
|
||||
/// Stops all running threads and waits for their completion.
|
||||
///
|
||||
/// Will also delete all thread objects.
|
||||
/// If used, this method should be the last action before
|
||||
/// the thread pool is deleted.
|
||||
///
|
||||
/// Note: If a thread fails to stop within 10 seconds
|
||||
/// (due to a programming error, for example), the
|
||||
/// underlying thread object will not be deleted and
|
||||
/// this method will return anyway. This allows for a
|
||||
/// more or less graceful shutdown in case of a misbehaving
|
||||
/// thread.
|
||||
|
||||
void joinAll();
|
||||
/// Waits for all threads to complete.
|
||||
///
|
||||
/// Note that this will join() underlying
|
||||
/// threads and restart them for next tasks.
|
||||
/// Waits for all threads to exit and removes all threads from the thread pool.
|
||||
|
||||
const std::string& name() const;
|
||||
/// Returns the name of the thread pool,
|
||||
@ -107,38 +84,14 @@ public:
|
||||
/// Returns a reference to the default
|
||||
/// thread pool.
|
||||
|
||||
protected:
|
||||
ActiveThread* getThread();
|
||||
ActiveThread* createThread();
|
||||
|
||||
private:
|
||||
ActiveThreadPool(const ActiveThreadPool& pool);
|
||||
ActiveThreadPool& operator = (const ActiveThreadPool& pool);
|
||||
|
||||
typedef std::vector<ActiveThread*> ThreadVec;
|
||||
|
||||
std::string _name;
|
||||
int _capacity;
|
||||
int _serial;
|
||||
int _stackSize;
|
||||
ThreadVec _threads;
|
||||
mutable FastMutex _mutex;
|
||||
std::atomic<size_t> _lastThreadIndex{0};
|
||||
private:
|
||||
std::unique_ptr<ActiveThreadPoolPrivate> _impl;
|
||||
};
|
||||
|
||||
|
||||
inline int ActiveThreadPool::getStackSize() const
|
||||
{
|
||||
return _stackSize;
|
||||
}
|
||||
|
||||
|
||||
inline const std::string& ActiveThreadPool::name() const
|
||||
{
|
||||
return _name;
|
||||
}
|
||||
|
||||
|
||||
} // namespace Poco
|
||||
|
||||
|
||||
|
@ -15,347 +15,490 @@
|
||||
#include "Poco/ActiveThreadPool.h"
|
||||
#include "Poco/Runnable.h"
|
||||
#include "Poco/Thread.h"
|
||||
#include "Poco/Event.h"
|
||||
#include "Poco/ThreadLocal.h"
|
||||
#include "Poco/ErrorHandler.h"
|
||||
#include "Poco/NotificationQueue.h"
|
||||
#include "Poco/Condition.h"
|
||||
#include "Poco/RefCountedObject.h"
|
||||
#include "Poco/AutoPtr.h"
|
||||
#include <sstream>
|
||||
#include <utility>
|
||||
#include <set>
|
||||
#include <list>
|
||||
#include <queue>
|
||||
#include <optional>
|
||||
|
||||
namespace Poco {
|
||||
|
||||
class NewActionNotification: public Notification
|
||||
|
||||
class RunnableList
|
||||
/// A list of the same priority runnables
|
||||
{
|
||||
public:
|
||||
using Ptr = AutoPtr<NewActionNotification>;
|
||||
|
||||
NewActionNotification(Thread::Priority priority, Runnable& runnable, const std::string& name) :
|
||||
_priority(priority),
|
||||
_runnable(runnable),
|
||||
_name(name)
|
||||
RunnableList(Runnable& target, int priority):
|
||||
_priority(priority)
|
||||
{
|
||||
push(target);
|
||||
}
|
||||
|
||||
~NewActionNotification() override = default;
|
||||
|
||||
Runnable& runnable() const
|
||||
{
|
||||
return _runnable;
|
||||
}
|
||||
|
||||
Thread::Priority priority() const
|
||||
int priority() const
|
||||
{
|
||||
return _priority;
|
||||
}
|
||||
|
||||
const std::string &threadName() const
|
||||
void push(Runnable& r)
|
||||
{
|
||||
return _name;
|
||||
_runnables.push_back(std::ref(r));
|
||||
}
|
||||
|
||||
std::string threadFullName() const
|
||||
Runnable& pop()
|
||||
{
|
||||
std::string fullName(_name);
|
||||
if (_name.empty())
|
||||
{
|
||||
fullName = _name;
|
||||
}
|
||||
else
|
||||
{
|
||||
fullName.append(" (");
|
||||
fullName.append(_name);
|
||||
fullName.append(")");
|
||||
}
|
||||
return fullName;
|
||||
auto r = _runnables.front();
|
||||
_runnables.pop_front();
|
||||
return r;
|
||||
}
|
||||
|
||||
bool empty() const
|
||||
{
|
||||
return _runnables.empty();
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<Thread::Priority> _priority;
|
||||
Runnable& _runnable;
|
||||
std::string _name;
|
||||
int _priority = 0;
|
||||
std::list<std::reference_wrapper<Runnable>> _runnables;
|
||||
};
|
||||
|
||||
class ActiveThread: public Runnable
|
||||
|
||||
struct RunnablePriorityCompare
|
||||
{
|
||||
// for make heap
|
||||
bool operator()(const std::shared_ptr<RunnableList>& left, const std::shared_ptr<RunnableList>& right) const
|
||||
{
|
||||
return left->priority() < right->priority();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class RunnablePriorityQueue
|
||||
/// A priority queue of runnables
|
||||
{
|
||||
public:
|
||||
ActiveThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
|
||||
~ActiveThread() override = default;
|
||||
void push(Runnable& target, int priority)
|
||||
{
|
||||
for (auto& q : _queues)
|
||||
{
|
||||
if (q->priority() == priority)
|
||||
{
|
||||
q->push(target);
|
||||
return;
|
||||
}
|
||||
}
|
||||
auto q = std::make_shared<RunnableList>(std::ref(target), priority);
|
||||
_queues.push_back(q);
|
||||
std::push_heap(_queues.begin(), _queues.end(), _comp);
|
||||
}
|
||||
|
||||
void start();
|
||||
void start(Thread::Priority priority, Runnable& target);
|
||||
void start(Thread::Priority priority, Runnable& target, const std::string& name);
|
||||
void join();
|
||||
void release();
|
||||
void run() override;
|
||||
Runnable& pop()
|
||||
{
|
||||
auto q = _queues.front();
|
||||
auto& r = q->pop();
|
||||
if (q->empty())
|
||||
{
|
||||
std::pop_heap(_queues.begin(), _queues.end(), _comp);
|
||||
_queues.pop_back();
|
||||
}
|
||||
return r;
|
||||
}
|
||||
|
||||
bool empty() const
|
||||
{
|
||||
return _queues.empty();
|
||||
}
|
||||
|
||||
private:
|
||||
NotificationQueue _pTargetQueue;
|
||||
std::string _name;
|
||||
Thread _thread;
|
||||
Event _targetCompleted;
|
||||
FastMutex _mutex;
|
||||
const long JOIN_TIMEOUT = 10000;
|
||||
std::atomic<bool> _needToStop{false};
|
||||
std::vector<std::shared_ptr<RunnableList>> _queues;
|
||||
RunnablePriorityCompare _comp;
|
||||
};
|
||||
|
||||
|
||||
ActiveThread::ActiveThread(const std::string& name, int stackSize):
|
||||
_name(name),
|
||||
_thread(name),
|
||||
_targetCompleted(Event::EVENT_MANUALRESET)
|
||||
class ActivePooledThread: public Runnable, public RefCountedObject
|
||||
{
|
||||
poco_assert_dbg (stackSize >= 0);
|
||||
_thread.setStackSize(stackSize);
|
||||
public:
|
||||
using Ptr = Poco::AutoPtr<ActivePooledThread>;
|
||||
|
||||
explicit ActivePooledThread(ActiveThreadPoolPrivate& pool);
|
||||
|
||||
void start();
|
||||
void join();
|
||||
bool isRunning() const;
|
||||
|
||||
void setRunnable(Runnable& target);
|
||||
void notifyRunnableReady();
|
||||
void registerThreadInactive();
|
||||
|
||||
virtual void run() override;
|
||||
|
||||
private:
|
||||
ActiveThreadPoolPrivate& _pool;
|
||||
std::optional<std::reference_wrapper<Runnable>> _target;
|
||||
Condition _runnableReady;
|
||||
Thread _thread;
|
||||
};
|
||||
|
||||
|
||||
class ActiveThreadPoolPrivate
|
||||
{
|
||||
public:
|
||||
ActiveThreadPoolPrivate(int capacity, int stackSize);
|
||||
ActiveThreadPoolPrivate(int capacity, int stackSize, const std::string& name);
|
||||
~ActiveThreadPoolPrivate();
|
||||
|
||||
bool tryStart(Runnable& target);
|
||||
void enqueueTask(Runnable& target, int priority = 0);
|
||||
void startThread(Runnable& target);
|
||||
void joinAll();
|
||||
|
||||
int activeThreadCount() const;
|
||||
|
||||
public:
|
||||
mutable FastMutex mutex;
|
||||
std::string name;
|
||||
std::set<ActivePooledThread::Ptr> allThreads;
|
||||
std::list<ActivePooledThread::Ptr> waitingThreads;
|
||||
std::list<ActivePooledThread::Ptr> expiredThreads;
|
||||
RunnablePriorityQueue runnables;
|
||||
Condition noActiveThreads;
|
||||
|
||||
int expiryTimeout = 30000;
|
||||
int maxThreadCount;
|
||||
int stackSize;
|
||||
int activeThreads = 0;
|
||||
int serial = 0;
|
||||
};
|
||||
|
||||
|
||||
ActivePooledThread::ActivePooledThread(ActiveThreadPoolPrivate& pool):
|
||||
_pool(pool)
|
||||
{
|
||||
std::ostringstream name;
|
||||
name << _pool.name << "[#" << ++_pool.serial << "]";
|
||||
_thread.setName(name.str());
|
||||
_thread.setStackSize(_pool.stackSize);
|
||||
}
|
||||
|
||||
void ActiveThread::start()
|
||||
|
||||
void ActivePooledThread::start()
|
||||
{
|
||||
_needToStop = false;
|
||||
_thread.start(*this);
|
||||
}
|
||||
|
||||
|
||||
void ActiveThread::start(Thread::Priority priority, Runnable& target)
|
||||
void ActivePooledThread::setRunnable(Runnable& target)
|
||||
{
|
||||
_pTargetQueue.enqueueNotification(Poco::makeAuto<NewActionNotification>(priority, target, _name));
|
||||
poco_assert(_target.has_value() == false);
|
||||
_target = std::ref(target);
|
||||
}
|
||||
|
||||
|
||||
void ActiveThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
|
||||
void ActivePooledThread::notifyRunnableReady()
|
||||
{
|
||||
_pTargetQueue.enqueueNotification(Poco::makeAuto<NewActionNotification>(priority, target, name));
|
||||
}
|
||||
|
||||
void ActiveThread::join()
|
||||
{
|
||||
_pTargetQueue.wakeUpAll();
|
||||
if (!_pTargetQueue.empty())
|
||||
{
|
||||
_targetCompleted.wait();
|
||||
}
|
||||
|
||||
_runnableReady.signal();
|
||||
}
|
||||
|
||||
|
||||
void ActiveThread::release()
|
||||
bool ActivePooledThread::isRunning() const
|
||||
{
|
||||
// In case of a statically allocated thread pool (such
|
||||
// as the default thread pool), Windows may have already
|
||||
// terminated the thread before we got here.
|
||||
if (_thread.isRunning())
|
||||
{
|
||||
_needToStop = true;
|
||||
_pTargetQueue.wakeUpAll();
|
||||
if (!_pTargetQueue.empty())
|
||||
_targetCompleted.wait(JOIN_TIMEOUT);
|
||||
}
|
||||
|
||||
if (_thread.tryJoin(JOIN_TIMEOUT))
|
||||
{
|
||||
delete this;
|
||||
}
|
||||
return _thread.isRunning();
|
||||
}
|
||||
|
||||
|
||||
void ActiveThread::run()
|
||||
void ActivePooledThread::join()
|
||||
{
|
||||
do
|
||||
_thread.join();
|
||||
}
|
||||
|
||||
|
||||
void ActivePooledThread::run()
|
||||
{
|
||||
FastMutex::ScopedLock lock(_pool.mutex);
|
||||
for (;;)
|
||||
{
|
||||
AutoPtr<Notification> pN = _pTargetQueue.waitDequeueNotification();
|
||||
while (pN)
|
||||
auto r = _target;
|
||||
_target.reset();
|
||||
|
||||
do
|
||||
{
|
||||
NewActionNotification::Ptr pNAN = pN.cast<NewActionNotification>();
|
||||
Runnable& target = pNAN->runnable();
|
||||
_thread.setPriority(pNAN->priority());
|
||||
_thread.setName(pNAN->name());
|
||||
try
|
||||
if (r.has_value())
|
||||
{
|
||||
target.run();
|
||||
_pool.mutex.unlock();
|
||||
try
|
||||
{
|
||||
r.value().get().run();
|
||||
}
|
||||
catch (Exception& exc)
|
||||
{
|
||||
ErrorHandler::handle(exc);
|
||||
}
|
||||
catch (std::exception& exc)
|
||||
{
|
||||
ErrorHandler::handle(exc);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ErrorHandler::handle();
|
||||
}
|
||||
ThreadLocalStorage::clear();
|
||||
_pool.mutex.lock();
|
||||
}
|
||||
catch (Exception& exc)
|
||||
|
||||
if (_pool.runnables.empty())
|
||||
{
|
||||
ErrorHandler::handle(exc);
|
||||
r.reset();
|
||||
break;
|
||||
}
|
||||
catch (std::exception& exc)
|
||||
{
|
||||
ErrorHandler::handle(exc);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ErrorHandler::handle();
|
||||
}
|
||||
_thread.setName(_name);
|
||||
_thread.setPriority(Thread::PRIO_NORMAL);
|
||||
ThreadLocalStorage::clear();
|
||||
pN = _pTargetQueue.waitDequeueNotification(1000);
|
||||
|
||||
r = std::ref(_pool.runnables.pop());
|
||||
} while (true);
|
||||
|
||||
_pool.waitingThreads.push_back(ActivePooledThread::Ptr{ this, true });
|
||||
registerThreadInactive();
|
||||
// wait for work, exiting after the expiry timeout is reached
|
||||
_runnableReady.tryWait(_pool.mutex, _pool.expiryTimeout);
|
||||
++_pool.activeThreads;
|
||||
|
||||
auto it = std::find(_pool.waitingThreads.begin(), _pool.waitingThreads.end(), ActivePooledThread::Ptr{ this, true });
|
||||
if (it != _pool.waitingThreads.end())
|
||||
{
|
||||
_pool.waitingThreads.erase(it);
|
||||
_pool.expiredThreads.push_back(ActivePooledThread::Ptr{ this, true });
|
||||
registerThreadInactive();
|
||||
break;
|
||||
}
|
||||
|
||||
if (!_pool.allThreads.count(ActivePooledThread::Ptr{ this, true }))
|
||||
{
|
||||
registerThreadInactive();
|
||||
break;
|
||||
}
|
||||
_targetCompleted.set();
|
||||
}
|
||||
while (_needToStop == false);
|
||||
}
|
||||
|
||||
|
||||
void ActivePooledThread::registerThreadInactive()
|
||||
{
|
||||
if (--_pool.activeThreads == 0)
|
||||
{
|
||||
_pool.noActiveThreads.broadcast();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ActiveThreadPoolPrivate::ActiveThreadPoolPrivate(int capacity, int stackSize_):
|
||||
maxThreadCount(capacity),
|
||||
stackSize(stackSize_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
ActiveThreadPoolPrivate::ActiveThreadPoolPrivate(int capacity, int stackSize_, const std::string& name_):
|
||||
name(name_),
|
||||
maxThreadCount(capacity),
|
||||
stackSize(stackSize_)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
ActiveThreadPoolPrivate::~ActiveThreadPoolPrivate()
|
||||
{
|
||||
joinAll();
|
||||
}
|
||||
|
||||
|
||||
bool ActiveThreadPoolPrivate::tryStart(Runnable& target)
|
||||
{
|
||||
if (allThreads.empty())
|
||||
{
|
||||
startThread(target);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (activeThreadCount() >= maxThreadCount)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!waitingThreads.empty())
|
||||
{
|
||||
// recycle an available thread
|
||||
enqueueTask(target);
|
||||
auto pThread = waitingThreads.front();
|
||||
waitingThreads.pop_front();
|
||||
pThread->notifyRunnableReady();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!expiredThreads.empty())
|
||||
{
|
||||
// restart an expired thread
|
||||
auto pThread = expiredThreads.front();
|
||||
expiredThreads.pop_front();
|
||||
|
||||
++activeThreads;
|
||||
|
||||
// an expired thread must call join() before restart it, or it will cost thread leak
|
||||
pThread->join();
|
||||
pThread->setRunnable(target);
|
||||
pThread->start();
|
||||
return true;
|
||||
}
|
||||
|
||||
// start a new thread
|
||||
startThread(target);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPoolPrivate::enqueueTask(Runnable& target, int priority)
|
||||
{
|
||||
runnables.push(target, priority);
|
||||
}
|
||||
|
||||
|
||||
int ActiveThreadPoolPrivate::activeThreadCount() const
|
||||
{
|
||||
std::size_t count = allThreads.size() - expiredThreads.size() - waitingThreads.size();
|
||||
return static_cast<int>(count);
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPoolPrivate::startThread(Runnable& target)
|
||||
{
|
||||
ActivePooledThread::Ptr pThread = new ActivePooledThread(*this);
|
||||
allThreads.insert(pThread);
|
||||
++activeThreads;
|
||||
pThread->setRunnable(target);
|
||||
pThread->start();
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPoolPrivate::joinAll()
|
||||
{
|
||||
FastMutex::ScopedLock lock(mutex);
|
||||
|
||||
do {
|
||||
while (!runnables.empty() || activeThreads != 0)
|
||||
{
|
||||
noActiveThreads.wait(mutex);
|
||||
}
|
||||
|
||||
// move the contents of the set out so that we can iterate without the lock
|
||||
std::set<ActivePooledThread::Ptr> allThreadsCopy;
|
||||
allThreadsCopy.swap(allThreads);
|
||||
expiredThreads.clear();
|
||||
waitingThreads.clear();
|
||||
mutex.unlock();
|
||||
|
||||
for (auto pThread : allThreadsCopy)
|
||||
{
|
||||
if (pThread->isRunning())
|
||||
{
|
||||
pThread->notifyRunnableReady();
|
||||
}
|
||||
|
||||
// we must call join() before thread destruction, or it will cost thread leak
|
||||
pThread->join();
|
||||
poco_assert(2 == pThread->referenceCount());
|
||||
}
|
||||
|
||||
mutex.lock();
|
||||
|
||||
// More threads can be started during reset(), in that case continue
|
||||
// waiting if we still have time left.
|
||||
} while (!runnables.empty() || activeThreads != 0);
|
||||
|
||||
while (!runnables.empty() || activeThreads != 0)
|
||||
{
|
||||
noActiveThreads.wait(mutex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize):
|
||||
_capacity(capacity),
|
||||
_serial(0),
|
||||
_stackSize(stackSize),
|
||||
_lastThreadIndex(0)
|
||||
_impl(new ActiveThreadPoolPrivate(capacity, stackSize))
|
||||
{
|
||||
poco_assert (_capacity >= 1);
|
||||
|
||||
_threads.reserve(_capacity);
|
||||
|
||||
for (int i = 0; i < _capacity; i++)
|
||||
{
|
||||
ActiveThread* pThread = createThread();
|
||||
_threads.push_back(pThread);
|
||||
pThread->start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize):
|
||||
_name(std::move(name)),
|
||||
_capacity(capacity),
|
||||
_serial(0),
|
||||
_stackSize(stackSize),
|
||||
_lastThreadIndex(0)
|
||||
ActiveThreadPool::ActiveThreadPool(const std::string& name, int capacity, int stackSize):
|
||||
_impl(new ActiveThreadPoolPrivate(capacity, stackSize, name))
|
||||
{
|
||||
poco_assert (_capacity >= 1);
|
||||
|
||||
_threads.reserve(_capacity);
|
||||
|
||||
for (int i = 0; i < _capacity; i++)
|
||||
{
|
||||
ActiveThread* pThread = createThread();
|
||||
_threads.push_back(pThread);
|
||||
pThread->start();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ActiveThreadPool::~ActiveThreadPool()
|
||||
{
|
||||
try
|
||||
{
|
||||
stopAll();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
poco_unexpected();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int ActiveThreadPool::capacity() const
|
||||
{
|
||||
return _capacity;
|
||||
return _impl->maxThreadCount;
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPool::start(Runnable& target)
|
||||
void ActiveThreadPool::start(Runnable& target, int priority)
|
||||
{
|
||||
getThread()->start(Thread::PRIO_NORMAL, target);
|
||||
}
|
||||
FastMutex::ScopedLock lock(_impl->mutex);
|
||||
|
||||
|
||||
void ActiveThreadPool::start(Runnable& target, const std::string& name)
|
||||
{
|
||||
getThread()->start(Thread::PRIO_NORMAL, target, name);
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)
|
||||
{
|
||||
getThread()->start(priority, target);
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)
|
||||
{
|
||||
getThread()->start(priority, target, name);
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPool::stopAll()
|
||||
{
|
||||
FastMutex::ScopedLock lock(_mutex);
|
||||
|
||||
for (auto pThread: _threads)
|
||||
if (!_impl->tryStart(target))
|
||||
{
|
||||
pThread->release();
|
||||
_impl->enqueueTask(target, priority);
|
||||
|
||||
if (!_impl->waitingThreads.empty())
|
||||
{
|
||||
auto pThread = _impl->waitingThreads.front();
|
||||
_impl->waitingThreads.pop_front();
|
||||
pThread->notifyRunnableReady();
|
||||
}
|
||||
}
|
||||
_threads.clear();
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPool::joinAll()
|
||||
{
|
||||
FastMutex::ScopedLock lock(_mutex);
|
||||
|
||||
for (auto pThread: _threads)
|
||||
{
|
||||
pThread->join();
|
||||
}
|
||||
|
||||
_threads.clear();
|
||||
_threads.reserve(_capacity);
|
||||
|
||||
for (int i = 0; i < _capacity; i++)
|
||||
{
|
||||
ActiveThread* pThread = createThread();
|
||||
_threads.push_back(pThread);
|
||||
pThread->start();
|
||||
}
|
||||
_impl->joinAll();
|
||||
}
|
||||
|
||||
ActiveThread* ActiveThreadPool::getThread()
|
||||
{
|
||||
auto thrSize = _threads.size();
|
||||
auto i = (_lastThreadIndex++) % thrSize;
|
||||
ActiveThread* pThread = _threads[i];
|
||||
return pThread;
|
||||
}
|
||||
|
||||
|
||||
ActiveThread* ActiveThreadPool::createThread()
|
||||
{
|
||||
std::ostringstream name;
|
||||
name << _name << "[#active-thread-" << ++_serial << "]";
|
||||
return new ActiveThread(name.str(), _stackSize);
|
||||
}
|
||||
|
||||
|
||||
class ActiveThreadPoolSingletonHolder
|
||||
{
|
||||
public:
|
||||
ActiveThreadPoolSingletonHolder() = default;
|
||||
~ActiveThreadPoolSingletonHolder()
|
||||
{
|
||||
delete _pPool;
|
||||
}
|
||||
ActiveThreadPool* pool()
|
||||
{
|
||||
FastMutex::ScopedLock lock(_mutex);
|
||||
|
||||
if (!_pPool)
|
||||
{
|
||||
_pPool = new ActiveThreadPool("default-active");
|
||||
}
|
||||
return _pPool;
|
||||
}
|
||||
|
||||
private:
|
||||
ActiveThreadPool* _pPool{nullptr};
|
||||
FastMutex _mutex;
|
||||
};
|
||||
|
||||
|
||||
ActiveThreadPool& ActiveThreadPool::defaultPool()
|
||||
{
|
||||
static ActiveThreadPoolSingletonHolder sh;
|
||||
return *sh.pool();
|
||||
static ActiveThreadPool thePool;
|
||||
return thePool;
|
||||
}
|
||||
|
||||
|
||||
int ActiveThreadPool::getStackSize() const
|
||||
{
|
||||
return _impl->stackSize;
|
||||
}
|
||||
|
||||
|
||||
int ActiveThreadPool::expiryTimeout() const
|
||||
{
|
||||
return _impl->expiryTimeout;
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPool::setExpiryTimeout(int expiryTimeout)
|
||||
{
|
||||
if (_impl->expiryTimeout != expiryTimeout)
|
||||
{
|
||||
_impl->expiryTimeout = expiryTimeout;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
const std::string& ActiveThreadPool::name() const
|
||||
{
|
||||
return _impl->name;
|
||||
}
|
||||
|
||||
} // namespace Poco
|
||||
|
@ -16,12 +16,44 @@
|
||||
#include "Poco/Exception.h"
|
||||
#include "Poco/Thread.h"
|
||||
#include "Poco/Environment.h"
|
||||
#include "Poco/RefCountedObject.h"
|
||||
#include "Poco/AutoPtr.h"
|
||||
|
||||
|
||||
using Poco::ActiveThreadPool;
|
||||
using Poco::RunnableAdapter;
|
||||
using Poco::Thread;
|
||||
using Poco::Environment;
|
||||
using Poco::Runnable;
|
||||
using Poco::RefCountedObject;
|
||||
using Poco::AutoPtr;
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
class TestPriorityRunnable: public Runnable, public RefCountedObject
|
||||
{
|
||||
public:
|
||||
using Ptr = AutoPtr<TestPriorityRunnable>;
|
||||
|
||||
TestPriorityRunnable(int n, Poco::FastMutex& mutex, std::vector<int>& result):
|
||||
_n(n),
|
||||
_mutex(mutex),
|
||||
_result(result)
|
||||
{}
|
||||
|
||||
virtual void run() override
|
||||
{
|
||||
Poco::FastMutex::ScopedLock lock(_mutex);
|
||||
_result.push_back(_n);
|
||||
}
|
||||
|
||||
private:
|
||||
int _n;
|
||||
Poco::FastMutex& _mutex;
|
||||
std::vector<int>& _result;
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
ActiveThreadPoolTest::ActiveThreadPoolTest(const std::string& name): CppUnit::TestCase(name)
|
||||
@ -34,14 +66,13 @@ ActiveThreadPoolTest::~ActiveThreadPoolTest()
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPoolTest::testActiveThreadPool()
|
||||
void ActiveThreadPoolTest::testActiveThreadPool1()
|
||||
{
|
||||
ActiveThreadPool pool;
|
||||
|
||||
assertTrue (pool.capacity() == static_cast<int>(Environment::processorCount()) + 1);
|
||||
|
||||
RunnableAdapter<ActiveThreadPoolTest> ra(*this, &ActiveThreadPoolTest::count);
|
||||
|
||||
_count = 0;
|
||||
try
|
||||
{
|
||||
for (int i = 0; i < 2000; ++i)
|
||||
@ -53,9 +84,7 @@ void ActiveThreadPoolTest::testActiveThreadPool()
|
||||
{
|
||||
failmsg("wrong exception thrown");
|
||||
}
|
||||
|
||||
pool.joinAll();
|
||||
|
||||
assertTrue (_count == 2000);
|
||||
|
||||
_count = 0;
|
||||
@ -71,11 +100,86 @@ void ActiveThreadPoolTest::testActiveThreadPool()
|
||||
failmsg("wrong exception thrown");
|
||||
}
|
||||
pool.joinAll();
|
||||
|
||||
assertTrue (_count == 1000);
|
||||
}
|
||||
|
||||
|
||||
void ActiveThreadPoolTest::testActiveThreadPool2()
|
||||
{
|
||||
ActiveThreadPool pool;
|
||||
RunnableAdapter<ActiveThreadPoolTest> ra(*this, &ActiveThreadPoolTest::count);
|
||||
|
||||
pool.setExpiryTimeout(10);
|
||||
assertTrue (pool.expiryTimeout() == 10);
|
||||
|
||||
_count = 0;
|
||||
try
|
||||
{
|
||||
for (int i = 0; i < pool.capacity(); ++i)
|
||||
{
|
||||
pool.start(ra);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
failmsg("wrong exception thrown");
|
||||
}
|
||||
|
||||
// wait for the threads to expire
|
||||
Thread::sleep(pool.expiryTimeout() * pool.capacity());
|
||||
|
||||
try
|
||||
{
|
||||
for (int i = 0; i < pool.capacity(); ++i)
|
||||
{
|
||||
pool.start(ra); // reuse expired threads
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
failmsg("wrong exception thrown");
|
||||
}
|
||||
|
||||
// wait for the threads to expire
|
||||
Thread::sleep(pool.expiryTimeout() * pool.capacity());
|
||||
pool.joinAll(); // join with no active threads
|
||||
assertTrue (_count == pool.capacity() * 2);
|
||||
}
|
||||
|
||||
void ActiveThreadPoolTest::testActiveThreadPool3()
|
||||
{
|
||||
Poco::FastMutex mutex;
|
||||
std::vector<int> result;
|
||||
ActiveThreadPool pool(1);
|
||||
std::vector<TestPriorityRunnable::Ptr> runnables;
|
||||
|
||||
mutex.lock(); // lock, to make sure runnables are queued
|
||||
try
|
||||
{
|
||||
for (int priority = 0; priority < 1000; ++priority)
|
||||
{
|
||||
TestPriorityRunnable::Ptr r = new TestPriorityRunnable(priority, mutex, result);
|
||||
runnables.push_back(r);
|
||||
pool.start(*r, priority);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
failmsg("wrong exception thrown");
|
||||
}
|
||||
mutex.unlock(); // unlock, to let runnables go
|
||||
|
||||
pool.joinAll();
|
||||
std::vector<int> mock;
|
||||
mock.push_back(0); // 0 is the first result
|
||||
for (int i = 999; i > 0; --i)
|
||||
{
|
||||
mock.push_back(i); // other results should sort by priority
|
||||
}
|
||||
|
||||
assertTrue (std::equal(result.begin(), result.end(), mock.begin(), mock.end()));
|
||||
}
|
||||
|
||||
void ActiveThreadPoolTest::setUp()
|
||||
{
|
||||
_count = 0;
|
||||
@ -97,7 +201,9 @@ CppUnit::Test* ActiveThreadPoolTest::suite()
|
||||
{
|
||||
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest");
|
||||
|
||||
CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool);
|
||||
CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool1);
|
||||
CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool2);
|
||||
CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool3);
|
||||
|
||||
return pSuite;
|
||||
}
|
||||
|
@ -26,7 +26,9 @@ public:
|
||||
ActiveThreadPoolTest(const std::string& name);
|
||||
~ActiveThreadPoolTest();
|
||||
|
||||
void testActiveThreadPool();
|
||||
void testActiveThreadPool1();
|
||||
void testActiveThreadPool2();
|
||||
void testActiveThreadPool3();
|
||||
|
||||
void setUp();
|
||||
void tearDown();
|
||||
|
@ -1,7 +1,7 @@
|
||||
#
|
||||
# Darwin-clang-libc++
|
||||
#
|
||||
# Build settings for Mac OS X 10.11 and later (clang, libc++, x86_64/arm64)
|
||||
# Build settings for Mac OS X 10.13 and later (clang, libc++, x86_64/arm64)
|
||||
# The build settings defined in this file are compatible
|
||||
# with XCode C++ projects.
|
||||
#
|
||||
@ -13,7 +13,7 @@ LINKMODE ?= SHARED
|
||||
|
||||
ARCHFLAGS ?= -arch $(POCO_HOST_OSARCH)
|
||||
SANITIZEFLAGS ?=
|
||||
OSFLAGS ?= -mmacosx-version-min=10.11 -isysroot $(shell xcrun --show-sdk-path)
|
||||
OSFLAGS ?= -mmacosx-version-min=10.15 -isysroot $(shell xcrun --show-sdk-path)
|
||||
|
||||
ifeq ($(POCO_HOST_OSARCH),arm64)
|
||||
OPENSSL_DIR ?= /opt/homebrew/opt/openssl
|
||||
|
@ -77,6 +77,8 @@ else(BUILD_SHARED_LIBS)
|
||||
set(CMAKE_RELWITHDEBINFO_POSTFIX "${STATIC_POSTFIX}" CACHE STRING "Set RelWithDebInfo library postfix" FORCE)
|
||||
endif()
|
||||
|
||||
# MacOS version that has full support for C++17
|
||||
set(CMAKE_OSX_DEPLOYMENT_TARGET, 10.15)
|
||||
|
||||
# OS Detection
|
||||
include(CheckTypeSize)
|
||||
|
Loading…
Reference in New Issue
Block a user