From ea38cdb7402b18f24422f5c2bd02df9d2d668a40 Mon Sep 17 00:00:00 2001 From: Alex Fabijanic Date: Thu, 26 Apr 2018 19:17:49 -0500 Subject: [PATCH] Use PollSet in SocketReactor #2092 (linux) --- Foundation/include/Poco/Observer.h | 2 +- Net/src/PollSet.cpp | 13 +++++-- Net/src/SocketReactor.cpp | 32 +++++++++--------- Net/testsuite/src/SocketReactorTest.cpp | 45 +++++++++++++------------ 4 files changed, 51 insertions(+), 41 deletions(-) diff --git a/Foundation/include/Poco/Observer.h b/Foundation/include/Poco/Observer.h index 98d94e062..0bec7412c 100644 --- a/Foundation/include/Poco/Observer.h +++ b/Foundation/include/Poco/Observer.h @@ -70,7 +70,7 @@ public: } return *this; } - + void notify(Notification* pNf) const { Poco::Mutex::ScopedLock lock(_mutex); diff --git a/Net/src/PollSet.cpp b/Net/src/PollSet.cpp index 576c475d0..e3230fd99 100644 --- a/Net/src/PollSet.cpp +++ b/Net/src/PollSet.cpp @@ -72,7 +72,8 @@ public: { Poco::FastMutex::ScopedLock lock(_mutex); - poco_socket_t fd = socket.impl()->sockfd(); + SocketImpl* sockImpl = socket.impl(); + poco_socket_t fd = sockImpl->sockfd(); struct epoll_event ev; ev.events = 0; if (mode & PollSet::POLL_READ) @@ -83,9 +84,15 @@ public: ev.events |= EPOLLERR; ev.data.ptr = socket.impl(); int err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, fd, &ev); - if (err) SocketImpl::error(); - _socketMap[socket.impl()] = socket; + if (err) + { + if (errno == EEXIST) update(socket, mode); + else SocketImpl::error(); + } + + if (_socketMap.find(sockImpl) == _socketMap.end()) + _socketMap[sockImpl] = socket; } void remove(const Socket& socket) diff --git a/Net/src/SocketReactor.cpp b/Net/src/SocketReactor.cpp index 175d9d74a..6cc28a303 100644 --- a/Net/src/SocketReactor.cpp +++ b/Net/src/SocketReactor.cpp @@ -61,19 +61,6 @@ SocketReactor::~SocketReactor() } -bool SocketReactor::hasSocketHandlers() -{ - ScopedLock lock(_mutex); - for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) - { - if (it->second->accepts(_pReadableNotification) || - it->second->accepts(_pWritableNotification) || - it->second->accepts(_pErrorNotification)) return true; - } - return false; -} - - void SocketReactor::run() { _pThread = Thread::current(); @@ -97,13 +84,13 @@ void SocketReactor::run() PollSet::SocketModeMap::iterator end = sm.end(); for (; it != end; ++it) { - if ((it->second & PollSet::POLL_READ) != 0) + if (it->second & PollSet::POLL_READ) { dispatch(it->first, _pReadableNotification); readable = true; } - if ((it->second & PollSet::POLL_WRITE) != 0) dispatch(it->first, _pWritableNotification); - if ((it->second & PollSet::POLL_ERROR) != 0) dispatch(it->first, _pErrorNotification); + if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification); + if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification); } } if (!readable) onTimeout(); @@ -126,6 +113,19 @@ void SocketReactor::run() } +bool SocketReactor::hasSocketHandlers() +{ + ScopedLock lock(_mutex); + for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) + { + if (it->second->accepts(_pReadableNotification) || + it->second->accepts(_pWritableNotification) || + it->second->accepts(_pErrorNotification)) return true; + } + return false; +} + + void SocketReactor::stop() { _stop = true; diff --git a/Net/testsuite/src/SocketReactorTest.cpp b/Net/testsuite/src/SocketReactorTest.cpp index 26d23b7f2..3bba177ee 100644 --- a/Net/testsuite/src/SocketReactorTest.cpp +++ b/Net/testsuite/src/SocketReactorTest.cpp @@ -50,11 +50,13 @@ namespace _reactor(reactor) { _reactor.addEventHandler(_socket, Observer(*this, &EchoServiceHandler::onReadable)); + _reactor.addEventHandler(_socket, Observer(*this, &EchoServiceHandler::onShutdown)); } - + ~EchoServiceHandler() { _reactor.removeEventHandler(_socket, Observer(*this, &EchoServiceHandler::onReadable)); + _reactor.removeEventHandler(_socket, Observer(*this, &EchoServiceHandler::onShutdown)); } void onReadable(ReadableNotification* pNf) @@ -66,11 +68,12 @@ namespace { _socket.sendBytes(buffer, n); } - else - { - _socket.shutdownSend(); - delete this; - } + } + + void onShutdown(ShutdownNotification* pNf) + { + pNf->release(); + delete this; } private: @@ -125,7 +128,7 @@ namespace } } } - + void onWritable(WritableNotification* pNf) { pNf->release(); @@ -136,7 +139,7 @@ namespace _socket.sendBytes(data.data(), (int) data.length()); _socket.shutdownSend(); } - + void onTimeout(TimeoutNotification* pNf) { pNf->release(); @@ -147,7 +150,7 @@ namespace delete this; } } - + static std::string data() { return _data; @@ -167,22 +170,22 @@ namespace { return _closeOnTimeout; } - + static void setCloseOnTimeout(bool flag) { _closeOnTimeout = flag; } - + static bool readableError() { return _readableError; } - + static bool writableError() { return _writableError; } - + static bool timeoutError() { return _timeoutError; @@ -235,8 +238,8 @@ namespace static bool _closeOnTimeout; static bool _once; }; - - + + std::string ClientServiceHandler::_data; bool ClientServiceHandler::_readableError = false; bool ClientServiceHandler::_writableError = false; @@ -244,8 +247,8 @@ namespace bool ClientServiceHandler::_timeout = false; bool ClientServiceHandler::_closeOnTimeout = false; bool ClientServiceHandler::_once = false; - - + + class FailConnector: public SocketConnector { public: @@ -257,13 +260,13 @@ namespace reactor.addEventHandler(socket(), Observer(*this, &FailConnector::onTimeout)); reactor.addEventHandler(socket(), Observer(*this, &FailConnector::onShutdown)); } - + void onShutdown(ShutdownNotification* pNf) { pNf->release(); _shutdown = true; } - + void onTimeout(TimeoutNotification* pNf) { pNf->release(); @@ -276,7 +279,7 @@ namespace _failed = true; reactor()->stop(); } - + bool failed() const { return _failed; @@ -286,7 +289,7 @@ namespace { return _shutdown; } - + private: bool _failed; bool _shutdown;