mirror of
https://github.com/pocoproject/poco.git
synced 2025-10-27 11:06:50 +01:00
AsyncObserver (#4444)
* feat(AsyncObserver): Improve NotificationCenter speed and usability #4414 * fix(Notification): add missing header * feat(Any): add checkers for holding nullptr #4447 * feat(NotificationCenter): g++ build and refactoring #4414 * fix(Observer): compile errors on some compilers #4414 * fix(NotificationCenter): compile errors #4414 * chore(ParallelSocketAcceptor): remove unnecessary include and using from header * feat(AsyncNotificationCenter): add #4414 * test(AsyncNotificationCenter): add mixed observer types to the test #4414 * fix(AsyncNotificationCenter): hangs on program exit #4414 * fix(dev): friend not honored, temporarily make private members public * fix(AsyncNotificationCenter); remove default #4414
This commit is contained in:
committed by
GitHub
parent
30a0a06bac
commit
88be66972a
177
Foundation/include/Poco/AsyncObserver.h
Normal file
177
Foundation/include/Poco/AsyncObserver.h
Normal file
@@ -0,0 +1,177 @@
|
||||
//
|
||||
// AsyncObserver.h
|
||||
//
|
||||
// Library: Foundation
|
||||
// Package: Notifications
|
||||
// Module: AsyncObserver
|
||||
//
|
||||
// Definition of the AsyncObserver class template.
|
||||
//
|
||||
// Copyright (c) 2006, Applied Informatics Software Engineering GmbH.
|
||||
// Aleph ONE Software Engineering d.o.o.,
|
||||
// and Contributors.
|
||||
//
|
||||
// SPDX-License-Identifier: BSL-1.0
|
||||
//
|
||||
|
||||
|
||||
#ifndef Foundation_AsyncObserver_INCLUDED
|
||||
#define Foundation_AsyncObserver_INCLUDED
|
||||
|
||||
|
||||
#include "Poco/Foundation.h"
|
||||
#include "Poco/NObserver.h"
|
||||
#include "Poco/Thread.h"
|
||||
#include "Poco/Stopwatch.h"
|
||||
#include "Poco/Debugger.h"
|
||||
#include "Poco/ErrorHandler.h"
|
||||
#include "Poco/Format.h"
|
||||
#include "Poco/RunnableAdapter.h"
|
||||
#include "Poco/NotificationQueue.h"
|
||||
|
||||
|
||||
namespace Poco {
|
||||
|
||||
|
||||
template <class C, class N>
|
||||
class AsyncObserver: public NObserver<C, N>
|
||||
/// AsyncObserver notifies subscribers in a dedicated thread (as opposed
|
||||
/// to (N)Observer classes, which notify subscribers synchronously).
|
||||
/// In order to become active and process notifications, the start()
|
||||
/// method must be called.
|
||||
///
|
||||
/// This class is meant to be used with the NotificationCenter only.
|
||||
/// Notification processing thread can be started only once, and copying
|
||||
/// should be done before `start()` is called.
|
||||
{
|
||||
public:
|
||||
using Type = AsyncObserver<C, N>;
|
||||
using Matcher = typename NObserver<C, N>::Matcher;
|
||||
using Handler = typename NObserver<C, N>::Handler;
|
||||
using NotificationPtr = typename NObserver<C, N>::NotificationPtr;
|
||||
|
||||
AsyncObserver() = delete;
|
||||
|
||||
AsyncObserver(C& object, Handler handler, Matcher matcher = nullptr):
|
||||
NObserver<C, N>(object, handler, matcher),
|
||||
_ra(*this, &AsyncObserver::dequeue),
|
||||
_started(false),
|
||||
_done(false)
|
||||
{
|
||||
}
|
||||
|
||||
AsyncObserver(const AsyncObserver& observer):
|
||||
NObserver<C, N>(observer),
|
||||
_ra(*this, &AsyncObserver::dequeue),
|
||||
_started(false),
|
||||
_done(false)
|
||||
{
|
||||
poco_assert(observer._nq.size() == 0);
|
||||
}
|
||||
|
||||
~AsyncObserver()
|
||||
{
|
||||
disable();
|
||||
}
|
||||
|
||||
AsyncObserver& operator = (const AsyncObserver& observer)
|
||||
{
|
||||
if (&observer != this)
|
||||
{
|
||||
poco_assert(observer._nq.size() == 0);
|
||||
setObject(observer._pObject);
|
||||
setHandler(observer._handler);
|
||||
setMatcher(observer._matcher);
|
||||
_started = false;
|
||||
_done =false;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
virtual void notify(Notification* pNf) const
|
||||
{
|
||||
_nq.enqueueNotification(NotificationPtr(static_cast<N*>(pNf), true));
|
||||
}
|
||||
|
||||
virtual AbstractObserver* clone() const
|
||||
{
|
||||
return new AsyncObserver(*this);
|
||||
}
|
||||
|
||||
virtual void start()
|
||||
{
|
||||
Poco::ScopedLock l(this->mutex());
|
||||
if (_started)
|
||||
{
|
||||
throw Poco::InvalidAccessException(
|
||||
Poco::format("thread already started %s", poco_src_loc));
|
||||
}
|
||||
|
||||
_thread.start(_ra);
|
||||
Poco::Stopwatch sw;
|
||||
sw.start();
|
||||
while (!_started)
|
||||
{
|
||||
if (sw.elapsedSeconds() > 5)
|
||||
throw Poco::TimeoutException(poco_src_loc);
|
||||
Thread::sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
virtual void disable()
|
||||
{
|
||||
if (!_started.exchange(false)) return;
|
||||
_nq.wakeUpAll();
|
||||
while (!_done) Thread::sleep(100);
|
||||
_thread.join();
|
||||
NObserver<C, N>::disable();
|
||||
}
|
||||
|
||||
virtual int backlog() const
|
||||
{
|
||||
return _nq.size();
|
||||
}
|
||||
|
||||
private:
|
||||
void dequeue()
|
||||
{
|
||||
Notification::Ptr pNf;
|
||||
_started = true;
|
||||
_done = false;
|
||||
while ((pNf = _nq.waitDequeueNotification()))
|
||||
{
|
||||
try
|
||||
{
|
||||
this->handle(pNf.unsafeCast<N>());
|
||||
}
|
||||
catch (Poco::Exception& ex)
|
||||
{
|
||||
Poco::ErrorHandler::handle(ex);
|
||||
}
|
||||
catch (std::exception& ex)
|
||||
{
|
||||
Poco::ErrorHandler::handle(ex);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
Poco::ErrorHandler::handle();
|
||||
}
|
||||
}
|
||||
_done = true;
|
||||
_started = false;
|
||||
}
|
||||
|
||||
using Adapter = RunnableAdapter<AsyncObserver<C, N>>;
|
||||
|
||||
Thread _thread;
|
||||
mutable NotificationQueue _nq;
|
||||
Adapter _ra;
|
||||
std::atomic<bool> _started;
|
||||
std::atomic<bool> _done;
|
||||
};
|
||||
|
||||
|
||||
} // namespace Poco
|
||||
|
||||
|
||||
#endif // Foundation_AsyncObserver_INCLUDED
|
||||
Reference in New Issue
Block a user