From a91f9a0afa9e920005aa73abc2b65864adf70770 Mon Sep 17 00:00:00 2001 From: Aleksandar Fabijanic Date: Tue, 31 Oct 2017 13:16:00 -0500 Subject: [PATCH] Fix/tcp dispatcher (#1965) * TCPServerDispatcher::run() issue #1884; make integral members atomic and minimize locking * Update TCPServerDispatcher.cpp --- Net/include/Poco/Net/TCPServerDispatcher.h | 38 +++++++++--- Net/src/TCPServerDispatcher.cpp | 69 +++++++--------------- 2 files changed, 51 insertions(+), 56 deletions(-) diff --git a/Net/include/Poco/Net/TCPServerDispatcher.h b/Net/include/Poco/Net/TCPServerDispatcher.h index 3c46d0f26..920a07576 100644 --- a/Net/include/Poco/Net/TCPServerDispatcher.h +++ b/Net/include/Poco/Net/TCPServerDispatcher.h @@ -100,14 +100,38 @@ private: TCPServerDispatcher(const TCPServerDispatcher&); TCPServerDispatcher& operator = (const TCPServerDispatcher&); - int _rc; + class ThreadCountWatcher + { + public: + ThreadCountWatcher(TCPServerDispatcher* pDisp) : _pDisp(pDisp) + { + } + + ~ThreadCountWatcher() + { + FastMutex::ScopedLock lock(_pDisp->_mutex); + if (_pDisp->_currentThreads > 1 && _pDisp->_queue.empty()) + { + --_pDisp->_currentThreads; + } + } + + private: + ThreadCountWatcher(); + ThreadCountWatcher(const ThreadCountWatcher&); + ThreadCountWatcher& operator=(const ThreadCountWatcher&); + + TCPServerDispatcher* _pDisp; + }; + + std::atomic _rc; TCPServerParams::Ptr _pParams; - int _currentThreads; - int _totalConnections; - int _currentConnections; - int _maxConcurrentConnections; - int _refusedConnections; - bool _stopped; + std::atomic _currentThreads; + std::atomic _totalConnections; + std::atomic _currentConnections; + std::atomic _maxConcurrentConnections; + std::atomic _refusedConnections; + std::atomic _stopped; Poco::NotificationQueue _queue; TCPServerConnectionFactory::Ptr _pConnectionFactory; Poco::ThreadPool& _threadPool; diff --git a/Net/src/TCPServerDispatcher.cpp b/Net/src/TCPServerDispatcher.cpp index 1c71ee243..058924324 100644 --- a/Net/src/TCPServerDispatcher.cpp +++ b/Net/src/TCPServerDispatcher.cpp @@ -80,18 +80,13 @@ TCPServerDispatcher::~TCPServerDispatcher() void TCPServerDispatcher::duplicate() { - _mutex.lock(); ++_rc; - _mutex.unlock(); } void TCPServerDispatcher::release() { - _mutex.lock(); - int rc = --_rc; - _mutex.unlock(); - if (rc == 0) delete this; + if (--_rc == 0) delete this; } @@ -103,41 +98,29 @@ void TCPServerDispatcher::run() for (;;) { - try { - AutoPtr pNf = _queue.waitDequeueNotification(idleTime); - if (pNf) + ThreadCountWatcher tcw(this); + try { - TCPConnectionNotification* pCNf = dynamic_cast(pNf.get()); - if (pCNf) + AutoPtr pNf = _queue.waitDequeueNotification(idleTime); + if (pNf) { - std::unique_ptr pConnection(_pConnectionFactory->createConnection(pCNf->socket())); - poco_check_ptr(pConnection.get()); - beginConnection(); - pConnection->start(); - endConnection(); + TCPConnectionNotification* pCNf = dynamic_cast(pNf.get()); + if (pCNf) + { + std::unique_ptr pConnection(_pConnectionFactory->createConnection(pCNf->socket())); + poco_check_ptr(pConnection.get()); + beginConnection(); + pConnection->start(); + endConnection(); + } } } + catch (Poco::Exception &exc) { ErrorHandler::handle(exc); } + catch (std::exception &exc) { ErrorHandler::handle(exc); } + catch (...) { ErrorHandler::handle(); } } - catch (Poco::Exception &exc) - { - ErrorHandler::handle(exc); - } - catch (std::exception &exc) - { - ErrorHandler::handle(exc); - } - catch (...) - { - ErrorHandler::handle(); - } - - FastMutex::ScopedLock lock(_mutex); - if (_stopped || (_currentThreads > 1 && _queue.empty())) - { - --_currentThreads; - break; - } + if (_stopped || (_currentThreads > 1 && _queue.empty())) break; } } @@ -186,8 +169,6 @@ void TCPServerDispatcher::stop() int TCPServerDispatcher::currentThreads() const { - FastMutex::ScopedLock lock(_mutex); - return _currentThreads; } @@ -201,24 +182,18 @@ int TCPServerDispatcher::maxThreads() const int TCPServerDispatcher::totalConnections() const { - FastMutex::ScopedLock lock(_mutex); - return _totalConnections; } int TCPServerDispatcher::currentConnections() const { - FastMutex::ScopedLock lock(_mutex); - return _currentConnections; } int TCPServerDispatcher::maxConcurrentConnections() const { - FastMutex::ScopedLock lock(_mutex); - return _maxConcurrentConnections; } @@ -231,8 +206,6 @@ int TCPServerDispatcher::queuedConnections() const int TCPServerDispatcher::refusedConnections() const { - FastMutex::ScopedLock lock(_mutex); - return _refusedConnections; } @@ -240,18 +213,16 @@ int TCPServerDispatcher::refusedConnections() const void TCPServerDispatcher::beginConnection() { FastMutex::ScopedLock lock(_mutex); - + ++_totalConnections; ++_currentConnections; if (_currentConnections > _maxConcurrentConnections) - _maxConcurrentConnections = _currentConnections; + _maxConcurrentConnections.store(_currentConnections); } void TCPServerDispatcher::endConnection() { - FastMutex::ScopedLock lock(_mutex); - --_currentConnections; }