Improve FifoEvent, ActiveMethod, ActiveResult (#4211)

Co-authored-by: Alexander B <bas524@ya.ru>
This commit is contained in:
Alexander B 2023-11-22 02:59:24 +03:00 committed by GitHub
parent f30d759c08
commit 4a9285c997
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 748 additions and 16 deletions

View File

@ -1385,6 +1385,7 @@
<ClCompile Include="src\Thread.cpp" />
<ClCompile Include="src\ThreadLocal.cpp" />
<ClCompile Include="src\ThreadPool.cpp" />
<ClCompile Include="src\ActiveThreadPool.cpp" />
<ClCompile Include="src\ThreadTarget.cpp" />
<ClCompile Include="src\Thread_POSIX.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug_shared|Win32'">true</ExcludedFromBuild>

View File

@ -1385,6 +1385,7 @@
<ClCompile Include="src\Thread.cpp" />
<ClCompile Include="src\ThreadLocal.cpp" />
<ClCompile Include="src\ThreadPool.cpp" />
<ClCompile Include="src\ActiveThreadPool.cpp" />
<ClCompile Include="src\ThreadTarget.cpp" />
<ClCompile Include="src\Thread_POSIX.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug_shared|Win32'">true</ExcludedFromBuild>

View File

@ -1391,6 +1391,7 @@
<ClCompile Include="src\Thread.cpp" />
<ClCompile Include="src\ThreadLocal.cpp" />
<ClCompile Include="src\ThreadPool.cpp" />
<ClCompile Include="src\ActiveThreadPool.cpp" />
<ClCompile Include="src\ThreadTarget.cpp" />
<ClCompile Include="src\Thread_POSIX.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug_shared|Win32'">true</ExcludedFromBuild>

View File

@ -1950,6 +1950,7 @@
<ClCompile Include="src\Thread.cpp" />
<ClCompile Include="src\ThreadLocal.cpp" />
<ClCompile Include="src\ThreadPool.cpp" />
<ClCompile Include="src\ActiveThreadPool.cpp" />
<ClCompile Include="src\ThreadTarget.cpp" />
<ClCompile Include="src\Thread_POSIX.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='debug_shared|Win32'">true</ExcludedFromBuild>

View File

@ -6,7 +6,7 @@
include $(POCO_BASE)/build/rules/global
objects = ArchiveStrategy Ascii ASCIIEncoding AsyncChannel \
objects = ArchiveStrategy Ascii ASCIIEncoding AsyncChannel ActiveThreadPool\
Base32Decoder Base32Encoder Base64Decoder Base64Encoder \
BinaryReader BinaryWriter Bugcheck ByteOrder Channel Checksum Clock Configurable ConsoleChannel \
Condition CountingStream DateTime LocalDateTime DateTimeFormat DateTimeFormatter DateTimeParser \

View File

@ -339,9 +339,8 @@ protected:
}
NotifyAsyncParams params = par;
TArgs retArgs(params.args);
params.ptrStrat->notify(params.pSender, retArgs);
return retArgs;
params.ptrStrat->notify(params.pSender, params.args);
return params.args;
}
TStrategy _strategy; /// The strategy used to notify observers.

View File

@ -19,7 +19,7 @@
#include "Poco/Foundation.h"
#include "Poco/ThreadPool.h"
#include "Poco/ActiveThreadPool.h"
#include "Poco/ActiveRunnable.h"
@ -36,7 +36,7 @@ class ActiveStarter
public:
static void start(OwnerType* /*pOwner*/, ActiveRunnableBase::Ptr pRunnable)
{
ThreadPool::defaultPool().start(*pRunnable);
ActiveThreadPool::defaultPool().start(*pRunnable);
pRunnable->duplicate(); // The runnable will release itself.
}
};

View File

@ -0,0 +1,145 @@
//
// ActiveThreadPool.h
//
// Library: Foundation
// Package: Threading
// Module: ActiveThreadPool
//
// Definition of the ActiveThreadPool class.
//
// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Foundation_ActiveThreadPool_INCLUDED
#define Foundation_ActiveThreadPool_INCLUDED
#include "Poco/Foundation.h"
#include "Poco/Thread.h"
#include "Poco/Mutex.h"
#include "Poco/Environment.h"
#include <vector>
namespace Poco {
class Runnable;
class ActiveThread;
class Foundation_API ActiveThreadPool
/// A thread pool always keeps a number of threads running, ready
/// to accept work.
/// Threads in an active thread pool are re-used
/// Every thread in the pool has own notification-queue with Runnable
/// Every Runnable executes on next thread (round-robin model)
/// The thread pool always keeps fixed number of threads running.
/// Use case for this pool is running many (more than os-max-thread-count) short live tasks
/// Round-robin model allow efficiently utilize cpu cores
{
public:
ActiveThreadPool(int capacity = static_cast<int>(Environment::processorCount()) + 1,
int stackSize = POCO_THREAD_STACK_SIZE);
/// Creates a thread pool with fixed capacity threads.
/// Threads are created with given stack size.
ActiveThreadPool(std::string name,
int capacity = static_cast<int>(Environment::processorCount()) + 1,
int stackSize = POCO_THREAD_STACK_SIZE);
/// Creates a thread pool with the given name and fixed capacity threads.
/// Threads are created with given stack size.
~ActiveThreadPool();
/// Currently running threads will remain active
/// until they complete.
int capacity() const;
/// Returns the capacity of threads.
int getStackSize() const;
/// Returns the stack size used to create new threads.
void start(Runnable& target);
/// Obtains a thread and starts the target.
void start(Runnable& target, const std::string& name);
/// Obtains a thread and starts the target.
/// Assigns the given name to the thread.
void startWithPriority(Thread::Priority priority, Runnable& target);
/// Obtains a thread, adjusts the thread's priority, and starts the target.
void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name);
/// Obtains a thread, adjusts the thread's priority, and starts the target.
/// Assigns the given name to the thread.
void stopAll();
/// Stops all running threads and waits for their completion.
///
/// Will also delete all thread objects.
/// If used, this method should be the last action before
/// the thread pool is deleted.
///
/// Note: If a thread fails to stop within 10 seconds
/// (due to a programming error, for example), the
/// underlying thread object will not be deleted and
/// this method will return anyway. This allows for a
/// more or less graceful shutdown in case of a misbehaving
/// thread.
void joinAll();
/// Waits for all threads to complete.
///
/// Note that this will join() underlying
/// threads and restart them for next tasks.
const std::string& name() const;
/// Returns the name of the thread pool,
/// or an empty string if no name has been
/// specified in the constructor.
static ActiveThreadPool& defaultPool();
/// Returns a reference to the default
/// thread pool.
protected:
ActiveThread* getThread();
ActiveThread* createThread();
private:
ActiveThreadPool(const ActiveThreadPool& pool);
ActiveThreadPool& operator = (const ActiveThreadPool& pool);
typedef std::vector<ActiveThread*> ThreadVec;
std::string _name;
int _capacity;
int _serial;
int _stackSize;
ThreadVec _threads;
mutable FastMutex _mutex;
std::atomic<size_t> _lastThreadIndex{0};
};
inline int ActiveThreadPool::getStackSize() const
{
return _stackSize;
}
inline const std::string& ActiveThreadPool::name() const
{
return _name;
}
} // namespace Poco
#endif // Foundation_ActiveThreadPool_INCLUDED

View File

@ -147,6 +147,11 @@ public:
{
}
DefaultStrategy(DefaultStrategy&& s):
_delegates(std::move(s._delegates))
{
}
~DefaultStrategy()
{
}
@ -201,6 +206,15 @@ public:
return *this;
}
DefaultStrategy& operator = (DefaultStrategy&& s)
{
if (this != &s)
{
_delegates = std::move(s._delegates);
}
return *this;
}
void clear()
{
for (Iterator it = _delegates.begin(); it != _delegates.end(); ++it)

View File

@ -41,6 +41,11 @@ public:
{
}
FIFOStrategy(FIFOStrategy&& s):
DefaultStrategy<TArgs, TDelegate>(std::move(s))
{
}
~FIFOStrategy()
{
}
@ -50,6 +55,12 @@ public:
DefaultStrategy<TArgs, TDelegate>::operator = (s);
return *this;
}
FIFOStrategy& operator = (FIFOStrategy&& s)
{
DefaultStrategy<TArgs, TDelegate>::operator = (s);
return *this;
}
};

View File

@ -0,0 +1,363 @@
//
// ActiveThreadPool.cpp
//
// Library: Foundation
// Package: Threading
// Module: ActiveThreadPool
//
// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/ActiveThreadPool.h"
#include "Poco/Runnable.h"
#include "Poco/Thread.h"
#include "Poco/Event.h"
#include "Poco/ThreadLocal.h"
#include "Poco/ErrorHandler.h"
#include "Poco/NotificationQueue.h"
#include <sstream>
#include <ctime>
#include <utility>
namespace Poco {
class NewActionNotification: public Notification
{
public:
NewActionNotification(Thread::Priority priority, Runnable &runnable, std::string name) :
_priority(priority),
_runnable(runnable),
_name(std::move(name))
{ }
~NewActionNotification() override = default;
Runnable& runnable() const
{
return _runnable;
}
Thread::Priority priotity() const
{
return _priority;
}
const std::string &threadName() const
{
return _name;
}
std::string threadFullName() const
{
std::string fullName(_name);
if (_name.empty())
{
fullName = _name;
}
else
{
fullName.append(" (");
fullName.append(_name);
fullName.append(")");
}
return fullName;
}
private:
Thread::Priority _priority;
Runnable &_runnable;
std::string _name;
};
class ActiveThread: public Runnable
{
public:
ActiveThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
~ActiveThread() override = default;
void start();
void start(Thread::Priority priority, Runnable& target);
void start(Thread::Priority priority, Runnable& target, const std::string& name);
void join();
void release();
void run() override;
private:
NotificationQueue _pTargetQueue;
std::string _name;
Thread _thread;
Event _targetCompleted;
FastMutex _mutex;
const long JOIN_TIMEOUT = 10000;
std::atomic<bool> _needToStop{false};
};
ActiveThread::ActiveThread(const std::string& name, int stackSize):
_name(name),
_thread(name),
_targetCompleted(false)
{
poco_assert_dbg (stackSize >= 0);
_thread.setStackSize(stackSize);
}
void ActiveThread::start()
{
_needToStop = false;
_thread.start(*this);
}
void ActiveThread::start(Thread::Priority priority, Runnable& target)
{
_pTargetQueue.enqueueNotification(Poco::makeAuto<NewActionNotification>(priority, target, _name));
}
void ActiveThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
{
_pTargetQueue.enqueueNotification(Poco::makeAuto<NewActionNotification>(priority, target, name));
}
void ActiveThread::join()
{
_pTargetQueue.wakeUpAll();
if (!_pTargetQueue.empty())
{
_targetCompleted.wait();
}
}
void ActiveThread::release()
{
// In case of a statically allocated thread pool (such
// as the default thread pool), Windows may have already
// terminated the thread before we got here.
if (_thread.isRunning())
{
_needToStop = true;
_pTargetQueue.wakeUpAll();
if (!_pTargetQueue.empty())
_targetCompleted.wait(JOIN_TIMEOUT);
}
if (_thread.tryJoin(JOIN_TIMEOUT))
{
delete this;
}
}
void ActiveThread::run()
{
do {
auto *_pTarget = dynamic_cast<NewActionNotification*>(_pTargetQueue.waitDequeueNotification());
while (_pTarget)
{
Runnable* pTarget = &_pTarget->runnable();
_thread.setPriority(_pTarget->priotity());
_thread.setName(_pTarget->name());
try
{
pTarget->run();
}
catch (Exception& exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception& exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
_pTarget->release();
_thread.setName(_name);
_thread.setPriority(Thread::PRIO_NORMAL);
ThreadLocalStorage::clear();
_pTarget = dynamic_cast<NewActionNotification*>(_pTargetQueue.waitDequeueNotification(1000));
}
_targetCompleted.set();
}
while (_needToStop == false);
}
ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize):
_capacity(capacity),
_serial(0),
_stackSize(stackSize),
_lastThreadIndex(0)
{
poco_assert (_capacity >= 1);
_threads.reserve(_capacity);
for (int i = 0; i < _capacity; i++)
{
ActiveThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}
}
ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize):
_name(std::move(name)),
_capacity(capacity),
_serial(0),
_stackSize(stackSize),
_lastThreadIndex(0)
{
poco_assert (_capacity >= 1);
_threads.reserve(_capacity);
for (int i = 0; i < _capacity; i++)
{
ActiveThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}
}
ActiveThreadPool::~ActiveThreadPool()
{
try
{
stopAll();
}
catch (...)
{
poco_unexpected();
}
}
int ActiveThreadPool::capacity() const
{
return _capacity;
}
void ActiveThreadPool::start(Runnable& target)
{
getThread()->start(Thread::PRIO_NORMAL, target);
}
void ActiveThreadPool::start(Runnable& target, const std::string& name)
{
getThread()->start(Thread::PRIO_NORMAL, target, name);
}
void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)
{
getThread()->start(priority, target);
}
void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)
{
getThread()->start(priority, target, name);
}
void ActiveThreadPool::stopAll()
{
FastMutex::ScopedLock lock(_mutex);
for (auto pThread: _threads)
{
pThread->release();
}
_threads.clear();
}
void ActiveThreadPool::joinAll()
{
FastMutex::ScopedLock lock(_mutex);
for (auto pThread: _threads)
{
pThread->join();
}
_threads.clear();
_threads.reserve(_capacity);
for (int i = 0; i < _capacity; i++)
{
ActiveThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}
}
ActiveThread* ActiveThreadPool::getThread()
{
auto thrSize = _threads.size();
auto i = (_lastThreadIndex++) % thrSize;
ActiveThread* pThread = _threads[i];
return pThread;
}
ActiveThread* ActiveThreadPool::createThread()
{
std::ostringstream name;
name << _name << "[#active-thread-" << ++_serial << "]";
return new ActiveThread(name.str(), _stackSize);
}
class ActiveThreadPoolSingletonHolder
{
public:
ActiveThreadPoolSingletonHolder() = default;
~ActiveThreadPoolSingletonHolder()
{
delete _pPool;
}
ActiveThreadPool* pool()
{
FastMutex::ScopedLock lock(_mutex);
if (!_pPool)
{
_pPool = new ActiveThreadPool("default-active");
}
return _pPool;
}
private:
ActiveThreadPool* _pPool{nullptr};
FastMutex _mutex;
};
namespace
{
static ActiveThreadPoolSingletonHolder sh;
}
ActiveThreadPool& ActiveThreadPool::defaultPool()
{
return *sh.pool();
}
} // namespace Poco

View File

@ -45,13 +45,13 @@ void NotificationQueue::enqueueNotification(Notification::Ptr pNotification)
FastMutex::ScopedLock lock(_mutex);
if (_waitQueue.empty())
{
_nfQueue.push_back(pNotification);
_nfQueue.push_back(std::move(pNotification));
}
else
{
WaitInfo* pWI = _waitQueue.front();
_waitQueue.pop_front();
pWI->pNf = pNotification;
pWI->pNf = std::move(pNotification);
pWI->nfAvailable.set();
}
}
@ -63,13 +63,13 @@ void NotificationQueue::enqueueUrgentNotification(Notification::Ptr pNotificatio
FastMutex::ScopedLock lock(_mutex);
if (_waitQueue.empty())
{
_nfQueue.push_front(pNotification);
_nfQueue.push_front(std::move(pNotification));
}
else
{
WaitInfo* pWI = _waitQueue.front();
_waitQueue.pop_front();
pWI->pNf = pNotification;
pWI->pNf = std::move(pNotification);
pWI->nfAvailable.set();
}
}

View File

@ -28,7 +28,7 @@ objects = ActiveMethodTest ActivityTest ActiveDispatcherTest \
StreamsTestSuite StringTest StringTokenizerTest TaskTestSuite TaskTest \
TaskManagerTest TestChannel TeeStreamTest UTF8StringTest \
TextConverterTest TextIteratorTest TextBufferIteratorTest TextTestSuite TextEncodingTest \
ThreadLocalTest ThreadPoolTest ThreadTest ThreadingTestSuite TimerTest \
ThreadLocalTest ThreadPoolTest ActiveThreadPoolTest ThreadTest ThreadingTestSuite TimerTest \
TimespanTest TimestampTest TimezoneTest URIStreamOpenerTest URITest \
URITestSuite UUIDGeneratorTest UUIDTest UUIDTestSuite ZLibTest \
TestPlugin DummyDelegate BasicEventTest FIFOEventTest PriorityEventTest EventTestSuite \

View File

@ -723,6 +723,7 @@
<ClCompile Include="src\ThreadingTestSuite.cpp" />
<ClCompile Include="src\ThreadLocalTest.cpp" />
<ClCompile Include="src\ThreadPoolTest.cpp" />
<ClCompile Include="src\ActiveThreadPoolTest.cpp" />
<ClCompile Include="src\ThreadTest.cpp" />
<ClCompile Include="src\TimedNotificationQueueTest.cpp" />
<ClCompile Include="src\TimerTest.cpp" />
@ -864,6 +865,7 @@
<ClInclude Include="src\ThreadingTestSuite.h" />
<ClInclude Include="src\ThreadLocalTest.h" />
<ClInclude Include="src\ThreadPoolTest.h" />
<ClInclude Include="src\ActiveThreadPoolTest.h" />
<ClInclude Include="src\ThreadTest.h" />
<ClInclude Include="src\TimedNotificationQueueTest.h" />
<ClInclude Include="src\TimerTest.h" />
@ -887,4 +889,4 @@
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets" />
</Project>
</Project>

View File

@ -723,6 +723,7 @@
<ClCompile Include="src\ThreadingTestSuite.cpp" />
<ClCompile Include="src\ThreadLocalTest.cpp" />
<ClCompile Include="src\ThreadPoolTest.cpp" />
<ClCompile Include="src\ActiveThreadPoolTest.cpp" />
<ClCompile Include="src\ThreadTest.cpp" />
<ClCompile Include="src\TimedNotificationQueueTest.cpp" />
<ClCompile Include="src\TimerTest.cpp" />
@ -864,6 +865,7 @@
<ClInclude Include="src\ThreadingTestSuite.h" />
<ClInclude Include="src\ThreadLocalTest.h" />
<ClInclude Include="src\ThreadPoolTest.h" />
<ClInclude Include="src\ActiveThreadPoolTest.h" />
<ClInclude Include="src\ThreadTest.h" />
<ClInclude Include="src\TimedNotificationQueueTest.h" />
<ClInclude Include="src\TimerTest.h" />
@ -887,4 +889,4 @@
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets" />
</Project>
</Project>

View File

@ -730,6 +730,7 @@
<ClCompile Include="src\ThreadingTestSuite.cpp" />
<ClCompile Include="src\ThreadLocalTest.cpp" />
<ClCompile Include="src\ThreadPoolTest.cpp" />
<ClCompile Include="src\ActiveThreadPoolTest.cpp" />
<ClCompile Include="src\ThreadTest.cpp" />
<ClCompile Include="src\TimedNotificationQueueTest.cpp" />
<ClCompile Include="src\TimerTest.cpp" />
@ -871,6 +872,7 @@
<ClInclude Include="src\ThreadingTestSuite.h" />
<ClInclude Include="src\ThreadLocalTest.h" />
<ClInclude Include="src\ThreadPoolTest.h" />
<ClInclude Include="src\ActiveThreadPoolTest.h" />
<ClInclude Include="src\ThreadTest.h" />
<ClInclude Include="src\TimedNotificationQueueTest.h" />
<ClInclude Include="src\TimerTest.h" />
@ -894,4 +896,4 @@
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets" />
</Project>
</Project>

View File

@ -1021,6 +1021,7 @@
<ClCompile Include="src\ThreadingTestSuite.cpp" />
<ClCompile Include="src\ThreadLocalTest.cpp" />
<ClCompile Include="src\ThreadPoolTest.cpp" />
<ClCompile Include="src\ActiveThreadPoolTest.cpp" />
<ClCompile Include="src\ThreadTest.cpp" />
<ClCompile Include="src\TimedNotificationQueueTest.cpp" />
<ClCompile Include="src\TimerTest.cpp" />
@ -1162,6 +1163,7 @@
<ClInclude Include="src\ThreadingTestSuite.h" />
<ClInclude Include="src\ThreadLocalTest.h" />
<ClInclude Include="src\ThreadPoolTest.h" />
<ClInclude Include="src\ActiveThreadPoolTest.h" />
<ClInclude Include="src\ThreadTest.h" />
<ClInclude Include="src\TimedNotificationQueueTest.h" />
<ClInclude Include="src\TimerTest.h" />
@ -1185,4 +1187,4 @@
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets" />
</Project>
</Project>

View File

@ -0,0 +1,103 @@
//
// ActiveThreadPoolTest.cpp
//
// Copyright (c) 2004-2023, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "ActiveThreadPoolTest.h"
#include "CppUnit/TestCaller.h"
#include "CppUnit/TestSuite.h"
#include "Poco/ActiveThreadPool.h"
#include "Poco/RunnableAdapter.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include "Poco/Environment.h"
using Poco::ActiveThreadPool;
using Poco::RunnableAdapter;
using Poco::Thread;
using Poco::Environment;
ActiveThreadPoolTest::ActiveThreadPoolTest(const std::string& name): CppUnit::TestCase(name)
{
}
ActiveThreadPoolTest::~ActiveThreadPoolTest()
{
}
void ActiveThreadPoolTest::testActiveThreadPool()
{
ActiveThreadPool pool;
assertTrue (pool.capacity() == static_cast<int>(Environment::processorCount()) + 1);
RunnableAdapter<ActiveThreadPoolTest> ra(*this, &ActiveThreadPoolTest::count);
try
{
for (int i = 0; i < 2000; ++i)
{
pool.start(ra);
}
}
catch (...)
{
failmsg("wrong exception thrown");
}
pool.joinAll();
assertTrue (_count == 2000);
_count = 0;
try
{
for (int i = 0; i < 1000; ++i)
{
pool.start(ra);
}
}
catch (...)
{
failmsg("wrong exception thrown");
}
pool.joinAll();
assertTrue (_count == 1000);
}
void ActiveThreadPoolTest::setUp()
{
_count = 0;
}
void ActiveThreadPoolTest::tearDown()
{
}
void ActiveThreadPoolTest::count()
{
++_count;
}
CppUnit::Test* ActiveThreadPoolTest::suite()
{
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest");
CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool);
return pSuite;
}

View File

@ -0,0 +1,44 @@
//
// ActiveThreadPoolTest.h
//
// Definition of the ActiveThreadPoolTest class.
//
// Copyright (c) 2004-2023, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef ActiveThreadPoolTest_INCLUDED
#define ActiveThreadPoolTest_INCLUDED
#include "Poco/Foundation.h"
#include "CppUnit/TestCase.h"
#include "Poco/Event.h"
#include "Poco/AtomicCounter.h"
class ActiveThreadPoolTest: public CppUnit::TestCase
{
public:
ActiveThreadPoolTest(const std::string& name);
~ActiveThreadPoolTest();
void testActiveThreadPool();
void setUp();
void tearDown();
static CppUnit::Test* suite();
protected:
void count();
private:
Poco::AtomicCounter _count;
};
#endif // ActiveThreadPoolTest_INCLUDED

View File

@ -16,6 +16,8 @@
#include "Poco/Expire.h"
#include "Poco/Thread.h"
#include "Poco/Exception.h"
#include "Poco/Stopwatch.h"
#include <iostream>
using namespace Poco;
@ -347,6 +349,35 @@ void FIFOEventTest::testAsyncNotify()
assertTrue (_count == LARGEINC);
}
void FIFOEventTest::testAsyncNotifyBenchmark()
{
Poco::FIFOEvent<int> simple;
simple += delegate(this, &FIFOEventTest::onAsyncBench);
assertTrue (_count == 0);
const int cnt = 10000;
int runCount = 1000;
const Poco::Int64 allCount = cnt * runCount;
Poco::Stopwatch sw;
sw.restart();
while (runCount-- > 0)
{
std::vector<Poco::ActiveResult<int>> vresult;
vresult.reserve(cnt);
for (int i = 0; i < cnt; ++i)
{
vresult.push_back(simple.notifyAsync(this, i));
}
for (int i = 0; i < cnt; ++i)
{
vresult[i].wait();
assertTrue (vresult[i].data() == (i*2));
}
}
sw.stop();
std::cout << "notify and wait time = " << sw.elapsed() / 1000 << std::endl;
assertTrue (_count == allCount);
}
void FIFOEventTest::onVoid(const void* pSender)
{
@ -402,6 +433,11 @@ void FIFOEventTest::onAsync(const void* pSender, int& i)
_count += LARGEINC ;
}
void FIFOEventTest::onAsyncBench(const void* pSender, int& i)
{
++_count;
i *= 2;
}
int FIFOEventTest::getCount() const
{
@ -446,5 +482,6 @@ CppUnit::Test* FIFOEventTest::suite()
CppUnit_addTest(pSuite, FIFOEventTest, testExpireReRegister);
CppUnit_addTest(pSuite, FIFOEventTest, testOverwriteDelegate);
CppUnit_addTest(pSuite, FIFOEventTest, testAsyncNotify);
CppUnit_addTest(pSuite, FIFOEventTest, testAsyncNotifyBenchmark);
return pSuite;
}

View File

@ -45,6 +45,7 @@ public:
void testReturnParams();
void testOverwriteDelegate();
void testAsyncNotify();
void testAsyncNotifyBenchmark();
void setUp();
void tearDown();
@ -60,10 +61,11 @@ protected:
void onConstComplex(const void* pSender, const Poco::EventArgs*& i);
void onConst2Complex(const void* pSender, const Poco::EventArgs * const & i);
void onAsync(const void* pSender, int& i);
void onAsyncBench(const void* pSender, int& i);
int getCount() const;
private:
std::atomic<int> _count;
std::atomic<Poco::Int64> _count;
};

View File

@ -19,6 +19,7 @@
#include "ActiveMethodTest.h"
#include "ActiveDispatcherTest.h"
#include "ConditionTest.h"
#include "ActiveThreadPoolTest.h"
CppUnit::Test* ThreadingTestSuite::suite()
@ -35,6 +36,7 @@ CppUnit::Test* ThreadingTestSuite::suite()
pSuite->addTest(ActiveMethodTest::suite());
pSuite->addTest(ActiveDispatcherTest::suite());
pSuite->addTest(ConditionTest::suite());
pSuite->addTest(ActiveThreadPoolTest::suite());
return pSuite;
}