Async notification center (dispatch notifications asynchronously) (#4957)

This commit is contained in:
Matej Kenda
2025-06-12 21:12:57 +02:00
committed by GitHub
parent 1782761914
commit 735e174d90
23 changed files with 866 additions and 323 deletions

View File

@@ -236,7 +236,7 @@ jobs:
ctest --output-on-failure -E "(DataMySQL)|(DataODBC)|(PostgreSQL)|(MongoDB)"
linux-emscripten-cmake:
runs-on: ubuntu-22.04
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- run: sudo apt -y update && sudo apt -y install cmake ninja-build emscripten
@@ -372,12 +372,12 @@ jobs:
# PWD=`pwd`
# ctest --output-on-failure -E "(DataMySQL)|(DataODBC)|(PostgreSQL)|(MongoDB)|(Redis)"
macos-clang-cmake-openssl3:
runs-on: macos-latest
macos-clang-18-cmake-openssl3:
runs-on: macos-15
steps:
- uses: actions/checkout@v4
- run: brew install openssl@3 mysql-client unixodbc libpq
- run: cmake -S. -Bcmake-build -DENABLE_PDF=OFF -DENABLE_TESTS=ON -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl@3 -DMYSQL_ROOT_DIR=/usr/local/opt/mysql-client && cmake --build cmake-build --target all
- run: CXX=$(brew --prefix llvm@18)/bin/clang++ CC=$(brew --prefix llvm@18)/bin/clang cmake -S. -Bcmake-build -DENABLE_PDF=OFF -DENABLE_TESTS=ON -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl@3 -DMYSQL_ROOT_DIR=/usr/local/opt/mysql-client && cmake --build cmake-build --target all
- uses: ./.github/actions/retry-action
with:
timeout_minutes: 90
@@ -398,7 +398,7 @@ jobs:
ctest --output-on-failure -E "(DataMySQL)|(DataODBC)|(PostgreSQL)|(MongoDB)|(Redis)"
macos-clang-cmake-openssl3-visibility-hidden:
runs-on: macos-latest
runs-on: macos-15
steps:
- uses: actions/checkout@v4
- run: brew install openssl@3 mysql-client unixodbc libpq
@@ -423,7 +423,7 @@ jobs:
ctest --output-on-failure -E "(DataMySQL)|(DataODBC)|(PostgreSQL)|(MongoDB)|(Redis)"
macos-clang-make-openssl3-tsan:
runs-on: macos-latest
runs-on: macos-15
steps:
- uses: actions/checkout@v4
- run: brew install openssl@3
@@ -785,7 +785,7 @@ jobs:
./ci/runtests.sh
linux-gcc-make-mongodb:
runs-on: ubuntu-22.04
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- uses: supercharge/mongodb-github-action@1.10.0

View File

@@ -50,22 +50,6 @@ option(POCO_ENABLE_CPP20 "Build Poco with C++20 standard" ON)
if (EMSCRIPTEN)
set(POCO_ENABLE_CPP20 OFF CACHE BOOL "Build Poco with C++20 standard" FORCE)
else()
# https://libcxx.llvm.org/Status/Cxx20.html
# https://en.wikipedia.org/wiki/Xcode#Toolchain_versions
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 18.0)
# Does not fully support C++20 yet
set(POCO_ENABLE_CPP20 OFF CACHE BOOL "Build Poco with C++20 standard" FORCE)
endif ()
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 17.0)
# Does not fully support C++20 yet
set(POCO_ENABLE_CPP20 OFF CACHE BOOL "Build Poco with C++20 standard" FORCE)
endif ()
endif()
endif()
if (POCO_ENABLE_CPP20)

View File

@@ -30,21 +30,31 @@ class Foundation_API AbstractObserver
/// the Observer and NObserver template classes.
{
public:
AbstractObserver();
AbstractObserver(const AbstractObserver& observer);
AbstractObserver(AbstractObserver&& observer);
virtual ~AbstractObserver();
AbstractObserver& operator = (const AbstractObserver& observer);
AbstractObserver& operator = (AbstractObserver&& observer);
virtual void notify(Notification* pNf) const = 0;
virtual NotificationResult notifySync(Notification* pNf) const;
/// Synchronous notification processing. Blocks and returns a result.
/// Default implementation throws NotImplementedException.
virtual bool equals(const AbstractObserver& observer) const = 0;
POCO_DEPRECATED("use `Poco::Any accepts(Notification*)` instead")
POCO_DEPRECATED("use `bool accepts(Notification::Ptr&)` instead")
virtual bool accepts(Notification* pNf, const char* pName) const = 0;
virtual bool accepts(const Notification::Ptr& pNf) const = 0;
virtual bool acceptsSync() const;
/// Returns true if this observer supports synchronous notification processing.
virtual AbstractObserver* clone() const = 0;
virtual void start();

View File

@@ -18,17 +18,25 @@
#ifndef Foundation_AsyncNotificationCenter_INCLUDED
#define Foundation_AsyncNotificationCenter_INCLUDED
#include "Poco/Foundation.h"
#include "Poco/NotificationCenter.h"
#include "Poco/Thread.h"
#include "Poco/Stopwatch.h"
#include "Poco/Debugger.h"
#include "Poco/ErrorHandler.h"
#include "Poco/Format.h"
#include "Poco/RunnableAdapter.h"
#include "Poco/NotificationQueue.h"
#if (POCO_HAVE_CPP20_COMPILER)
#if !defined(POCO_HAVE_JTHREAD)
#pragma message ("NOTE: std::jthread is expected but is not available. Please check your compiler version and settings.")
#endif
#endif
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <list>
#include <map>
#include <optional>
namespace Poco {
@@ -37,13 +45,52 @@ class Foundation_API AsyncNotificationCenter: public NotificationCenter
/// AsyncNotificationCenter decouples posting of notifications
/// from notifying subscribers by calling observers' notification
/// handler in a dedicated thread.
///
/// It supports multiple modes of operation:
///
/// - ENQUEUE: Notifications are added to a queue, separate single thread
/// asynchronously dispatches them to observers sequentially
///
/// - NOTIFY: Notifications are added to a list for each observer, multiple
/// worker threads process notifications in parallel
///
/// - BOTH: Combination of both modes, notifications are enqueued and worker
/// threads dispatch them to observers in parallel.
///
/// NOTIFY and BOTH mode:
///
/// These modes are only available if the compiler supports C++20 std::jthread.
///
/// Notifications can be delivered to observers in a different order than they
/// were posted, as they are processed by multiple worker threads. Observer
/// handlers must also be thread-safe as multiple notifications can be dispatched
/// to the same observer in parallel.
///
/// Note about using AsyncObserver
///
/// Although it is possible to use them with AsyncNotificationCenter,
/// it is more efficient to use NObserver.
{
public:
enum class AsyncMode { ENQUEUE, NOTIFY, BOTH };
/// ENQUEUE: Notifications are enqueued in a separate thread.
/// NOTIFY: Notifications are dispatched to observers from worker threads
/// BOTH: Notifications are enqueued and dispatched to observers in worker threads.
#if (POCO_HAVE_JTHREAD)
AsyncNotificationCenter(AsyncMode mode = AsyncMode::ENQUEUE, std::size_t workersCount = AsyncNotificationCenter::DEFAULT_WORKERS_COUNT);
/// Creates the AsyncNotificationCenter and starts the notifying thread and workers.
#else
AsyncNotificationCenter();
/// Creates the AsyncNotificationCenter and starts the notifying thread.
#endif
~AsyncNotificationCenter() override;
/// Stops the notifying thread and destroys the AsyncNotificationCenter.
/// Stops the notifying thread and destroys the AsyncNotificationCenter.
AsyncNotificationCenter& operator = (const AsyncNotificationCenter&) = delete;
AsyncNotificationCenter(const AsyncNotificationCenter&) = delete;
@@ -51,10 +98,20 @@ public:
AsyncNotificationCenter(AsyncNotificationCenter&&) = delete;
void postNotification(Notification::Ptr pNotification) override;
/// Enqueues notification into the notification queue.
/// Enqueues notification into the notification queue.
int backlog() const override;
/// Returns the numbner of notifications in the notification queue.
/// Returns the number of notifications in the notification queue.
std::vector<NotificationResult> synchronousDispatch(Notification::Ptr pNotification);
/// Dispatches the notification synchronously to all observers that have a function
/// for synchronous notification processing and accept the notification.
/// This method blocks until the notification is processed by
/// all observers. Returns results from all observers that accepted the notification.
protected:
void notifyObservers(Notification::Ptr& pNotification) override;
private:
void start();
@@ -63,11 +120,58 @@ private:
using Adapter = RunnableAdapter<AsyncNotificationCenter>;
Thread _thread;
const AsyncMode _mode { AsyncMode::ENQUEUE };
// Async enqueue for notifications
Thread _enqueueThread;
NotificationQueue _nq;
Adapter _ra;
std::atomic<bool> _started;
std::atomic<bool> _done;
std::atomic<bool> _enqueueThreadStarted;
std::atomic<bool> _enqueueThreadDone;
#if (POCO_HAVE_JTHREAD)
// Async notification dispatching
using NotificationList = std::list<Notification::Ptr>;
using ObserversMap = std::map<AbstractObserverPtr, NotificationList>;
using NotificationTuple = std::tuple<AbstractObserverPtr, Notification::Ptr>;
std::optional<NotificationTuple> nextNotification();
void dispatchNotifications(std::stop_token& stopToken, int workerId);
/// Dispatching function executed by each worker thread.
constexpr static std::size_t DEFAULT_WORKERS_COUNT { 5 };
/// Default number of worker threads to process notifications.
/// This can be configured to a different value if needed.
const std::size_t _workersCount { DEFAULT_WORKERS_COUNT };
/// Number of worker threads to process notifications.
/// This can be configured to a different value if needed.
std::vector<std::jthread> _workers;
/// Workers pop notifications from the lists and call the observers' notification handlers.
/// If an observer is not registered anymore (hasObserver), it is removed from the map.
/// Workers pick observers in a round robin fashion.
ObserversMap _lists;
/// Each observer has its own list of pending notifications.
/// Observers are identifed by their pointers.
/// When adding to the queue, observersToNotify is used to get the observers
/// that are registered for such notification.
ObserversMap::iterator _workerIterator;
/// Iterator to the current observer list being processed by the worker threads.
/// It is used to ensure that workers process observers in a round robin fashion.
std::mutex _listsMutex;
/// Mutex to protect access to the lists of notifications.
/// It is used to ensure that workers can safely access the lists.
bool _listsEmpty { true };
std::condition_variable _listsEmptyCondition;
// Condition variable to notify workers when new notifications are added to lists.
#endif
};

View File

@@ -189,6 +189,13 @@
#define POCO_HAVE_CPP20_COMPILER (__cplusplus >= 202002L)
#define POCO_HAVE_CPP23_COMPILER (__cplusplus >= 202302L)
#if defined(POCO_HAVE_CPP20_COMPILER)
#include <version>
#if defined(__cpp_lib_jthread)
#define POCO_HAVE_JTHREAD 1
#endif
#endif
// Option to silence deprecation warnings.
#ifndef POCO_SILENCE_DEPRECATED
#define POCO_DEPRECATED(reason) [[deprecated(reason)]]

View File

@@ -20,8 +20,10 @@
#include "Poco/Foundation.h"
#include "Poco/AbstractObserver.h"
#include "Poco/AutoPtr.h"
#include "Poco/Mutex.h"
#include <functional>
namespace Poco {
@@ -52,16 +54,27 @@ class NObserver: public AbstractObserver
public:
using Type = NObserver<C, N>;
using NotificationPtr = AutoPtr<N>;
using Callback = void (C::*)(const NotificationPtr&);
using Handler = Callback;
using Handler = void (C::*)(const NotificationPtr&);
using SyncHandler = NotificationResult (C::*)(const NotificationPtr&);
using Matcher = bool (C::*)(const std::string&) const;
using MatcherFunc = std::function<bool(const std::string&)>;
NObserver() = delete;
NObserver(C& object, Handler method, Matcher matcher = nullptr):
NObserver(C& object, Handler method, Matcher matcher = nullptr, SyncHandler syncMethod = nullptr):
_pObject(&object),
_handler(method),
_matcher(matcher)
_matcher(matcher),
_syncHandler(syncMethod)
{
}
NObserver(C& object, Handler method, MatcherFunc matcherFunc, SyncHandler syncMethod = nullptr):
_pObject(&object),
_handler(method),
_syncHandler(syncMethod),
_matcher(nullptr),
_matcherFunc(matcherFunc)
{
}
@@ -69,7 +82,19 @@ public:
AbstractObserver(observer),
_pObject(observer._pObject),
_handler(observer._handler),
_matcher(observer._matcher)
_syncHandler(observer._syncHandler),
_matcher(observer._matcher),
_matcherFunc(observer._matcherFunc)
{
}
NObserver(NObserver&& observer):
AbstractObserver(observer),
_pObject(std::move(observer._pObject)),
_handler(std::move(observer._handler)),
_syncHandler(std::move(observer._syncHandler)),
_matcher(std::move(observer._matcher)),
_matcherFunc(std::move(observer._matcherFunc))
{
}
@@ -80,8 +105,23 @@ public:
if (&observer != this)
{
_pObject = observer._pObject;
_handler = observer._handler;
_matcher = observer._matcher;
_handler = observer._handler;
_syncHandler = observer._syncHandler;
_matcher = observer._matcher;
_matcherFunc = observer._matcherFunc;
}
return *this;
}
NObserver& operator = (NObserver&& observer)
{
if (&observer != this)
{
_pObject = std::move(observer._pObject);
_handler = std::move(observer._handler);
_syncHandler = std::move(observer._syncHandler);
_matcher = std::move(observer._matcher);
_matcherFunc = std::move(observer._matcherFunc);
}
return *this;
}
@@ -91,13 +131,18 @@ public:
handle(NotificationPtr(static_cast<N*>(pNf), true));
}
bool equals(const AbstractObserver& abstractObserver) const override
NotificationResult notifySync(Notification* pNf) const override
{
const NObserver* pObs = dynamic_cast<const NObserver*>(&abstractObserver);
return pObs && pObs->_pObject == _pObject && pObs->_handler == _handler && pObs->_matcher == _matcher;
return handleSync(NotificationPtr(static_cast<N*>(pNf), true));
}
POCO_DEPRECATED("use `bool accepts(const Notification::Ptr&)` instead")
bool equals(const AbstractObserver& abstractObserver) const override
{
const auto* pObs = dynamic_cast<const NObserver*>(&abstractObserver);
return pObs && pObs->_pObject == _pObject && pObs->_handler == _handler;
}
POCO_DEPRECATED("use `bool accepts(const Notification::Ptr&)` instead with matcher function if needed")
bool accepts(Notification* pNf, const char* pName) const override
{
return (!pName || pNf->name() == pName) && dynamic_cast<N*>(pNf) != nullptr;
@@ -105,7 +150,15 @@ public:
bool accepts(const Notification::Ptr& pNf) const override
{
return (match(pNf) && (pNf.template cast<N>() != nullptr));
if (hasMatcher())
return match(pNf);
else
return pNf.template cast<N>() != nullptr;
}
bool acceptsSync() const override
{
return _pObject != nullptr && _syncHandler != nullptr;
}
AbstractObserver* clone() const override
@@ -126,15 +179,39 @@ protected:
{
Mutex::ScopedLock lock(_mutex);
if (_pObject)
if (_pObject != nullptr)
(_pObject->*_handler)(ptr);
}
NotificationResult handleSync(const NotificationPtr& ptr) const
{
Mutex::ScopedLock lock(_mutex);
if (_pObject == nullptr || _syncHandler == nullptr)
return {};
return (_pObject->*_syncHandler)(ptr);
}
bool hasMatcher() const
{
return _pObject != nullptr &&
(_matcher != nullptr || _matcherFunc != nullptr);
}
bool match(const Notification::Ptr& ptr) const
{
Mutex::ScopedLock l(_mutex);
if (_pObject == nullptr)
return false;
return _pObject && (!_matcher || (_pObject->*_matcher)(ptr->name()));
if (_matcher)
return (_pObject->*_matcher)(ptr->name());
if (_matcherFunc)
return _matcherFunc(ptr->name());
return false;
}
Mutex& mutex() const
@@ -143,9 +220,12 @@ protected:
}
private:
C* _pObject;
Callback _handler;
Matcher _matcher;
C* _pObject {nullptr};
Handler _handler {nullptr};
SyncHandler _syncHandler {nullptr};
Matcher _matcher {nullptr};
MatcherFunc _matcherFunc;
mutable Poco::Mutex _mutex;
};

View File

@@ -19,14 +19,20 @@
#include "Poco/Foundation.h"
#include "Poco/Mutex.h"
#include "Poco/RefCountedObject.h"
#include "Poco/AutoPtr.h"
#include "Poco/Any.h"
#include <memory>
namespace Poco {
using NotificationResult = std::pair<std::string, Poco::Any>;
/// Used for synchronous notification processing.
/// Observer shall return a pair containing a string identifier
/// that is interpreted by the caller and an Any object for
/// the payload (result).
class Foundation_API Notification: public RefCountedObject
/// The base class for all notification classes used

View File

@@ -141,7 +141,7 @@ protected:
}
ObserverList observersToNotify(const Notification::Ptr& pNotification) const;
void notifyObservers(Notification::Ptr& pNotification);
virtual void notifyObservers(Notification::Ptr& pNotification);
private:

View File

@@ -27,7 +27,7 @@ namespace Poco {
template <class C, class N>
class Observer: public AbstractObserver
class POCO_DEPRECATED("use `NObserver` instead") Observer: public AbstractObserver
/// This template class implements an adapter that sits between
/// a NotificationCenter and an object receiving notifications
/// from it. It is quite similar in concept to the

View File

@@ -13,30 +13,26 @@
#include "Poco/AbstractObserver.h"
#include "Poco/Exception.h"
namespace Poco {
AbstractObserver::AbstractObserver() = default;
AbstractObserver::AbstractObserver(const AbstractObserver& /*observer*/) = default;
AbstractObserver::AbstractObserver(AbstractObserver&& /*observer*/) = default;
AbstractObserver::~AbstractObserver() = default;
AbstractObserver& AbstractObserver::operator = (const AbstractObserver& /*observer*/) = default;
AbstractObserver& AbstractObserver::operator = (AbstractObserver&& /*observer*/) = default;
AbstractObserver::AbstractObserver()
NotificationResult AbstractObserver::notifySync(Notification* pNf) const
{
throw Poco::NotImplementedException("Synchronous notification not implemented.");
}
AbstractObserver::AbstractObserver(const AbstractObserver& /*observer*/)
bool AbstractObserver::acceptsSync() const
{
return false;
}
AbstractObserver::~AbstractObserver()
{
}
AbstractObserver& AbstractObserver::operator = (const AbstractObserver& /*observer*/)
{
return *this;
}
} // namespace Poco

View File

@@ -15,17 +15,38 @@
#include "Poco/AsyncNotificationCenter.h"
#include "Poco/AbstractObserver.h"
#include "Poco/Stopwatch.h"
#include "Poco/Debugger.h"
#include "Poco/ErrorHandler.h"
#include "Poco/Format.h"
#include <vector>
namespace Poco {
#if (POCO_HAVE_JTHREAD)
AsyncNotificationCenter::AsyncNotificationCenter(): _ra(*this, &AsyncNotificationCenter::dequeue),
_started(false),
_done(false)
AsyncNotificationCenter::AsyncNotificationCenter(AsyncMode mode, std::size_t workersCount) :
_mode(mode),
_ra(*this, &AsyncNotificationCenter::dequeue),
_enqueueThreadStarted(false),
_enqueueThreadDone(false),
_workersCount(workersCount)
{
start();
}
#else
AsyncNotificationCenter::AsyncNotificationCenter() :
_ra(*this, &AsyncNotificationCenter::dequeue),
_enqueueThreadStarted(false),
_enqueueThreadDone(false)
{
start();
}
#endif
AsyncNotificationCenter::~AsyncNotificationCenter()
{
@@ -35,7 +56,19 @@ AsyncNotificationCenter::~AsyncNotificationCenter()
void AsyncNotificationCenter::postNotification(Notification::Ptr pNotification)
{
#if (POCO_HAVE_JTHREAD)
if (_mode == AsyncMode::ENQUEUE || _mode == AsyncMode::BOTH)
{
_nq.enqueueNotification(pNotification);
}
else
{
// Notification enqueue is synchronous
notifyObservers(pNotification);
}
#else
_nq.enqueueNotification(pNotification);
#endif
}
@@ -45,40 +78,131 @@ int AsyncNotificationCenter::backlog() const
}
std::vector<NotificationResult> AsyncNotificationCenter::synchronousDispatch(Notification::Ptr pNotification)
{
poco_check_ptr (pNotification);
auto observers = observersToNotify(pNotification);
if (observers.empty())
return {};
std::vector<NotificationResult> results;
results.reserve(observers.size());
for (auto& o : observers)
{
if (!o->acceptsSync())
continue;
results.push_back(o->notifySync(pNotification));
}
return results;
}
void AsyncNotificationCenter::notifyObservers(Notification::Ptr& pNotification)
{
poco_check_ptr (pNotification);
#if (POCO_HAVE_JTHREAD)
if (_mode == AsyncMode::NOTIFY || _mode == AsyncMode::BOTH)
{
// Notification is asynchronous, add it to the lists
// for appropriate observers.
std::unique_lock<std::mutex> lock(_listsMutex);
auto observers = observersToNotify(pNotification);
if (observers.empty())
return;
for (auto& o : observers)
{
auto& list = _lists[o];
list.push_back(pNotification);
}
_listsEmpty = false;
_listsEmptyCondition.notify_all();
}
else
{
// Notification is synchronous
NotificationCenter::notifyObservers(pNotification);
}
#else
NotificationCenter::notifyObservers(pNotification);
#endif
}
void AsyncNotificationCenter::start()
{
Poco::ScopedLock l(mutex());
if (_started)
if (_mode == AsyncMode::ENQUEUE || _mode == AsyncMode::BOTH)
{
throw Poco::InvalidAccessException(
Poco::format("thread already started %s", poco_src_loc));
if (_enqueueThreadStarted)
{
throw Poco::InvalidAccessException(
Poco::format("thread already started %s", poco_src_loc));
}
_enqueueThread.start(_ra);
Poco::Stopwatch sw;
sw.start();
while (!_enqueueThreadStarted)
{
if (sw.elapsedSeconds() > 5)
throw Poco::TimeoutException(poco_src_loc);
Thread::sleep(20);
}
}
_thread.start(_ra);
Poco::Stopwatch sw;
sw.start();
while (!_started)
#if (POCO_HAVE_JTHREAD)
_workerIterator = _lists.begin();
if (_mode == AsyncMode::NOTIFY || _mode == AsyncMode::BOTH)
{
if (sw.elapsedSeconds() > 5)
throw Poco::TimeoutException(poco_src_loc);
Thread::sleep(100);
auto dispatch = [this](std::stop_token stopToken, int id) {
this->dispatchNotifications(stopToken, id);
};
for (std::size_t i {0}; i < _workersCount; ++i)
{
auto worker = std::jthread(dispatch, i);
_workers.push_back(std::move(worker));
}
}
#endif
}
void AsyncNotificationCenter::stop()
{
if (!_started.exchange(false)) return;
_nq.wakeUpAll();
while (!_done) Thread::sleep(100);
_thread.join();
if (_mode == AsyncMode::ENQUEUE || _mode == AsyncMode::BOTH)
{
if (_enqueueThreadStarted.exchange(false))
{
_nq.wakeUpAll();
while (!_enqueueThreadDone) Thread::sleep(100);
_enqueueThread.join();
}
}
#if (POCO_HAVE_JTHREAD)
for (auto& t: _workers)
{
t.request_stop();
}
// TODO: Should the observer lists be cleared here or
// shall the workers send all of them to observers and then finish?
#endif
}
void AsyncNotificationCenter::dequeue()
{
Notification::Ptr pNf;
_started = true;
_done = false;
_enqueueThreadStarted = true;
_enqueueThreadDone = false;
while ((pNf = _nq.waitDequeueNotification()))
{
try
@@ -98,9 +222,68 @@ void AsyncNotificationCenter::dequeue()
Poco::ErrorHandler::handle();
}
}
_done = true;
_started = false;
_enqueueThreadDone = true;
_enqueueThreadStarted = false;
}
#if (POCO_HAVE_JTHREAD)
std::optional<AsyncNotificationCenter::NotificationTuple> AsyncNotificationCenter::nextNotification()
{
std::unique_lock<std::mutex> lock(_listsMutex);
for (std::size_t i {0}; i < _lists.size(); ++i)
{
if (_lists.empty())
break;
if (_workerIterator == _lists.end())
_workerIterator = _lists.begin();
auto& o = _workerIterator->first;
auto& l = _workerIterator->second;
if (l.empty())
{
++_workerIterator;
continue;
};
if (!hasObserver(*o))
{
// Observer is not registered anymore, remove its list
_workerIterator = _lists.erase(_workerIterator);
continue;
}
auto n = l.front();
l.pop_front();
return { {o, n} };
}
_listsEmpty = true;
return {};
}
void AsyncNotificationCenter::dispatchNotifications(std::stop_token& stopToken, int workerId)
{
while (!stopToken.stop_requested())
{
// get next observer and notification
auto no = nextNotification();
if (no.has_value())
{
auto [o, n] = *no;
o->notify(n);
continue;
}
// No notification available, wait for a while
std::unique_lock<std::mutex> lock(_listsMutex);
_listsEmptyCondition.wait_for(
lock, 1s,
[&stopToken, this] { return !_listsEmpty || stopToken.stop_requested(); }
);
if (stopToken.stop_requested())
break;
}
}
#endif
} // namespace Poco

View File

@@ -54,7 +54,7 @@ void NotificationCenter::addObserver(const AbstractObserver& observer)
void NotificationCenter::removeObserver(const AbstractObserver& observer)
{
Mutex::ScopedLock lock(_mutex);
for (ObserverList::iterator it = _observers.begin(); it != _observers.end(); ++it)
for (auto it = _observers.begin(); it != _observers.end(); ++it)
{
if (observer.equals(**it))
{

View File

@@ -20,9 +20,7 @@
namespace Poco {
NotificationQueue::NotificationQueue()
{
}
NotificationQueue::NotificationQueue() = default;
NotificationQueue::~NotificationQueue()
@@ -84,7 +82,7 @@ Notification* NotificationQueue::dequeueNotification()
Notification* NotificationQueue::waitDequeueNotification()
{
Notification::Ptr pNf;
WaitInfo* pWI = 0;
WaitInfo* pWI = nullptr;
{
FastMutex::ScopedLock lock(_mutex);
pNf = dequeueOne();
@@ -102,7 +100,7 @@ Notification* NotificationQueue::waitDequeueNotification()
Notification* NotificationQueue::waitDequeueNotification(long milliseconds)
{
Notification::Ptr pNf;
WaitInfo* pWI = 0;
WaitInfo* pWI = nullptr;
{
FastMutex::ScopedLock lock(_mutex);
pNf = dequeueOne();
@@ -118,7 +116,7 @@ Notification* NotificationQueue::waitDequeueNotification(long milliseconds)
{
FastMutex::ScopedLock lock(_mutex);
pNf = pWI->pNf;
for (WaitQueue::iterator it = _waitQueue.begin(); it != _waitQueue.end(); ++it)
for (auto it = _waitQueue.begin(); it != _waitQueue.end(); ++it)
{
if (*it == pWI)
{
@@ -179,7 +177,7 @@ void NotificationQueue::clear()
bool NotificationQueue::remove(Notification::Ptr pNotification)
{
FastMutex::ScopedLock lock(_mutex);
NfQueue::iterator it = std::find(_nfQueue.begin(), _nfQueue.end(), pNotification);
auto it = std::find(_nfQueue.begin(), _nfQueue.end(), pNotification);
if (it == _nfQueue.end())
{
return false;

View File

@@ -20,9 +20,7 @@
namespace Poco {
PriorityNotificationQueue::PriorityNotificationQueue()
{
}
PriorityNotificationQueue::PriorityNotificationQueue() = default;
PriorityNotificationQueue::~PriorityNotificationQueue()
@@ -67,7 +65,7 @@ Notification* PriorityNotificationQueue::dequeueNotification()
Notification* PriorityNotificationQueue::waitDequeueNotification()
{
Notification::Ptr pNf;
WaitInfo* pWI = 0;
WaitInfo* pWI = nullptr;
{
FastMutex::ScopedLock lock(_mutex);
pNf = dequeueOne();
@@ -85,7 +83,7 @@ Notification* PriorityNotificationQueue::waitDequeueNotification()
Notification* PriorityNotificationQueue::waitDequeueNotification(long milliseconds)
{
Notification::Ptr pNf;
WaitInfo* pWI = 0;
WaitInfo* pWI = nullptr;
{
FastMutex::ScopedLock lock(_mutex);
pNf = dequeueOne();
@@ -101,7 +99,7 @@ Notification* PriorityNotificationQueue::waitDequeueNotification(long millisecon
{
FastMutex::ScopedLock lock(_mutex);
pNf = pWI->pNf;
for (WaitQueue::iterator it = _waitQueue.begin(); it != _waitQueue.end(); ++it)
for (auto it = _waitQueue.begin(); it != _waitQueue.end(); ++it)
{
if (*it == pWI)
{
@@ -169,7 +167,7 @@ bool PriorityNotificationQueue::hasIdleThreads() const
Notification::Ptr PriorityNotificationQueue::dequeueOne()
{
Notification::Ptr pNf;
NfQueue::iterator it = _nfQueue.begin();
auto it = _nfQueue.begin();
if (it != _nfQueue.end())
{
pNf = it->second;

View File

@@ -20,9 +20,7 @@
namespace Poco {
TimedNotificationQueue::TimedNotificationQueue()
{
}
TimedNotificationQueue::TimedNotificationQueue() = default;
TimedNotificationQueue::~TimedNotificationQueue()
@@ -67,7 +65,7 @@ Notification* TimedNotificationQueue::dequeueNotification()
{
FastMutex::ScopedLock lock(_mutex);
NfQueue::iterator it = _nfQueue.begin();
auto it = _nfQueue.begin();
if (it != _nfQueue.end())
{
Clock::ClockDiff sleep = -it->first.elapsed();
@@ -78,7 +76,7 @@ Notification* TimedNotificationQueue::dequeueNotification()
return pNf.duplicate();
}
}
return 0;
return nullptr;
}
@@ -86,14 +84,14 @@ Notification* TimedNotificationQueue::dequeueNextNotification()
{
FastMutex::ScopedLock lock(_mutex);
NfQueue::iterator it = _nfQueue.begin();
auto it = _nfQueue.begin();
if (it != _nfQueue.end())
{
Notification::Ptr pNf = it->second;
_nfQueue.erase(it);
return pNf.duplicate();
}
return 0;
return nullptr;
}
Notification* TimedNotificationQueue::waitDequeueNotification()
@@ -101,7 +99,7 @@ Notification* TimedNotificationQueue::waitDequeueNotification()
for (;;)
{
_mutex.lock();
NfQueue::iterator it = _nfQueue.begin();
auto it = _nfQueue.begin();
if (it != _nfQueue.end())
{
_mutex.unlock();
@@ -130,7 +128,7 @@ Notification* TimedNotificationQueue::waitDequeueNotification(long milliseconds)
while (milliseconds >= 0)
{
_mutex.lock();
NfQueue::iterator it = _nfQueue.begin();
auto it = _nfQueue.begin();
if (it != _nfQueue.end())
{
_mutex.unlock();
@@ -163,9 +161,9 @@ Notification* TimedNotificationQueue::waitDequeueNotification(long milliseconds)
_nfAvailable.tryWait(milliseconds);
milliseconds -= static_cast<long>((now.elapsed() + 999)/1000);
}
else return 0;
else return nullptr;
}
return 0;
return nullptr;
}

View File

@@ -10,10 +10,11 @@
#include "NotificationCenterTest.h"
#include "CppUnit/TestCaller.h"
#include "CppUnit/TestCase.h"
#include "CppUnit/TestSuite.h"
#include "Poco/Notification.h"
#include "Poco/NotificationCenter.h"
#include "Poco/AsyncNotificationCenter.h"
#include "Poco/Observer.h"
#include "Poco/NObserver.h"
#include "Poco/AsyncObserver.h"
#include "Poco/AutoPtr.h"
@@ -21,38 +22,31 @@
using Poco::NotificationCenter;
using Poco::AsyncNotificationCenter;
using Poco::Observer;
using Poco::NObserver;
using Poco::AsyncObserver;
using Poco::Notification;
using Poco::AutoPtr;
class TestNotification: public Notification
{
public:
TestNotification()
TestNotification() = default;
TestNotification(const std::string& name, int n = 0):
Notification(name), num(n)
{}
TestNotification(const std::string& name):
Notification(name)
{}
int num {0};
};
NotificationCenterTest::NotificationCenterTest(const std::string& name):
CppUnit::TestCase(name),
_handle1Done(false),
_handleAuto1Done(false),
_handleAsync1Done(false),
_handleAsync2Done(false)
CppUnit::TestCase(name)
{
}
NotificationCenterTest::~NotificationCenterTest()
{
}
NotificationCenterTest::~NotificationCenterTest() = default;
void NotificationCenterTest::testNotificationCenter1()
@@ -65,7 +59,7 @@ void NotificationCenterTest::testNotificationCenter1()
void NotificationCenterTest::testNotificationCenter2()
{
NotificationCenter nc;
Observer<NotificationCenterTest, Notification> o(*this, &NotificationCenterTest::handle1);
NObserver<NotificationCenterTest, Notification> o(*this, &NotificationCenterTest::handle1);
nc.addObserver(o);
assertTrue (nc.hasObserver(o));
assertTrue (nc.hasObservers());
@@ -73,7 +67,7 @@ void NotificationCenterTest::testNotificationCenter2()
nc.postNotification(new Notification);
assertTrue (_set.size() == 1);
assertTrue (_set.find("handle1") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
assertTrue (!nc.hasObserver(o));
assertTrue (!nc.hasObservers());
assertTrue (nc.countObservers() == 0);
@@ -83,8 +77,8 @@ void NotificationCenterTest::testNotificationCenter2()
void NotificationCenterTest::testNotificationCenter3()
{
NotificationCenter nc;
Observer<NotificationCenterTest, Notification> o1(*this, &NotificationCenterTest::handle1);
Observer<NotificationCenterTest, Notification> o2(*this, &NotificationCenterTest::handle2);
NObserver<NotificationCenterTest, Notification> o1(*this, &NotificationCenterTest::handle1);
NObserver<NotificationCenterTest, Notification> o2(*this, &NotificationCenterTest::handle2);
nc.addObserver(o1);
assertTrue (nc.hasObserver(o1));
nc.addObserver(o2);
@@ -95,9 +89,9 @@ void NotificationCenterTest::testNotificationCenter3()
assertTrue (_set.size() == 2);
assertTrue (_set.find("handle1") != _set.end());
assertTrue (_set.find("handle2") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
assertTrue (!nc.hasObserver(o1));
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2));
assertTrue (!nc.hasObserver(o2));
assertTrue (!nc.hasObservers());
assertTrue (nc.countObservers() == 0);
@@ -107,8 +101,8 @@ void NotificationCenterTest::testNotificationCenter3()
void NotificationCenterTest::testNotificationCenter4()
{
NotificationCenter nc;
Observer<NotificationCenterTest, Notification> o1(*this, &NotificationCenterTest::handle1);
Observer<NotificationCenterTest, Notification> o2(*this, &NotificationCenterTest::handle2);
NObserver<NotificationCenterTest, Notification> o1(*this, &NotificationCenterTest::handle1);
NObserver<NotificationCenterTest, Notification> o2(*this, &NotificationCenterTest::handle2);
nc.addObserver(o1);
assertTrue (nc.hasObserver(o1));
nc.addObserver(o2);
@@ -117,20 +111,20 @@ void NotificationCenterTest::testNotificationCenter4()
assertTrue (_set.size() == 2);
assertTrue (_set.find("handle1") != _set.end());
assertTrue (_set.find("handle2") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
assertTrue (!nc.hasObserver(o1));
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle2));
assertTrue (!nc.hasObserver(o2));
_set.clear();
nc.postNotification(new Notification);
assertTrue (_set.empty());
Observer<NotificationCenterTest, Notification> o3(*this, &NotificationCenterTest::handle3);
NObserver<NotificationCenterTest, Notification> o3(*this, &NotificationCenterTest::handle3);
nc.addObserver(o3);
assertTrue (nc.hasObserver(o3));
nc.postNotification(new Notification);
assertTrue (_set.size() == 1);
assertTrue (_set.find("handle3") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle3));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle3));
assertTrue (!nc.hasObserver(o3));
}
@@ -138,8 +132,8 @@ void NotificationCenterTest::testNotificationCenter4()
void NotificationCenterTest::testNotificationCenter5()
{
NotificationCenter nc;
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.addObserver(Observer<NotificationCenterTest, TestNotification>(*this, &NotificationCenterTest::handleTest));
nc.addObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.addObserver(NObserver<NotificationCenterTest, TestNotification>(*this, &NotificationCenterTest::handleTest));
nc.postNotification(new Notification);
assertTrue (_set.size() == 1);
assertTrue (_set.find("handle1") != _set.end());
@@ -148,8 +142,8 @@ void NotificationCenterTest::testNotificationCenter5()
assertTrue (_set.size() == 2);
assertTrue (_set.find("handle1") != _set.end());
assertTrue (_set.find("handleTest") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.removeObserver(Observer<NotificationCenterTest, TestNotification>(*this, &NotificationCenterTest::handleTest));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.removeObserver(NObserver<NotificationCenterTest, TestNotification>(*this, &NotificationCenterTest::handleTest));
}
@@ -192,7 +186,7 @@ void NotificationCenterTest::testAsyncObserver()
void NotificationCenterTest::testAsyncNotificationCenter()
{
using ObserverT = AsyncObserver<NotificationCenterTest, TestNotification>::Type;
using ObserverT = NObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc;
@@ -216,14 +210,203 @@ void NotificationCenterTest::testAsyncNotificationCenter()
}
void NotificationCenterTest::testAsyncNotificationCenter2()
{
using ObserverT = NObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc;
const auto matchAsync = [](const std::string& s) -> bool
{
return s.find("asyncNotification") == 0;
};
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, matchAsync));
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, matchAsync));
nc.postNotification(new TestNotification("asyncNotification"));
nc.postNotification(new TestNotification("anotherNotification"));
nc.postNotification(new Notification);
while (!_handleAsync1Done || !_handleAsync2Done)
Poco::Thread::sleep(100);
Poco::Mutex::ScopedLock l(_mutex);
assertTrue(_set.size() == 2);
assertTrue(_set.find("handleAsync1") != _set.end());
assertTrue(_set.find("handleAsync2") != _set.end());
}
void NotificationCenterTest::testAsyncNotificationCenterSyncNotify()
{
using ObserverT = NObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc;
const auto matchAsync = [](const std::string& s) -> bool
{
return s.find("asyncNotification") == 0;
};
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, matchAsync, &NotificationCenterTest::handleSync));
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, matchAsync));
const auto res = nc.synchronousDispatch(new TestNotification("asyncNotification"));
assertFalse(res.empty());
assertEquals(res.size(), 1);
assertEquals(res[0].first, "handleAsync1");
Poco::Mutex::ScopedLock l(_mutex);
assertTrue(_set.size() == 1);
assertTrue(_set.find("handleAsync1") != _set.end());
assertTrue(_handleAsync1Counter == 1);
}
void NotificationCenterTest::testAsyncNotificationCenterAsyncNotify()
{
#if (POCO_HAVE_JTHREAD)
using ObserverT = NObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc(AsyncNotificationCenter::AsyncMode::NOTIFY);
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
nc.postNotification(new TestNotification("asyncNotification"));
nc.postNotification(new TestNotification("anotherNotification"));
nc.postNotification(new Notification);
while (!_handleAsync1Done || !_handleAsync2Done)
Poco::Thread::sleep(100);
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
Poco::Mutex::ScopedLock l(_mutex);
assertTrue(_set.size() == 2);
assertTrue(_set.find("handleAsync1") != _set.end());
assertTrue(_set.find("handleAsync2") != _set.end());
#endif
}
void NotificationCenterTest::testAsyncNotificationCenterAsyncBoth()
{
#if (POCO_HAVE_JTHREAD)
using ObserverT = NObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc(AsyncNotificationCenter::AsyncMode::BOTH);
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
nc.postNotification(new TestNotification("asyncNotification"));
nc.postNotification(new TestNotification("anotherNotification"));
nc.postNotification(new Notification);
while (!_handleAsync1Done || !_handleAsync2Done)
Poco::Thread::sleep(100);
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
Poco::Mutex::ScopedLock l(_mutex);
assertTrue(_set.size() == 2);
assertTrue(_set.find("handleAsync1") != _set.end());
assertTrue(_set.find("handleAsync2") != _set.end());
#endif
}
void NotificationCenterTest::testAsyncNotificationCenterAsyncNotifyStress()
{
#if (POCO_HAVE_JTHREAD)
using ObserverT = NObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc(AsyncNotificationCenter::AsyncMode::NOTIFY);
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
for (int i {0}; i < 1000; ++i)
{
nc.postNotification(new TestNotification("asyncNotification", i));
nc.postNotification(new TestNotification("anotherNotification", i));
nc.postNotification(new Notification);
}
// Give enough time for the notifications to be delivered
Poco::Thread::sleep(2000);
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
Poco::Mutex::ScopedLock l(_mutex);
assertTrue(_set.size() == 2);
assertTrue(_set.find("handleAsync1") != _set.end());
assertTrue(_set.find("handleAsync2") != _set.end());
assertTrue(_handleAsync1Counter == 1000ul);
assertTrue(_handleAsync2Counter == 1000ul);
#endif
}
void NotificationCenterTest::testAsyncNotificationCenterAsyncRemoveObserver()
{
#if (POCO_HAVE_JTHREAD)
using ObserverT = NObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc(AsyncNotificationCenter::AsyncMode::NOTIFY);
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.addObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
for (int i {0}; i < 1000; ++i)
{
nc.postNotification(new TestNotification("asyncNotification", i));
nc.postNotification(new TestNotification("anotherNotification", i));
if (i == 100)
{
while (!_handleAsync2Done)
Poco::Thread::sleep(50);
// Remove one observer while notifications are still being posted
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync2, &NotificationCenterTest::matchAsync));
}
}
// Give enough time for the notifications to be delivered
Poco::Thread::sleep(2000);
nc.removeObserver(ObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
Poco::Mutex::ScopedLock l(_mutex);
assertTrue(_set.size() == 2);
assertTrue(_set.find("handleAsync1") != _set.end());
assertTrue(_set.find("handleAsync2") != _set.end());
assertTrue(_handleAsync1Counter == 1000ul);
assertTrue(_handleAsync2Counter < 1000ul);
#endif
}
void NotificationCenterTest::testDefaultNotificationCenter()
{
NotificationCenter& nc = NotificationCenter::defaultCenter();
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.addObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.postNotification(new Notification);
assertTrue (_set.size() == 1);
assertTrue (_set.find("handle1") != _set.end());
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
}
@@ -232,7 +415,7 @@ void NotificationCenterTest::testMixedObservers()
using AObserverT = AsyncObserver<NotificationCenterTest, TestNotification>::Type;
AsyncNotificationCenter nc;
nc.addObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.addObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.addObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handleAuto));
nc.addObserver(AObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.postNotification(new Notification);
@@ -243,7 +426,7 @@ void NotificationCenterTest::testMixedObservers()
nc.removeObserver(AObserverT(*this, &NotificationCenterTest::handleAsync1, &NotificationCenterTest::matchAsync));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handleAuto));
nc.removeObserver(Observer<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
nc.removeObserver(NObserver<NotificationCenterTest, Notification>(*this, &NotificationCenterTest::handle1));
Poco::Mutex::ScopedLock l(_mutex);
assertTrue (_set.size() == 3);
assertTrue (_set.find("handle1") != _set.end());
@@ -252,36 +435,32 @@ void NotificationCenterTest::testMixedObservers()
}
void NotificationCenterTest::handle1(Poco::Notification* pNf)
void NotificationCenterTest::handle1(const AutoPtr<Notification>& pNf)
{
Poco::Mutex::ScopedLock l(_mutex);
poco_check_ptr (pNf);
AutoPtr<Notification> nf = pNf;
_set.insert("handle1");
_handle1Done = true;
}
void NotificationCenterTest::handle2(Poco::Notification* pNf)
void NotificationCenterTest::handle2(const AutoPtr<Notification>& pNf)
{
poco_check_ptr (pNf);
AutoPtr<Notification> nf = pNf;
_set.insert("handle2");
}
void NotificationCenterTest::handle3(Poco::Notification* pNf)
void NotificationCenterTest::handle3(const AutoPtr<Notification>& pNf)
{
poco_check_ptr (pNf);
AutoPtr<Notification> nf = pNf;
_set.insert("handle3");
}
void NotificationCenterTest::handleTest(TestNotification* pNf)
void NotificationCenterTest::handleTest(const AutoPtr<TestNotification>& pNf)
{
poco_check_ptr (pNf);
AutoPtr<TestNotification> nf = pNf;
_set.insert("handleTest");
}
@@ -299,6 +478,15 @@ void NotificationCenterTest::handleAsync1(const AutoPtr<TestNotification>& pNf)
Poco::Mutex::ScopedLock l(_mutex);
_set.insert("handleAsync1");
_handleAsync1Done = true;
_handleAsync1Counter++;
}
Poco::NotificationResult NotificationCenterTest::handleSync(const AutoPtr<TestNotification>& pNf)
{
Poco::Mutex::ScopedLock l(_mutex);
_set.insert("handleAsync1");
return std::make_pair("handleAsync1", Poco::Any(++_handleAsync1Counter));
}
@@ -307,6 +495,7 @@ void NotificationCenterTest::handleAsync2(const AutoPtr<TestNotification>& pNf)
Poco::Mutex::ScopedLock l(_mutex);
_set.insert("handleAsync2");
_handleAsync2Done = true;
_handleAsync2Counter++;
}
@@ -319,6 +508,13 @@ bool NotificationCenterTest::matchAsync(const std::string& name) const
void NotificationCenterTest::setUp()
{
_set.clear();
_handle1Done = false;
_handleAuto1Done = false;
_handleAsync1Done = false;
_handleAsync2Done = false;
_handleAsync1Counter = 0;
_handleAsync2Counter = 0;
}
@@ -339,6 +535,12 @@ CppUnit::Test* NotificationCenterTest::suite()
CppUnit_addTest(pSuite, NotificationCenterTest, testNotificationCenterAuto);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncObserver);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncNotificationCenter);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncNotificationCenter2);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncNotificationCenterSyncNotify);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncNotificationCenterAsyncNotify);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncNotificationCenterAsyncBoth);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncNotificationCenterAsyncNotifyStress);
CppUnit_addTest(pSuite, NotificationCenterTest, testAsyncNotificationCenterAsyncRemoveObserver);
CppUnit_addTest(pSuite, NotificationCenterTest, testDefaultNotificationCenter);
CppUnit_addTest(pSuite, NotificationCenterTest, testMixedObservers);

View File

@@ -29,7 +29,7 @@ class NotificationCenterTest: public CppUnit::TestCase
{
public:
NotificationCenterTest(const std::string& name);
~NotificationCenterTest();
~NotificationCenterTest() override;
void testNotificationCenter1();
void testNotificationCenter2();
@@ -39,30 +39,41 @@ public:
void testNotificationCenterAuto();
void testAsyncObserver();
void testAsyncNotificationCenter();
void testAsyncNotificationCenter2();
void testAsyncNotificationCenterSyncNotify();
void testAsyncNotificationCenterAsyncNotify();
void testAsyncNotificationCenterAsyncBoth();
void testAsyncNotificationCenterAsyncNotifyStress();
void testAsyncNotificationCenterAsyncRemoveObserver();
void testDefaultNotificationCenter();
void testMixedObservers();
void setUp();
void tearDown();
void setUp() override;
void tearDown() override;
static CppUnit::Test* suite();
protected:
void handle1(Poco::Notification* pNf);
void handle2(Poco::Notification* pNf);
void handle3(Poco::Notification* pNf);
void handleTest(TestNotification* pNf);
void handle1(const Poco::AutoPtr<Poco::Notification>& pNf);
void handle2(const Poco::AutoPtr<Poco::Notification>& pNf);
void handle3(const Poco::AutoPtr<Poco::Notification>& pNf);
void handleTest(const Poco::AutoPtr<TestNotification>& pNf);
void handleAuto(const Poco::AutoPtr<Poco::Notification>& pNf);
void handleAsync1(const Poco::AutoPtr<TestNotification>& pNf);
void handleAsync2(const Poco::AutoPtr<TestNotification>& pNf);
Poco::NotificationResult handleSync(const Poco::AutoPtr<TestNotification>& pNf);
bool matchAsync(const std::string& name) const;
private:
std::set<std::string> _set;
std::atomic<bool> _handle1Done;
std::atomic<bool> _handleAuto1Done;
std::atomic<bool> _handleAsync1Done;
std::atomic<bool> _handleAsync2Done;
std::atomic<bool> _handle1Done {false};
std::atomic<bool> _handleAuto1Done {false};
std::atomic<bool> _handleAsync1Done {false};
std::atomic<bool> _handleAsync2Done {false};
std::atomic<std::size_t> _handleAsync1Counter {0};
std::atomic<std::size_t> _handleAsync2Counter {0};
Poco::Mutex _mutex;
};

View File

@@ -19,7 +19,7 @@
#include "Poco/Thread.h"
#include "Poco/ThreadPool.h"
#include "Poco/Event.h"
#include "Poco/Observer.h"
#include "Poco/NObserver.h"
#include "Poco/Exception.h"
#include "Poco/AutoPtr.h"
#include <iostream>
@@ -37,7 +37,7 @@ using Poco::TaskCustomNotification;
using Poco::Thread;
using Poco::ThreadPool;
using Poco::Event;
using Poco::Observer;
using Poco::NObserver;
using Poco::Exception;
using Poco::NoThreadAvailableException;
using Poco::SystemException;
@@ -57,7 +57,7 @@ namespace
{
}
void runTask()
void runTask() override
{
_started = true;
_event.wait();
@@ -112,7 +112,7 @@ namespace
_started(false),
_cancelled(false),
_finished(false),
_pException(0),
_pException(nullptr),
_progress(0.0)
{
}
@@ -122,34 +122,29 @@ namespace
delete _pException;
}
void taskStarted(TaskStartedNotification* pNf)
void taskStarted(const AutoPtr<TaskStartedNotification>& pNf)
{
_started = true;
pNf->release();
}
void taskCancelled(TaskCancelledNotification* pNf)
void taskCancelled(const AutoPtr<TaskCancelledNotification>& pNf)
{
_cancelled = true;
pNf->release();
}
void taskFinished(TaskFinishedNotification* pNf)
void taskFinished(const AutoPtr<TaskFinishedNotification>& pNf)
{
_finished = true;
pNf->release();
}
void taskFailed(TaskFailedNotification* pNf)
void taskFailed(const AutoPtr<TaskFailedNotification>& pNf)
{
_pException = pNf->reason().clone();
pNf->release();
}
void taskProgress(TaskProgressNotification* pNf)
void taskProgress(const AutoPtr<TaskProgressNotification>& pNf)
{
_progress = pNf->progress();
pNf->release();
}
bool started() const
@@ -196,7 +191,7 @@ namespace
{
}
void runTask()
void runTask() override
{
sleep(10000);
}
@@ -220,14 +215,11 @@ namespace
{
}
~CustomTaskObserver()
{
}
~CustomTaskObserver() = default;
void taskCustom(TaskCustomNotification<C>* pNf)
void taskCustom(const AutoPtr<TaskCustomNotification<C>>& pNf)
{
_custom = pNf->custom();
pNf->release();
}
const C& custom() const
@@ -246,20 +238,18 @@ TaskManagerTest::TaskManagerTest(const std::string& name): CppUnit::TestCase(nam
}
TaskManagerTest::~TaskManagerTest()
{
}
TaskManagerTest::~TaskManagerTest() = default;
void TaskManagerTest::testFinish()
{
TaskManager tm;
TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.addObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
@@ -286,11 +276,11 @@ void TaskManagerTest::testFinish()
assertTrue (!to.error());
tm.cancelAll();
tm.joinAll();
tm.removeObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.removeObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
}
@@ -298,11 +288,11 @@ void TaskManagerTest::testCancel()
{
TaskManager tm;
TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.addObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask;
tm.start(pTT.duplicate());
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
@@ -331,11 +321,11 @@ void TaskManagerTest::testCancel()
assertTrue (!to.error());
tm.cancelAll();
tm.joinAll();
tm.removeObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.removeObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
}
@@ -343,11 +333,11 @@ void TaskManagerTest::testError()
{
TaskManager tm;
TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.addObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask;
assertTrue (tm.start(pTT.duplicate()));
while (pTT->state() < Task::TASK_RUNNING) Thread::sleep(50);
@@ -369,17 +359,17 @@ void TaskManagerTest::testError()
assertTrue (pTT->state() == Task::TASK_FINISHED);
while (!to.finished()) Thread::sleep(50);
assertTrue (to.finished());
assertTrue (to.error() != 0);
assertTrue (to.error() != nullptr);
while (tm.count() == 1) Thread::sleep(50);
list = tm.taskList();
assertTrue (list.empty());
tm.cancelAll();
tm.joinAll();
tm.removeObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.removeObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
}
@@ -389,7 +379,7 @@ void TaskManagerTest::testCustom()
CustomTaskObserver<int> ti(0);
tm.addObserver(
Observer<CustomTaskObserver<int>, TaskCustomNotification<int> >
NObserver<CustomTaskObserver<int>, TaskCustomNotification<int> >
(ti, &CustomTaskObserver<int>::taskCustom));
AutoPtr<CustomNotificationTask<int> > pCNT1 = new CustomNotificationTask<int>(0);
@@ -404,7 +394,7 @@ void TaskManagerTest::testCustom()
CustomTaskObserver<std::string> ts("");
tm.addObserver(
Observer<CustomTaskObserver<std::string>, TaskCustomNotification<std::string> >
NObserver<CustomTaskObserver<std::string>, TaskCustomNotification<std::string> >
(ts, &CustomTaskObserver<std::string>::taskCustom));
AutoPtr<CustomNotificationTask<std::string> > pCNT2 = new CustomNotificationTask<std::string>("");
@@ -422,7 +412,7 @@ void TaskManagerTest::testCustom()
CustomTaskObserver<S*> ptst(&s);
tm.addObserver(
Observer<CustomTaskObserver<S*>, TaskCustomNotification<S*> >
NObserver<CustomTaskObserver<S*>, TaskCustomNotification<S*> >
(ptst, &CustomTaskObserver<S*>::taskCustom));
AutoPtr<CustomNotificationTask<S*> > pCNT3 = new CustomNotificationTask<S*>(&s);
@@ -442,7 +432,7 @@ void TaskManagerTest::testCustom()
CustomTaskObserver<S> tst(s);
tm.addObserver(
Observer<CustomTaskObserver<S>, TaskCustomNotification<S> >
NObserver<CustomTaskObserver<S>, TaskCustomNotification<S> >
(tst, &CustomTaskObserver<S>::taskCustom));
AutoPtr<CustomNotificationTask<S> > pCNT4 = new CustomNotificationTask<S>(s);
@@ -471,11 +461,11 @@ void TaskManagerTest::testCancelNoStart()
{
TaskManager tm;
TaskObserver to;
tm.addObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.addObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.addObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.addObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.addObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.addObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
AutoPtr<TestTask> pTT = new TestTask;
pTT->cancel();
assertTrue (pTT->isCancelled());
@@ -483,11 +473,11 @@ void TaskManagerTest::testCancelNoStart()
assertTrue (pTT->progress() == 0);
assertTrue (pTT->isCancelled());
assertFalse (pTT->hasOwner());
tm.removeObserver(Observer<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(Observer<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(Observer<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(Observer<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(Observer<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
tm.removeObserver(NObserver<TaskObserver, TaskStartedNotification>(to, &TaskObserver::taskStarted));
tm.removeObserver(NObserver<TaskObserver, TaskCancelledNotification>(to, &TaskObserver::taskCancelled));
tm.removeObserver(NObserver<TaskObserver, TaskFailedNotification>(to, &TaskObserver::taskFailed));
tm.removeObserver(NObserver<TaskObserver, TaskFinishedNotification>(to, &TaskObserver::taskFinished));
tm.removeObserver(NObserver<TaskObserver, TaskProgressNotification>(to, &TaskObserver::taskProgress));
}

View File

@@ -45,7 +45,7 @@ class ParallelSocketAcceptor
{
public:
using ParallelReactor = Poco::Net::ParallelSocketReactor<SR>;
using Observer = Poco::Observer<ParallelSocketAcceptor, ReadableNotification>;
using Observer = Poco::NObserver<ParallelSocketAcceptor, ReadableNotification>;
explicit ParallelSocketAcceptor(ServerSocket& socket,
unsigned threads = Poco::Environment::processorCount(),
@@ -134,10 +134,9 @@ public:
}
}
void onAccept(ReadableNotification* pNotification)
void onAccept(const AutoPtr<ReadableNotification>& pNotification)
/// Accepts connection and creates event handler.
{
pNotification->release();
StreamSocket sock = _socket.acceptConnection();
_pReactor->wakeUp();
createServiceHandler(sock);

View File

@@ -23,7 +23,7 @@
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Observer.h"
#include "Poco/NObserver.h"
namespace Poco {
@@ -68,7 +68,7 @@ class SocketAcceptor
/// if special steps are necessary to create a ServiceHandler object.
{
public:
using Observer = Poco::Observer<SocketAcceptor, ReadableNotification>;
using Observer = Poco::NObserver<SocketAcceptor, ReadableNotification>;
explicit SocketAcceptor(ServerSocket& socket):
_socket(socket),
@@ -146,10 +146,9 @@ public:
}
}
void onAccept(ReadableNotification* pNotification)
void onAccept(const AutoPtr<ReadableNotification>& pNotification)
/// Accepts connection and creates event handler.
{
pNotification->release();
StreamSocket sock = _socket.acceptConnection();
_pReactor->wakeUp();
createServiceHandler(sock);

View File

@@ -115,9 +115,9 @@ public:
/// The overriding method must call the baseclass implementation first.
{
_pReactor = &reactor;
_pReactor->addEventHandler(_socket, Poco::Observer<SocketConnector, ReadableNotification>(*this, &SocketConnector::onReadable));
_pReactor->addEventHandler(_socket, Poco::Observer<SocketConnector, WritableNotification>(*this, &SocketConnector::onWritable));
_pReactor->addEventHandler(_socket, Poco::Observer<SocketConnector, ErrorNotification>(*this, &SocketConnector::onError));
_pReactor->addEventHandler(_socket, Poco::NObserver<SocketConnector, ReadableNotification>(*this, &SocketConnector::onReadable));
_pReactor->addEventHandler(_socket, Poco::NObserver<SocketConnector, WritableNotification>(*this, &SocketConnector::onWritable));
_pReactor->addEventHandler(_socket, Poco::NObserver<SocketConnector, ErrorNotification>(*this, &SocketConnector::onError));
}
virtual void unregisterConnector()
@@ -130,32 +130,29 @@ public:
{
if (_pReactor)
{
_pReactor->removeEventHandler(_socket, Poco::Observer<SocketConnector, ReadableNotification>(*this, &SocketConnector::onReadable));
_pReactor->removeEventHandler(_socket, Poco::Observer<SocketConnector, WritableNotification>(*this, &SocketConnector::onWritable));
_pReactor->removeEventHandler(_socket, Poco::Observer<SocketConnector, ErrorNotification>(*this, &SocketConnector::onError));
_pReactor->removeEventHandler(_socket, Poco::NObserver<SocketConnector, ReadableNotification>(*this, &SocketConnector::onReadable));
_pReactor->removeEventHandler(_socket, Poco::NObserver<SocketConnector, WritableNotification>(*this, &SocketConnector::onWritable));
_pReactor->removeEventHandler(_socket, Poco::NObserver<SocketConnector, ErrorNotification>(*this, &SocketConnector::onError));
}
}
void onReadable(ReadableNotification* pNotification)
void onReadable(const AutoPtr<ReadableNotification>& pNotification)
{
unregisterConnector();
pNotification->release();
int err = _socket.impl()->socketError();
if (err) onError(err);
else onConnect();
}
void onWritable(WritableNotification* pNotification)
void onWritable(const AutoPtr<WritableNotification>& pNotification)
{
unregisterConnector();
pNotification->release();
onConnect();
}
void onError(ErrorNotification* pNotification)
void onError(const AutoPtr<ErrorNotification>& pNotification)
{
unregisterConnector();
pNotification->release();
onError(_socket.impl()->socketError());
}

View File

@@ -18,7 +18,7 @@
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Observer.h"
#include "Poco/NObserver.h"
#include <iostream>
@@ -33,7 +33,7 @@ using Poco::Net::ReadableNotification;
using Poco::Net::WritableNotification;
using Poco::Net::TimeoutNotification;
using Poco::Net::ShutdownNotification;
using Poco::Observer;
using Poco::NObserver;
namespace
@@ -45,17 +45,16 @@ namespace
_socket(socket),
_reactor(reactor)
{
_reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
_reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
}
~EchoServiceHandler()
{
_reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
}
void onReadable(ReadableNotification* pNf)
void onReadable(const AutoPtr<ReadableNotification>& pNf)
{
pNf->release();
char buffer[8];
int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n > 0) _socket.sendBytes(buffer, n);
@@ -96,24 +95,21 @@ namespace
Thread::sleep(100);
}
void onShutdown(ShutdownNotification* pNf)
void onShutdown(const AutoPtr<ShutdownNotification>& pNf)
{
if (pNf) pNf->release();
_reactor.removeEventHandler(_socket, _os);
delete this;
}
void onReadable(ReadableNotification* pNf)
void onReadable(const AutoPtr<ReadableNotification>& pNf)
{
if (pNf) pNf->release();
char buffer[32];
int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n <= 0) onShutdown(0);
}
void onWritable(WritableNotification* pNf)
void onWritable(const AutoPtr<WritableNotification>& pNf)
{
if (pNf) pNf->release();
_reactor.removeEventHandler(_socket, _ow);
std::string data(5, 'x');
_socket.sendBytes(data.data(), (int) data.length());
@@ -122,9 +118,9 @@ namespace
StreamSocket _socket;
SocketReactor& _reactor;
Observer<ClientServiceHandler, ReadableNotification> _or;
Observer<ClientServiceHandler, WritableNotification> _ow;
Observer<ClientServiceHandler, ShutdownNotification> _os;
NObserver<ClientServiceHandler, ReadableNotification> _or;
NObserver<ClientServiceHandler, WritableNotification> _ow;
NObserver<ClientServiceHandler, ShutdownNotification> _os;
};
}

View File

@@ -19,7 +19,7 @@
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Observer.h"
#include "Poco/NObserver.h"
#include "Poco/Stopwatch.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
@@ -39,7 +39,7 @@ using Poco::Net::WritableNotification;
using Poco::Net::TimeoutNotification;
using Poco::Net::ErrorNotification;
using Poco::Net::ShutdownNotification;
using Poco::Observer;
using Poco::NObserver;
using Poco::Stopwatch;
using Poco::IllegalStateException;
using Poco::Thread;
@@ -58,16 +58,13 @@ namespace
_socket(socket),
_reactor(reactor)
{
_reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
_reactor.addEventHandler(_socket, NObserver<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
}
~EchoServiceHandler()
{
}
~EchoServiceHandler() = default;
void onReadable(ReadableNotification* pNf)
void onReadable(const AutoPtr<ReadableNotification>& pNf)
{
pNf->release();
char buffer[8];
int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n > 0)
@@ -76,7 +73,7 @@ namespace
}
else
{
_reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, NObserver<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
delete this;
}
}
@@ -121,9 +118,8 @@ namespace
_reactor.removeEventHandler(_socket, _os);
}
void onReadable(ReadableNotification* pNf)
void onReadable(const AutoPtr<ReadableNotification>& pNf)
{
pNf->release();
char buffer[32];
int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n > 0)
@@ -135,7 +131,7 @@ namespace
else
{
checkReadableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, NObserver<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
checkReadableObserverCount(0);
if (_once)
{
@@ -151,20 +147,18 @@ namespace
}
}
void onWritable(WritableNotification* pNf)
void onWritable(const AutoPtr<WritableNotification>& pNf)
{
pNf->release();
checkWritableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable));
_reactor.removeEventHandler(_socket, NObserver<ClientServiceHandler, WritableNotification>(*this, &ClientServiceHandler::onWritable));
checkWritableObserverCount(0);
std::string data(DATA_SIZE, 'x');
_socket.sendBytes(data.data(), (int) data.length());
_socket.shutdownSend();
}
void onTimeout(TimeoutNotification* pNf)
void onTimeout(const AutoPtr<TimeoutNotification>& pNf)
{
pNf->release();
_timeout = true;
if (_closeOnTimeout)
{
@@ -173,9 +167,8 @@ namespace
}
}
void onShutdown(ShutdownNotification* pNf)
void onShutdown(const AutoPtr<ShutdownNotification>& pNf)
{
pNf->release();
delete this;
}
@@ -254,10 +247,10 @@ namespace
StreamSocket _socket;
SocketReactor& _reactor;
Observer<ClientServiceHandler, ReadableNotification> _or;
Observer<ClientServiceHandler, WritableNotification> _ow;
Observer<ClientServiceHandler, TimeoutNotification> _ot;
Observer<ClientServiceHandler, ShutdownNotification> _os;
NObserver<ClientServiceHandler, ReadableNotification> _or;
NObserver<ClientServiceHandler, WritableNotification> _ow;
NObserver<ClientServiceHandler, TimeoutNotification> _ot;
NObserver<ClientServiceHandler, ShutdownNotification> _os;
std::stringstream _str;
static std::string _data;
static bool _readableError;
@@ -285,19 +278,17 @@ namespace
_failed(false),
_shutdown(false)
{
reactor.addEventHandler(socket(), Observer<FailConnector, TimeoutNotification>(*this, &FailConnector::onTimeout));
reactor.addEventHandler(socket(), Observer<FailConnector, ShutdownNotification>(*this, &FailConnector::onShutdown));
reactor.addEventHandler(socket(), NObserver<FailConnector, TimeoutNotification>(*this, &FailConnector::onTimeout));
reactor.addEventHandler(socket(), NObserver<FailConnector, ShutdownNotification>(*this, &FailConnector::onShutdown));
}
void onShutdown(ShutdownNotification* pNf)
void onShutdown(const AutoPtr<ShutdownNotification>& pNf)
{
pNf->release();
_shutdown = true;
}
void onTimeout(TimeoutNotification* pNf)
void onTimeout(const AutoPtr<TimeoutNotification>& pNf)
{
pNf->release();
_failed = true;
reactor()->stop();
}
@@ -320,7 +311,7 @@ namespace
class DataServiceHandler
{
public:
typedef std::vector<std::string> Data;
using Data = std::vector<std::string>;
DataServiceHandler(StreamSocket& socket, SocketReactor& reactor):
_socket(socket),
@@ -328,20 +319,19 @@ namespace
_pos(0)
{
_data.resize(1);
_reactor.addEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
_reactor.addEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
_reactor.addEventHandler(_socket, NObserver<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
_reactor.addEventHandler(_socket, NObserver<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
_socket.setBlocking(false);
}
~DataServiceHandler()
{
_reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
_reactor.removeEventHandler(_socket, NObserver<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, NObserver<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
}
void onReadable(ReadableNotification* pNf)
void onReadable(const AutoPtr<ReadableNotification>& pNf)
{
pNf->release();
char buffer[64];
int n = 0;
do
@@ -371,9 +361,8 @@ namespace
} while (true);
}
void onShutdown(ShutdownNotification* pNf)
void onShutdown(const AutoPtr<ShutdownNotification>& pNf)
{
pNf->release();
delete this;
}
@@ -402,20 +391,19 @@ namespace
DummyServiceHandler(StreamSocket& socket, SocketReactor& reactor) : _socket(socket),
_reactor(reactor)
{
_reactor.addEventHandler(_socket, Observer<DummyServiceHandler, ReadableNotification>(*this, &DummyServiceHandler::onReadable));
_reactor.addEventHandler(_socket, Observer<DummyServiceHandler, ShutdownNotification>(*this, &DummyServiceHandler::onShutdown));
_reactor.addEventHandler(_socket, NObserver<DummyServiceHandler, ReadableNotification>(*this, &DummyServiceHandler::onReadable));
_reactor.addEventHandler(_socket, NObserver<DummyServiceHandler, ShutdownNotification>(*this, &DummyServiceHandler::onShutdown));
_socket.setBlocking(false);
}
~DummyServiceHandler()
{
_reactor.removeEventHandler(_socket, Observer<DummyServiceHandler, ReadableNotification>(*this, &DummyServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, Observer<DummyServiceHandler, ShutdownNotification>(*this, &DummyServiceHandler::onShutdown));
_reactor.removeEventHandler(_socket, NObserver<DummyServiceHandler, ReadableNotification>(*this, &DummyServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, NObserver<DummyServiceHandler, ShutdownNotification>(*this, &DummyServiceHandler::onShutdown));
}
void onReadable(ReadableNotification* pNf)
void onReadable(const AutoPtr<ReadableNotification>& pNf)
{
pNf->release();
std::vector<char> buffer;
int n = 0;
while ((n = _socket.available()))
@@ -426,9 +414,8 @@ namespace
}
}
void onShutdown(ShutdownNotification* pNf)
void onShutdown(const AutoPtr<ShutdownNotification>& pNf)
{
pNf->release();
delete this;
}
@@ -444,9 +431,7 @@ SocketReactorTest::SocketReactorTest(const std::string& name): CppUnit::TestCase
}
SocketReactorTest::~SocketReactorTest()
{
}
SocketReactorTest::~SocketReactorTest() = default;
void SocketReactorTest::testSocketReactor()