feat(SocketReactor): extract and expose poll() as a member function

This commit is contained in:
Alex Fabijanic
2021-04-26 18:17:23 +02:00
parent 54667890eb
commit 2d0609b10c
4 changed files with 233 additions and 75 deletions

View File

@@ -123,6 +123,12 @@ public:
virtual ~SocketReactor();
/// Destroys the SocketReactor.
int poll();
/// Polls all registered sockets and calls their respective handlers.
/// If there are no handlers, an idle notification is dispatched.
/// If there are no readable sockets, a timeout notification is dispatched.
/// Returns the total number of read/write/error handlers called.
void run();
/// Runs the SocketReactor. The reactor will run
/// until stop() is called (in a separate thread).

View File

@@ -61,6 +61,45 @@ SocketReactor::~SocketReactor()
}
int SocketReactor::poll()
{
int handled = 0;
if (!hasSocketHandlers()) onIdle();
else
{
bool readable = false;
PollSet::SocketModeMap sm = _pollSet.poll(_timeout);
if (sm.size() > 0)
{
onBusy();
PollSet::SocketModeMap::iterator it = sm.begin();
PollSet::SocketModeMap::iterator end = sm.end();
for (; it != end; ++it)
{
if (it->second & PollSet::POLL_READ)
{
dispatch(it->first, _pReadableNotification);
readable = true;
++handled;
}
if (it->second & PollSet::POLL_WRITE)
{
dispatch(it->first, _pWritableNotification);
++handled;
}
if (it->second & PollSet::POLL_ERROR)
{
dispatch(it->first, _pErrorNotification);
++handled;
}
}
}
if (!readable) onTimeout();
}
return handled;
}
void SocketReactor::run()
{
_pThread = Thread::current();
@@ -68,32 +107,10 @@ void SocketReactor::run()
{
try
{
if (!hasSocketHandlers())
if (!poll())
{
onIdle();
Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds()));
}
else
{
bool readable = false;
PollSet::SocketModeMap sm = _pollSet.poll(_timeout);
if (sm.size() > 0)
{
onBusy();
PollSet::SocketModeMap::iterator it = sm.begin();
PollSet::SocketModeMap::iterator end = sm.end();
for (; it != end; ++it)
{
if (it->second & PollSet::POLL_READ)
{
dispatch(it->first, _pReadableNotification);
readable = true;
}
if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification);
if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification);
}
}
if (!readable) onTimeout();
if (!hasSocketHandlers())
Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds()));
}
}
catch (Exception& exc)

View File

@@ -9,19 +9,17 @@
#include "SocketReactorTest.h"
#include "EchoServer.h"
#include "UDPEchoServer.h"
#include "CppUnit/TestCaller.h"
#include "CppUnit/TestSuite.h"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/SocketConnector.h"
#include "Poco/Net/SocketAcceptor.h"
#include "Poco/Net/ParallelSocketAcceptor.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Observer.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include <sstream>
@@ -30,6 +28,7 @@ using Poco::Net::SocketConnector;
using Poco::Net::SocketAcceptor;
using Poco::Net::ParallelSocketAcceptor;
using Poco::Net::StreamSocket;
using Poco::Net::DatagramSocket;
using Poco::Net::ServerSocket;
using Poco::Net::SocketAddress;
using Poco::Net::SocketNotification;
@@ -239,20 +238,20 @@ namespace
}
}
StreamSocket _socket;
SocketReactor& _reactor;
StreamSocket _socket;
SocketReactor& _reactor;
Observer<ClientServiceHandler, ReadableNotification> _or;
Observer<ClientServiceHandler, WritableNotification> _ow;
Observer<ClientServiceHandler, TimeoutNotification> _ot;
Observer<ClientServiceHandler, TimeoutNotification> _ot;
Observer<ClientServiceHandler, ShutdownNotification> _os;
std::stringstream _str;
static std::string _data;
static bool _readableError;
static bool _writableError;
static bool _timeoutError;
static bool _timeout;
static bool _closeOnTimeout;
static bool _once;
std::stringstream _str;
static std::string _data;
static bool _readableError;
static bool _writableError;
static bool _timeoutError;
static bool _timeout;
static bool _closeOnTimeout;
static bool _once;
};
@@ -369,9 +368,9 @@ namespace
static Data _data;
private:
StreamSocket _socket;
StreamSocket _socket;
SocketReactor& _reactor;
int _pos;
int _pos;
};
DataServiceHandler::Data DataServiceHandler::_data;
@@ -407,6 +406,13 @@ void SocketReactorTest::testSocketReactor()
}
void SocketReactorTest::testSocketReactorPoll()
{
testIOHandler<EchoServer, StreamSocket>();
testIOHandler<UDPEchoServer, DatagramSocket>();
}
void SocketReactorTest::testSetSocketReactor()
{
SocketAddress ssa;
@@ -508,11 +514,11 @@ void SocketReactorTest::testDataCollection()
" \"data\":"
" ["
" {"
" \"tag1\":"
" ["
" {\"val1\":123},"
" {\"val2\":\"abc\"}"
" ]"
" \"tag1\":"
" ["
" {\"val1\":123},"
" {\"val2\":\"abc\"}"
" ]"
" }"
" ]"
"}\n");
@@ -524,33 +530,33 @@ void SocketReactorTest::testDataCollection()
" \"ts\":\"1524864652654321\","
" \"data\":"
" ["
" {"
" \"tag1\":"
" ["
" {"
" \"val1\":123,"
" \"val2\":\"abc\","
" \"val3\":42.123"
" },"
" {"
" \"val1\":987,"
" \"val2\":\"xyz\","
" \"val3\":24.321"
" }"
" ],"
" \"tag2\":"
" ["
" {"
" \"val1\":42.123,"
" \"val2\":123,"
" \"val3\":\"abc\""
" },"
" {"
" \"val1\":24.321,"
" \"val2\":987,"
" \"val3\":\"xyz\""
" }"
" ]"
" {"
" \"tag1\":"
" ["
" {"
" \"val1\":123,"
" \"val2\":\"abc\","
" \"val3\":42.123"
" },"
" {"
" \"val1\":987,"
" \"val2\":\"xyz\","
" \"val3\":24.321"
" }"
" ],"
" \"tag2\":"
" ["
" {"
" \"val1\":42.123,"
" \"val2\":123,"
" \"val3\":\"abc\""
" },"
" {"
" \"val1\":24.321,"
" \"val2\":987,"
" \"val3\":\"xyz\""
" }"
" ]"
" }"
" ]"
"}\n";
@@ -586,6 +592,7 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest");
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactorPoll);
CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail);

View File

@@ -15,6 +15,11 @@
#include "Poco/Net/Net.h"
#include "Poco/Thread.h"
#include "Poco/Observer.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketNotification.h"
#include "CppUnit/TestCase.h"
@@ -25,6 +30,7 @@ public:
~SocketReactorTest();
void testSocketReactor();
void testSocketReactorPoll();
void testSetSocketReactor();
void testParallelSocketReactor();
void testSocketConnectorFail();
@@ -37,6 +43,128 @@ public:
static CppUnit::Test* suite();
private:
template <class SocketType>
class IOHandler
{
public:
IOHandler(const SocketType& socket, Poco::Net::SocketReactor& reactor):
_socket(socket),
_reactor(reactor),
_or(*this, &IOHandler::onReadable),
_ow(*this, &IOHandler::onWritable),
_ot(*this, &IOHandler::onTimeout),
_os(*this, &IOHandler::onShutdown)
{
_reactor.addEventHandler(_socket, _or);
_reactor.addEventHandler(_socket, _ow);
_reactor.addEventHandler(_socket, _ot);
_reactor.addEventHandler(_socket, _os);
}
~IOHandler()
{
_reactor.removeEventHandler(_socket, _or);
_reactor.removeEventHandler(_socket, _ow);
_reactor.removeEventHandler(_socket, _ot);
_reactor.removeEventHandler(_socket, _os);
}
void onReadable(Poco::Net::ReadableNotification* pNf)
{
pNf->release();
char buffer[32];
int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n > 0) _data.append(buffer, n);
}
void onWritable(Poco::Net::WritableNotification* pNf)
{
pNf->release();
_writable = true;
}
void onTimeout(Poco::Net::TimeoutNotification* pNf)
{
pNf->release();
_timeout = true;
}
void onShutdown(Poco::Net::ShutdownNotification* pNf)
{
pNf->release();
_shutdown = true;
}
const std::string& data()
{
return _data;
}
bool writable() const
{
return _writable;
}
bool timeout() const
{
return _timeout;
}
bool shutdown() const
{
return _shutdown;
}
private:
SocketType _socket;
Poco::Net::SocketReactor& _reactor;
Poco::Observer<IOHandler, Poco::Net::ReadableNotification> _or;
Poco::Observer<IOHandler, Poco::Net::WritableNotification> _ow;
Poco::Observer<IOHandler, Poco::Net::TimeoutNotification> _ot;
Poco::Observer<IOHandler, Poco::Net::ShutdownNotification> _os;
bool _writable = false;
bool _timeout = false;
bool _shutdown = false;
std::string _data;
};
template <typename ServerType, typename SocketType>
void testIOHandler()
{
using Poco::Thread;
using Poco::Net::SocketReactor;
using Poco::Net::SocketAddress;
ServerType echoServer;
SocketType ds;
char buffer[256];
ds.connect(SocketAddress("127.0.0.1", echoServer.port()));
SocketReactor reactor;
IOHandler<SocketType> ih(ds, reactor);
int n = ds.sendBytes("hello", 5);
assertTrue (n == 5);
assertFalse(ih.writable());
Thread t; t.start(reactor);
Thread::sleep(500);
assertTrue(ih.writable());
assertFalse(ih.shutdown());
reactor.stop(); t.join();
assertTrue(ih.shutdown());
assertTrue (ih.data() == "hello");
n = ds.sendBytes("hello", 5);
assertTrue (n == 5);
while (ih.data().size() < 10)
{
reactor.poll();
Thread::sleep(10);
}
assertTrue (ih.data() == "hellohello");
reactor.poll();
assertTrue (ih.timeout());
ds.close();
}
};