From 2d0609b10c3768f9dc65cfe66b29db71dba69190 Mon Sep 17 00:00:00 2001 From: Alex Fabijanic Date: Mon, 26 Apr 2021 18:17:23 +0200 Subject: [PATCH] feat(SocketReactor): extract and expose poll() as a member function --- Net/include/Poco/Net/SocketReactor.h | 6 ++ Net/src/SocketReactor.cpp | 67 ++++++++----- Net/testsuite/src/SocketReactorTest.cpp | 107 +++++++++++--------- Net/testsuite/src/SocketReactorTest.h | 128 ++++++++++++++++++++++++ 4 files changed, 233 insertions(+), 75 deletions(-) diff --git a/Net/include/Poco/Net/SocketReactor.h b/Net/include/Poco/Net/SocketReactor.h index 2644f946f..91c76e071 100644 --- a/Net/include/Poco/Net/SocketReactor.h +++ b/Net/include/Poco/Net/SocketReactor.h @@ -123,6 +123,12 @@ public: virtual ~SocketReactor(); /// Destroys the SocketReactor. + int poll(); + /// Polls all registered sockets and calls their respective handlers. + /// If there are no handlers, an idle notification is dispatched. + /// If there are no readable sockets, a timeout notification is dispatched. + /// Returns the total number of read/write/error handlers called. + void run(); /// Runs the SocketReactor. The reactor will run /// until stop() is called (in a separate thread). diff --git a/Net/src/SocketReactor.cpp b/Net/src/SocketReactor.cpp index 8972c5f37..c534e7246 100644 --- a/Net/src/SocketReactor.cpp +++ b/Net/src/SocketReactor.cpp @@ -61,6 +61,45 @@ SocketReactor::~SocketReactor() } +int SocketReactor::poll() +{ + int handled = 0; + if (!hasSocketHandlers()) onIdle(); + else + { + bool readable = false; + PollSet::SocketModeMap sm = _pollSet.poll(_timeout); + if (sm.size() > 0) + { + onBusy(); + PollSet::SocketModeMap::iterator it = sm.begin(); + PollSet::SocketModeMap::iterator end = sm.end(); + for (; it != end; ++it) + { + if (it->second & PollSet::POLL_READ) + { + dispatch(it->first, _pReadableNotification); + readable = true; + ++handled; + } + if (it->second & PollSet::POLL_WRITE) + { + dispatch(it->first, _pWritableNotification); + ++handled; + } + if (it->second & PollSet::POLL_ERROR) + { + dispatch(it->first, _pErrorNotification); + ++handled; + } + } + } + if (!readable) onTimeout(); + } + return handled; +} + + void SocketReactor::run() { _pThread = Thread::current(); @@ -68,32 +107,10 @@ void SocketReactor::run() { try { - if (!hasSocketHandlers()) + if (!poll()) { - onIdle(); - Thread::trySleep(static_cast(_timeout.totalMilliseconds())); - } - else - { - bool readable = false; - PollSet::SocketModeMap sm = _pollSet.poll(_timeout); - if (sm.size() > 0) - { - onBusy(); - PollSet::SocketModeMap::iterator it = sm.begin(); - PollSet::SocketModeMap::iterator end = sm.end(); - for (; it != end; ++it) - { - if (it->second & PollSet::POLL_READ) - { - dispatch(it->first, _pReadableNotification); - readable = true; - } - if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification); - if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification); - } - } - if (!readable) onTimeout(); + if (!hasSocketHandlers()) + Thread::trySleep(static_cast(_timeout.totalMilliseconds())); } } catch (Exception& exc) diff --git a/Net/testsuite/src/SocketReactorTest.cpp b/Net/testsuite/src/SocketReactorTest.cpp index 75a31d350..61ba840ed 100644 --- a/Net/testsuite/src/SocketReactorTest.cpp +++ b/Net/testsuite/src/SocketReactorTest.cpp @@ -9,19 +9,17 @@ #include "SocketReactorTest.h" +#include "EchoServer.h" +#include "UDPEchoServer.h" #include "CppUnit/TestCaller.h" #include "CppUnit/TestSuite.h" -#include "Poco/Net/SocketReactor.h" -#include "Poco/Net/SocketNotification.h" #include "Poco/Net/SocketConnector.h" #include "Poco/Net/SocketAcceptor.h" #include "Poco/Net/ParallelSocketAcceptor.h" #include "Poco/Net/StreamSocket.h" +#include "Poco/Net/DatagramSocket.h" #include "Poco/Net/ServerSocket.h" -#include "Poco/Net/SocketAddress.h" -#include "Poco/Observer.h" #include "Poco/Exception.h" -#include "Poco/Thread.h" #include @@ -30,6 +28,7 @@ using Poco::Net::SocketConnector; using Poco::Net::SocketAcceptor; using Poco::Net::ParallelSocketAcceptor; using Poco::Net::StreamSocket; +using Poco::Net::DatagramSocket; using Poco::Net::ServerSocket; using Poco::Net::SocketAddress; using Poco::Net::SocketNotification; @@ -239,20 +238,20 @@ namespace } } - StreamSocket _socket; - SocketReactor& _reactor; + StreamSocket _socket; + SocketReactor& _reactor; Observer _or; Observer _ow; - Observer _ot; + Observer _ot; Observer _os; - std::stringstream _str; - static std::string _data; - static bool _readableError; - static bool _writableError; - static bool _timeoutError; - static bool _timeout; - static bool _closeOnTimeout; - static bool _once; + std::stringstream _str; + static std::string _data; + static bool _readableError; + static bool _writableError; + static bool _timeoutError; + static bool _timeout; + static bool _closeOnTimeout; + static bool _once; }; @@ -369,9 +368,9 @@ namespace static Data _data; private: - StreamSocket _socket; + StreamSocket _socket; SocketReactor& _reactor; - int _pos; + int _pos; }; DataServiceHandler::Data DataServiceHandler::_data; @@ -407,6 +406,13 @@ void SocketReactorTest::testSocketReactor() } +void SocketReactorTest::testSocketReactorPoll() +{ + testIOHandler(); + testIOHandler(); +} + + void SocketReactorTest::testSetSocketReactor() { SocketAddress ssa; @@ -508,11 +514,11 @@ void SocketReactorTest::testDataCollection() " \"data\":" " [" " {" - " \"tag1\":" - " [" - " {\"val1\":123}," - " {\"val2\":\"abc\"}" - " ]" + " \"tag1\":" + " [" + " {\"val1\":123}," + " {\"val2\":\"abc\"}" + " ]" " }" " ]" "}\n"); @@ -524,33 +530,33 @@ void SocketReactorTest::testDataCollection() " \"ts\":\"1524864652654321\"," " \"data\":" " [" - " {" - " \"tag1\":" - " [" - " {" - " \"val1\":123," - " \"val2\":\"abc\"," - " \"val3\":42.123" - " }," - " {" - " \"val1\":987," - " \"val2\":\"xyz\"," - " \"val3\":24.321" - " }" - " ]," - " \"tag2\":" - " [" - " {" - " \"val1\":42.123," - " \"val2\":123," - " \"val3\":\"abc\"" - " }," - " {" - " \"val1\":24.321," - " \"val2\":987," - " \"val3\":\"xyz\"" - " }" - " ]" + " {" + " \"tag1\":" + " [" + " {" + " \"val1\":123," + " \"val2\":\"abc\"," + " \"val3\":42.123" + " }," + " {" + " \"val1\":987," + " \"val2\":\"xyz\"," + " \"val3\":24.321" + " }" + " ]," + " \"tag2\":" + " [" + " {" + " \"val1\":42.123," + " \"val2\":123," + " \"val3\":\"abc\"" + " }," + " {" + " \"val1\":24.321," + " \"val2\":987," + " \"val3\":\"xyz\"" + " }" + " ]" " }" " ]" "}\n"; @@ -586,6 +592,7 @@ CppUnit::Test* SocketReactorTest::suite() CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest"); CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor); + CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactorPoll); CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor); CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail); diff --git a/Net/testsuite/src/SocketReactorTest.h b/Net/testsuite/src/SocketReactorTest.h index 9fb5cf9c7..ede298f5c 100644 --- a/Net/testsuite/src/SocketReactorTest.h +++ b/Net/testsuite/src/SocketReactorTest.h @@ -15,6 +15,11 @@ #include "Poco/Net/Net.h" +#include "Poco/Thread.h" +#include "Poco/Observer.h" +#include "Poco/Net/SocketAddress.h" +#include "Poco/Net/SocketReactor.h" +#include "Poco/Net/SocketNotification.h" #include "CppUnit/TestCase.h" @@ -25,6 +30,7 @@ public: ~SocketReactorTest(); void testSocketReactor(); + void testSocketReactorPoll(); void testSetSocketReactor(); void testParallelSocketReactor(); void testSocketConnectorFail(); @@ -37,6 +43,128 @@ public: static CppUnit::Test* suite(); private: + + template + class IOHandler + { + public: + IOHandler(const SocketType& socket, Poco::Net::SocketReactor& reactor): + _socket(socket), + _reactor(reactor), + _or(*this, &IOHandler::onReadable), + _ow(*this, &IOHandler::onWritable), + _ot(*this, &IOHandler::onTimeout), + _os(*this, &IOHandler::onShutdown) + { + _reactor.addEventHandler(_socket, _or); + _reactor.addEventHandler(_socket, _ow); + _reactor.addEventHandler(_socket, _ot); + _reactor.addEventHandler(_socket, _os); + } + + ~IOHandler() + { + _reactor.removeEventHandler(_socket, _or); + _reactor.removeEventHandler(_socket, _ow); + _reactor.removeEventHandler(_socket, _ot); + _reactor.removeEventHandler(_socket, _os); + } + + void onReadable(Poco::Net::ReadableNotification* pNf) + { + pNf->release(); + char buffer[32]; + int n = _socket.receiveBytes(buffer, sizeof(buffer)); + if (n > 0) _data.append(buffer, n); + } + + void onWritable(Poco::Net::WritableNotification* pNf) + { + pNf->release(); + _writable = true; + } + + void onTimeout(Poco::Net::TimeoutNotification* pNf) + { + pNf->release(); + _timeout = true; + } + + void onShutdown(Poco::Net::ShutdownNotification* pNf) + { + pNf->release(); + _shutdown = true; + } + + const std::string& data() + { + return _data; + } + + bool writable() const + { + return _writable; + } + + bool timeout() const + { + return _timeout; + } + + bool shutdown() const + { + return _shutdown; + } + + private: + + SocketType _socket; + Poco::Net::SocketReactor& _reactor; + Poco::Observer _or; + Poco::Observer _ow; + Poco::Observer _ot; + Poco::Observer _os; + bool _writable = false; + bool _timeout = false; + bool _shutdown = false; + std::string _data; + }; + + template + void testIOHandler() + { + using Poco::Thread; + using Poco::Net::SocketReactor; + using Poco::Net::SocketAddress; + + ServerType echoServer; + SocketType ds; + char buffer[256]; + ds.connect(SocketAddress("127.0.0.1", echoServer.port())); + SocketReactor reactor; + IOHandler ih(ds, reactor); + int n = ds.sendBytes("hello", 5); + assertTrue (n == 5); + assertFalse(ih.writable()); + Thread t; t.start(reactor); + Thread::sleep(500); + assertTrue(ih.writable()); + assertFalse(ih.shutdown()); + reactor.stop(); t.join(); + assertTrue(ih.shutdown()); + assertTrue (ih.data() == "hello"); + n = ds.sendBytes("hello", 5); + assertTrue (n == 5); + while (ih.data().size() < 10) + { + reactor.poll(); + Thread::sleep(10); + } + assertTrue (ih.data() == "hellohello"); + reactor.poll(); + assertTrue (ih.timeout()); + ds.close(); + } };