Use PollSet in SocketReactor #2092 (windows tested)

This commit is contained in:
Alex Fabijanic
2018-04-26 17:42:11 -05:00
committed by Alex Fabijanic
parent 479bde1e46
commit 6912384422
4 changed files with 69 additions and 65 deletions

View File

@@ -20,6 +20,7 @@
#include "Poco/Net/Net.h" #include "Poco/Net/Net.h"
#include "Poco/Net/Socket.h" #include "Poco/Net/Socket.h"
#include "Poco/Net/PollSet.h"
#include "Poco/Runnable.h" #include "Poco/Runnable.h"
#include "Poco/Timespan.h" #include "Poco/Timespan.h"
#include "Poco/Observer.h" #include "Poco/Observer.h"
@@ -205,7 +206,10 @@ private:
typedef Poco::AutoPtr<SocketNotifier> NotifierPtr; typedef Poco::AutoPtr<SocketNotifier> NotifierPtr;
typedef Poco::AutoPtr<SocketNotification> NotificationPtr; typedef Poco::AutoPtr<SocketNotification> NotificationPtr;
typedef std::map<Socket, NotifierPtr> EventHandlerMap; typedef std::map<Socket, NotifierPtr> EventHandlerMap;
typedef Poco::FastMutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
bool hasSocketHandlers();
void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification); void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification);
enum enum
@@ -216,13 +220,14 @@ private:
bool _stop; bool _stop;
Poco::Timespan _timeout; Poco::Timespan _timeout;
EventHandlerMap _handlers; EventHandlerMap _handlers;
PollSet _pollSet;
NotificationPtr _pReadableNotification; NotificationPtr _pReadableNotification;
NotificationPtr _pWritableNotification; NotificationPtr _pWritableNotification;
NotificationPtr _pErrorNotification; NotificationPtr _pErrorNotification;
NotificationPtr _pTimeoutNotification; NotificationPtr _pTimeoutNotification;
NotificationPtr _pIdleNotification; NotificationPtr _pIdleNotification;
NotificationPtr _pShutdownNotification; NotificationPtr _pShutdownNotification;
Poco::FastMutex _mutex; MutexType _mutex;
Poco::Thread* _pThread; Poco::Thread* _pThread;
friend class SocketNotifier; friend class SocketNotifier;

View File

@@ -190,7 +190,7 @@ private:
// //
// BSD implementation using poll // BSD/Windows implementation using poll
// //
class PollSetImpl class PollSetImpl
{ {

View File

@@ -20,7 +20,6 @@
#include "Poco/Exception.h" #include "Poco/Exception.h"
using Poco::FastMutex;
using Poco::Exception; using Poco::Exception;
using Poco::ErrorHandler; using Poco::ErrorHandler;
@@ -62,60 +61,53 @@ SocketReactor::~SocketReactor()
} }
bool SocketReactor::hasSocketHandlers()
{
ScopedLock lock(_mutex);
for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
{
if (it->second->accepts(_pReadableNotification) ||
it->second->accepts(_pWritableNotification) ||
it->second->accepts(_pErrorNotification)) return true;
}
return false;
}
void SocketReactor::run() void SocketReactor::run()
{ {
_pThread = Thread::current(); _pThread = Thread::current();
Socket::SocketList readable;
Socket::SocketList writable;
Socket::SocketList except;
while (!_stop) while (!_stop)
{ {
try try
{ {
readable.clear(); if (!hasSocketHandlers())
writable.clear();
except.clear();
int nSockets = 0;
{
FastMutex::ScopedLock lock(_mutex);
for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
{
if (it->second->accepts(_pReadableNotification))
{
readable.push_back(it->first);
nSockets++;
}
if (it->second->accepts(_pWritableNotification))
{
writable.push_back(it->first);
nSockets++;
}
if (it->second->accepts(_pErrorNotification))
{
except.push_back(it->first);
nSockets++;
}
}
}
if (nSockets == 0)
{ {
onIdle(); onIdle();
Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds())); Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds()));
} }
else if (Socket::select(readable, writable, except, _timeout)) else
{ {
onBusy(); bool readable = false;
PollSet::SocketModeMap sm = _pollSet.poll(_timeout);
for (Socket::SocketList::iterator it = readable.begin(); it != readable.end(); ++it) if (sm.size() > 0)
dispatch(*it, _pReadableNotification); {
for (Socket::SocketList::iterator it = writable.begin(); it != writable.end(); ++it) onBusy();
dispatch(*it, _pWritableNotification); PollSet::SocketModeMap::iterator it = sm.begin();
for (Socket::SocketList::iterator it = except.begin(); it != except.end(); ++it) PollSet::SocketModeMap::iterator end = sm.end();
dispatch(*it, _pErrorNotification); for (; it != end; ++it)
{
if ((it->second & PollSet::POLL_READ) != 0)
{
dispatch(it->first, _pReadableNotification);
readable = true;
}
if ((it->second & PollSet::POLL_WRITE) != 0) dispatch(it->first, _pWritableNotification);
if ((it->second & PollSet::POLL_ERROR) != 0) dispatch(it->first, _pErrorNotification);
}
}
if (!readable) onTimeout();
} }
else onTimeout();
} }
catch (Exception& exc) catch (Exception& exc)
{ {
@@ -162,7 +154,7 @@ void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractOb
{ {
NotifierPtr pNotifier; NotifierPtr pNotifier;
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLock lock(_mutex);
EventHandlerMap::iterator it = _handlers.find(socket); EventHandlerMap::iterator it = _handlers.find(socket);
if (it == _handlers.end()) if (it == _handlers.end())
@@ -171,9 +163,16 @@ void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractOb
_handlers[socket] = pNotifier; _handlers[socket] = pNotifier;
} }
else pNotifier = it->second; else pNotifier = it->second;
if (!pNotifier->hasObserver(observer))
pNotifier->addObserver(this, observer);
} }
if (!pNotifier->hasObserver(observer))
pNotifier->addObserver(this, observer); int mode = 0;
if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ;
if (pNotifier->accepts(_pWritableNotification)) mode |= PollSet::POLL_WRITE;
if (pNotifier->accepts(_pErrorNotification)) mode |= PollSet::POLL_ERROR;
if (mode) _pollSet.add(socket, mode);
} }
@@ -181,7 +180,7 @@ bool SocketReactor::hasEventHandler(const Socket& socket, const Poco::AbstractOb
{ {
NotifierPtr pNotifier; NotifierPtr pNotifier;
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLock lock(_mutex);
EventHandlerMap::iterator it = _handlers.find(socket); EventHandlerMap::iterator it = _handlers.find(socket);
if (it != _handlers.end()) if (it != _handlers.end())
@@ -199,7 +198,7 @@ void SocketReactor::removeEventHandler(const Socket& socket, const Poco::Abstrac
{ {
NotifierPtr pNotifier; NotifierPtr pNotifier;
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLock lock(_mutex);
EventHandlerMap::iterator it = _handlers.find(socket); EventHandlerMap::iterator it = _handlers.find(socket);
if (it != _handlers.end()) if (it != _handlers.end())
@@ -208,6 +207,7 @@ void SocketReactor::removeEventHandler(const Socket& socket, const Poco::Abstrac
if (pNotifier->hasObserver(observer) && pNotifier->countObservers() == 1) if (pNotifier->hasObserver(observer) && pNotifier->countObservers() == 1)
{ {
_handlers.erase(it); _handlers.erase(it);
_pollSet.remove(socket);
} }
} }
} }
@@ -246,7 +246,7 @@ void SocketReactor::dispatch(const Socket& socket, SocketNotification* pNotifica
{ {
NotifierPtr pNotifier; NotifierPtr pNotifier;
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLock lock(_mutex);
EventHandlerMap::iterator it = _handlers.find(socket); EventHandlerMap::iterator it = _handlers.find(socket);
if (it != _handlers.end()) if (it != _handlers.end())
pNotifier = it->second; pNotifier = it->second;
@@ -262,7 +262,7 @@ void SocketReactor::dispatch(SocketNotification* pNotification)
std::vector<NotifierPtr> delegates; std::vector<NotifierPtr> delegates;
delegates.reserve(_handlers.size()); delegates.reserve(_handlers.size());
{ {
FastMutex::ScopedLock lock(_mutex); ScopedLock lock(_mutex);
for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
delegates.push_back(it->second); delegates.push_back(it->second);
} }

View File

@@ -116,15 +116,13 @@ namespace
if (n > 0) if (n > 0)
{ {
_str.write(buffer, n); _str.write(buffer, n);
}
else
{
checkReadableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
checkReadableObserverCount(0);
if (_once || _data.size() >= 3072) _reactor.stop();
_data += _str.str(); _data += _str.str();
delete this; if ((_once && _data.size() >= 1024) ||
(!_once && _data.size() >= 4096))
{
_reactor.stop();
delete this;
}
} }
} }
@@ -318,7 +316,7 @@ void SocketReactorTest::testSocketReactor()
ClientServiceHandler::resetData(); ClientServiceHandler::resetData();
reactor.run(); reactor.run();
std::string data(ClientServiceHandler::data()); std::string data(ClientServiceHandler::data());
assertTrue (data.size() == 1024); assertTrue (data.size() >= 1024);
assertTrue (!ClientServiceHandler::readableError()); assertTrue (!ClientServiceHandler::readableError());
assertTrue (!ClientServiceHandler::writableError()); assertTrue (!ClientServiceHandler::writableError());
assertTrue (!ClientServiceHandler::timeoutError()); assertTrue (!ClientServiceHandler::timeoutError());
@@ -338,7 +336,7 @@ void SocketReactorTest::testSetSocketReactor()
ClientServiceHandler::resetData(); ClientServiceHandler::resetData();
reactor.run(); reactor.run();
std::string data(ClientServiceHandler::data()); std::string data(ClientServiceHandler::data());
assertTrue (data.size() == 1024); assertTrue (data.size() >= 1024);
assertTrue (!ClientServiceHandler::readableError()); assertTrue (!ClientServiceHandler::readableError());
assertTrue (!ClientServiceHandler::writableError()); assertTrue (!ClientServiceHandler::writableError());
assertTrue (!ClientServiceHandler::timeoutError()); assertTrue (!ClientServiceHandler::timeoutError());
@@ -360,7 +358,7 @@ void SocketReactorTest::testParallelSocketReactor()
ClientServiceHandler::resetData(); ClientServiceHandler::resetData();
reactor.run(); reactor.run();
std::string data(ClientServiceHandler::data()); std::string data(ClientServiceHandler::data());
assertTrue (data.size() == 4096); assertTrue (data.size() >= 4096);
assertTrue (!ClientServiceHandler::readableError()); assertTrue (!ClientServiceHandler::readableError());
assertTrue (!ClientServiceHandler::writableError()); assertTrue (!ClientServiceHandler::writableError());
assertTrue (!ClientServiceHandler::timeoutError()); assertTrue (!ClientServiceHandler::timeoutError());
@@ -411,6 +409,7 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest"); CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest");
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor); CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor); CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);