add PollSet::has/empty(); ParallelAcceptor: always use same reactor for a socket, if registered

This commit is contained in:
Alex Fabijanic 2018-05-01 13:23:21 -05:00 committed by Alex Fabijanic
parent f6e6bec32d
commit df46368413
8 changed files with 139 additions and 31 deletions

View File

@ -159,15 +159,39 @@ public:
} }
protected: protected:
typedef std::vector<typename ParallelReactor::Ptr> ReactorVec;
virtual ServiceHandler* createServiceHandler(StreamSocket& socket) virtual ServiceHandler* createServiceHandler(StreamSocket& socket)
/// Create and initialize a new ServiceHandler instance. /// Create and initialize a new ServiceHandler instance.
/// If socket is already registered with a reactor, the new
/// ServiceHandler instance is given that reactor; otherwise,
/// the next reactor is used. Reactors are rotated in round-robin
/// fashion.
/// ///
/// Subclasses can override this method. /// Subclasses can override this method.
{
SocketReactor* pReactor = reactor(socket);
if (!pReactor)
{ {
std::size_t next = _next++; std::size_t next = _next++;
if (_next == _reactors.size()) _next = 0; if (_next == _reactors.size()) _next = 0;
_reactors[next]->wakeUp(); pReactor = _reactors[next];
return new ServiceHandler(socket, *_reactors[next]); }
pReactor->wakeUp();
return new ServiceHandler(socket, *pReactor);
}
SocketReactor* reactor(const Socket& socket)
/// Returns reactor where this socket is already registered
/// for polling, if found; otherwise returns null pointer.
{
ReactorVec::iterator it = _reactors.begin();
ReactorVec::iterator end = _reactors.end();
for (; it != end; ++it)
{
if ((*it)->has(socket)) return it->get();
}
return 0;
} }
SocketReactor* reactor() SocketReactor* reactor()
@ -194,8 +218,6 @@ protected:
_reactors.push_back(new ParallelReactor); _reactors.push_back(new ParallelReactor);
} }
typedef std::vector<typename ParallelReactor::Ptr> ReactorVec;
ReactorVec& reactors() ReactorVec& reactors()
/// Returns reference to vector of reactors. /// Returns reference to vector of reactors.
{ {
@ -208,8 +230,8 @@ protected:
return _reactors.at(idx).get(); return _reactors.at(idx).get();
} }
std::size_t& next() std::size_t next()
/// Returns reference to the next reactor index. /// Returns the next reactor index.
{ {
return _next; return _next;
} }

View File

@ -63,6 +63,12 @@ public:
void update(const Poco::Net::Socket& socket, int mode); void update(const Poco::Net::Socket& socket, int mode);
/// Updates the mode of the given socket. /// Updates the mode of the given socket.
bool has(const Socket& socket) const;
/// Returns true if socket is registered for polling.
bool empty() const;
/// Returns true if no socket is registered for polling.
void clear(); void clear();
/// Removes all sockets from the PollSet. /// Removes all sockets from the PollSet.

View File

@ -20,6 +20,11 @@
#include "Poco/Net/Net.h" #include "Poco/Net/Net.h"
#include "Poco/Net/SocketNotification.h" #include "Poco/Net/SocketNotification.h"
<<<<<<< HEAD
=======
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/ParallelSocketAcceptor.h"
>>>>>>> 0d7f39f... add PollSet::has/empty(); ParallelAcceptor: always use same reactor for a socket, if registered
#include "Poco/Net/SocketAddress.h" #include "Poco/Net/SocketAddress.h"
#include "Poco/Net/StreamSocket.h" #include "Poco/Net/StreamSocket.h"
#include "Poco/Observer.h" #include "Poco/Observer.h"
@ -78,13 +83,13 @@ public:
_socket.connectNB(address); _socket.connectNB(address);
} }
SocketConnector(SocketAddress& address, SocketReactor& reactor): SocketConnector(SocketAddress& address, SocketReactor& reactor, bool doRegister = true) :
_pReactor(0) _pReactor(0)
/// Creates an acceptor, using the given ServerSocket. /// Creates an acceptor, using the given ServerSocket.
/// The SocketConnector registers itself with the given SocketReactor. /// The SocketConnector registers itself with the given SocketReactor.
{ {
_socket.connectNB(address); _socket.connectNB(address);
registerConnector(reactor); if (doRegister) registerConnector(reactor);
} }
virtual ~SocketConnector() virtual ~SocketConnector()

View File

@ -166,6 +166,9 @@ public:
/// Poco::Observer<MyEventHandler, SocketNotification> obs(*this, &MyEventHandler::handleMyEvent); /// Poco::Observer<MyEventHandler, SocketNotification> obs(*this, &MyEventHandler::handleMyEvent);
/// reactor.removeEventHandler(obs); /// reactor.removeEventHandler(obs);
bool has(const Socket& socket) const;
/// Returns true if socket is registered with this rector.
protected: protected:
virtual void onTimeout(); virtual void onTimeout();
/// Called if the timeout expires and no other events are available. /// Called if the timeout expires and no other events are available.

View File

@ -109,6 +109,20 @@ public:
_socketMap.erase(socket.impl()); _socketMap.erase(socket.impl());
} }
bool has(const Socket& socket) const
{
Poco::FastMutex::ScopedLock lock(_mutex);
SocketImpl* sockImpl = socket.impl();
return sockImpl &&
(_socketMap.find(sockImpl->sockfd()) != _socketMap.end());
}
bool empty() const
{
Poco::FastMutex::ScopedLock lock(_mutex);
return _socketMap.empty();
}
void update(const Socket& socket, int mode) void update(const Socket& socket, int mode)
{ {
poco_socket_t fd = socket.impl()->sockfd(); poco_socket_t fd = socket.impl()->sockfd();
@ -186,7 +200,7 @@ public:
} }
private: private:
Poco::FastMutex _mutex; mutable Poco::FastMutex _mutex;
int _epollfd; int _epollfd;
std::map<void*, Socket> _socketMap; std::map<void*, Socket> _socketMap;
std::vector<struct epoll_event> _events; std::vector<struct epoll_event> _events;
@ -222,6 +236,20 @@ public:
_socketMap.erase(fd); _socketMap.erase(fd);
} }
bool has(const Socket& socket) const
{
Poco::FastMutex::ScopedLock lock(_mutex);
SocketImpl* sockImpl = socket.impl();
return sockImpl &&
(_socketMap.find(sockImpl->sockfd()) != _socketMap.end());
}
bool empty() const
{
Poco::FastMutex::ScopedLock lock(_mutex);
return _socketMap.empty();
}
void update(const Socket& socket, int mode) void update(const Socket& socket, int mode)
{ {
Poco::FastMutex::ScopedLock lock(_mutex); Poco::FastMutex::ScopedLock lock(_mutex);
@ -337,7 +365,7 @@ public:
} }
private: private:
Poco::FastMutex _mutex; mutable Poco::FastMutex _mutex;
std::map<poco_socket_t, Socket> _socketMap; std::map<poco_socket_t, Socket> _socketMap;
std::map<poco_socket_t, int> _addMap; std::map<poco_socket_t, int> _addMap;
std::set<poco_socket_t> _removeSet; std::set<poco_socket_t> _removeSet;
@ -357,28 +385,36 @@ public:
void add(const Socket& socket, int mode) void add(const Socket& socket, int mode)
{ {
Poco::FastMutex::ScopedLock lock(_mutex); Poco::FastMutex::ScopedLock lock(_mutex);
_map[socket] = mode; _map[socket] = mode;
} }
void remove(const Socket& socket) void remove(const Socket& socket)
{ {
Poco::FastMutex::ScopedLock lock(_mutex); Poco::FastMutex::ScopedLock lock(_mutex);
_map.erase(socket); _map.erase(socket);
} }
bool has(const Socket& socket) const
{
Poco::FastMutex::ScopedLock lock(_mutex);
return _map.find(socket) != _map.end();
}
bool empty() const
{
Poco::FastMutex::ScopedLock lock(_mutex);
return _map.empty();
}
void update(const Socket& socket, int mode) void update(const Socket& socket, int mode)
{ {
Poco::FastMutex::ScopedLock lock(_mutex); Poco::FastMutex::ScopedLock lock(_mutex);
_map[socket] = mode; _map[socket] = mode;
} }
void clear() void clear()
{ {
Poco::FastMutex::ScopedLock lock(_mutex); Poco::FastMutex::ScopedLock lock(_mutex);
_map.clear(); _map.clear();
} }
@ -472,7 +508,7 @@ public:
} }
private: private:
Poco::FastMutex _mutex; mutable Poco::FastMutex _mutex;
PollSet::SocketModeMap _map; PollSet::SocketModeMap _map;
}; };
@ -510,6 +546,18 @@ void PollSet::update(const Socket& socket, int mode)
} }
bool PollSet::has(const Socket& socket) const
{
return _pImpl->has(socket);
}
bool PollSet::empty() const
{
return _pImpl->empty();
}
void PollSet::clear() void PollSet::clear()
{ {
_pImpl->clear(); _pImpl->clear();

View File

@ -116,6 +116,9 @@ void SocketReactor::run()
bool SocketReactor::hasSocketHandlers() bool SocketReactor::hasSocketHandlers()
{ {
ScopedLock lock(_mutex); ScopedLock lock(_mutex);
if (_pollSet.empty()) return false;
for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it) for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
{ {
if (it->second->accepts(_pReadableNotification) || if (it->second->accepts(_pReadableNotification) ||
@ -219,6 +222,12 @@ void SocketReactor::removeEventHandler(const Socket& socket, const Poco::Abstrac
} }
bool SocketReactor::has(const Socket& socket) const
{
return _pollSet.has(socket);
}
void SocketReactor::onTimeout() void SocketReactor::onTimeout()
{ {
dispatch(_pTimeoutNotification); dispatch(_pTimeoutNotification);

View File

@ -51,7 +51,11 @@ void PollSetTest::testPoll()
ss2.connect(SocketAddress("127.0.0.1", echoServer2.port())); ss2.connect(SocketAddress("127.0.0.1", echoServer2.port()));
PollSet ps; PollSet ps;
assertTrue(ps.empty());
ps.add(ss1, PollSet::POLL_READ); ps.add(ss1, PollSet::POLL_READ);
assertTrue(!ps.empty());
assertTrue(ps.has(ss1));
assertTrue(!ps.has(ss2));
// nothing readable // nothing readable
Stopwatch sw; Stopwatch sw;
@ -62,6 +66,9 @@ void PollSetTest::testPoll()
sw.restart(); sw.restart();
ps.add(ss2, PollSet::POLL_READ); ps.add(ss2, PollSet::POLL_READ);
assertTrue(!ps.empty());
assertTrue(ps.has(ss1));
assertTrue(ps.has(ss2));
// ss1 must be writable, if polled for // ss1 must be writable, if polled for
ps.update(ss1, PollSet::POLL_READ | PollSet::POLL_WRITE); ps.update(ss1, PollSet::POLL_READ | PollSet::POLL_WRITE);
@ -100,6 +107,9 @@ void PollSetTest::testPoll()
assertTrue (std::string(buffer, n) == "HELLO"); assertTrue (std::string(buffer, n) == "HELLO");
ps.remove(ss2); ps.remove(ss2);
assertTrue(!ps.empty());
assertTrue(ps.has(ss1));
assertTrue(!ps.has(ss2));
ss2.sendBytes("HELLO", 5); ss2.sendBytes("HELLO", 5);
sw.restart(); sw.restart();

View File

@ -124,7 +124,7 @@ namespace
_data += _str.str(); _data += _str.str();
_str.str(""); _str.str("");
if ((_once && _data.size() == 1024) || if ((_once && _data.size() == 1024) ||
(!_once && _data.size() == 4096)) (!_once && _data.size() == 8192))
{ {
_reactor.stop(); _reactor.stop();
delete this; delete this;
@ -135,7 +135,7 @@ namespace
checkReadableObserverCount(1); checkReadableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable)); _reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
checkReadableObserverCount(0); checkReadableObserverCount(0);
if (_once || _data.size() == 4096) _reactor.stop(); if (_once || _data.size() == 8192) _reactor.stop();
delete this; delete this;
} }
} }
@ -433,11 +433,16 @@ void SocketReactorTest::testParallelSocketReactor()
SocketConnector<ClientServiceHandler> connector2(sa, reactor); SocketConnector<ClientServiceHandler> connector2(sa, reactor);
SocketConnector<ClientServiceHandler> connector3(sa, reactor); SocketConnector<ClientServiceHandler> connector3(sa, reactor);
SocketConnector<ClientServiceHandler> connector4(sa, reactor); SocketConnector<ClientServiceHandler> connector4(sa, reactor);
SocketConnector<ClientServiceHandler> connector5(sa, reactor);
SocketConnector<ClientServiceHandler> connector6(sa, reactor);
SocketConnector<ClientServiceHandler> connector7(sa, reactor);
SocketConnector<ClientServiceHandler> connector8(sa, reactor);
ClientServiceHandler::setOnce(false); ClientServiceHandler::setOnce(false);
ClientServiceHandler::resetData(); ClientServiceHandler::resetData();
reactor.run(); reactor.run();
//acceptor.run();
std::string data(ClientServiceHandler::data()); std::string data(ClientServiceHandler::data());
assertTrue (data.size() == 4096); assertTrue (data.size() == 8192);
assertTrue (!ClientServiceHandler::readableError()); assertTrue (!ClientServiceHandler::readableError());
assertTrue (!ClientServiceHandler::writableError()); assertTrue (!ClientServiceHandler::writableError());
assertTrue (!ClientServiceHandler::timeoutError()); assertTrue (!ClientServiceHandler::timeoutError());