chore (SocketProactor): add some state members

This commit is contained in:
Alex Fabijanic 2021-12-19 17:48:56 +01:00
parent da70f91796
commit 79affbbe6e
2 changed files with 38 additions and 5 deletions

View File

@ -35,6 +35,7 @@
#include <deque>
#include <utility>
#include <memory>
#include <iostream>
namespace Poco {
@ -104,7 +105,7 @@ public:
/// Default is removal of all scheduled functions.
int permanentWork();
/// Returns the number of scheduled functions.
/// Returns the number of permanent functions.
int removePermanentWork(int count = -1);
/// Removes the count permanent functions
@ -187,9 +188,18 @@ public:
void addSend(Socket sock, Buffer&& message, Callback&& onCompletion);
/// Adds the stream socket and the completion handler to the I/O send queue.
bool hasSocketHandlers() const;
/// Returns true if proactor had at least one I/O completion handler.
bool has(const Socket& sock) const;
/// Returns true if socket is registered with this proactor.
bool isRunning() const;
/// Returns true if this proactor is running
bool ioCompletionInProgress() const;
/// Returns true if there are not executed handlers from last IO..
private:
void onShutdown();
/// Called when the SocketProactor is about to terminate.
@ -204,7 +214,6 @@ private:
typedef Poco::Mutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
bool hasSocketHandlers();
static const long DEFAULT_MAX_TIMEOUT_MS = 250;
struct Handler
@ -300,6 +309,11 @@ private:
_nq.wakeUpAll();
}
int queueSize() const
{
return _nq.size();
}
private:
bool runOne()
/// Runs the next I/O completion handler in the queue.
@ -325,7 +339,6 @@ private:
{
ErrorHandler::handle();
}
}
return false;
}
@ -423,6 +436,7 @@ private:
Worker& worker();
std::atomic<bool> _isRunning;
std::atomic<bool> _isStopped;
std::atomic<bool> _stop;
long _timeout;
@ -473,6 +487,12 @@ inline void SocketProactor::enqueueIONotification(Callback&& onCompletion, int n
}
inline bool SocketProactor::isRunning() const
{
return _isRunning;
}
} } // namespace Poco::Net

View File

@ -230,6 +230,7 @@ const Timestamp::TimeDiff SocketProactor::PERMANENT_COMPLETION_HANDLER =
SocketProactor::SocketProactor(bool worker):
_isRunning(false),
_isStopped(false),
_stop(false),
_timeout(0),
@ -242,6 +243,7 @@ SocketProactor::SocketProactor(bool worker):
SocketProactor::SocketProactor(const Poco::Timespan& timeout, bool worker):
_isRunning(false),
_isStopped(false),
_stop(false),
_timeout(0),
@ -569,6 +571,7 @@ void SocketProactor::receiveFrom(SocketImpl& sock, IOHandlerIt& it, int availabl
{
Buffer *pBuf = (*it)->_pBuf;
SocketAddress *pAddr = (*it)->_pAddr;
SocketAddress addr = *pAddr;
poco_check_ptr(pBuf);
if (pBuf->size() < available) pBuf->resize(available);
int n = 0, err = 0;
@ -652,13 +655,17 @@ void SocketProactor::run()
int handled = 0;
if (!_isStopped) _stop = false;
_isStopped = false;
while(!_stop)
while (!_stop)
{
this->sleep(poll(&handled) || handled);
_isRunning = true;
}
_isRunning = false;
onShutdown();
}
bool SocketProactor::hasSocketHandlers()
bool SocketProactor::hasSocketHandlers() const
{
if (_readHandlers.size() || _writeHandlers.size())
return true;
@ -750,6 +757,12 @@ bool SocketProactor::has(const Socket& sock) const
}
bool SocketProactor::ioCompletionInProgress() const
{
return _ioCompletion.queueSize();
}
void SocketProactor::onShutdown()
{
_pollSet.wakeUp();