Fix/tcp dispatcher (#1965)

* TCPServerDispatcher::run() issue #1884; make integral members atomic and minimize locking

* Update TCPServerDispatcher.cpp
This commit is contained in:
Aleksandar Fabijanic 2017-10-31 13:16:00 -05:00 committed by GitHub
parent 9288e89bfe
commit a91f9a0afa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 56 deletions

View File

@ -100,14 +100,38 @@ private:
TCPServerDispatcher(const TCPServerDispatcher&);
TCPServerDispatcher& operator = (const TCPServerDispatcher&);
int _rc;
class ThreadCountWatcher
{
public:
ThreadCountWatcher(TCPServerDispatcher* pDisp) : _pDisp(pDisp)
{
}
~ThreadCountWatcher()
{
FastMutex::ScopedLock lock(_pDisp->_mutex);
if (_pDisp->_currentThreads > 1 && _pDisp->_queue.empty())
{
--_pDisp->_currentThreads;
}
}
private:
ThreadCountWatcher();
ThreadCountWatcher(const ThreadCountWatcher&);
ThreadCountWatcher& operator=(const ThreadCountWatcher&);
TCPServerDispatcher* _pDisp;
};
std::atomic<int> _rc;
TCPServerParams::Ptr _pParams;
int _currentThreads;
int _totalConnections;
int _currentConnections;
int _maxConcurrentConnections;
int _refusedConnections;
bool _stopped;
std::atomic<int> _currentThreads;
std::atomic<int> _totalConnections;
std::atomic<int> _currentConnections;
std::atomic<int> _maxConcurrentConnections;
std::atomic<int> _refusedConnections;
std::atomic<bool> _stopped;
Poco::NotificationQueue _queue;
TCPServerConnectionFactory::Ptr _pConnectionFactory;
Poco::ThreadPool& _threadPool;

View File

@ -80,18 +80,13 @@ TCPServerDispatcher::~TCPServerDispatcher()
void TCPServerDispatcher::duplicate()
{
_mutex.lock();
++_rc;
_mutex.unlock();
}
void TCPServerDispatcher::release()
{
_mutex.lock();
int rc = --_rc;
_mutex.unlock();
if (rc == 0) delete this;
if (--_rc == 0) delete this;
}
@ -103,41 +98,29 @@ void TCPServerDispatcher::run()
for (;;)
{
try
{
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf)
ThreadCountWatcher tcw(this);
try
{
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
if (pNf)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
beginConnection();
pConnection->start();
endConnection();
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pCNf)
{
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
poco_check_ptr(pConnection.get());
beginConnection();
pConnection->start();
endConnection();
}
}
}
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
}
catch (Poco::Exception &exc)
{
ErrorHandler::handle(exc);
}
catch (std::exception &exc)
{
ErrorHandler::handle(exc);
}
catch (...)
{
ErrorHandler::handle();
}
FastMutex::ScopedLock lock(_mutex);
if (_stopped || (_currentThreads > 1 && _queue.empty()))
{
--_currentThreads;
break;
}
if (_stopped || (_currentThreads > 1 && _queue.empty())) break;
}
}
@ -186,8 +169,6 @@ void TCPServerDispatcher::stop()
int TCPServerDispatcher::currentThreads() const
{
FastMutex::ScopedLock lock(_mutex);
return _currentThreads;
}
@ -201,24 +182,18 @@ int TCPServerDispatcher::maxThreads() const
int TCPServerDispatcher::totalConnections() const
{
FastMutex::ScopedLock lock(_mutex);
return _totalConnections;
}
int TCPServerDispatcher::currentConnections() const
{
FastMutex::ScopedLock lock(_mutex);
return _currentConnections;
}
int TCPServerDispatcher::maxConcurrentConnections() const
{
FastMutex::ScopedLock lock(_mutex);
return _maxConcurrentConnections;
}
@ -231,8 +206,6 @@ int TCPServerDispatcher::queuedConnections() const
int TCPServerDispatcher::refusedConnections() const
{
FastMutex::ScopedLock lock(_mutex);
return _refusedConnections;
}
@ -240,18 +213,16 @@ int TCPServerDispatcher::refusedConnections() const
void TCPServerDispatcher::beginConnection()
{
FastMutex::ScopedLock lock(_mutex);
++_totalConnections;
++_currentConnections;
if (_currentConnections > _maxConcurrentConnections)
_maxConcurrentConnections = _currentConnections;
_maxConcurrentConnections.store(_currentConnections);
}
void TCPServerDispatcher::endConnection()
{
FastMutex::ScopedLock lock(_mutex);
--_currentConnections;
}