Merge remote-tracking branch 'origin/ParallelReactor' into develop

This commit is contained in:
aleks-f 2013-01-05 14:33:43 -06:00
commit 0d9e18ba7c
18 changed files with 403 additions and 4 deletions

View File

@ -803,6 +803,8 @@
Name="Header Files">
<File
RelativePath=".\include\Poco\Net\SocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\SocketConnector.h"/>
<File
@ -811,6 +813,8 @@
RelativePath=".\include\Poco\Net\SocketNotifier.h"/>
<File
RelativePath=".\include\Poco\Net\SocketReactor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketReactor.h"/>
</Filter>
<Filter
Name="Source Files">

View File

@ -273,6 +273,8 @@
<ClInclude Include="include\Poco\Net\Net.h" />
<ClInclude Include="include\Poco\Net\NetException.h" />
<ClInclude Include="include\Poco\Net\NetworkInterface.h" />
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h" />
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h" />
<ClInclude Include="include\Poco\Net\SocketAddress.h" />
<ClInclude Include="include\Poco\Net\SocketDefs.h" />
<ClInclude Include="include\Poco\Net\DatagramSocket.h" />

View File

@ -414,6 +414,12 @@
<ClInclude Include="include\Poco\Net\WebSocketImpl.h">
<Filter>WebSocket\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="src\DNS.cpp">

View File

@ -347,10 +347,12 @@
<ClInclude Include="include\Poco\Net\FTPClientSession.h"/>
<ClInclude Include="include\Poco\Net\FTPStreamFactory.h"/>
<ClInclude Include="include\Poco\Net\SocketAcceptor.h"/>
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h"/>
<ClInclude Include="include\Poco\Net\SocketConnector.h"/>
<ClInclude Include="include\Poco\Net\SocketNotification.h"/>
<ClInclude Include="include\Poco\Net\SocketNotifier.h"/>
<ClInclude Include="include\Poco\Net\SocketReactor.h"/>
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h"/>
<ClInclude Include="include\Poco\Net\MailMessage.h"/>
<ClInclude Include="include\Poco\Net\MailRecipient.h"/>
<ClInclude Include="include\Poco\Net\MailStream.h"/>

View File

@ -351,6 +351,9 @@
<ClInclude Include="include\Poco\Net\SocketAcceptor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\SocketConnector.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
@ -363,6 +366,9 @@
<ClInclude Include="include\Poco\Net\SocketReactor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\MailMessage.h">
<Filter>Mail\Header Files</Filter>
</ClInclude>

View File

@ -735,6 +735,8 @@
Name="Header Files">
<File
RelativePath=".\include\Poco\Net\SocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\SocketConnector.h"/>
<File
@ -743,6 +745,8 @@
RelativePath=".\include\Poco\Net\SocketNotifier.h"/>
<File
RelativePath=".\include\Poco\Net\SocketReactor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketReactor.h"/>
</Filter>
<Filter
Name="Source Files">

View File

@ -756,6 +756,8 @@
Name="Header Files">
<File
RelativePath=".\include\Poco\Net\SocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\SocketConnector.h"/>
<File
@ -764,6 +766,8 @@
RelativePath=".\include\Poco\Net\SocketNotifier.h"/>
<File
RelativePath=".\include\Poco\Net\SocketReactor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketReactor.h"/>
</Filter>
<Filter
Name="Source Files">

View File

@ -755,6 +755,8 @@
Name="Header Files">
<File
RelativePath=".\include\Poco\Net\SocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\SocketConnector.h"/>
<File
@ -763,6 +765,8 @@
RelativePath=".\include\Poco\Net\SocketNotifier.h"/>
<File
RelativePath=".\include\Poco\Net\SocketReactor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketReactor.h"/>
</Filter>
<Filter
Name="Source Files">

View File

@ -339,10 +339,12 @@
<ClInclude Include="include\Poco\Net\FTPClientSession.h"/>
<ClInclude Include="include\Poco\Net\FTPStreamFactory.h"/>
<ClInclude Include="include\Poco\Net\SocketAcceptor.h"/>
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h"/>
<ClInclude Include="include\Poco\Net\SocketConnector.h"/>
<ClInclude Include="include\Poco\Net\SocketNotification.h"/>
<ClInclude Include="include\Poco\Net\SocketNotifier.h"/>
<ClInclude Include="include\Poco\Net\SocketReactor.h"/>
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h"/>
<ClInclude Include="include\Poco\Net\MailMessage.h"/>
<ClInclude Include="include\Poco\Net\MailRecipient.h"/>
<ClInclude Include="include\Poco\Net\MailStream.h"/>

View File

@ -351,6 +351,9 @@
<ClInclude Include="include\Poco\Net\SocketAcceptor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\SocketConnector.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
@ -363,6 +366,9 @@
<ClInclude Include="include\Poco\Net\SocketReactor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\MailMessage.h">
<Filter>Mail\Header Files</Filter>
</ClInclude>

View File

@ -345,10 +345,12 @@
<ClInclude Include="include\Poco\Net\FTPClientSession.h"/>
<ClInclude Include="include\Poco\Net\FTPStreamFactory.h"/>
<ClInclude Include="include\Poco\Net\SocketAcceptor.h"/>
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h"/>
<ClInclude Include="include\Poco\Net\SocketConnector.h"/>
<ClInclude Include="include\Poco\Net\SocketNotification.h"/>
<ClInclude Include="include\Poco\Net\SocketNotifier.h"/>
<ClInclude Include="include\Poco\Net\SocketReactor.h"/>
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h"/>
<ClInclude Include="include\Poco\Net\MailMessage.h"/>
<ClInclude Include="include\Poco\Net\MailRecipient.h"/>
<ClInclude Include="include\Poco\Net\MailStream.h"/>

View File

@ -351,6 +351,9 @@
<ClInclude Include="include\Poco\Net\SocketAcceptor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketAcceptor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\SocketConnector.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
@ -363,6 +366,9 @@
<ClInclude Include="include\Poco\Net\SocketReactor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\ParallelSocketReactor.h">
<Filter>Reactor\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Net\MailMessage.h">
<Filter>Mail\Header Files</Filter>
</ClInclude>

View File

@ -760,6 +760,8 @@
Name="Header Files">
<File
RelativePath=".\include\Poco\Net\SocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketAcceptor.h"/>
<File
RelativePath=".\include\Poco\Net\SocketConnector.h"/>
<File
@ -768,6 +770,8 @@
RelativePath=".\include\Poco\Net\SocketNotifier.h"/>
<File
RelativePath=".\include\Poco\Net\SocketReactor.h"/>
<File
RelativePath=".\include\Poco\Net\ParallelSocketReactor.h"/>
</Filter>
<Filter
Name="Source Files">

View File

@ -0,0 +1,201 @@
//
// ParallelSocketAcceptor.h
//
// $Id: //poco/1.4/Net/include/Poco/Net/ParallelSocketAcceptor.h#1 $
//
// Library: Net
// Package: Reactor
// Module: ParallelSocketAcceptor
//
// Definition of the ParallelSocketAcceptor class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// Permission is hereby granted, free of charge, to any person or organization
// obtaining a copy of the software and accompanying documentation covered by
// this license (the "Software") to use, reproduce, display, distribute,
// execute, and transmit the Software, and to prepare derivative works of the
// Software, and to permit third-parties to whom the Software is furnished to
// do so, all subject to the following:
//
// The copyright notices in the Software and this entire statement, including
// the above license grant, this restriction and the following disclaimer,
// must be included in all copies of the Software, in whole or in part, and
// all derivative works of the Software, unless such copies or derivative
// works are solely in the form of machine-executable object code generated by
// a source language processor.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//
#ifndef Net_ParallelSocketAcceptor_INCLUDED
#define Net_ParallelSocketAcceptor_INCLUDED
#include "Poco/Net/ParallelSocketReactor.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Environment.h"
#include "Poco/NObserver.h"
#include "Poco/SharedPtr.h"
#include <vector>
using Poco::Net::Socket;
using Poco::Net::SocketReactor;
using Poco::Net::ServerSocket;
using Poco::Net::StreamSocket;
using Poco::NObserver;
using Poco::AutoPtr;
namespace Poco {
namespace Net {
template <class ServiceHandler, class SR>
class ParallelSocketAcceptor
/// This class implements the Acceptor part of the Acceptor-Connector design pattern.
/// Only the difference from single-threaded version is documented here, For full
/// description see Poco::Net::SocketAcceptor documentation.
///
/// This is a multi-threaded version of SocketAcceptor, it differs from the
/// single-threaded version in number of reactors (defaulting to number of processors)
/// that can be specified at construction time and is rotated in a round-robin fashion
/// by event handler. See ParallelSocketAcceptor::onAccept and
/// ParallelSocketAcceptor::createServiceHandler documentation and implementation for
/// details.
{
public:
typedef Poco::Net::ParallelSocketReactor<SR> ParallelReactor;
explicit ParallelSocketAcceptor(ServerSocket& socket,
unsigned threads = Poco::Environment::processorCount()):
_socket(socket),
_pReactor(0),
_threads(threads),
_next(0)
/// Creates a ParallelSocketAcceptor using the given ServerSocket,
/// sets number of threads and populates the reactors vector.
{
init();
}
ParallelSocketAcceptor(ServerSocket& socket,
SocketReactor& reactor,
unsigned threads = Poco::Environment::processorCount()):
_socket(socket),
_pReactor(0),
_threads(threads),
_next(0)
/// Creates a ParallelSocketAcceptor using the given ServerSocket, sets the
/// number of threads, populates the reactors vector and registers itself
/// with the given SocketReactor.
{
init();
registerAcceptor(reactor);
}
~ParallelSocketAcceptor()
/// Destroys the ParallelSocketAcceptor.
{
unregisterAcceptor();
}
void registerAcceptor(SocketReactor& reactor)
/// Registers the ParallelSocketAcceptor with a SocketReactor.
///
/// A subclass can override this and, for example, also register
/// an event handler for a timeout event.
///
/// The overriding method must call the baseclass implementation first.
{
_pReactor = &reactor;
_pReactor->addEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
}
void unregisterAcceptor()
/// Unregisters the ParallelSocketAcceptor.
///
/// A subclass can override this and, for example, also unregister
/// its event handler for a timeout event.
///
/// The overriding method must call the baseclass implementation first.
{
_pReactor->removeEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
}
void onAccept(ReadableNotification* pNotification)
/// Accepts connection and creates event handler.
{
pNotification->release();
StreamSocket sock = _socket.acceptConnection();
createServiceHandler(sock);
}
protected:
ServiceHandler* createServiceHandler(StreamSocket& socket)
/// Create and initialize a new ServiceHandler instance.
///
/// Subclasses can override this method.
{
std::size_t next = _next++;
if (_next == _reactors.size()) _next = 0;
return new ServiceHandler(socket, *_reactors[next]);
}
SocketReactor* reactor()
/// Returns a pointer to the SocketReactor where
/// this SocketAcceptor is registered.
///
/// The pointer may be null.
{
return _pReactor;
}
Socket& socket()
/// Returns a reference to the SocketAcceptor's socket.
{
return _socket;
}
void init()
/// Populates the reactors vector.
{
poco_assert (_threads > 0);
for (unsigned i = 0; i < _threads; ++i)
_reactors.push_back(new ParallelReactor);
}
private:
typedef std::vector<typename ParallelReactor::Ptr> ReactorVec;
ParallelSocketAcceptor();
ParallelSocketAcceptor(const ParallelSocketAcceptor&);
ParallelSocketAcceptor& operator = (const ParallelSocketAcceptor&);
ServerSocket _socket;
SocketReactor* _pReactor;
unsigned _threads;
ReactorVec _reactors;
std::size_t _next;
};
} } // namespace Poco::Net
#endif // Net_ParallelSocketAcceptor_INCLUDED

View File

@ -0,0 +1,105 @@
//
// ParallelSocketReactor.h
//
// $Id: //poco/1.4/Net/include/Poco/Net/ParallelSocketReactor.h#1 $
//
// Library: Net
// Package: Reactor
// Module: ParallelSocketReactor
//
// Definition of the ParallelSocketReactor class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// Permission is hereby granted, free of charge, to any person or organization
// obtaining a copy of the software and accompanying documentation covered by
// this license (the "Software") to use, reproduce, display, distribute,
// execute, and transmit the Software, and to prepare derivative works of the
// Software, and to permit third-parties to whom the Software is furnished to
// do so, all subject to the following:
//
// The copyright notices in the Software and this entire statement, including
// the above license grant, this restriction and the following disclaimer,
// must be included in all copies of the Software, in whole or in part, and
// all derivative works of the Software, unless such copies or derivative
// works are solely in the form of machine-executable object code generated by
// a source language processor.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//
#ifndef Net_ParallelSocketReactor_INCLUDED
#define Net_ParallelSocketReactor_INCLUDED
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/NObserver.h"
#include "Poco/Thread.h"
#include "Poco/SharedPtr.h"
using Poco::Net::Socket;
using Poco::Net::SocketReactor;
using Poco::Net::ReadableNotification;
using Poco::Net::ShutdownNotification;
using Poco::Net::ServerSocket;
using Poco::Net::StreamSocket;
using Poco::NObserver;
using Poco::AutoPtr;
using Poco::Thread;
namespace Poco {
namespace Net {
template <class SR>
class ParallelSocketReactor: public SR
{
public:
typedef Poco::SharedPtr<ParallelSocketReactor> Ptr;
ParallelSocketReactor()
{
_thread.start(*this);
}
ParallelSocketReactor(const Poco::Timespan& timeout):
SR(timeout)
{
_thread.start(*this);
}
~ParallelSocketReactor()
{
this->stop();
_thread.join();
}
protected:
void onIdle()
{
SR::onIdle();
Poco::Thread::yield();
}
private:
Poco::Thread _thread;
};
} } // namespace Poco::Net
#endif // Net_ParallelSocketReactor_INCLUDED

View File

@ -92,14 +92,14 @@ public:
explicit SocketAcceptor(ServerSocket& socket):
_socket(socket),
_pReactor(0)
/// Creates an SocketAcceptor, using the given ServerSocket.
/// Creates a SocketAcceptor, using the given ServerSocket.
{
}
SocketAcceptor(ServerSocket& socket, SocketReactor& reactor):
_socket(socket),
_pReactor(0)
/// Creates an SocketAcceptor, using the given ServerSocket.
/// Creates a SocketAcceptor, using the given ServerSocket.
/// The SocketAcceptor registers itself with the given SocketReactor.
{
registerAcceptor(reactor);
@ -138,6 +138,7 @@ public:
}
void onAccept(ReadableNotification* pNotification)
/// Accepts connection and creates event handler.
{
pNotification->release();
StreamSocket sock = _socket.acceptConnection();

View File

@ -37,6 +37,7 @@
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/SocketConnector.h"
#include "Poco/Net/SocketAcceptor.h"
#include "Poco/Net/ParallelSocketAcceptor.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Net/SocketAddress.h"
@ -48,6 +49,7 @@
using Poco::Net::SocketReactor;
using Poco::Net::SocketConnector;
using Poco::Net::SocketAcceptor;
using Poco::Net::ParallelSocketAcceptor;
using Poco::Net::StreamSocket;
using Poco::Net::ServerSocket;
using Poco::Net::SocketAddress;
@ -142,8 +144,8 @@ namespace
checkReadableObserverCount(1);
_reactor.removeEventHandler(_socket, Observer<ClientServiceHandler, ReadableNotification>(*this, &ClientServiceHandler::onReadable));
checkReadableObserverCount(0);
_reactor.stop();
_data = _str.str();
if (_once || _data.size() >= 3072) _reactor.stop();
_data += _str.str();
delete this;
}
}
@ -175,6 +177,11 @@ namespace
return _data;
}
static void resetData()
{
_data.clear();
}
static bool timeout()
{
return _timeout;
@ -205,6 +212,11 @@ namespace
return _timeoutError;
}
static void setOnce(bool once = true)
{
_once = once;
}
private:
void checkReadableObserverCount(std::size_t oro)
{
@ -245,6 +257,7 @@ namespace
static bool _timeoutError;
static bool _timeout;
static bool _closeOnTimeout;
static bool _once;
};
@ -254,6 +267,7 @@ namespace
bool ClientServiceHandler::_timeoutError = false;
bool ClientServiceHandler::_timeout = false;
bool ClientServiceHandler::_closeOnTimeout = false;
bool ClientServiceHandler::_once = false;
class FailConnector: public SocketConnector<ClientServiceHandler>
@ -322,6 +336,8 @@ void SocketReactorTest::testSocketReactor()
SocketAcceptor<EchoServiceHandler> acceptor(ss, reactor);
SocketAddress sa("localhost", ss.address().port());
SocketConnector<ClientServiceHandler> connector(sa, reactor);
ClientServiceHandler::setOnce(true);
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assert (data.size() == 1024);
@ -331,6 +347,28 @@ void SocketReactorTest::testSocketReactor()
}
void SocketReactorTest::testParallelSocketReactor()
{
SocketAddress ssa;
ServerSocket ss(ssa);
SocketReactor reactor;
ParallelSocketAcceptor<EchoServiceHandler, SocketReactor> acceptor(ss, reactor);
SocketAddress sa("localhost", ss.address().port());
SocketConnector<ClientServiceHandler> connector1(sa, reactor);
SocketConnector<ClientServiceHandler> connector2(sa, reactor);
SocketConnector<ClientServiceHandler> connector3(sa, reactor);
SocketConnector<ClientServiceHandler> connector4(sa, reactor);
ClientServiceHandler::setOnce(false);
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assert (data.size() == 4096);
assert (!ClientServiceHandler::readableError());
assert (!ClientServiceHandler::writableError());
assert (!ClientServiceHandler::timeoutError());
}
void SocketReactorTest::testSocketConnectorFail()
{
SocketReactor reactor;
@ -375,6 +413,7 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest");
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);

View File

@ -47,6 +47,7 @@ public:
~SocketReactorTest();
void testSocketReactor();
void testParallelSocketReactor();
void testSocketConnectorFail();
void testSocketConnectorTimeout();