diff --git a/Net/include/Poco/Net/SocketReactor.h b/Net/include/Poco/Net/SocketReactor.h index 043444b6b..ae11b13c7 100644 --- a/Net/include/Poco/Net/SocketReactor.h +++ b/Net/include/Poco/Net/SocketReactor.h @@ -23,12 +23,14 @@ #include "Poco/Net/PollSet.h" #include "Poco/Runnable.h" #include "Poco/Timespan.h" +#include "Poco/Timestamp.h" #include "Poco/Observer.h" #include "Poco/AutoPtr.h" #include #include #include #include +#include namespace Poco { @@ -118,6 +120,8 @@ class Net_API SocketReactor: public Poco::Runnable public: typedef std::function CompletionHandler; + static const Timestamp::TimeDiff PERMANENT_COMPLETION_HANDLER; + SocketReactor(); /// Creates the SocketReactor. @@ -127,13 +131,35 @@ public: virtual ~SocketReactor(); /// Destroys the SocketReactor. - void addCompletionHandler(const CompletionHandler& ch); + void addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER); /// Adds a completion handler to the list of handlers /// to be called after the next poll() completion. + /// Handler will be called until the specified expiration, + /// which defaults to immediately, ie. expiration after the + /// first invocation. - void addCompletionHandler(CompletionHandler&& ch); + void addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER, int pos = -1); /// Adds a completion handler to the list of handlers /// to be called after the next poll() completion. + /// Handler will be called until the specified expiration, + /// which defaults to immediately, ie. expiration after the + /// first invocation. + + void removeCompletionHandlers(); + /// Removes all completion handlers. + + int scheduledCompletionHandlers(); + /// Returns the number of scheduled completion handlers. + + int removeScheduledCompletionHandlers(int count = -1); + /// Removes the count scheduled completion handlers + /// from the front of the schedule queue. + /// Default is removal of all scheduled handlers. + + int removePermanentCompletionHandlers(int count = -1); + /// Removes the count permanent completion handlers + /// from the front of the schedule queue. + /// Default is removal of all scheduled handlers. int poll(); /// Polls all registered sockets and calls their respective handlers. @@ -141,6 +167,13 @@ public: /// If there are no readable sockets, a timeout notification is dispatched. /// Returns the total number of read/write/error handlers called. + int runOne(); + /// Runs one handler, scheduled or permanent. + /// If there are no available handlers, it blocks + /// until the first handler is encountered and executed. + /// Returns 1 on successful handler invocation, 0 on + /// exception. + void run(); /// Runs the SocketReactor. The reactor will run /// until stop() is called (in a separate thread). @@ -217,9 +250,13 @@ protected: /// Can be overridden by subclasses to perform additional /// periodic tasks. The default implementation does nothing. - void onComplete(); - /// Called after poll() completes processing. - /// All completion handlers are called and deleted. + int onComplete(bool handleOne = false); + /// Calls completion handler(s) (after poll() completes processing + /// or on runOne() invocation). If handleOne is true, returns + /// after first handler invocation. + /// Scheduled completion handlers are deleted after + /// invocation. + /// Returns number of handlers invoked. void dispatch(const Socket& socket, SocketNotification* pNotification); /// Dispatches the given notification to all observers @@ -229,16 +266,38 @@ protected: /// Dispatches the given notification to all observers. private: - typedef Poco::AutoPtr NotifierPtr; - typedef Poco::AutoPtr NotificationPtr; - typedef std::map EventHandlerMap; - typedef Poco::FastMutex MutexType; - typedef MutexType::ScopedLock ScopedLock; - typedef std::deque HandlerList; + typedef Poco::AutoPtr NotifierPtr; + typedef Poco::AutoPtr NotificationPtr; + typedef std::map EventHandlerMap; + typedef Poco::FastMutex MutexType; + typedef MutexType::ScopedLock ScopedLock; + typedef std::pair CompletionHandlerEntry; + typedef std::deque HandlerList; bool hasSocketHandlers(); void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification); NotifierPtr getNotifier(const Socket& socket, bool makeNew = false); + bool isPermanentCompletionHandler(const CompletionHandlerEntry& entry) const; + + template + int removeCompletionHandlers(F isType, int count) + { + int removed = 0; + ScopedLock lock(_mutex); + int left = count > -1 ? count : _complHandlers.size(); + HandlerList::iterator it = _complHandlers.begin(); + for (; it != _complHandlers.end();) + { + if (isType(it->second)) + { + ++removed; + it = _complHandlers.erase((it)); + if (--left == 0) break; + } + else ++it; + } + return removed; + } enum { diff --git a/Net/src/SocketReactor.cpp b/Net/src/SocketReactor.cpp index 2c5c04c88..eea3805d1 100644 --- a/Net/src/SocketReactor.cpp +++ b/Net/src/SocketReactor.cpp @@ -19,6 +19,7 @@ #include "Poco/Thread.h" #include "Poco/Exception.h" #include +#include using Poco::Exception; @@ -29,6 +30,10 @@ namespace Poco { namespace Net { +const Timestamp::TimeDiff SocketReactor::PERMANENT_COMPLETION_HANDLER = + std::numeric_limits::max(); + + SocketReactor::SocketReactor(): _stop(false), _timeout(DEFAULT_TIMEOUT), @@ -102,21 +107,50 @@ int SocketReactor::poll() } -void SocketReactor::onComplete() +int SocketReactor::onComplete(bool handleOne) { std::unique_ptr pCH; - while (!_complHandlers.empty()) + int handled = 0; + for (HandlerList::iterator it = _complHandlers.begin(); it != _complHandlers.end();) { // A completion handler may add new // completion handler(s), so the mutex must - // be unlocked before the call execution. + // be unlocked before the invocation. { ScopedLock lock(_mutex); - pCH.reset(new CompletionHandler(std::move(_complHandlers.front()))); - _complHandlers.pop_front(); + bool alwaysRun = isPermanentCompletionHandler(*it); + bool isExpired = !alwaysRun && (Timestamp() > it->second); + if (isExpired) + { + pCH.reset(new CompletionHandler(std::move(it->first))); + it = _complHandlers.erase(it); + } + else if (alwaysRun) + { + pCH.reset(new CompletionHandler(it->first)); + ++it; + } else ++it; + } + if (pCH) + { + (*pCH)(); + ++handled; + if (handleOne) break; } - if (pCH) (*pCH)(); } + return handled; +} + + +int SocketReactor::runOne() +{ + try + { + while (0 == onComplete(true)); + return 1; + } + catch(...) {} + return 0; } @@ -191,16 +225,66 @@ const Poco::Timespan& SocketReactor::getTimeout() const } -void SocketReactor::addCompletionHandler(const CompletionHandler& ch) +void SocketReactor::addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms) { - addCompletionHandler(CompletionHandler(ch)); + addCompletionHandler(CompletionHandler(ch), ms); } -void SocketReactor::addCompletionHandler(CompletionHandler&& ch) +void SocketReactor::addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms, int pos) +{ + Poco::Timestamp expires = (ms != PERMANENT_COMPLETION_HANDLER) ? Timestamp() + ms*1000 : Timestamp(0); + if (pos == -1) + { + ScopedLock lock(_mutex); + _complHandlers.push_back({std::move(ch), expires}); + } + else + { + if (pos < 0) throw Poco::InvalidArgumentException("SocketReactor::addCompletionHandler()"); + ScopedLock lock(_mutex); + _complHandlers.insert(_complHandlers.begin() + pos, {std::move(ch), expires}); + } +} + + +void SocketReactor::removeCompletionHandlers() { ScopedLock lock(_mutex); - _complHandlers.push_back(std::move(ch)); + _complHandlers.clear(); +} + + +int SocketReactor::scheduledCompletionHandlers() +{ + int cnt = 0; + ScopedLock lock(_mutex); + HandlerList::iterator it = _complHandlers.begin(); + for (; it != _complHandlers.end(); ++it) + { + if (it->second != Timestamp(0)) ++cnt; + } + return cnt; +} + + +int SocketReactor::removeScheduledCompletionHandlers(int count) +{ + auto isScheduled = [](const Timestamp& ts) { return ts != Timestamp(0); }; + return removeCompletionHandlers(isScheduled, count); +} + + +int SocketReactor::removePermanentCompletionHandlers(int count) +{ + auto isPermanent = [](const Timestamp& ts) { return ts == Timestamp(0); }; + return removeCompletionHandlers(isPermanent, count); +} + + +bool SocketReactor::isPermanentCompletionHandler(const CompletionHandlerEntry& entry) const +{ + return entry.second == Timestamp(0); } diff --git a/Net/testsuite/src/SocketReactorTest.cpp b/Net/testsuite/src/SocketReactorTest.cpp index c4276e04a..6719cf1db 100644 --- a/Net/testsuite/src/SocketReactorTest.cpp +++ b/Net/testsuite/src/SocketReactorTest.cpp @@ -19,6 +19,7 @@ #include "Poco/Net/StreamSocket.h" #include "Poco/Net/DatagramSocket.h" #include "Poco/Net/ServerSocket.h" +#include "Poco/Timestamp.h" #include "Poco/Exception.h" #include @@ -38,6 +39,7 @@ using Poco::Net::ShutdownNotification; using Poco::Observer; using Poco::IllegalStateException; using Poco::Thread; +using Poco::Timestamp; namespace @@ -379,13 +381,13 @@ namespace public: CompletionHandlerTestObject() = delete; - CompletionHandlerTestObject(SocketReactor& reactor): + CompletionHandlerTestObject(SocketReactor& reactor, int ms = SocketReactor::PERMANENT_COMPLETION_HANDLER): _reactor(reactor), _counter(0) { auto handler = [this] () { ++_counter; }; - _reactor.addCompletionHandler(handler); - _reactor.addCompletionHandler(std::move(handler)); + _reactor.addCompletionHandler(handler, ms); + _reactor.addCompletionHandler(std::move(handler), ms); } void addRecursiveCompletionHandler(int count) @@ -620,6 +622,26 @@ void SocketReactorTest::testCompletionHandler() ch.addRecursiveCompletionHandler(5); reactor.poll(); assertTrue(ch.counter() == 5); + reactor.removePermanentCompletionHandlers(); + reactor.poll(); + assertTrue(ch.counter() == 5); +} + + +void SocketReactorTest::testTimedCompletionHandler() +{ + SocketReactor reactor; + CompletionHandlerTestObject ch(reactor, 500); + assertTrue(ch.counter() == 0); + reactor.poll(); + assertTrue(ch.counter() == 0); + reactor.poll(); + + Thread::sleep(500); + reactor.poll(); + assertTrue(ch.counter() == 2); + reactor.poll(); + assertTrue(ch.counter() == 2); } @@ -646,6 +668,7 @@ CppUnit::Test* SocketReactorTest::suite() CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection); CppUnit_addTest(pSuite, SocketReactorTest, testCompletionHandler); + CppUnit_addTest(pSuite, SocketReactorTest, testTimedCompletionHandler); return pSuite; } diff --git a/Net/testsuite/src/SocketReactorTest.h b/Net/testsuite/src/SocketReactorTest.h index 8f989e560..1300bbc2d 100644 --- a/Net/testsuite/src/SocketReactorTest.h +++ b/Net/testsuite/src/SocketReactorTest.h @@ -37,6 +37,7 @@ public: void testSocketConnectorTimeout(); void testDataCollection(); void testCompletionHandler(); + void testTimedCompletionHandler(); void setUp(); void tearDown(); @@ -140,7 +141,6 @@ private: ServerType echoServer; SocketType ds; - char buffer[256]; ds.connect(SocketAddress("127.0.0.1", echoServer.port())); SocketReactor reactor; IOHandler ih(ds, reactor);