diff --git a/Net/include/Poco/Net/ParallelSocketAcceptor.h b/Net/include/Poco/Net/ParallelSocketAcceptor.h index bd986716c..0532065de 100644 --- a/Net/include/Poco/Net/ParallelSocketAcceptor.h +++ b/Net/include/Poco/Net/ParallelSocketAcceptor.h @@ -129,6 +129,7 @@ public: { pNotification->release(); StreamSocket sock = _socket.acceptConnection(); + _pReactor->wakeUp(); createServiceHandler(sock); } @@ -140,6 +141,7 @@ protected: { std::size_t next = _next++; if (_next == _reactors.size()) _next = 0; + _reactors[next]->wakeUp(); return new ServiceHandler(socket, *_reactors[next]); } @@ -167,9 +169,27 @@ protected: _reactors.push_back(new ParallelReactor); } -private: typedef std::vector ReactorVec; + ReactorVec& reactors() + /// Returns reference to vector of reactors. + { + return _reactors; + } + + SocketReactor* reactor(std::size_t idx) + /// Returns reference to the reactor at position idx. + { + return _reactors.at(idx).get(); + } + + std::size_t& next() + /// Returns reference to the next reactor index. + { + return _next; + } + +private: ParallelSocketAcceptor(); ParallelSocketAcceptor(const ParallelSocketAcceptor&); ParallelSocketAcceptor& operator = (const ParallelSocketAcceptor&); diff --git a/Net/include/Poco/Net/SocketAcceptor.h b/Net/include/Poco/Net/SocketAcceptor.h index 6d625f865..b6cbd3f89 100644 --- a/Net/include/Poco/Net/SocketAcceptor.h +++ b/Net/include/Poco/Net/SocketAcceptor.h @@ -129,6 +129,7 @@ public: { pNotification->release(); StreamSocket sock = _socket.acceptConnection(); + _pReactor->wakeUp(); createServiceHandler(sock); } diff --git a/Net/include/Poco/Net/SocketReactor.h b/Net/include/Poco/Net/SocketReactor.h index befdc9a0f..a39e9d25b 100644 --- a/Net/include/Poco/Net/SocketReactor.h +++ b/Net/include/Poco/Net/SocketReactor.h @@ -30,6 +30,11 @@ namespace Poco { + + +class Thread; + + namespace Net { @@ -128,6 +133,9 @@ public: /// The reactor will be stopped when the next event /// (including a timeout event) occurs. + void wakeUp(); + /// Wakes up idle reactor. + void setTimeout(const Poco::Timespan& timeout); /// Sets the timeout. /// @@ -166,7 +174,7 @@ protected: /// Can be overridden by subclasses. The default implementation /// dispatches the TimeoutNotification and thus should be called by overriding /// implementations. - + virtual void onIdle(); /// Called if no sockets are available to call select() on. /// @@ -194,7 +202,7 @@ protected: void dispatch(SocketNotification* pNotification); /// Dispatches the given notification to all observers. - + private: typedef Poco::AutoPtr NotifierPtr; typedef Poco::AutoPtr NotificationPtr; @@ -206,7 +214,7 @@ private: { DEFAULT_TIMEOUT = 250000 }; - + bool _stop; Poco::Timespan _timeout; EventHandlerMap _handlers; @@ -217,6 +225,7 @@ private: NotificationPtr _pIdleNotification; NotificationPtr _pShutdownNotification; Poco::FastMutex _mutex; + Poco::Thread* _pThread; friend class SocketNotifier; }; diff --git a/Net/src/SocketReactor.cpp b/Net/src/SocketReactor.cpp index 19aeb88d2..fb43589d1 100644 --- a/Net/src/SocketReactor.cpp +++ b/Net/src/SocketReactor.cpp @@ -39,7 +39,8 @@ SocketReactor::SocketReactor(): _pErrorNotification(new ErrorNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)), _pIdleNotification(new IdleNotification(this)), - _pShutdownNotification(new ShutdownNotification(this)) + _pShutdownNotification(new ShutdownNotification(this)), + _pThread(0) { } @@ -52,7 +53,8 @@ SocketReactor::SocketReactor(const Poco::Timespan& timeout): _pErrorNotification(new ErrorNotification(this)), _pTimeoutNotification(new TimeoutNotification(this)), _pIdleNotification(new IdleNotification(this)), - _pShutdownNotification(new ShutdownNotification(this)) + _pShutdownNotification(new ShutdownNotification(this)), + _pThread(0) { } @@ -64,6 +66,8 @@ SocketReactor::~SocketReactor() void SocketReactor::run() { + _pThread = Thread::current(); + Socket::SocketList readable; Socket::SocketList writable; Socket::SocketList except; @@ -100,6 +104,7 @@ void SocketReactor::run() if (nSockets == 0) { onIdle(); + Thread::trySleep(_timeout.milliseconds()); } else if (Socket::select(readable, writable, except, _timeout)) { @@ -130,19 +135,25 @@ void SocketReactor::run() onShutdown(); } - + void SocketReactor::stop() { _stop = true; } +void SocketReactor::wakeUp() +{ + if (_pThread) _pThread->wakeUp(); +} + + void SocketReactor::setTimeout(const Poco::Timespan& timeout) { _timeout = timeout; } - + const Poco::Timespan& SocketReactor::getTimeout() const { return _timeout;