diff --git a/Net/include/Poco/Net/SocketReactor.h b/Net/include/Poco/Net/SocketReactor.h index 84bf4ce30..2644f946f 100644 --- a/Net/include/Poco/Net/SocketReactor.h +++ b/Net/include/Poco/Net/SocketReactor.h @@ -23,15 +23,10 @@ #include "Poco/Net/PollSet.h" #include "Poco/Runnable.h" #include "Poco/Timespan.h" -#include "Poco/Timestamp.h" #include "Poco/Observer.h" #include "Poco/AutoPtr.h" -#include "Poco/Mutex.h" #include #include -#include -#include -#include namespace Poco { @@ -119,10 +114,6 @@ class Net_API SocketReactor: public Poco::Runnable /// from event handlers. { public: - typedef std::function CompletionHandler; - - static const Timestamp::TimeDiff PERMANENT_COMPLETION_HANDLER; - SocketReactor(); /// Creates the SocketReactor. @@ -132,54 +123,6 @@ public: virtual ~SocketReactor(); /// Destroys the SocketReactor. - void addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER); - /// Adds a completion handler to the list of handlers - /// to be called after the next poll() completion. - /// Handler will be called until the specified expiration, - /// which defaults to immediately, ie. expiration after the - /// first invocation. - - void addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER, int pos = -1); - /// Adds a completion handler to the list of handlers - /// to be called after the next poll() completion. - /// Handler will be called until the specified expiration, - /// which defaults to immediately, ie. expiration after the - /// first invocation. - - void removeCompletionHandlers(); - /// Removes all completion handlers. - - int scheduledCompletionHandlers(); - /// Returns the number of scheduled completion handlers. - - int removeScheduledCompletionHandlers(int count = -1); - /// Removes the count scheduled completion handlers - /// from the front of the schedule queue. - /// Default is removal of all scheduled handlers. - - int permanentCompletionHandlers(); - /// Returns the number of scheduled completion handlers. - - int removePermanentCompletionHandlers(int count = -1); - /// Removes the count permanent completion handlers - /// from the front of the schedule queue. - /// Default is removal of all scheduled handlers. - - int poll(int* pHandled = 0); - /// Polls all registered sockets and calls their respective handlers. - /// If there are no handlers, an idle notification is dispatched. - /// If there are no readable sockets, a timeout notification is dispatched. - /// If pHandled is not null, after the call it contains the total number - /// of read/write/error socket handlers called. - /// Returns the number of completion handlers invoked. - - int runOne(); - /// Runs one handler, scheduled or permanent. - /// If there are no available handlers, it blocks - /// until the first handler is encountered and executed. - /// Returns 1 on successful handler invocation, 0 on - /// exception. - void run(); /// Runs the SocketReactor. The reactor will run /// until stop() is called (in a separate thread). @@ -256,14 +199,6 @@ protected: /// Can be overridden by subclasses to perform additional /// periodic tasks. The default implementation does nothing. - int onComplete(bool handleOne = false, bool expiredOnly = false); - /// Calls completion handler(s) (after poll() completes processing - /// or on runOne() invocation). If handleOne is true, returns - /// after first handler invocation. - /// Scheduled completion handlers are deleted after - /// invocation. - /// Returns number of handlers invoked. - void dispatch(const Socket& socket, SocketNotification* pNotification); /// Dispatches the given notification to all observers /// registered for the given socket. @@ -272,48 +207,15 @@ protected: /// Dispatches the given notification to all observers. private: - typedef Poco::AutoPtr NotifierPtr; - typedef Poco::AutoPtr NotificationPtr; - typedef std::map EventHandlerMap; - typedef Poco::FastMutex FastMutexType; - typedef FastMutexType::ScopedLock FastScopedLock; -#ifdef POCO_HAVE_STD_ATOMICS - typedef Poco::SpinlockMutex SpinMutexType; - typedef SpinMutexType::ScopedLock SpinScopedLock; -#else - typedef Poco::FastMutex SpinMutexType; - typedef SpinMutexType::ScopedLock SpinScopedLock; -#endif // POCO_HAVE_STD_ATOMICS - typedef std::pair CompletionHandlerEntry; - typedef std::deque HandlerList; + typedef Poco::AutoPtr NotifierPtr; + typedef Poco::AutoPtr NotificationPtr; + typedef std::map EventHandlerMap; + typedef Poco::FastMutex MutexType; + typedef MutexType::ScopedLock ScopedLock; bool hasSocketHandlers(); void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification); NotifierPtr getNotifier(const Socket& socket, bool makeNew = false); - bool isPermanent(const Poco::Timestamp& entry) const; - - template - int removeCompletionHandlers(F isType, int count) - /// Removes `count` completion handlers of the specified - /// type; if count is -1, removes all completion handlers - /// of the specified type. - { - int removed = 0; - SpinScopedLock lock(_completionMutex); - int left = count > -1 ? count : static_cast(_complHandlers.size()); - HandlerList::iterator it = _complHandlers.begin(); - while (left && it != _complHandlers.end()) - { - if (isType(it->second)) - { - ++removed; - it = _complHandlers.erase((it)); - --left; - } - else ++it; - } - return removed; - } enum { @@ -330,9 +232,7 @@ private: NotificationPtr _pTimeoutNotification; NotificationPtr _pIdleNotification; NotificationPtr _pShutdownNotification; - HandlerList _complHandlers; - FastMutexType _ioMutex; - SpinMutexType _completionMutex; + MutexType _mutex; Poco::Thread* _pThread; friend class SocketNotifier; diff --git a/Net/src/SocketReactor.cpp b/Net/src/SocketReactor.cpp index 39b6c75f7..8972c5f37 100644 --- a/Net/src/SocketReactor.cpp +++ b/Net/src/SocketReactor.cpp @@ -18,11 +18,6 @@ #include "Poco/ErrorHandler.h" #include "Poco/Thread.h" #include "Poco/Exception.h" -#include -#ifdef POCO_OS_FAMILY_WINDOWS -#undef max -#endif -#include using Poco::Exception; @@ -33,10 +28,6 @@ namespace Poco { namespace Net { -const Timestamp::TimeDiff SocketReactor::PERMANENT_COMPLETION_HANDLER = - std::numeric_limits::max(); - - SocketReactor::SocketReactor(): _stop(false), _timeout(DEFAULT_TIMEOUT), @@ -70,114 +61,6 @@ SocketReactor::~SocketReactor() } -int SocketReactor::poll(int* pHandled) -{ - int handled = 0; - int completed = 0; - if (!hasSocketHandlers()) onIdle(); - else - { - bool readable = false; - PollSet::SocketModeMap sm = _pollSet.poll(_timeout); - if (sm.size() > 0) - { - onBusy(); - PollSet::SocketModeMap::iterator it = sm.begin(); - PollSet::SocketModeMap::iterator end = sm.end(); - for (; it != end; ++it) - { - if (it->second & PollSet::POLL_READ) - { - dispatch(it->first, _pReadableNotification); - readable = true; - ++handled; - } - if (it->second & PollSet::POLL_WRITE) - { - dispatch(it->first, _pWritableNotification); - ++handled; - } - if (it->second & PollSet::POLL_ERROR) - { - dispatch(it->first, _pErrorNotification); - ++handled; - } - } - } - if (!readable) onTimeout(); - } - if (pHandled) *pHandled = handled; - if (hasSocketHandlers()) - { - if (handled) completed = onComplete(); - } - else completed = onComplete(false, true); - return completed; -} - - -int SocketReactor::onComplete(bool handleOne, bool expiredOnly) -{ - std::unique_ptr pCH; - int handled = 0; - { - HandlerList::iterator it = _complHandlers.begin(); - while (it != _complHandlers.end()) - { - std::size_t prevSize = 0; - // A completion handler may add new - // completion handler(s), so the mutex must - // be unlocked before the invocation. - { - SpinScopedLock lock(_completionMutex); - bool alwaysRun = isPermanent(it->second) && !expiredOnly; - bool isExpired = !alwaysRun && (Timestamp() > it->second); - if (isExpired) - { - pCH.reset(new CompletionHandler(std::move(it->first))); - it = _complHandlers.erase(it); - } - else if (alwaysRun) - { - pCH.reset(new CompletionHandler(it->first)); - ++it; - } - else ++it; - prevSize = _complHandlers.size(); - } - - if (pCH) - { - (*pCH)(); - pCH.reset(); - ++handled; - if (handleOne) break; - } - // handler call may add or remove handlers; - // if so, we must start from the beginning - { - SpinScopedLock lock(_completionMutex); - if (prevSize != _complHandlers.size()) - it = _complHandlers.begin(); - } - } - } - return handled; -} - - -int SocketReactor::runOne() -{ - try - { - while (0 == onComplete(true)); - return 1; - } - catch(...) {} - return 0; -} - - void SocketReactor::run() { _pThread = Thread::current(); @@ -185,10 +68,32 @@ void SocketReactor::run() { try { - if (!poll()) + if (!hasSocketHandlers()) { - if (!hasSocketHandlers()) - Thread::trySleep(static_cast(_timeout.totalMilliseconds())); + onIdle(); + Thread::trySleep(static_cast(_timeout.totalMilliseconds())); + } + else + { + bool readable = false; + PollSet::SocketModeMap sm = _pollSet.poll(_timeout); + if (sm.size() > 0) + { + onBusy(); + PollSet::SocketModeMap::iterator it = sm.begin(); + PollSet::SocketModeMap::iterator end = sm.end(); + for (; it != end; ++it) + { + if (it->second & PollSet::POLL_READ) + { + dispatch(it->first, _pReadableNotification); + readable = true; + } + if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification); + if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification); + } + } + if (!readable) onTimeout(); } } catch (Exception& exc) @@ -212,7 +117,7 @@ bool SocketReactor::hasSocketHandlers() { if (!_pollSet.empty()) { - FastScopedLock lock(_ioMutex); + ScopedLock lock(_mutex); for (auto& p: _handlers) { if (p.second->accepts(_pReadableNotification) || @@ -249,92 +154,11 @@ const Poco::Timespan& SocketReactor::getTimeout() const } -void SocketReactor::addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms) -{ - addCompletionHandler(CompletionHandler(ch), ms); -} - - -void SocketReactor::addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms, int pos) -{ - Poco::Timestamp expires = (ms != PERMANENT_COMPLETION_HANDLER) ? Timestamp() + ms*1000 : Timestamp(PERMANENT_COMPLETION_HANDLER); - - if (pos == -1) - { - SpinScopedLock lock(_completionMutex); - _complHandlers.push_back({std::move(ch), expires}); - } - else - { - if (pos < 0) throw Poco::InvalidArgumentException("SocketReactor::addCompletionHandler()"); - SpinScopedLock lock(_completionMutex); - _complHandlers.insert(_complHandlers.begin() + pos, {std::move(ch), expires}); - } -} - - -void SocketReactor::removeCompletionHandlers() -{ - SpinScopedLock lock(_completionMutex); - _complHandlers.clear(); -} - - -int SocketReactor::scheduledCompletionHandlers() -{ - int cnt = 0; - SpinScopedLock lock(_completionMutex); - HandlerList::iterator it = _complHandlers.begin(); - for (; it != _complHandlers.end(); ++it) - { - if (!isPermanent(it->second)/*it->second != Timestamp(0)*/) ++cnt; - } - return cnt; -} - - -int SocketReactor::removeScheduledCompletionHandlers(int count) -{ - auto isScheduled = [this](const Timestamp& ts) { return !isPermanent(ts); }; - return removeCompletionHandlers(isScheduled, count); -} - - -int SocketReactor::permanentCompletionHandlers() -{ - int cnt = 0; - SpinScopedLock lock(_completionMutex); - HandlerList::iterator it = _complHandlers.begin(); - for (; it != _complHandlers.end(); ++it) - { - if (isPermanent(it->second)) - ++cnt; - } - return cnt; -} - - -int SocketReactor::removePermanentCompletionHandlers(int count) -{ - auto perm = [this](const Timestamp& ts) { return isPermanent(ts); }; - return removeCompletionHandlers(perm, count); -} - - -bool SocketReactor::isPermanent(const Timestamp& entry) const -{ - return entry == Timestamp(PERMANENT_COMPLETION_HANDLER); -} - - void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) { NotifierPtr pNotifier = getNotifier(socket, true); - if (!pNotifier->hasObserver(observer)) - { - pNotifier->addObserver(this, observer); - } + if (!pNotifier->hasObserver(observer)) pNotifier->addObserver(this, observer); int mode = 0; if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ; @@ -358,7 +182,7 @@ SocketReactor::NotifierPtr SocketReactor::getNotifier(const Socket& socket, bool const SocketImpl* pImpl = socket.impl(); if (pImpl == nullptr) return 0; poco_socket_t sockfd = pImpl->sockfd(); - FastScopedLock lock(_ioMutex); + ScopedLock lock(_mutex); EventHandlerMap::iterator it = _handlers.find(sockfd); if (it != _handlers.end()) return it->second; @@ -371,14 +195,14 @@ SocketReactor::NotifierPtr SocketReactor::getNotifier(const Socket& socket, bool void SocketReactor::removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer) { const SocketImpl* pImpl = socket.impl(); - if (pImpl == nullptr) { return; } + if (pImpl == nullptr) return; NotifierPtr pNotifier = getNotifier(socket); if (pNotifier && pNotifier->hasObserver(observer)) { if(pNotifier->countObservers() == 1) { { - FastScopedLock lock(_ioMutex); + ScopedLock lock(_mutex); _handlers.erase(pImpl->sockfd()); } _pollSet.remove(socket); @@ -429,7 +253,7 @@ void SocketReactor::dispatch(SocketNotification* pNotification) { std::vector delegates; { - FastScopedLock lock(_ioMutex); + ScopedLock lock(_mutex); delegates.reserve(_handlers.size()); for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) delegates.push_back(it->second); @@ -445,7 +269,6 @@ void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotifi { try { - Socket s = pNotification->socket(); pNotifier->dispatch(pNotification); } catch (Exception& exc) diff --git a/Net/testsuite/src/SocketReactorTest.cpp b/Net/testsuite/src/SocketReactorTest.cpp index 3575f3e66..75a31d350 100644 --- a/Net/testsuite/src/SocketReactorTest.cpp +++ b/Net/testsuite/src/SocketReactorTest.cpp @@ -9,27 +9,27 @@ #include "SocketReactorTest.h" -#include "EchoServer.h" -#include "UDPEchoServer.h" #include "CppUnit/TestCaller.h" #include "CppUnit/TestSuite.h" +#include "Poco/Net/SocketReactor.h" +#include "Poco/Net/SocketNotification.h" #include "Poco/Net/SocketConnector.h" #include "Poco/Net/SocketAcceptor.h" #include "Poco/Net/ParallelSocketAcceptor.h" #include "Poco/Net/StreamSocket.h" -#include "Poco/Net/DatagramSocket.h" #include "Poco/Net/ServerSocket.h" -#include "Poco/Timestamp.h" +#include "Poco/Net/SocketAddress.h" +#include "Poco/Observer.h" #include "Poco/Exception.h" +#include "Poco/Thread.h" #include -#include + using Poco::Net::SocketReactor; using Poco::Net::SocketConnector; using Poco::Net::SocketAcceptor; using Poco::Net::ParallelSocketAcceptor; using Poco::Net::StreamSocket; -using Poco::Net::DatagramSocket; using Poco::Net::ServerSocket; using Poco::Net::SocketAddress; using Poco::Net::SocketNotification; @@ -40,7 +40,6 @@ using Poco::Net::ShutdownNotification; using Poco::Observer; using Poco::IllegalStateException; using Poco::Thread; -using Poco::Timestamp; namespace @@ -240,20 +239,20 @@ namespace } } - StreamSocket _socket; - SocketReactor& _reactor; + StreamSocket _socket; + SocketReactor& _reactor; Observer _or; Observer _ow; - Observer _ot; + Observer _ot; Observer _os; - std::stringstream _str; - static std::string _data; - static bool _readableError; - static bool _writableError; - static bool _timeoutError; - static bool _timeout; - static bool _closeOnTimeout; - static bool _once; + std::stringstream _str; + static std::string _data; + static bool _readableError; + static bool _writableError; + static bool _timeoutError; + static bool _timeout; + static bool _closeOnTimeout; + static bool _once; }; @@ -370,55 +369,12 @@ namespace static Data _data; private: - StreamSocket _socket; + StreamSocket _socket; SocketReactor& _reactor; - int _pos; + int _pos; }; DataServiceHandler::Data DataServiceHandler::_data; - - class CompletionHandlerTestObject - { - public: - CompletionHandlerTestObject() = delete; - - CompletionHandlerTestObject(SocketReactor& reactor, Timestamp::TimeDiff ms = SocketReactor::PERMANENT_COMPLETION_HANDLER): - _reactor(reactor), - _counter(0) - { - auto handler = [this] () - { - ++_counter; - }; - _reactor.addCompletionHandler(handler, ms); - _reactor.addCompletionHandler(std::move(handler), ms); - } - - void addRecursiveCompletionHandler(int count) - { - _count = count; - if (!_handler) - { - _handler = [&] () - { - if (_counter++ < _count) - _reactor.addCompletionHandler(_handler); - }; - } - _reactor.addCompletionHandler(_handler); - } - - int counter() - { - return _counter; - } - - private: - SocketReactor& _reactor; - int _counter; - int _count; - std::function _handler = nullptr; - }; } @@ -451,13 +407,6 @@ void SocketReactorTest::testSocketReactor() } -void SocketReactorTest::testSocketReactorPoll() -{ - testIOHandler(); - testIOHandler(); -} - - void SocketReactorTest::testSetSocketReactor() { SocketAddress ssa; @@ -559,11 +508,11 @@ void SocketReactorTest::testDataCollection() " \"data\":" " [" " {" - " \"tag1\":" - " [" - " {\"val1\":123}," - " {\"val2\":\"abc\"}" - " ]" + " \"tag1\":" + " [" + " {\"val1\":123}," + " {\"val2\":\"abc\"}" + " ]" " }" " ]" "}\n"); @@ -575,33 +524,33 @@ void SocketReactorTest::testDataCollection() " \"ts\":\"1524864652654321\"," " \"data\":" " [" - " {" - " \"tag1\":" - " [" - " {" - " \"val1\":123," - " \"val2\":\"abc\"," - " \"val3\":42.123" - " }," - " {" - " \"val1\":987," - " \"val2\":\"xyz\"," - " \"val3\":24.321" - " }" - " ]," - " \"tag2\":" - " [" - " {" - " \"val1\":42.123," - " \"val2\":123," - " \"val3\":\"abc\"" - " }," - " {" - " \"val1\":24.321," - " \"val2\":987," - " \"val3\":\"xyz\"" - " }" - " ]" + " {" + " \"tag1\":" + " [" + " {" + " \"val1\":123," + " \"val2\":\"abc\"," + " \"val3\":42.123" + " }," + " {" + " \"val1\":987," + " \"val2\":\"xyz\"," + " \"val3\":24.321" + " }" + " ]," + " \"tag2\":" + " [" + " {" + " \"val1\":42.123," + " \"val2\":123," + " \"val3\":\"abc\"" + " }," + " {" + " \"val1\":24.321," + " \"val2\":987," + " \"val3\":\"xyz\"" + " }" + " ]" " }" " ]" "}\n"; @@ -621,50 +570,6 @@ void SocketReactorTest::testDataCollection() } -void SocketReactorTest::testCompletionHandler() -{/* - SocketReactor reactor; - CompletionHandlerTestObject ch(reactor); - assert (reactor.permanentCompletionHandlers() == 2); - assert (reactor.scheduledCompletionHandlers() == 0); - assertTrue(ch.counter() == 0); - assertTrue(reactor.poll() == 2); - assertTrue(ch.counter() == 2); - ch.addRecursiveCompletionHandler(5); - assertTrue (reactor.permanentCompletionHandlers() == 3); - assertTrue (reactor.scheduledCompletionHandlers() == 0); - assertTrue(reactor.poll() == 7); - assertTrue(ch.counter() == 9); - assertTrue (reactor.permanentCompletionHandlers() == 4); - assertTrue (reactor.scheduledCompletionHandlers() == 0); - - reactor.removePermanentCompletionHandlers(); - assertTrue (reactor.poll() == 0); - assertTrue(ch.counter() == 9); -*/} - - -void SocketReactorTest::testTimedCompletionHandler() -{ - SocketReactor reactor; - CompletionHandlerTestObject ch(reactor, 500); - assert (reactor.permanentCompletionHandlers() == 0); - assert (reactor.scheduledCompletionHandlers() == 2); - assertTrue(ch.counter() == 0); - reactor.poll(); - assertTrue(ch.counter() == 0); - reactor.poll(); - - Thread::sleep(500); - reactor.poll(); - assertTrue(ch.counter() == 2); - assert (reactor.permanentCompletionHandlers() == 0); - assert (reactor.scheduledCompletionHandlers() == 0); - reactor.poll(); - assertTrue(ch.counter() == 2); -} - - void SocketReactorTest::setUp() { ClientServiceHandler::setCloseOnTimeout(false); @@ -681,14 +586,11 @@ CppUnit::Test* SocketReactorTest::suite() CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest"); CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor); - CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactorPoll); CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor); CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection); - CppUnit_addTest(pSuite, SocketReactorTest, testCompletionHandler); - CppUnit_addTest(pSuite, SocketReactorTest, testTimedCompletionHandler); return pSuite; } diff --git a/Net/testsuite/src/SocketReactorTest.h b/Net/testsuite/src/SocketReactorTest.h index 1300bbc2d..9fb5cf9c7 100644 --- a/Net/testsuite/src/SocketReactorTest.h +++ b/Net/testsuite/src/SocketReactorTest.h @@ -15,11 +15,6 @@ #include "Poco/Net/Net.h" -#include "Poco/Thread.h" -#include "Poco/Observer.h" -#include "Poco/Net/SocketAddress.h" -#include "Poco/Net/SocketReactor.h" -#include "Poco/Net/SocketNotification.h" #include "CppUnit/TestCase.h" @@ -30,14 +25,11 @@ public: ~SocketReactorTest(); void testSocketReactor(); - void testSocketReactorPoll(); void testSetSocketReactor(); void testParallelSocketReactor(); void testSocketConnectorFail(); void testSocketConnectorTimeout(); void testDataCollection(); - void testCompletionHandler(); - void testTimedCompletionHandler(); void setUp(); void tearDown(); @@ -45,127 +37,6 @@ public: static CppUnit::Test* suite(); private: - - template - class IOHandler - { - public: - IOHandler(const SocketType& socket, Poco::Net::SocketReactor& reactor): - _socket(socket), - _reactor(reactor), - _or(*this, &IOHandler::onReadable), - _ow(*this, &IOHandler::onWritable), - _ot(*this, &IOHandler::onTimeout), - _os(*this, &IOHandler::onShutdown) - { - _reactor.addEventHandler(_socket, _or); - _reactor.addEventHandler(_socket, _ow); - _reactor.addEventHandler(_socket, _ot); - _reactor.addEventHandler(_socket, _os); - } - - ~IOHandler() - { - _reactor.removeEventHandler(_socket, _or); - _reactor.removeEventHandler(_socket, _ow); - _reactor.removeEventHandler(_socket, _ot); - _reactor.removeEventHandler(_socket, _os); - } - - void onReadable(Poco::Net::ReadableNotification* pNf) - { - pNf->release(); - char buffer[32]; - int n = _socket.receiveBytes(buffer, sizeof(buffer)); - if (n > 0) _data.append(buffer, n); - } - - void onWritable(Poco::Net::WritableNotification* pNf) - { - pNf->release(); - _writable = true; - } - - void onTimeout(Poco::Net::TimeoutNotification* pNf) - { - pNf->release(); - _timeout = true; - } - - void onShutdown(Poco::Net::ShutdownNotification* pNf) - { - pNf->release(); - _shutdown = true; - } - - const std::string& data() - { - return _data; - } - - bool writable() const - { - return _writable; - } - - bool timeout() const - { - return _timeout; - } - - bool shutdown() const - { - return _shutdown; - } - - private: - - SocketType _socket; - Poco::Net::SocketReactor& _reactor; - Poco::Observer _or; - Poco::Observer _ow; - Poco::Observer _ot; - Poco::Observer _os; - bool _writable = false; - bool _timeout = false; - bool _shutdown = false; - std::string _data; - }; - - template - void testIOHandler() - { - using Poco::Thread; - using Poco::Net::SocketReactor; - using Poco::Net::SocketAddress; - - ServerType echoServer; - SocketType ds; - ds.connect(SocketAddress("127.0.0.1", echoServer.port())); - SocketReactor reactor; - IOHandler ih(ds, reactor); - int n = ds.sendBytes("hello", 5); - assertTrue (n == 5); - assertFalse(ih.writable()); - Thread t; t.start(reactor); - Thread::sleep(500); - assertTrue(ih.writable()); - assertFalse(ih.shutdown()); - reactor.stop(); t.join(); - assertTrue(ih.shutdown()); - assertTrue (ih.data() == "hello"); - n = ds.sendBytes("hello", 5); - assertTrue (n == 5); - while (ih.data().size() < 10) - { - reactor.poll(); - Thread::sleep(10); - } - assertTrue (ih.data() == "hellohello"); - reactor.poll(); - assertTrue (ih.timeout()); - ds.close(); - } };