(Parallel)SocketAcceptor ctor/dtor call virtual functions #608

This commit is contained in:
Alex Fabijanic 2014-11-19 22:25:28 -06:00
parent 1919322d00
commit fbb18d1aa4
4 changed files with 97 additions and 26 deletions

View File

@ -73,7 +73,7 @@ public:
SocketReactor& reactor, SocketReactor& reactor,
unsigned threads = Poco::Environment::processorCount()): unsigned threads = Poco::Environment::processorCount()):
_socket(socket), _socket(socket),
_pReactor(0), _pReactor(&reactor),
_threads(threads), _threads(threads),
_next(0) _next(0)
/// Creates a ParallelSocketAcceptor using the given ServerSocket, sets the /// Creates a ParallelSocketAcceptor using the given ServerSocket, sets the
@ -81,7 +81,9 @@ public:
/// with the given SocketReactor. /// with the given SocketReactor.
{ {
init(); init();
registerAcceptor(reactor); _pReactor->addEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
} }
virtual ~ParallelSocketAcceptor() virtual ~ParallelSocketAcceptor()
@ -89,22 +91,42 @@ public:
{ {
try try
{ {
unregisterAcceptor(); if (_pReactor)
{
_pReactor->removeEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
}
} }
catch (...) catch (...)
{ {
poco_unexpected(); poco_unexpected();
} }
} }
void setReactor(SocketReactor& reactor)
/// Sets the reactor for this acceptor.
{
_pReactor = &reactor;
if (!_pReactor->hasEventHandler(_socket, Poco::Observer<SocketAcceptor,
ReadableNotification>(*this, &SocketAcceptor::onAccept)))
{
registerAcceptor(reactor);
}
}
virtual void registerAcceptor(SocketReactor& reactor) virtual void registerAcceptor(SocketReactor& reactor)
/// Registers the ParallelSocketAcceptor with a SocketReactor. /// Registers the ParallelSocketAcceptor with a SocketReactor.
/// ///
/// A subclass can override this and, for example, also register /// A subclass can override this function to e.g.
/// an event handler for a timeout event. /// register an event handler for timeout event.
/// ///
/// The overriding method must call the baseclass implementation first. /// The overriding method must either call the base class
/// implementation or register the accept handler on its own.
{ {
if (_pReactor)
throw Poco::InvalidAccessException("Acceptor already registered.");
_pReactor = &reactor; _pReactor = &reactor;
_pReactor->addEventHandler(_socket, _pReactor->addEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor, Poco::Observer<ParallelSocketAcceptor,
@ -114,14 +136,18 @@ public:
virtual void unregisterAcceptor() virtual void unregisterAcceptor()
/// Unregisters the ParallelSocketAcceptor. /// Unregisters the ParallelSocketAcceptor.
/// ///
/// A subclass can override this and, for example, also unregister /// A subclass can override this function to e.g.
/// its event handler for a timeout event. /// unregister its event handler for a timeout event.
/// ///
/// The overriding method must call the baseclass implementation first. /// The overriding method must either call the base class
/// implementation or unregister the accept handler on its own.
{ {
_pReactor->removeEventHandler(_socket, if (_pReactor)
Poco::Observer<ParallelSocketAcceptor, {
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept)); _pReactor->removeEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
}
} }
void onAccept(ReadableNotification* pNotification) void onAccept(ReadableNotification* pNotification)

View File

@ -78,11 +78,12 @@ public:
SocketAcceptor(ServerSocket& socket, SocketReactor& reactor): SocketAcceptor(ServerSocket& socket, SocketReactor& reactor):
_socket(socket), _socket(socket),
_pReactor(0) _pReactor(&reactor)
/// Creates a SocketAcceptor, using the given ServerSocket. /// Creates a SocketAcceptor, using the given ServerSocket.
/// The SocketAcceptor registers itself with the given SocketReactor. /// The SocketAcceptor registers itself with the given SocketReactor.
{ {
registerAcceptor(reactor); _pReactor->addEventHandler(_socket, Poco::Observer<SocketAcceptor,
ReadableNotification>(*this, &SocketAcceptor::onAccept));
} }
virtual ~SocketAcceptor() virtual ~SocketAcceptor()
@ -90,22 +91,43 @@ public:
{ {
try try
{ {
unregisterAcceptor(); if (_pReactor)
{
_pReactor->removeEventHandler(_socket, Poco::Observer<SocketAcceptor,
ReadableNotification>(*this, &SocketAcceptor::onAccept));
}
} }
catch (...) catch (...)
{ {
poco_unexpected(); poco_unexpected();
} }
} }
void setReactor(SocketReactor& reactor)
/// Sets the reactor for this acceptor.
{
_pReactor = &reactor;
if (!_pReactor->hasEventHandler(_socket, Poco::Observer<SocketAcceptor,
ReadableNotification>(*this, &SocketAcceptor::onAccept)))
{
registerAcceptor(reactor);
}
}
virtual void registerAcceptor(SocketReactor& reactor) virtual void registerAcceptor(SocketReactor& reactor)
/// Registers the SocketAcceptor with a SocketReactor. /// Registers the SocketAcceptor with a SocketReactor.
/// ///
/// A subclass can override this and, for example, also register /// A subclass can override this function to e.g.
/// an event handler for a timeout event. /// register an event handler for timeout event.
/// ///
/// The overriding method must call the baseclass implementation first. /// If acceptor was constructed without providing reactor to it,
/// the override of this method must either call the base class
/// implementation or directly register the accept handler with
/// the reactor.
{ {
if (_pReactor)
throw Poco::InvalidAccessException("Acceptor already registered.");
_pReactor = &reactor; _pReactor = &reactor;
_pReactor->addEventHandler(_socket, Poco::Observer<SocketAcceptor, ReadableNotification>(*this, &SocketAcceptor::onAccept)); _pReactor->addEventHandler(_socket, Poco::Observer<SocketAcceptor, ReadableNotification>(*this, &SocketAcceptor::onAccept));
} }
@ -113,10 +135,12 @@ public:
virtual void unregisterAcceptor() virtual void unregisterAcceptor()
/// Unregisters the SocketAcceptor. /// Unregisters the SocketAcceptor.
/// ///
/// A subclass can override this and, for example, also unregister /// A subclass can override this function to e.g.
/// its event handler for a timeout event. /// unregister its event handler for a timeout event.
/// ///
/// The overriding method must call the baseclass implementation first. /// If the accept handler was registered with the reactor,
/// the overriding method must either call the base class
/// implementation or directly unregister the accept handler.
{ {
if (_pReactor) if (_pReactor)
{ {

View File

@ -327,6 +327,26 @@ void SocketReactorTest::testSocketReactor()
} }
void SocketReactorTest::testSetSocketReactor()
{
SocketAddress ssa;
ServerSocket ss(ssa);
SocketReactor reactor;
SocketAcceptor<EchoServiceHandler> acceptor(ss);
acceptor.setReactor(reactor);
SocketAddress sa("localhost", ss.address().port());
SocketConnector<ClientServiceHandler> connector(sa, reactor);
ClientServiceHandler::setOnce(true);
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assert(data.size() == 1024);
assert(!ClientServiceHandler::readableError());
assert(!ClientServiceHandler::writableError());
assert(!ClientServiceHandler::timeoutError());
}
void SocketReactorTest::testParallelSocketReactor() void SocketReactorTest::testParallelSocketReactor()
{ {
SocketAddress ssa; SocketAddress ssa;

View File

@ -27,6 +27,7 @@ public:
~SocketReactorTest(); ~SocketReactorTest();
void testSocketReactor(); void testSocketReactor();
void testSetSocketReactor();
void testParallelSocketReactor(); void testParallelSocketReactor();
void testSocketConnectorFail(); void testSocketConnectorFail();
void testSocketConnectorTimeout(); void testSocketConnectorTimeout();