Use PollSet in SocketReactor #2092 (linux)

This commit is contained in:
Alex Fabijanic
2018-04-26 19:17:49 -05:00
parent 6912384422
commit ea38cdb740
4 changed files with 51 additions and 41 deletions

View File

@@ -70,7 +70,7 @@ public:
} }
return *this; return *this;
} }
void notify(Notification* pNf) const void notify(Notification* pNf) const
{ {
Poco::Mutex::ScopedLock lock(_mutex); Poco::Mutex::ScopedLock lock(_mutex);

View File

@@ -72,7 +72,8 @@ public:
{ {
Poco::FastMutex::ScopedLock lock(_mutex); Poco::FastMutex::ScopedLock lock(_mutex);
poco_socket_t fd = socket.impl()->sockfd(); SocketImpl* sockImpl = socket.impl();
poco_socket_t fd = sockImpl->sockfd();
struct epoll_event ev; struct epoll_event ev;
ev.events = 0; ev.events = 0;
if (mode & PollSet::POLL_READ) if (mode & PollSet::POLL_READ)
@@ -83,9 +84,15 @@ public:
ev.events |= EPOLLERR; ev.events |= EPOLLERR;
ev.data.ptr = socket.impl(); ev.data.ptr = socket.impl();
int err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev); int err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev);
if (err) SocketImpl::error();
_socketMap[socket.impl()] = socket; if (err)
{
if (errno == EEXIST) update(socket, mode);
else SocketImpl::error();
}
if (_socketMap.find(sockImpl) == _socketMap.end())
_socketMap[sockImpl] = socket;
} }
void remove(const Socket& socket) void remove(const Socket& socket)

View File

@@ -61,19 +61,6 @@ SocketReactor::~SocketReactor()
} }
bool SocketReactor::hasSocketHandlers()
{
ScopedLock lock(_mutex);
for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
{
if (it->second->accepts(_pReadableNotification) ||
it->second->accepts(_pWritableNotification) ||
it->second->accepts(_pErrorNotification)) return true;
}
return false;
}
void SocketReactor::run() void SocketReactor::run()
{ {
_pThread = Thread::current(); _pThread = Thread::current();
@@ -97,13 +84,13 @@ void SocketReactor::run()
PollSet::SocketModeMap::iterator end = sm.end(); PollSet::SocketModeMap::iterator end = sm.end();
for (; it != end; ++it) for (; it != end; ++it)
{ {
if ((it->second & PollSet::POLL_READ) != 0) if (it->second & PollSet::POLL_READ)
{ {
dispatch(it->first, _pReadableNotification); dispatch(it->first, _pReadableNotification);
readable = true; readable = true;
} }
if ((it->second & PollSet::POLL_WRITE) != 0) dispatch(it->first, _pWritableNotification); if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification);
if ((it->second & PollSet::POLL_ERROR) != 0) dispatch(it->first, _pErrorNotification); if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification);
} }
} }
if (!readable) onTimeout(); if (!readable) onTimeout();
@@ -126,6 +113,19 @@ void SocketReactor::run()
} }
bool SocketReactor::hasSocketHandlers()
{
ScopedLock lock(_mutex);
for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
{
if (it->second->accepts(_pReadableNotification) ||
it->second->accepts(_pWritableNotification) ||
it->second->accepts(_pErrorNotification)) return true;
}
return false;
}
void SocketReactor::stop() void SocketReactor::stop()
{ {
_stop = true; _stop = true;

View File

@@ -50,11 +50,13 @@ namespace
_reactor(reactor) _reactor(reactor)
{ {
_reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable)); _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
_reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ShutdownNotification>(*this, &EchoServiceHandler::onShutdown));
} }
~EchoServiceHandler() ~EchoServiceHandler()
{ {
_reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable)); _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ShutdownNotification>(*this, &EchoServiceHandler::onShutdown));
} }
void onReadable(ReadableNotification* pNf) void onReadable(ReadableNotification* pNf)
@@ -66,11 +68,12 @@ namespace
{ {
_socket.sendBytes(buffer, n); _socket.sendBytes(buffer, n);
} }
else }
{
_socket.shutdownSend(); void onShutdown(ShutdownNotification* pNf)
delete this; {
} pNf->release();
delete this;
} }
private: private:
@@ -125,7 +128,7 @@ namespace
} }
} }
} }
void onWritable(WritableNotification* pNf) void onWritable(WritableNotification* pNf)
{ {
pNf->release(); pNf->release();
@@ -136,7 +139,7 @@ namespace
_socket.sendBytes(data.data(), (int) data.length()); _socket.sendBytes(data.data(), (int) data.length());
_socket.shutdownSend(); _socket.shutdownSend();
} }
void onTimeout(TimeoutNotification* pNf) void onTimeout(TimeoutNotification* pNf)
{ {
pNf->release(); pNf->release();
@@ -147,7 +150,7 @@ namespace
delete this; delete this;
} }
} }
static std::string data() static std::string data()
{ {
return _data; return _data;
@@ -167,22 +170,22 @@ namespace
{ {
return _closeOnTimeout; return _closeOnTimeout;
} }
static void setCloseOnTimeout(bool flag) static void setCloseOnTimeout(bool flag)
{ {
_closeOnTimeout = flag; _closeOnTimeout = flag;
} }
static bool readableError() static bool readableError()
{ {
return _readableError; return _readableError;
} }
static bool writableError() static bool writableError()
{ {
return _writableError; return _writableError;
} }
static bool timeoutError() static bool timeoutError()
{ {
return _timeoutError; return _timeoutError;
@@ -235,8 +238,8 @@ namespace
static bool _closeOnTimeout; static bool _closeOnTimeout;
static bool _once; static bool _once;
}; };
std::string ClientServiceHandler::_data; std::string ClientServiceHandler::_data;
bool ClientServiceHandler::_readableError = false; bool ClientServiceHandler::_readableError = false;
bool ClientServiceHandler::_writableError = false; bool ClientServiceHandler::_writableError = false;
@@ -244,8 +247,8 @@ namespace
bool ClientServiceHandler::_timeout = false; bool ClientServiceHandler::_timeout = false;
bool ClientServiceHandler::_closeOnTimeout = false; bool ClientServiceHandler::_closeOnTimeout = false;
bool ClientServiceHandler::_once = false; bool ClientServiceHandler::_once = false;
class FailConnector: public SocketConnector<ClientServiceHandler> class FailConnector: public SocketConnector<ClientServiceHandler>
{ {
public: public:
@@ -257,13 +260,13 @@ namespace
reactor.addEventHandler(socket(), Observer<FailConnector, TimeoutNotification>(*this, &FailConnector::onTimeout)); reactor.addEventHandler(socket(), Observer<FailConnector, TimeoutNotification>(*this, &FailConnector::onTimeout));
reactor.addEventHandler(socket(), Observer<FailConnector, ShutdownNotification>(*this, &FailConnector::onShutdown)); reactor.addEventHandler(socket(), Observer<FailConnector, ShutdownNotification>(*this, &FailConnector::onShutdown));
} }
void onShutdown(ShutdownNotification* pNf) void onShutdown(ShutdownNotification* pNf)
{ {
pNf->release(); pNf->release();
_shutdown = true; _shutdown = true;
} }
void onTimeout(TimeoutNotification* pNf) void onTimeout(TimeoutNotification* pNf)
{ {
pNf->release(); pNf->release();
@@ -276,7 +279,7 @@ namespace
_failed = true; _failed = true;
reactor()->stop(); reactor()->stop();
} }
bool failed() const bool failed() const
{ {
return _failed; return _failed;
@@ -286,7 +289,7 @@ namespace
{ {
return _shutdown; return _shutdown;
} }
private: private:
bool _failed; bool _failed;
bool _shutdown; bool _shutdown;