fix(SocketReactor): Protect stop() and wakeUp() from reentrance #4217

This commit is contained in:
Aleksandar Fabijanic
2023-10-23 22:42:28 +02:00
parent c163237221
commit 47ddacd004
2 changed files with 70 additions and 9 deletions

View File

@@ -160,13 +160,14 @@ void SocketReactor::sleep()
void SocketReactor::stop()
{
_stop = true;
if (_stop.exchange(true)) return;
wakeUp();
}
void SocketReactor::wakeUp()
{
if (_stop) return;
_pollSet.wakeUp();
_event.set();
}

View File

@@ -395,6 +395,45 @@ namespace
Poco::Thread::sleep(500);
}
};
class DummyServiceHandler
{
public:
DummyServiceHandler(StreamSocket& socket, SocketReactor& reactor) : _socket(socket),
_reactor(reactor)
{
_reactor.addEventHandler(_socket, Observer<DummyServiceHandler, ReadableNotification>(*this, &DummyServiceHandler::onReadable));
_reactor.addEventHandler(_socket, Observer<DummyServiceHandler, ShutdownNotification>(*this, &DummyServiceHandler::onShutdown));
_socket.setBlocking(false);
}
~DummyServiceHandler()
{
_reactor.removeEventHandler(_socket, Observer<DummyServiceHandler, ReadableNotification>(*this, &DummyServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, Observer<DummyServiceHandler, ShutdownNotification>(*this, &DummyServiceHandler::onShutdown));
}
void onReadable(ReadableNotification* pNf)
{
pNf->release();
char buffer[64];
do
{
if (0 == _socket.receiveBytes(&buffer[0], sizeof(buffer)))
break;
} while (true);
}
void onShutdown(ShutdownNotification* pNf)
{
pNf->release();
delete this;
}
private:
StreamSocket _socket;
SocketReactor& _reactor;
};
}
@@ -589,19 +628,40 @@ void SocketReactorTest::testDataCollection()
void SocketReactorTest::testSocketConnectorDeadlock()
{
SocketAddress ssa;
ServerSocket ss(ssa);
SocketAddress sa("127.0.0.1", ss.address().port());
SocketReactor reactor;
Thread thread;
{
SocketAddress ssa;
ServerSocket ss(ssa);
SocketAddress sa("127.0.0.1", ss.address().port());
SocketReactor reactor;
Thread thread;
int i = 0;
while (++i < 10)
{
auto sc = new SocketConnector<SleepClientServiceHandler>(sa, reactor);
thread.startFunc([&reactor]() { reactor.run(); });
reactor.stop();
thread.join();
delete sc;
}
}
int i = 0;
while (++i < 10)
{
auto sc = new SocketConnector<SleepClientServiceHandler>(sa, reactor);
thread.startFunc([&reactor]() { reactor.run(); });
SocketAddress ssa;
ServerSocket ss(ssa);
SocketReactor reactor;
SocketAcceptor<DummyServiceHandler> acceptor(ss, reactor);
Thread thread;
thread.start(reactor);
SocketAddress sa("127.0.0.1", ss.address().port());
StreamSocket sock(sa);
std::string data("HELLO");
sock.sendBytes(data.data(), static_cast<int>(data.size()));
reactor.stop();
thread.join();
delete sc;
}
}