revert(SocketReactor): back to devel branch

This commit is contained in:
Alex Fabijanic 2021-07-21 13:47:48 +02:00
parent d47db5aecc
commit ac4f7fa8f7
4 changed files with 88 additions and 592 deletions

View File

@ -23,15 +23,10 @@
#include "Poco/Net/PollSet.h"
#include "Poco/Runnable.h"
#include "Poco/Timespan.h"
#include "Poco/Timestamp.h"
#include "Poco/Observer.h"
#include "Poco/AutoPtr.h"
#include "Poco/Mutex.h"
#include <map>
#include <atomic>
#include <functional>
#include <deque>
#include <utility>
namespace Poco {
@ -119,10 +114,6 @@ class Net_API SocketReactor: public Poco::Runnable
/// from event handlers.
{
public:
typedef std::function<void()> CompletionHandler;
static const Timestamp::TimeDiff PERMANENT_COMPLETION_HANDLER;
SocketReactor();
/// Creates the SocketReactor.
@ -132,54 +123,6 @@ public:
virtual ~SocketReactor();
/// Destroys the SocketReactor.
void addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER);
/// Adds a completion handler to the list of handlers
/// to be called after the next poll() completion.
/// Handler will be called until the specified expiration,
/// which defaults to immediately, ie. expiration after the
/// first invocation.
void addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms = PERMANENT_COMPLETION_HANDLER, int pos = -1);
/// Adds a completion handler to the list of handlers
/// to be called after the next poll() completion.
/// Handler will be called until the specified expiration,
/// which defaults to immediately, ie. expiration after the
/// first invocation.
void removeCompletionHandlers();
/// Removes all completion handlers.
int scheduledCompletionHandlers();
/// Returns the number of scheduled completion handlers.
int removeScheduledCompletionHandlers(int count = -1);
/// Removes the count scheduled completion handlers
/// from the front of the schedule queue.
/// Default is removal of all scheduled handlers.
int permanentCompletionHandlers();
/// Returns the number of scheduled completion handlers.
int removePermanentCompletionHandlers(int count = -1);
/// Removes the count permanent completion handlers
/// from the front of the schedule queue.
/// Default is removal of all scheduled handlers.
int poll(int* pHandled = 0);
/// Polls all registered sockets and calls their respective handlers.
/// If there are no handlers, an idle notification is dispatched.
/// If there are no readable sockets, a timeout notification is dispatched.
/// If pHandled is not null, after the call it contains the total number
/// of read/write/error socket handlers called.
/// Returns the number of completion handlers invoked.
int runOne();
/// Runs one handler, scheduled or permanent.
/// If there are no available handlers, it blocks
/// until the first handler is encountered and executed.
/// Returns 1 on successful handler invocation, 0 on
/// exception.
void run();
/// Runs the SocketReactor. The reactor will run
/// until stop() is called (in a separate thread).
@ -256,14 +199,6 @@ protected:
/// Can be overridden by subclasses to perform additional
/// periodic tasks. The default implementation does nothing.
int onComplete(bool handleOne = false, bool expiredOnly = false);
/// Calls completion handler(s) (after poll() completes processing
/// or on runOne() invocation). If handleOne is true, returns
/// after first handler invocation.
/// Scheduled completion handlers are deleted after
/// invocation.
/// Returns number of handlers invoked.
void dispatch(const Socket& socket, SocketNotification* pNotification);
/// Dispatches the given notification to all observers
/// registered for the given socket.
@ -275,45 +210,12 @@ private:
typedef Poco::AutoPtr<SocketNotifier> NotifierPtr;
typedef Poco::AutoPtr<SocketNotification> NotificationPtr;
typedef std::map<poco_socket_t, NotifierPtr> EventHandlerMap;
typedef Poco::FastMutex FastMutexType;
typedef FastMutexType::ScopedLock FastScopedLock;
#ifdef POCO_HAVE_STD_ATOMICS
typedef Poco::SpinlockMutex SpinMutexType;
typedef SpinMutexType::ScopedLock SpinScopedLock;
#else
typedef Poco::FastMutex SpinMutexType;
typedef SpinMutexType::ScopedLock SpinScopedLock;
#endif // POCO_HAVE_STD_ATOMICS
typedef std::pair<CompletionHandler, Poco::Timestamp> CompletionHandlerEntry;
typedef std::deque<CompletionHandlerEntry> HandlerList;
typedef Poco::FastMutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
bool hasSocketHandlers();
void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification);
NotifierPtr getNotifier(const Socket& socket, bool makeNew = false);
bool isPermanent(const Poco::Timestamp& entry) const;
template <typename F>
int removeCompletionHandlers(F isType, int count)
/// Removes `count` completion handlers of the specified
/// type; if count is -1, removes all completion handlers
/// of the specified type.
{
int removed = 0;
SpinScopedLock lock(_completionMutex);
int left = count > -1 ? count : static_cast<int>(_complHandlers.size());
HandlerList::iterator it = _complHandlers.begin();
while (left && it != _complHandlers.end())
{
if (isType(it->second))
{
++removed;
it = _complHandlers.erase((it));
--left;
}
else ++it;
}
return removed;
}
enum
{
@ -330,9 +232,7 @@ private:
NotificationPtr _pTimeoutNotification;
NotificationPtr _pIdleNotification;
NotificationPtr _pShutdownNotification;
HandlerList _complHandlers;
FastMutexType _ioMutex;
SpinMutexType _completionMutex;
MutexType _mutex;
Poco::Thread* _pThread;
friend class SocketNotifier;

View File

@ -18,11 +18,6 @@
#include "Poco/ErrorHandler.h"
#include "Poco/Thread.h"
#include "Poco/Exception.h"
#include <memory>
#ifdef POCO_OS_FAMILY_WINDOWS
#undef max
#endif
#include <limits>
using Poco::Exception;
@ -33,10 +28,6 @@ namespace Poco {
namespace Net {
const Timestamp::TimeDiff SocketReactor::PERMANENT_COMPLETION_HANDLER =
std::numeric_limits<Timestamp::TimeDiff>::max();
SocketReactor::SocketReactor():
_stop(false),
_timeout(DEFAULT_TIMEOUT),
@ -70,11 +61,18 @@ SocketReactor::~SocketReactor()
}
int SocketReactor::poll(int* pHandled)
void SocketReactor::run()
{
int handled = 0;
int completed = 0;
if (!hasSocketHandlers()) onIdle();
_pThread = Thread::current();
while (!_stop)
{
try
{
if (!hasSocketHandlers())
{
onIdle();
Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds()));
}
else
{
bool readable = false;
@ -90,106 +88,13 @@ int SocketReactor::poll(int* pHandled)
{
dispatch(it->first, _pReadableNotification);
readable = true;
++handled;
}
if (it->second & PollSet::POLL_WRITE)
{
dispatch(it->first, _pWritableNotification);
++handled;
}
if (it->second & PollSet::POLL_ERROR)
{
dispatch(it->first, _pErrorNotification);
++handled;
}
if (it->second & PollSet::POLL_WRITE) dispatch(it->first, _pWritableNotification);
if (it->second & PollSet::POLL_ERROR) dispatch(it->first, _pErrorNotification);
}
}
if (!readable) onTimeout();
}
if (pHandled) *pHandled = handled;
if (hasSocketHandlers())
{
if (handled) completed = onComplete();
}
else completed = onComplete(false, true);
return completed;
}
int SocketReactor::onComplete(bool handleOne, bool expiredOnly)
{
std::unique_ptr<CompletionHandler> pCH;
int handled = 0;
{
HandlerList::iterator it = _complHandlers.begin();
while (it != _complHandlers.end())
{
std::size_t prevSize = 0;
// A completion handler may add new
// completion handler(s), so the mutex must
// be unlocked before the invocation.
{
SpinScopedLock lock(_completionMutex);
bool alwaysRun = isPermanent(it->second) && !expiredOnly;
bool isExpired = !alwaysRun && (Timestamp() > it->second);
if (isExpired)
{
pCH.reset(new CompletionHandler(std::move(it->first)));
it = _complHandlers.erase(it);
}
else if (alwaysRun)
{
pCH.reset(new CompletionHandler(it->first));
++it;
}
else ++it;
prevSize = _complHandlers.size();
}
if (pCH)
{
(*pCH)();
pCH.reset();
++handled;
if (handleOne) break;
}
// handler call may add or remove handlers;
// if so, we must start from the beginning
{
SpinScopedLock lock(_completionMutex);
if (prevSize != _complHandlers.size())
it = _complHandlers.begin();
}
}
}
return handled;
}
int SocketReactor::runOne()
{
try
{
while (0 == onComplete(true));
return 1;
}
catch(...) {}
return 0;
}
void SocketReactor::run()
{
_pThread = Thread::current();
while (!_stop)
{
try
{
if (!poll())
{
if (!hasSocketHandlers())
Thread::trySleep(static_cast<long>(_timeout.totalMilliseconds()));
}
}
catch (Exception& exc)
{
@ -212,7 +117,7 @@ bool SocketReactor::hasSocketHandlers()
{
if (!_pollSet.empty())
{
FastScopedLock lock(_ioMutex);
ScopedLock lock(_mutex);
for (auto& p: _handlers)
{
if (p.second->accepts(_pReadableNotification) ||
@ -249,92 +154,11 @@ const Poco::Timespan& SocketReactor::getTimeout() const
}
void SocketReactor::addCompletionHandler(const CompletionHandler& ch, Timestamp::TimeDiff ms)
{
addCompletionHandler(CompletionHandler(ch), ms);
}
void SocketReactor::addCompletionHandler(CompletionHandler&& ch, Timestamp::TimeDiff ms, int pos)
{
Poco::Timestamp expires = (ms != PERMANENT_COMPLETION_HANDLER) ? Timestamp() + ms*1000 : Timestamp(PERMANENT_COMPLETION_HANDLER);
if (pos == -1)
{
SpinScopedLock lock(_completionMutex);
_complHandlers.push_back({std::move(ch), expires});
}
else
{
if (pos < 0) throw Poco::InvalidArgumentException("SocketReactor::addCompletionHandler()");
SpinScopedLock lock(_completionMutex);
_complHandlers.insert(_complHandlers.begin() + pos, {std::move(ch), expires});
}
}
void SocketReactor::removeCompletionHandlers()
{
SpinScopedLock lock(_completionMutex);
_complHandlers.clear();
}
int SocketReactor::scheduledCompletionHandlers()
{
int cnt = 0;
SpinScopedLock lock(_completionMutex);
HandlerList::iterator it = _complHandlers.begin();
for (; it != _complHandlers.end(); ++it)
{
if (!isPermanent(it->second)/*it->second != Timestamp(0)*/) ++cnt;
}
return cnt;
}
int SocketReactor::removeScheduledCompletionHandlers(int count)
{
auto isScheduled = [this](const Timestamp& ts) { return !isPermanent(ts); };
return removeCompletionHandlers(isScheduled, count);
}
int SocketReactor::permanentCompletionHandlers()
{
int cnt = 0;
SpinScopedLock lock(_completionMutex);
HandlerList::iterator it = _complHandlers.begin();
for (; it != _complHandlers.end(); ++it)
{
if (isPermanent(it->second))
++cnt;
}
return cnt;
}
int SocketReactor::removePermanentCompletionHandlers(int count)
{
auto perm = [this](const Timestamp& ts) { return isPermanent(ts); };
return removeCompletionHandlers(perm, count);
}
bool SocketReactor::isPermanent(const Timestamp& entry) const
{
return entry == Timestamp(PERMANENT_COMPLETION_HANDLER);
}
void SocketReactor::addEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
{
NotifierPtr pNotifier = getNotifier(socket, true);
if (!pNotifier->hasObserver(observer))
{
pNotifier->addObserver(this, observer);
}
if (!pNotifier->hasObserver(observer)) pNotifier->addObserver(this, observer);
int mode = 0;
if (pNotifier->accepts(_pReadableNotification)) mode |= PollSet::POLL_READ;
@ -358,7 +182,7 @@ SocketReactor::NotifierPtr SocketReactor::getNotifier(const Socket& socket, bool
const SocketImpl* pImpl = socket.impl();
if (pImpl == nullptr) return 0;
poco_socket_t sockfd = pImpl->sockfd();
FastScopedLock lock(_ioMutex);
ScopedLock lock(_mutex);
EventHandlerMap::iterator it = _handlers.find(sockfd);
if (it != _handlers.end()) return it->second;
@ -371,14 +195,14 @@ SocketReactor::NotifierPtr SocketReactor::getNotifier(const Socket& socket, bool
void SocketReactor::removeEventHandler(const Socket& socket, const Poco::AbstractObserver& observer)
{
const SocketImpl* pImpl = socket.impl();
if (pImpl == nullptr) { return; }
if (pImpl == nullptr) return;
NotifierPtr pNotifier = getNotifier(socket);
if (pNotifier && pNotifier->hasObserver(observer))
{
if(pNotifier->countObservers() == 1)
{
{
FastScopedLock lock(_ioMutex);
ScopedLock lock(_mutex);
_handlers.erase(pImpl->sockfd());
}
_pollSet.remove(socket);
@ -429,7 +253,7 @@ void SocketReactor::dispatch(SocketNotification* pNotification)
{
std::vector<NotifierPtr> delegates;
{
FastScopedLock lock(_ioMutex);
ScopedLock lock(_mutex);
delegates.reserve(_handlers.size());
for (EventHandlerMap::iterator it = _handlers.begin(); it != _handlers.end(); ++it)
delegates.push_back(it->second);
@ -445,7 +269,6 @@ void SocketReactor::dispatch(NotifierPtr& pNotifier, SocketNotification* pNotifi
{
try
{
Socket s = pNotification->socket();
pNotifier->dispatch(pNotification);
}
catch (Exception& exc)

View File

@ -9,27 +9,27 @@
#include "SocketReactorTest.h"
#include "EchoServer.h"
#include "UDPEchoServer.h"
#include "CppUnit/TestCaller.h"
#include "CppUnit/TestSuite.h"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketNotification.h"
#include "Poco/Net/SocketConnector.h"
#include "Poco/Net/SocketAcceptor.h"
#include "Poco/Net/ParallelSocketAcceptor.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/ServerSocket.h"
#include "Poco/Timestamp.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Observer.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include <sstream>
#include <iostream>
using Poco::Net::SocketReactor;
using Poco::Net::SocketConnector;
using Poco::Net::SocketAcceptor;
using Poco::Net::ParallelSocketAcceptor;
using Poco::Net::StreamSocket;
using Poco::Net::DatagramSocket;
using Poco::Net::ServerSocket;
using Poco::Net::SocketAddress;
using Poco::Net::SocketNotification;
@ -40,7 +40,6 @@ using Poco::Net::ShutdownNotification;
using Poco::Observer;
using Poco::IllegalStateException;
using Poco::Thread;
using Poco::Timestamp;
namespace
@ -376,49 +375,6 @@ namespace
};
DataServiceHandler::Data DataServiceHandler::_data;
class CompletionHandlerTestObject
{
public:
CompletionHandlerTestObject() = delete;
CompletionHandlerTestObject(SocketReactor& reactor, Timestamp::TimeDiff ms = SocketReactor::PERMANENT_COMPLETION_HANDLER):
_reactor(reactor),
_counter(0)
{
auto handler = [this] ()
{
++_counter;
};
_reactor.addCompletionHandler(handler, ms);
_reactor.addCompletionHandler(std::move(handler), ms);
}
void addRecursiveCompletionHandler(int count)
{
_count = count;
if (!_handler)
{
_handler = [&] ()
{
if (_counter++ < _count)
_reactor.addCompletionHandler(_handler);
};
}
_reactor.addCompletionHandler(_handler);
}
int counter()
{
return _counter;
}
private:
SocketReactor& _reactor;
int _counter;
int _count;
std::function<void()> _handler = nullptr;
};
}
@ -451,13 +407,6 @@ void SocketReactorTest::testSocketReactor()
}
void SocketReactorTest::testSocketReactorPoll()
{
testIOHandler<EchoServer, StreamSocket>();
testIOHandler<UDPEchoServer, DatagramSocket>();
}
void SocketReactorTest::testSetSocketReactor()
{
SocketAddress ssa;
@ -621,50 +570,6 @@ void SocketReactorTest::testDataCollection()
}
void SocketReactorTest::testCompletionHandler()
{/*
SocketReactor reactor;
CompletionHandlerTestObject ch(reactor);
assert (reactor.permanentCompletionHandlers() == 2);
assert (reactor.scheduledCompletionHandlers() == 0);
assertTrue(ch.counter() == 0);
assertTrue(reactor.poll() == 2);
assertTrue(ch.counter() == 2);
ch.addRecursiveCompletionHandler(5);
assertTrue (reactor.permanentCompletionHandlers() == 3);
assertTrue (reactor.scheduledCompletionHandlers() == 0);
assertTrue(reactor.poll() == 7);
assertTrue(ch.counter() == 9);
assertTrue (reactor.permanentCompletionHandlers() == 4);
assertTrue (reactor.scheduledCompletionHandlers() == 0);
reactor.removePermanentCompletionHandlers();
assertTrue (reactor.poll() == 0);
assertTrue(ch.counter() == 9);
*/}
void SocketReactorTest::testTimedCompletionHandler()
{
SocketReactor reactor;
CompletionHandlerTestObject ch(reactor, 500);
assert (reactor.permanentCompletionHandlers() == 0);
assert (reactor.scheduledCompletionHandlers() == 2);
assertTrue(ch.counter() == 0);
reactor.poll();
assertTrue(ch.counter() == 0);
reactor.poll();
Thread::sleep(500);
reactor.poll();
assertTrue(ch.counter() == 2);
assert (reactor.permanentCompletionHandlers() == 0);
assert (reactor.scheduledCompletionHandlers() == 0);
reactor.poll();
assertTrue(ch.counter() == 2);
}
void SocketReactorTest::setUp()
{
ClientServiceHandler::setCloseOnTimeout(false);
@ -681,14 +586,11 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("SocketReactorTest");
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketReactorPoll);
CppUnit_addTest(pSuite, SocketReactorTest, testSetSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);
CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection);
CppUnit_addTest(pSuite, SocketReactorTest, testCompletionHandler);
CppUnit_addTest(pSuite, SocketReactorTest, testTimedCompletionHandler);
return pSuite;
}

View File

@ -15,11 +15,6 @@
#include "Poco/Net/Net.h"
#include "Poco/Thread.h"
#include "Poco/Observer.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Net/SocketReactor.h"
#include "Poco/Net/SocketNotification.h"
#include "CppUnit/TestCase.h"
@ -30,14 +25,11 @@ public:
~SocketReactorTest();
void testSocketReactor();
void testSocketReactorPoll();
void testSetSocketReactor();
void testParallelSocketReactor();
void testSocketConnectorFail();
void testSocketConnectorTimeout();
void testDataCollection();
void testCompletionHandler();
void testTimedCompletionHandler();
void setUp();
void tearDown();
@ -45,127 +37,6 @@ public:
static CppUnit::Test* suite();
private:
template <class SocketType>
class IOHandler
{
public:
IOHandler(const SocketType& socket, Poco::Net::SocketReactor& reactor):
_socket(socket),
_reactor(reactor),
_or(*this, &IOHandler::onReadable),
_ow(*this, &IOHandler::onWritable),
_ot(*this, &IOHandler::onTimeout),
_os(*this, &IOHandler::onShutdown)
{
_reactor.addEventHandler(_socket, _or);
_reactor.addEventHandler(_socket, _ow);
_reactor.addEventHandler(_socket, _ot);
_reactor.addEventHandler(_socket, _os);
}
~IOHandler()
{
_reactor.removeEventHandler(_socket, _or);
_reactor.removeEventHandler(_socket, _ow);
_reactor.removeEventHandler(_socket, _ot);
_reactor.removeEventHandler(_socket, _os);
}
void onReadable(Poco::Net::ReadableNotification* pNf)
{
pNf->release();
char buffer[32];
int n = _socket.receiveBytes(buffer, sizeof(buffer));
if (n > 0) _data.append(buffer, n);
}
void onWritable(Poco::Net::WritableNotification* pNf)
{
pNf->release();
_writable = true;
}
void onTimeout(Poco::Net::TimeoutNotification* pNf)
{
pNf->release();
_timeout = true;
}
void onShutdown(Poco::Net::ShutdownNotification* pNf)
{
pNf->release();
_shutdown = true;
}
const std::string& data()
{
return _data;
}
bool writable() const
{
return _writable;
}
bool timeout() const
{
return _timeout;
}
bool shutdown() const
{
return _shutdown;
}
private:
SocketType _socket;
Poco::Net::SocketReactor& _reactor;
Poco::Observer<IOHandler, Poco::Net::ReadableNotification> _or;
Poco::Observer<IOHandler, Poco::Net::WritableNotification> _ow;
Poco::Observer<IOHandler, Poco::Net::TimeoutNotification> _ot;
Poco::Observer<IOHandler, Poco::Net::ShutdownNotification> _os;
bool _writable = false;
bool _timeout = false;
bool _shutdown = false;
std::string _data;
};
template <typename ServerType, typename SocketType>
void testIOHandler()
{
using Poco::Thread;
using Poco::Net::SocketReactor;
using Poco::Net::SocketAddress;
ServerType echoServer;
SocketType ds;
ds.connect(SocketAddress("127.0.0.1", echoServer.port()));
SocketReactor reactor;
IOHandler<SocketType> ih(ds, reactor);
int n = ds.sendBytes("hello", 5);
assertTrue (n == 5);
assertFalse(ih.writable());
Thread t; t.start(reactor);
Thread::sleep(500);
assertTrue(ih.writable());
assertFalse(ih.shutdown());
reactor.stop(); t.join();
assertTrue(ih.shutdown());
assertTrue (ih.data() == "hello");
n = ds.sendBytes("hello", 5);
assertTrue (n == 5);
while (ih.data().size() < 10)
{
reactor.poll();
Thread::sleep(10);
}
assertTrue (ih.data() == "hellohello");
reactor.poll();
assertTrue (ih.timeout());
ds.close();
}
};