added ParallelSocketAcceptor and Reactor w/ tests

ParallelSocketAcceptor and Reactor provide parallel (threaded) data
handling
This commit is contained in:
aleks-f
2013-01-03 23:41:38 -06:00
parent 7094df540f
commit f0cefd2976
18 changed files with 403 additions and 4 deletions

View File

@@ -37,6 +37,7 @@
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/SocketConnector.h"
#include "Poco/Net/SocketAcceptor.h"
#include "Poco/Net/ParallelSocketAcceptor.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h"
@@ -48,6 +49,7 @@
using Poco::Net::SocketReactor;
using Poco::Net::SocketConnector;
using Poco::Net::SocketAcceptor;
using Poco::Net::ParallelSocketAcceptor;
using Poco::Net::StreamSocket;
using Poco::Net::ServerSocket;
using Poco::Net::SocketAddress;
@@ -142,8 +144,8 @@ namespace
checkReadableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
checkReadableObserverCount(0);
_reactor.stop();
_data = _str.str();
if (_once || _data.size() >= 3072) _reactor.stop();
_data += _str.str();
delete this;
}
}
@@ -175,6 +177,11 @@ namespace
return _data;
}
static void resetData()
{
_data.clear();
}
static bool timeout()
{
return _timeout;
@@ -205,6 +212,11 @@ namespace
return _timeoutError;
}
static void setOnce(bool once = true)
{
_once = once;
}
private:
void checkReadableObserverCount(std::size_t oro)
{
@@ -245,6 +257,7 @@ namespace
static bool _timeoutError;
static bool _timeout;
static bool _closeOnTimeout;
static bool _once;
};
@@ -254,6 +267,7 @@ namespace
bool ClientServiceHandler::_timeoutError = false;
bool ClientServiceHandler::_timeout = false;
bool ClientServiceHandler::_closeOnTimeout = false;
bool ClientServiceHandler::_once = false;
class FailConnector: public SocketConnector<ClientServiceHandler>
@@ -322,6 +336,8 @@ void SocketReactorTest::testSocketReactor()
SocketAcceptor<EchoServiceHandler> acceptor(ss, reactor);
SocketAddress sa("localhost", ss.address().port());
SocketConnector<ClientServiceHandler> connector(sa, reactor);
ClientServiceHandler::setOnce(true);
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assert (data.size() == 1024);
@@ -331,6 +347,28 @@ void SocketReactorTest::testSocketReactor()
}
void SocketReactorTest::testParallelSocketReactor()
{
SocketAddress ssa;
ServerSocket ss(ssa);
SocketReactor reactor;
ParallelSocketAcceptor<EchoServiceHandler, SocketReactor> acceptor(ss, reactor);
SocketAddress sa("localhost", ss.address().port());
SocketConnector<ClientServiceHandler> connector1(sa, reactor);
SocketConnector<ClientServiceHandler> connector2(sa, reactor);
SocketConnector<ClientServiceHandler> connector3(sa, reactor);
SocketConnector<ClientServiceHandler> connector4(sa, reactor);
ClientServiceHandler::setOnce(false);
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assert (data.size() == 4096);
assert (!ClientServiceHandler::readableError());
assert (!ClientServiceHandler::writableError());
assert (!ClientServiceHandler::timeoutError());
}
void SocketReactorTest::testSocketConnectorFail()
{
SocketReactor reactor;
@@ -375,6 +413,7 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest");
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);

View File

@@ -47,6 +47,7 @@ public:
~SocketReactorTest();
void testSocketReactor();
void testParallelSocketReactor();
void testSocketConnectorFail();
void testSocketConnectorTimeout();