Idle Reactor high CPU usage #607

This commit is contained in:
Alex Fabijanic 2014-11-17 00:40:00 -06:00
parent 8769ef01bc
commit 2f20f801c5
4 changed files with 49 additions and 8 deletions

View File

@ -129,6 +129,7 @@ public:
{ {
pNotification->release(); pNotification->release();
StreamSocket sock = _socket.acceptConnection(); StreamSocket sock = _socket.acceptConnection();
_pReactor->wakeUp();
createServiceHandler(sock); createServiceHandler(sock);
} }
@ -140,6 +141,7 @@ protected:
{ {
std::size_t next = _next++; std::size_t next = _next++;
if (_next == _reactors.size()) _next = 0; if (_next == _reactors.size()) _next = 0;
_reactors[next]->wakeUp();
return new ServiceHandler(socket, *_reactors[next]); return new ServiceHandler(socket, *_reactors[next]);
} }
@ -167,9 +169,27 @@ protected:
_reactors.push_back(new ParallelReactor); _reactors.push_back(new ParallelReactor);
} }
private:
typedef std::vector<typename ParallelReactor::Ptr> ReactorVec; typedef std::vector<typename ParallelReactor::Ptr> ReactorVec;
ReactorVec& reactors()
/// Returns reference to vector of reactors.
{
return _reactors;
}
SocketReactor* reactor(std::size_t idx)
/// Returns reference to the reactor at position idx.
{
return _reactors.at(idx).get();
}
std::size_t& next()
/// Returns reference to the next reactor index.
{
return _next;
}
private:
ParallelSocketAcceptor(); ParallelSocketAcceptor();
ParallelSocketAcceptor(const ParallelSocketAcceptor&); ParallelSocketAcceptor(const ParallelSocketAcceptor&);
ParallelSocketAcceptor& operator = (const ParallelSocketAcceptor&); ParallelSocketAcceptor& operator = (const ParallelSocketAcceptor&);

View File

@ -129,6 +129,7 @@ public:
{ {
pNotification->release(); pNotification->release();
StreamSocket sock = _socket.acceptConnection(); StreamSocket sock = _socket.acceptConnection();
_pReactor->wakeUp();
createServiceHandler(sock); createServiceHandler(sock);
} }

View File

@ -30,6 +30,11 @@
namespace Poco { namespace Poco {
class Thread;
namespace Net { namespace Net {
@ -128,6 +133,9 @@ public:
/// The reactor will be stopped when the next event /// The reactor will be stopped when the next event
/// (including a timeout event) occurs. /// (including a timeout event) occurs.
void wakeUp();
/// Wakes up idle reactor.
void setTimeout(const Poco::Timespan& timeout); void setTimeout(const Poco::Timespan& timeout);
/// Sets the timeout. /// Sets the timeout.
/// ///
@ -166,7 +174,7 @@ protected:
/// Can be overridden by subclasses. The default implementation /// Can be overridden by subclasses. The default implementation
/// dispatches the TimeoutNotification and thus should be called by overriding /// dispatches the TimeoutNotification and thus should be called by overriding
/// implementations. /// implementations.
virtual void onIdle(); virtual void onIdle();
/// Called if no sockets are available to call select() on. /// Called if no sockets are available to call select() on.
/// ///
@ -194,7 +202,7 @@ protected:
void dispatch(SocketNotification* pNotification); void dispatch(SocketNotification* pNotification);
/// Dispatches the given notification to all observers. /// Dispatches the given notification to all observers.
private: private:
typedef Poco::AutoPtr<SocketNotifier> NotifierPtr; typedef Poco::AutoPtr<SocketNotifier> NotifierPtr;
typedef Poco::AutoPtr<SocketNotification> NotificationPtr; typedef Poco::AutoPtr<SocketNotification> NotificationPtr;
@ -206,7 +214,7 @@ private:
{ {
DEFAULT_TIMEOUT = 250000 DEFAULT_TIMEOUT = 250000
}; };
bool _stop; bool _stop;
Poco::Timespan _timeout; Poco::Timespan _timeout;
EventHandlerMap _handlers; EventHandlerMap _handlers;
@ -217,6 +225,7 @@ private:
NotificationPtr _pIdleNotification; NotificationPtr _pIdleNotification;
NotificationPtr _pShutdownNotification; NotificationPtr _pShutdownNotification;
Poco::FastMutex _mutex; Poco::FastMutex _mutex;
Poco::Thread* _pThread;
friend class SocketNotifier; friend class SocketNotifier;
}; };

View File

@ -39,7 +39,8 @@ SocketReactor::SocketReactor():
_pErrorNotification(new ErrorNotification(this)), _pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)),
_pIdleNotification(new IdleNotification(this)), _pIdleNotification(new IdleNotification(this)),
_pShutdownNotification(new ShutdownNotification(this)) _pShutdownNotification(new ShutdownNotification(this)),
_pThread(0)
{ {
} }
@ -52,7 +53,8 @@ SocketReactor::SocketReactor(const Poco::Timespan& timeout):
_pErrorNotification(new ErrorNotification(this)), _pErrorNotification(new ErrorNotification(this)),
_pTimeoutNotification(new TimeoutNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)),
_pIdleNotification(new IdleNotification(this)), _pIdleNotification(new IdleNotification(this)),
_pShutdownNotification(new ShutdownNotification(this)) _pShutdownNotification(new ShutdownNotification(this)),
_pThread(0)
{ {
} }
@ -64,6 +66,8 @@ SocketReactor::~SocketReactor()
void SocketReactor::run() void SocketReactor::run()
{ {
_pThread = Thread::current();
Socket::SocketList readable; Socket::SocketList readable;
Socket::SocketList writable; Socket::SocketList writable;
Socket::SocketList except; Socket::SocketList except;
@ -100,6 +104,7 @@ void SocketReactor::run()
if (nSockets == 0) if (nSockets == 0)
{ {
onIdle(); onIdle();
Thread::trySleep(_timeout.milliseconds());
} }
else if (Socket::select(readable, writable, except, _timeout)) else if (Socket::select(readable, writable, except, _timeout))
{ {
@ -130,19 +135,25 @@ void SocketReactor::run()
onShutdown(); onShutdown();
} }
void SocketReactor::stop() void SocketReactor::stop()
{ {
_stop = true; _stop = true;
} }
void SocketReactor::wakeUp()
{
if (_pThread) _pThread->wakeUp();
}
void SocketReactor::setTimeout(const Poco::Timespan& timeout) void SocketReactor::setTimeout(const Poco::Timespan& timeout)
{ {
_timeout = timeout; _timeout = timeout;
} }
const Poco::Timespan& SocketReactor::getTimeout() const const Poco::Timespan& SocketReactor::getTimeout() const
{ {
return _timeout; return _timeout;