Fix thread counter leak (#3992)

Cherry-picked from https://github.com/ClickHouse/poco/pull/28, see the
mentioned PR for more details.

Co-authored-by: alexey-milovidov <milovidov@yandex-team.ru>
This commit is contained in:
Alexander Kernozhitsky 2023-06-10 06:05:19 +03:00 committed by GitHub
parent 2a6434a86d
commit c6fd0db4b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 41 deletions

View File

@ -101,30 +101,6 @@ private:
TCPServerDispatcher(const TCPServerDispatcher&); TCPServerDispatcher(const TCPServerDispatcher&);
TCPServerDispatcher& operator = (const TCPServerDispatcher&); TCPServerDispatcher& operator = (const TCPServerDispatcher&);
class ThreadCountWatcher
{
public:
ThreadCountWatcher(TCPServerDispatcher* pDisp) : _pDisp(pDisp)
{
}
~ThreadCountWatcher()
{
FastMutex::ScopedLock lock(_pDisp->_mutex);
if (_pDisp->_currentThreads > 1 && _pDisp->_queue.empty())
{
--_pDisp->_currentThreads;
}
}
private:
ThreadCountWatcher();
ThreadCountWatcher(const ThreadCountWatcher&);
ThreadCountWatcher& operator=(const ThreadCountWatcher&);
TCPServerDispatcher* _pDisp;
};
std::atomic<int> _rc; std::atomic<int> _rc;
TCPServerParams::Ptr _pParams; TCPServerParams::Ptr _pParams;
std::atomic<int> _currentThreads; std::atomic<int> _currentThreads;

View File

@ -103,29 +103,31 @@ void TCPServerDispatcher::run()
for (;;) for (;;)
{ {
try
{ {
ThreadCountWatcher tcw(this); AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
try if (pNf)
{ {
AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime); TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
if (pNf) if (pCNf)
{ {
TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get()); std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
if (pCNf) poco_check_ptr(pConnection.get());
{ beginConnection();
std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket())); pConnection->start();
poco_check_ptr(pConnection.get()); endConnection();
beginConnection();
pConnection->start();
endConnection();
}
} }
} }
catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
} }
if (_stopped || (_currentThreads > 1 && _queue.empty())) break; catch (Poco::Exception &exc) { ErrorHandler::handle(exc); }
catch (std::exception &exc) { ErrorHandler::handle(exc); }
catch (...) { ErrorHandler::handle(); }
FastMutex::ScopedLock lock(_mutex);
if (_stopped || (_currentThreads > 1 && _queue.empty()))
{
--_currentThreads;
break;
}
} }
} }