modify(SocketProactor): wait for completion handlers availability #3357

This commit is contained in:
Alex Fabijanic 2021-07-24 18:00:42 +02:00
parent 5b444efc53
commit 4bbfa757ff

View File

@ -256,16 +256,17 @@ private:
public:
IOCompletion() = delete;
explicit IOCompletion(int maxTimeout): _timeout(0),
_maxTimeout(static_cast<long>(maxTimeout)),
_activity(this, &IOCompletion::run),
_pThread(nullptr)
explicit IOCompletion(int maxTimeout):
_activity(this, &IOCompletion::run)
/// Creates IOCompletion.
{
_activity.start();
}
~IOCompletion() = default;
~IOCompletion()
{
wakeUp();
}
void stop()
/// Stops the I/O completion execution.
@ -289,14 +290,14 @@ private:
void wakeUp()
/// Wakes up the I/O completion execution loop.
{
if (_pThread) _pThread->wakeUp();
_nq.wakeUpAll();
}
private:
bool runOne()
/// Runs the next I/O completion handler in the queue.
{
IONotification* pNf = dynamic_cast<IONotification*>(_nq.dequeueNotification());
IONotification* pNf = dynamic_cast<IONotification*>(_nq.waitDequeueNotification());
if (pNf)
{
pNf->call();
@ -309,18 +310,11 @@ private:
void run()
/// Continuously runs enqueued completion handlers.
{
_pThread = Thread::current();
while(!_activity.isStopped())
{
SocketProactor::runImpl(!_nq.empty() && runOne(), _timeout, _maxTimeout);
}
while(!_activity.isStopped()) runOne();
}
long _timeout;
long _maxTimeout;
Activity<IOCompletion> _activity;
NotificationQueue _nq;
Thread* _pThread;
};
using IOHandlerList = std::deque<std::unique_ptr<Handler>>;