feat(SocketReactor): Add completion handling to SocketReactor #3290 (add scheduled handlers and runOne())

This commit is contained in:
Alex Fabijanic 2021-06-01 21:31:43 +02:00
parent d412c0e62b
commit 564b4d0688
4 changed files with 191 additions and 25 deletions

View File

@ -23,12 +23,14 @@
#include "Poco/Net/PollSet.h"
#include "Poco/Runnable.h"
#include "Poco/Timespan.h"
#include "Poco/Timestamp.h"
#include "Poco/Observer.h"
#include "Poco/AutoPtr.h"
#include <map>
#include <atomic>
#include <functional>
#include <deque>
#include <utility>
namespace Poco {
@ -118,6 +120,8 @@ class Net_API SocketReactor: public Poco::Runnable
public:
typedef std::function<void()> CompletionHandler;
static const Timestamp::TimeDiff PERMANENT_COMPLETION_HANDLER;
SocketReactor();
/// Creates the SocketReactor.
@ -127,13 +131,35 @@ public:
virtual ~SocketReactor();
/// Destroys the SocketReactor.
void addCompletionHandler(const CompletionHandler& ch);
void addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER);
/// Adds a completion handler to the list of handlers
/// to be called after the next poll() completion.
/// Handler will be called until the specified expiration,
/// which defaults to immediately, ie. expiration after the
/// first invocation.
void addCompletionHandler(CompletionHandler&& ch);
void addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER, int pos = -1);
/// Adds a completion handler to the list of handlers
/// to be called after the next poll() completion.
/// Handler will be called until the specified expiration,
/// which defaults to immediately, ie. expiration after the
/// first invocation.
void removeCompletionHandlers();
/// Removes all completion handlers.
int scheduledCompletionHandlers();
/// Returns the number of scheduled completion handlers.
int removeScheduledCompletionHandlers(int count = -1);
/// Removes the count scheduled completion handlers
/// from the front of the schedule queue.
/// Default is removal of all scheduled handlers.
int removePermanentCompletionHandlers(int count = -1);
/// Removes the count permanent completion handlers
/// from the front of the schedule queue.
/// Default is removal of all scheduled handlers.
int poll();
/// Polls all registered sockets and calls their respective handlers.
@ -141,6 +167,13 @@ public:
/// If there are no readable sockets, a timeout notification is dispatched.
/// Returns the total number of read/write/error handlers called.
int runOne();
/// Runs one handler, scheduled or permanent.
/// If there are no available handlers, it blocks
/// until the first handler is encountered and executed.
/// Returns 1 on successful handler invocation, 0 on
/// exception.
void run();
/// Runs the SocketReactor. The reactor will run
/// until stop() is called (in a separate thread).
@ -217,9 +250,13 @@ protected:
/// Can be overridden by subclasses to perform additional
/// periodic tasks. The default implementation does nothing.
void onComplete();
/// Called after poll() completes processing.
/// All completion handlers are called and deleted.
int onComplete(bool handleOne = false);
/// Calls completion handler(s) (after poll() completes processing
/// or on runOne() invocation). If handleOne is true, returns
/// after first handler invocation.
/// Scheduled completion handlers are deleted after
/// invocation.
/// Returns number of handlers invoked.
void dispatch(const Socket& socket, SocketNotification* pNotification);
/// Dispatches the given notification to all observers
@ -229,16 +266,38 @@ protected:
/// Dispatches the given notification to all observers.
private:
typedef Poco::AutoPtr<SocketNotifier> NotifierPtr;
typedef Poco::AutoPtr<SocketNotification> NotificationPtr;
typedef std::map<poco_socket_t, NotifierPtr> EventHandlerMap;
typedef Poco::FastMutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
typedef std::deque<CompletionHandler> HandlerList;
typedef Poco::AutoPtr<SocketNotifier> NotifierPtr;
typedef Poco::AutoPtr<SocketNotification> NotificationPtr;
typedef std::map<poco_socket_t, NotifierPtr> EventHandlerMap;
typedef Poco::FastMutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
typedef std::pair<CompletionHandler, Poco::Timestamp> CompletionHandlerEntry;
typedef std::deque<CompletionHandlerEntry> HandlerList;
bool hasSocketHandlers();
void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification);
NotifierPtr getNotifier(const Socket& socket, bool makeNew = false);
bool isPermanentCompletionHandler(const CompletionHandlerEntry& entry) const;
template <typename F>
int removeCompletionHandlers(F isType, int count)
{
int removed = 0;
ScopedLock lock(_mutex);
int left = count > -1 ? count : _complHandlers.size();
HandlerList::iterator it = _complHandlers.begin();
for (; it != _complHandlers.end();)
{
if (isType(it->second))
{
++removed;
it = _complHandlers.erase((it));
if (--left == 0) break;
}
else ++it;
}
return removed;
}
enum
{

View File

@ -19,6 +19,7 @@
#include "Poco/Thread.h"
#include "Poco/Exception.h"
#include <memory>
#include <limits>
using Poco::Exception;
@ -29,6 +30,10 @@ namespace Poco {
namespace Net {
const Timestamp::TimeDiff SocketReactor::PERMANENT_COMPLETION_HANDLER =
std::numeric_limits<Timestamp::TimeDiff>::max();
SocketReactor::SocketReactor():
_stop(false),
_timeout(DEFAULT_TIMEOUT),
@ -102,21 +107,50 @@ int SocketReactor::poll()
}
void SocketReactor::onComplete()
int SocketReactor::onComplete(bool handleOne)
{
std::unique_ptr<CompletionHandler> pCH;
while (!_complHandlers.empty())
int handled = 0;
for (HandlerList::iterator it = _complHandlers.begin(); it != _complHandlers.end();)
{
// A completion handler may add new
// completion handler(s), so the mutex must
// be unlocked before the call execution.
// be unlocked before the invocation.
{
ScopedLock lock(_mutex);
pCH.reset(new CompletionHandler(std::move(_complHandlers.front())));
_complHandlers.pop_front();
bool alwaysRun = isPermanentCompletionHandler(*it);
bool isExpired = !alwaysRun && (Timestamp() > it->second);
if (isExpired)
{
pCH.reset(new CompletionHandler(std::move(it->first)));
it = _complHandlers.erase(it);
}
else if (alwaysRun)
{
pCH.reset(new CompletionHandler(it->first));
++it;
} else ++it;
}
if (pCH)
{
(*pCH)();
++handled;
if (handleOne) break;
}
if (pCH) (*pCH)();
}
return handled;
}
int SocketReactor::runOne()
{
try
{
while (0 == onComplete(true));
return 1;
}
catch(...) {}
return 0;
}
@ -191,16 +225,66 @@ const Poco::Timespan& SocketReactor::getTimeout() const
}
void SocketReactor::addCompletionHandler(const CompletionHandler& ch)
void SocketReactor::addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms)
{
addCompletionHandler(CompletionHandler(ch));
addCompletionHandler(CompletionHandler(ch), ms);
}
void SocketReactor::addCompletionHandler(CompletionHandler&& ch)
void SocketReactor::addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms, int pos)
{
Poco::Timestamp expires = (ms != PERMANENT_COMPLETION_HANDLER) ? Timestamp() + ms*1000 : Timestamp(0);
if (pos == -1)
{
ScopedLock lock(_mutex);
_complHandlers.push_back({std::move(ch), expires});
}
else
{
if (pos < 0) throw Poco::InvalidArgumentException("SocketReactor::addCompletionHandler()");
ScopedLock lock(_mutex);
_complHandlers.insert(_complHandlers.begin() + pos, {std::move(ch), expires});
}
}
void SocketReactor::removeCompletionHandlers()
{
ScopedLock lock(_mutex);
_complHandlers.push_back(std::move(ch));
_complHandlers.clear();
}
int SocketReactor::scheduledCompletionHandlers()
{
int cnt = 0;
ScopedLock lock(_mutex);
HandlerList::iterator it = _complHandlers.begin();
for (; it != _complHandlers.end(); ++it)
{
if (it->second != Timestamp(0)) ++cnt;
}
return cnt;
}
int SocketReactor::removeScheduledCompletionHandlers(int count)
{
auto isScheduled = [](const Timestamp& ts) { return ts != Timestamp(0); };
return removeCompletionHandlers(isScheduled, count);
}
int SocketReactor::removePermanentCompletionHandlers(int count)
{
auto isPermanent = [](const Timestamp& ts) { return ts == Timestamp(0); };
return removeCompletionHandlers(isPermanent, count);
}
bool SocketReactor::isPermanentCompletionHandler(const CompletionHandlerEntry& entry) const
{
return entry.second == Timestamp(0);
}

View File

@ -19,6 +19,7 @@
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Timestamp.h"
#include "Poco/Exception.h"
#include <sstream>
@ -38,6 +39,7 @@ using Poco::Net::ShutdownNotification;
using Poco::Observer;
using Poco::IllegalStateException;
using Poco::Thread;
using Poco::Timestamp;
namespace
@ -379,13 +381,13 @@ namespace
public:
CompletionHandlerTestObject() = delete;
CompletionHandlerTestObject(SocketReactor& reactor):
CompletionHandlerTestObject(SocketReactor& reactor, int ms = SocketReactor::PERMANENT_COMPLETION_HANDLER):
_reactor(reactor),
_counter(0)
{
auto handler = [this] () { ++_counter; };
_reactor.addCompletionHandler(handler);
_reactor.addCompletionHandler(std::move(handler));
_reactor.addCompletionHandler(handler, ms);
_reactor.addCompletionHandler(std::move(handler), ms);
}
void addRecursiveCompletionHandler(int count)
@ -620,6 +622,26 @@ void SocketReactorTest::testCompletionHandler()
ch.addRecursiveCompletionHandler(5);
reactor.poll();
assertTrue(ch.counter() == 5);
reactor.removePermanentCompletionHandlers();
reactor.poll();
assertTrue(ch.counter() == 5);
}
void SocketReactorTest::testTimedCompletionHandler()
{
SocketReactor reactor;
CompletionHandlerTestObject ch(reactor, 500);
assertTrue(ch.counter() == 0);
reactor.poll();
assertTrue(ch.counter() == 0);
reactor.poll();
Thread::sleep(500);
reactor.poll();
assertTrue(ch.counter() == 2);
reactor.poll();
assertTrue(ch.counter() == 2);
}
@ -646,6 +668,7 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);
CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection);
CppUnit_addTest(pSuite, SocketReactorTest, testCompletionHandler);
CppUnit_addTest(pSuite, SocketReactorTest, testTimedCompletionHandler);
return pSuite;
}

View File

@ -37,6 +37,7 @@ public:
void testSocketConnectorTimeout();
void testDataCollection();
void testCompletionHandler();
void testTimedCompletionHandler();
void setUp();
void tearDown();
@ -140,7 +141,6 @@ private:
ServerType echoServer;
SocketType ds;
char buffer[256];
ds.connect(SocketAddress("127.0.0.1", echoServer.port()));
SocketReactor reactor;
IOHandler<SocketType> ih(ds, reactor);