fix(UDPHandler): data race #3613; clean up all Net tsan warnings

This commit is contained in:
Alex Fabijanic 2022-05-27 21:57:34 -05:00
parent 216d5ae3a4
commit 71a3a79ec9
18 changed files with 75 additions and 54 deletions

View File

@ -89,12 +89,15 @@
"ranges": "cpp", "ranges": "cpp",
"cfenv": "cpp", "cfenv": "cpp",
"__bits": "cpp", "__bits": "cpp",
"variant": "cpp" "variant": "cpp",
"condition_variable": "cpp",
"valarray": "cpp"
}, },
"files.exclude": { "files.exclude": {
"**/.dep": true, "**/.dep": true,
"**/bin": true, "**/bin": true,
"**/obj": true "**/obj": true
}, },
"git.ignoreLimitWarning": true "git.ignoreLimitWarning": true,
"cmake.configureOnOpen": false
} }

View File

@ -143,6 +143,7 @@ public:
if (_running) if (_running)
{ {
_done.wait(); _done.wait();
_running = false;
} }
} }
@ -154,6 +155,7 @@ public:
if (_running) if (_running)
{ {
_done.wait(milliseconds); _done.wait(milliseconds);
_running = false;
} }
} }
@ -178,11 +180,9 @@ protected:
} }
catch (...) catch (...)
{ {
_running = false;
_done.set(); _done.set();
throw; throw;
} }
_running = false;
_done.set(); _done.set();
} }

View File

@ -108,7 +108,7 @@ public:
} }
private: private:
bool _finished; std::atomic<bool> _finished;
}; };
@ -140,8 +140,8 @@ public:
} }
private: private:
int _counter; std::atomic<int> _counter;
bool _sleepy; std::atomic<bool> _sleepy;
}; };

View File

@ -319,6 +319,7 @@ private:
/// Runs the next I/O completion handler in the queue. /// Runs the next I/O completion handler in the queue.
{ {
IONotification* pNf = dynamic_cast<IONotification*>(_nq.waitDequeueNotification()); IONotification* pNf = dynamic_cast<IONotification*>(_nq.waitDequeueNotification());
if (_activity.isStopped()) return false;
if (pNf) if (pNf)
{ {
try try

View File

@ -27,6 +27,7 @@
#include "Poco/Runnable.h" #include "Poco/Runnable.h"
#include "Poco/Thread.h" #include "Poco/Thread.h"
#include "Poco/ThreadPool.h" #include "Poco/ThreadPool.h"
#include <atomic>
namespace Poco { namespace Poco {
@ -227,7 +228,7 @@ private:
TCPServerDispatcher* _pDispatcher; TCPServerDispatcher* _pDispatcher;
TCPServerConnectionFilter::Ptr _pConnectionFilter; TCPServerConnectionFilter::Ptr _pConnectionFilter;
Poco::Thread _thread; Poco::Thread _thread;
bool _stopped; std::atomic<bool> _stopped;
}; };

View File

@ -86,7 +86,7 @@ private:
DatagramSocket _socket; DatagramSocket _socket;
SocketAddress _address; SocketAddress _address;
Thread* _pThread; Thread* _pThread;
bool _stop; std::atomic<bool> _stop;
Poco::AtomicCounter _dataBacklog; Poco::AtomicCounter _dataBacklog;
Poco::AtomicCounter _errorBacklog; Poco::AtomicCounter _errorBacklog;
}; };

View File

@ -30,6 +30,7 @@
#include "Poco/StringTokenizer.h" #include "Poco/StringTokenizer.h"
#include <deque> #include <deque>
#include <cstring> #include <cstring>
#include <atomic>
namespace Poco { namespace Poco {
@ -44,11 +45,12 @@ template <std::size_t S = POCO_UDP_BUF_SIZE>
class UDPHandlerImpl: public Runnable, public RefCountedObject class UDPHandlerImpl: public Runnable, public RefCountedObject
/// UDP handler handles the data that arrives to the UDP server. /// UDP handler handles the data that arrives to the UDP server.
/// The class is thread-safe and runs in its own thread, so many handlers /// The class is thread-safe and runs in its own thread, so many handlers
/// can be used in parallel.Handler manages and provides the storage /// can be used in parallel. Handler manages and provides the storage
/// (fixed-size memory blocks of S size) to the reader, which signals back /// (fixed-size memory blocks of S size) to the reader, which signals back
/// to the handler when there is data or error ready for processing. /// to the handler when there is data or error ready for processing.
/// Typically, user will inherit from this class and override processData() /// Typically, user will inherit from this class and override processData()
/// and processError() members to do the actual work. /// and processError() members to do the actual work. To auto-start the handler,
/// the inheriting class can call start() in the constructor.
{ {
public: public:
typedef UDPMsgSizeT MsgSizeT; typedef UDPMsgSizeT MsgSizeT;
@ -76,7 +78,6 @@ public:
_pErr(pErr) _pErr(pErr)
/// Creates the UDPHandlerImpl. /// Creates the UDPHandlerImpl.
{ {
_thread.start(*this);
} }
~UDPHandlerImpl() ~UDPHandlerImpl()
@ -143,9 +144,9 @@ public:
} }
void notify() void notify()
/// Sets the ready event. /// Sets the data ready event.
{ {
_ready.set(); _dataReady.set();
} }
void run() void run()
@ -153,28 +154,31 @@ public:
{ {
while (!_stop) while (!_stop)
{ {
_ready.wait(); _dataReady.wait();
if (_stop) break; if (_stop) break;
if (_mutex.tryLock(10)) if (_mutex.tryLock(10))
{ {
BufMap::iterator it = _buffers.begin(); if (!_stop)
BufMap::iterator end = _buffers.end();
for (; it != end; ++it)
{ {
BufList::iterator lIt = it->second.begin(); BufMap::iterator it = _buffers.begin();
BufList::iterator lEnd = it->second.end(); BufMap::iterator end = _buffers.end();
for (; lIt != lEnd; ++lIt) for (; it != end; ++it)
{ {
if (hasData(*lIt)) BufList::iterator lIt = it->second.begin();
BufList::iterator lEnd = it->second.end();
for (; lIt != lEnd; ++lIt)
{ {
processData(*lIt); if (hasData(*lIt))
--_dataBacklog; {
setIdle(*lIt); processData(*lIt);
} --_dataBacklog;
else if (isError(*lIt)) setIdle(*lIt);
{ }
processError(*lIt); else if (isError(*lIt))
++_errorBacklog; {
processError(*lIt);
++_errorBacklog;
}
} }
} }
} }
@ -188,7 +192,7 @@ public:
/// Signals the handler to stop. /// Signals the handler to stop.
{ {
_stop = true; _stop = true;
_ready.set(); _dataReady.set();
} }
bool stopped() const bool stopped() const
@ -329,6 +333,12 @@ public:
setIdle(buf); setIdle(buf);
} }
void start()
/// Stars the handler run in thread.
{
_thread.start(*this);
}
private: private:
typedef std::deque<char*> BufList; typedef std::deque<char*> BufList;
typedef std::map<poco_socket_t, BufList> BufMap; typedef std::map<poco_socket_t, BufList> BufMap;
@ -355,10 +365,10 @@ private:
*ret = _buffers[sock].back(); *ret = _buffers[sock].back();
} }
Poco::Event _ready; Poco::Event _dataReady;
Poco::Thread _thread; Poco::Thread _thread;
bool _stop; std::atomic<bool> _stop;
bool _done; std::atomic<bool> _done;
BufMap _buffers; BufMap _buffers;
BufIt _bufIt; BufIt _bufIt;
std::size_t _bufListSize; std::size_t _bufListSize;

View File

@ -90,9 +90,9 @@ public:
} }
private: private:
P _poller; P _poller;
Poco::Thread _thread; Poco::Thread _thread;
bool _stop; std::atomic<bool> _stop;
}; };

View File

@ -96,7 +96,7 @@ public:
private: private:
Poco::NotificationQueue& _queue; Poco::NotificationQueue& _queue;
DatagramSocket _socket; DatagramSocket _socket;
bool _stopped; std::atomic<bool> _stopped;
}; };
@ -190,7 +190,7 @@ private:
private: private:
Poco::NotificationQueue& _queue; Poco::NotificationQueue& _queue;
bool _stopped; std::atomic<bool> _stopped;
RemoteSyslogListener* _pListener; RemoteSyslogListener* _pListener;
}; };

View File

@ -70,11 +70,11 @@ private:
Poco::Thread _thread; Poco::Thread _thread;
Poco::Event _ready; Poco::Event _ready;
mutable Poco::FastMutex _mutex; mutable Poco::FastMutex _mutex;
bool _stop; std::atomic<bool> _stop;
std::vector<std::string> _nextResponses; std::vector<std::string> _nextResponses;
std::vector<std::string> _lastCommands; std::vector<std::string> _lastCommands;
bool _acceptCommands; std::atomic<bool> _acceptCommands;
bool _log; std::atomic<bool> _log;
}; };

View File

@ -18,6 +18,7 @@
#include "Poco/Net/ServerSocket.h" #include "Poco/Net/ServerSocket.h"
#include "Poco/Thread.h" #include "Poco/Thread.h"
#include "Poco/Event.h" #include "Poco/Event.h"
#include <atomic>
class EchoServer: public Poco::Runnable class EchoServer: public Poco::Runnable
@ -50,8 +51,8 @@ private:
Poco::Net::ServerSocket _socket; Poco::Net::ServerSocket _socket;
Poco::Thread _thread; Poco::Thread _thread;
Poco::Event _ready; Poco::Event _ready;
bool _stop; std::atomic<bool> _stop;
bool _done; std::atomic<bool> _done;
}; };

View File

@ -51,7 +51,7 @@ private:
Poco::Net::ServerSocket _socket; Poco::Net::ServerSocket _socket;
Poco::Thread _thread; Poco::Thread _thread;
Poco::Event _ready; Poco::Event _ready;
bool _stop; std::atomic<bool> _stop;
std::string _lastRequest; std::string _lastRequest;
}; };

View File

@ -61,7 +61,7 @@ private:
Poco::Net::NetworkInterface _if; Poco::Net::NetworkInterface _if;
Poco::Thread _thread; Poco::Thread _thread;
Poco::Event _ready; Poco::Event _ready;
bool _stop; std::atomic<bool> _stop;
}; };

View File

@ -38,7 +38,8 @@ class Poller : public Poco::Runnable
{ {
public: public:
Poller(PollSet& pollSet, const Timespan& timeout): _pollSet(pollSet), Poller(PollSet& pollSet, const Timespan& timeout): _pollSet(pollSet),
_timeout(timeout) _timeout(timeout),
_running(false)
{ {
} }
@ -57,7 +58,7 @@ public:
private: private:
PollSet& _pollSet; PollSet& _pollSet;
Timespan _timeout; Timespan _timeout;
bool _running = false; std::atomic<bool> _running;
}; };

View File

@ -56,7 +56,7 @@ void SocketProactorTest::testTCPSocketProactor()
}; };
proactor.addSend(s, SocketProactor::Buffer(hello.begin(), hello.end()), onSendCompletion); proactor.addSend(s, SocketProactor::Buffer(hello.begin(), hello.end()), onSendCompletion);
SocketProactor::Buffer buf(hello.size(), 0); SocketProactor::Buffer buf(hello.size(), 0);
bool received = false, receivePassed = false; std::atomic<bool> received(false), receivePassed(false);
auto onRecvCompletion = [&](std::error_code err, int bytes) auto onRecvCompletion = [&](std::error_code err, int bytes)
{ {
receivePassed = (err.value() == 0) && receivePassed = (err.value() == 0) &&
@ -107,8 +107,8 @@ void SocketProactorTest::testTCPSocketProactor()
assertFalse (received); assertFalse (received);
assertFalse (receivePassed); assertFalse (receivePassed);
bool error = false; std::atomic<bool> error(false);
bool errorPassed = false; std::atomic<bool> errorPassed(false);
auto onError = [&](std::error_code err, int bytes) auto onError = [&](std::error_code err, int bytes)
{ {
errorPassed = (err.value() != 0) && (bytes == 0); errorPassed = (err.value() != 0) && (bytes == 0);
@ -151,7 +151,7 @@ void SocketProactorTest::testUDPSocketProactor()
SocketAddress("127.0.0.1", echoServer.port()), SocketAddress("127.0.0.1", echoServer.port()),
onSendCompletion); onSendCompletion);
Poco::Net::SocketProactor::Buffer buf(hello.size(), 0); Poco::Net::SocketProactor::Buffer buf(hello.size(), 0);
bool received = false, receivePassed = false; std::atomic<bool> received(false), receivePassed(false);
SocketAddress sa; SocketAddress sa;
auto onRecvCompletion = [&](std::error_code err, int bytes) auto onRecvCompletion = [&](std::error_code err, int bytes)
{ {
@ -229,7 +229,7 @@ void SocketProactorTest::testSocketProactorStartStop()
SocketAddress("127.0.0.1", echoServer.port()), SocketAddress("127.0.0.1", echoServer.port()),
onSendCompletion); onSendCompletion);
Poco::Net::SocketProactor::Buffer buf(hello.size(), 0); Poco::Net::SocketProactor::Buffer buf(hello.size(), 0);
bool received = false, receivePassed = false; std::atomic<bool> received(false), receivePassed(false);
SocketAddress sa; SocketAddress sa;
auto onRecvCompletion = [&](std::error_code err, int bytes) auto onRecvCompletion = [&](std::error_code err, int bytes)
{ {

View File

@ -63,6 +63,7 @@ std::size_t CachingChannel::getMaxSize() const
std::size_t CachingChannel::getCurrentSize() const std::size_t CachingChannel::getCurrentSize() const
{ {
Poco::FastMutex::ScopedLock lock(_mutex);
return _size; return _size;
} }

View File

@ -49,7 +49,7 @@ private:
Poco::Net::DatagramSocket _socket; Poco::Net::DatagramSocket _socket;
Poco::Thread _thread; Poco::Thread _thread;
Poco::Event _ready; Poco::Event _ready;
bool _stop; std::atomic<bool> _stop;
}; };

View File

@ -45,7 +45,10 @@ namespace
{ {
struct TestUDPHandler : public Poco::Net::UDPHandler struct TestUDPHandler : public Poco::Net::UDPHandler
{ {
TestUDPHandler() : counter(0), errCounter(0) {} TestUDPHandler() : counter(0), errCounter(0)
{
start();
}
void processData(char *buf) void processData(char *buf)
{ {