From 71a3a79ec979310683da853399d8f5153506a5af Mon Sep 17 00:00:00 2001 From: Alex Fabijanic Date: Fri, 27 May 2022 21:57:34 -0500 Subject: [PATCH] fix(UDPHandler): data race #3613; clean up all Net tsan warnings --- .vscode/settings.json | 7 ++- Foundation/include/Poco/Activity.h | 4 +- Foundation/testsuite/src/ThreadTest.cpp | 6 +-- Net/include/Poco/Net/SocketProactor.h | 1 + Net/include/Poco/Net/TCPServer.h | 3 +- Net/include/Poco/Net/UDPClient.h | 2 +- Net/include/Poco/Net/UDPHandler.h | 60 ++++++++++++++---------- Net/include/Poco/Net/UDPServer.h | 4 +- Net/src/RemoteSyslogListener.cpp | 4 +- Net/testsuite/src/DialogServer.h | 6 +-- Net/testsuite/src/EchoServer.h | 5 +- Net/testsuite/src/HTTPTestServer.h | 2 +- Net/testsuite/src/MulticastEchoServer.h | 2 +- Net/testsuite/src/PollSetTest.cpp | 5 +- Net/testsuite/src/SocketProactorTest.cpp | 10 ++-- Net/testsuite/src/SyslogTest.cpp | 1 + Net/testsuite/src/UDPEchoServer.h | 2 +- Net/testsuite/src/UDPServerTest.cpp | 5 +- 18 files changed, 75 insertions(+), 54 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 0c7d752e4..bc8e0af14 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -89,12 +89,15 @@ "ranges": "cpp", "cfenv": "cpp", "__bits": "cpp", - "variant": "cpp" + "variant": "cpp", + "condition_variable": "cpp", + "valarray": "cpp" }, "files.exclude": { "**/.dep": true, "**/bin": true, "**/obj": true }, - "git.ignoreLimitWarning": true + "git.ignoreLimitWarning": true, + "cmake.configureOnOpen": false } diff --git a/Foundation/include/Poco/Activity.h b/Foundation/include/Poco/Activity.h index 8329e6d5c..3f79b431b 100644 --- a/Foundation/include/Poco/Activity.h +++ b/Foundation/include/Poco/Activity.h @@ -143,6 +143,7 @@ public: if (_running) { _done.wait(); + _running = false; } } @@ -154,6 +155,7 @@ public: if (_running) { _done.wait(milliseconds); + _running = false; } } @@ -178,11 +180,9 @@ protected: } catch (...) { - _running = false; _done.set(); throw; } - _running = false; _done.set(); } diff --git a/Foundation/testsuite/src/ThreadTest.cpp b/Foundation/testsuite/src/ThreadTest.cpp index 06b422c58..e1346fe90 100644 --- a/Foundation/testsuite/src/ThreadTest.cpp +++ b/Foundation/testsuite/src/ThreadTest.cpp @@ -108,7 +108,7 @@ public: } private: - bool _finished; + std::atomic _finished; }; @@ -140,8 +140,8 @@ public: } private: - int _counter; - bool _sleepy; + std::atomic _counter; + std::atomic _sleepy; }; diff --git a/Net/include/Poco/Net/SocketProactor.h b/Net/include/Poco/Net/SocketProactor.h index 2d3258f6a..403010b90 100644 --- a/Net/include/Poco/Net/SocketProactor.h +++ b/Net/include/Poco/Net/SocketProactor.h @@ -319,6 +319,7 @@ private: /// Runs the next I/O completion handler in the queue. { IONotification* pNf = dynamic_cast(_nq.waitDequeueNotification()); + if (_activity.isStopped()) return false; if (pNf) { try diff --git a/Net/include/Poco/Net/TCPServer.h b/Net/include/Poco/Net/TCPServer.h index c0b1cae01..7176fffb6 100644 --- a/Net/include/Poco/Net/TCPServer.h +++ b/Net/include/Poco/Net/TCPServer.h @@ -27,6 +27,7 @@ #include "Poco/Runnable.h" #include "Poco/Thread.h" #include "Poco/ThreadPool.h" +#include namespace Poco { @@ -227,7 +228,7 @@ private: TCPServerDispatcher* _pDispatcher; TCPServerConnectionFilter::Ptr _pConnectionFilter; Poco::Thread _thread; - bool _stopped; + std::atomic _stopped; }; diff --git a/Net/include/Poco/Net/UDPClient.h b/Net/include/Poco/Net/UDPClient.h index ecd8d076b..98889e45d 100644 --- a/Net/include/Poco/Net/UDPClient.h +++ b/Net/include/Poco/Net/UDPClient.h @@ -86,7 +86,7 @@ private: DatagramSocket _socket; SocketAddress _address; Thread* _pThread; - bool _stop; + std::atomic _stop; Poco::AtomicCounter _dataBacklog; Poco::AtomicCounter _errorBacklog; }; diff --git a/Net/include/Poco/Net/UDPHandler.h b/Net/include/Poco/Net/UDPHandler.h index 02795e4a5..09730f521 100644 --- a/Net/include/Poco/Net/UDPHandler.h +++ b/Net/include/Poco/Net/UDPHandler.h @@ -30,6 +30,7 @@ #include "Poco/StringTokenizer.h" #include #include +#include namespace Poco { @@ -44,11 +45,12 @@ template class UDPHandlerImpl: public Runnable, public RefCountedObject /// UDP handler handles the data that arrives to the UDP server. /// The class is thread-safe and runs in its own thread, so many handlers - /// can be used in parallel.Handler manages and provides the storage + /// can be used in parallel. Handler manages and provides the storage /// (fixed-size memory blocks of S size) to the reader, which signals back /// to the handler when there is data or error ready for processing. /// Typically, user will inherit from this class and override processData() - /// and processError() members to do the actual work. + /// and processError() members to do the actual work. To auto-start the handler, + /// the inheriting class can call start() in the constructor. { public: typedef UDPMsgSizeT MsgSizeT; @@ -76,7 +78,6 @@ public: _pErr(pErr) /// Creates the UDPHandlerImpl. { - _thread.start(*this); } ~UDPHandlerImpl() @@ -143,9 +144,9 @@ public: } void notify() - /// Sets the ready event. + /// Sets the data ready event. { - _ready.set(); + _dataReady.set(); } void run() @@ -153,28 +154,31 @@ public: { while (!_stop) { - _ready.wait(); + _dataReady.wait(); if (_stop) break; if (_mutex.tryLock(10)) { - BufMap::iterator it = _buffers.begin(); - BufMap::iterator end = _buffers.end(); - for (; it != end; ++it) + if (!_stop) { - BufList::iterator lIt = it->second.begin(); - BufList::iterator lEnd = it->second.end(); - for (; lIt != lEnd; ++lIt) + BufMap::iterator it = _buffers.begin(); + BufMap::iterator end = _buffers.end(); + for (; it != end; ++it) { - if (hasData(*lIt)) + BufList::iterator lIt = it->second.begin(); + BufList::iterator lEnd = it->second.end(); + for (; lIt != lEnd; ++lIt) { - processData(*lIt); - --_dataBacklog; - setIdle(*lIt); - } - else if (isError(*lIt)) - { - processError(*lIt); - ++_errorBacklog; + if (hasData(*lIt)) + { + processData(*lIt); + --_dataBacklog; + setIdle(*lIt); + } + else if (isError(*lIt)) + { + processError(*lIt); + ++_errorBacklog; + } } } } @@ -188,7 +192,7 @@ public: /// Signals the handler to stop. { _stop = true; - _ready.set(); + _dataReady.set(); } bool stopped() const @@ -329,6 +333,12 @@ public: setIdle(buf); } + void start() + /// Stars the handler run in thread. + { + _thread.start(*this); + } + private: typedef std::deque BufList; typedef std::map BufMap; @@ -355,10 +365,10 @@ private: *ret = _buffers[sock].back(); } - Poco::Event _ready; + Poco::Event _dataReady; Poco::Thread _thread; - bool _stop; - bool _done; + std::atomic _stop; + std::atomic _done; BufMap _buffers; BufIt _bufIt; std::size_t _bufListSize; diff --git a/Net/include/Poco/Net/UDPServer.h b/Net/include/Poco/Net/UDPServer.h index 1742dedc9..a65036745 100644 --- a/Net/include/Poco/Net/UDPServer.h +++ b/Net/include/Poco/Net/UDPServer.h @@ -90,9 +90,9 @@ public: } private: - P _poller; + P _poller; Poco::Thread _thread; - bool _stop; + std::atomic _stop; }; diff --git a/Net/src/RemoteSyslogListener.cpp b/Net/src/RemoteSyslogListener.cpp index ca2afed1f..2685c8ab9 100644 --- a/Net/src/RemoteSyslogListener.cpp +++ b/Net/src/RemoteSyslogListener.cpp @@ -96,7 +96,7 @@ public: private: Poco::NotificationQueue& _queue; DatagramSocket _socket; - bool _stopped; + std::atomic _stopped; }; @@ -190,7 +190,7 @@ private: private: Poco::NotificationQueue& _queue; - bool _stopped; + std::atomic _stopped; RemoteSyslogListener* _pListener; }; diff --git a/Net/testsuite/src/DialogServer.h b/Net/testsuite/src/DialogServer.h index 8c1e0e0ba..462d1a8fa 100644 --- a/Net/testsuite/src/DialogServer.h +++ b/Net/testsuite/src/DialogServer.h @@ -70,11 +70,11 @@ private: Poco::Thread _thread; Poco::Event _ready; mutable Poco::FastMutex _mutex; - bool _stop; + std::atomic _stop; std::vector _nextResponses; std::vector _lastCommands; - bool _acceptCommands; - bool _log; + std::atomic _acceptCommands; + std::atomic _log; }; diff --git a/Net/testsuite/src/EchoServer.h b/Net/testsuite/src/EchoServer.h index 4ff35baee..3df637450 100644 --- a/Net/testsuite/src/EchoServer.h +++ b/Net/testsuite/src/EchoServer.h @@ -18,6 +18,7 @@ #include "Poco/Net/ServerSocket.h" #include "Poco/Thread.h" #include "Poco/Event.h" +#include class EchoServer: public Poco::Runnable @@ -50,8 +51,8 @@ private: Poco::Net::ServerSocket _socket; Poco::Thread _thread; Poco::Event _ready; - bool _stop; - bool _done; + std::atomic _stop; + std::atomic _done; }; diff --git a/Net/testsuite/src/HTTPTestServer.h b/Net/testsuite/src/HTTPTestServer.h index d3a0fd632..ea0960bc2 100644 --- a/Net/testsuite/src/HTTPTestServer.h +++ b/Net/testsuite/src/HTTPTestServer.h @@ -51,7 +51,7 @@ private: Poco::Net::ServerSocket _socket; Poco::Thread _thread; Poco::Event _ready; - bool _stop; + std::atomic _stop; std::string _lastRequest; }; diff --git a/Net/testsuite/src/MulticastEchoServer.h b/Net/testsuite/src/MulticastEchoServer.h index 03f1f2be3..e262525e6 100644 --- a/Net/testsuite/src/MulticastEchoServer.h +++ b/Net/testsuite/src/MulticastEchoServer.h @@ -61,7 +61,7 @@ private: Poco::Net::NetworkInterface _if; Poco::Thread _thread; Poco::Event _ready; - bool _stop; + std::atomic _stop; }; diff --git a/Net/testsuite/src/PollSetTest.cpp b/Net/testsuite/src/PollSetTest.cpp index 9c2c41e7a..4cb58f83e 100644 --- a/Net/testsuite/src/PollSetTest.cpp +++ b/Net/testsuite/src/PollSetTest.cpp @@ -38,7 +38,8 @@ class Poller : public Poco::Runnable { public: Poller(PollSet& pollSet, const Timespan& timeout): _pollSet(pollSet), - _timeout(timeout) + _timeout(timeout), + _running(false) { } @@ -57,7 +58,7 @@ public: private: PollSet& _pollSet; Timespan _timeout; - bool _running = false; + std::atomic _running; }; diff --git a/Net/testsuite/src/SocketProactorTest.cpp b/Net/testsuite/src/SocketProactorTest.cpp index 6e4a1c5d2..cf8e75a50 100644 --- a/Net/testsuite/src/SocketProactorTest.cpp +++ b/Net/testsuite/src/SocketProactorTest.cpp @@ -56,7 +56,7 @@ void SocketProactorTest::testTCPSocketProactor() }; proactor.addSend(s, SocketProactor::Buffer(hello.begin(), hello.end()), onSendCompletion); SocketProactor::Buffer buf(hello.size(), 0); - bool received = false, receivePassed = false; + std::atomic received(false), receivePassed(false); auto onRecvCompletion = [&](std::error_code err, int bytes) { receivePassed = (err.value() == 0) && @@ -107,8 +107,8 @@ void SocketProactorTest::testTCPSocketProactor() assertFalse (received); assertFalse (receivePassed); - bool error = false; - bool errorPassed = false; + std::atomic error(false); + std::atomic errorPassed(false); auto onError = [&](std::error_code err, int bytes) { errorPassed = (err.value() != 0) && (bytes == 0); @@ -151,7 +151,7 @@ void SocketProactorTest::testUDPSocketProactor() SocketAddress("127.0.0.1", echoServer.port()), onSendCompletion); Poco::Net::SocketProactor::Buffer buf(hello.size(), 0); - bool received = false, receivePassed = false; + std::atomic received(false), receivePassed(false); SocketAddress sa; auto onRecvCompletion = [&](std::error_code err, int bytes) { @@ -229,7 +229,7 @@ void SocketProactorTest::testSocketProactorStartStop() SocketAddress("127.0.0.1", echoServer.port()), onSendCompletion); Poco::Net::SocketProactor::Buffer buf(hello.size(), 0); - bool received = false, receivePassed = false; + std::atomic received(false), receivePassed(false); SocketAddress sa; auto onRecvCompletion = [&](std::error_code err, int bytes) { diff --git a/Net/testsuite/src/SyslogTest.cpp b/Net/testsuite/src/SyslogTest.cpp index fe3b7659e..49e2769b1 100644 --- a/Net/testsuite/src/SyslogTest.cpp +++ b/Net/testsuite/src/SyslogTest.cpp @@ -63,6 +63,7 @@ std::size_t CachingChannel::getMaxSize() const std::size_t CachingChannel::getCurrentSize() const { + Poco::FastMutex::ScopedLock lock(_mutex); return _size; } diff --git a/Net/testsuite/src/UDPEchoServer.h b/Net/testsuite/src/UDPEchoServer.h index ec8885fac..f92967028 100644 --- a/Net/testsuite/src/UDPEchoServer.h +++ b/Net/testsuite/src/UDPEchoServer.h @@ -49,7 +49,7 @@ private: Poco::Net::DatagramSocket _socket; Poco::Thread _thread; Poco::Event _ready; - bool _stop; + std::atomic _stop; }; diff --git a/Net/testsuite/src/UDPServerTest.cpp b/Net/testsuite/src/UDPServerTest.cpp index 40cec1f84..1e82cc33a 100644 --- a/Net/testsuite/src/UDPServerTest.cpp +++ b/Net/testsuite/src/UDPServerTest.cpp @@ -45,7 +45,10 @@ namespace { struct TestUDPHandler : public Poco::Net::UDPHandler { - TestUDPHandler() : counter(0), errCounter(0) {} + TestUDPHandler() : counter(0), errCounter(0) + { + start(); + } void processData(char *buf) {