fix(Net): #4594: PollSet::poll on Windows always returns immediately after first connection, due to bad event socket implementation

This commit is contained in:
Günter Obiltschnig 2024-06-26 20:27:06 +02:00
parent 9d739176f6
commit 02b59b4d25

View File

@ -21,7 +21,7 @@
#if defined(POCO_HAVE_FD_EPOLL) #if defined(POCO_HAVE_FD_EPOLL)
#ifdef POCO_OS_FAMILY_WINDOWS #ifdef POCO_OS_FAMILY_WINDOWS
#include "Poco/Net/ServerSocket.h" #include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/SocketAddress.h" #include "Poco/Net/SocketAddress.h"
#include "wepoll.h" #include "wepoll.h"
#else #else
@ -48,16 +48,13 @@ namespace Net {
#ifdef WEPOLL_H_ #ifdef WEPOLL_H_
namespace
namespace {
int close(HANDLE h)
{ {
return epoll_close(h); int close(HANDLE h)
{
return epoll_close(h);
}
} }
}
#endif // WEPOLL_H_ #endif // WEPOLL_H_
@ -72,11 +69,7 @@ public:
static const epoll_event EPOLL_NULL_EVENT; static const epoll_event EPOLL_NULL_EVENT;
PollSetImpl(): _events(FD_SETSIZE, EPOLL_NULL_EVENT), PollSetImpl(): _events(FD_SETSIZE, EPOLL_NULL_EVENT),
#if defined(WEPOLL_H_)
_eventfd(eventfd()),
#else
_eventfd(eventfd(0, 0)), _eventfd(eventfd(0, 0)),
#endif // WEPOLL_H_
_epollfd(epoll_create(1)) _epollfd(epoll_create(1))
{ {
int err = addFD(_eventfd, PollSet::POLL_READ, EPOLL_CTL_ADD); int err = addFD(_eventfd, PollSet::POLL_READ, EPOLL_CTL_ADD);
@ -174,8 +167,7 @@ public:
while (true) while (true)
{ {
Poco::Timestamp start; Poco::Timestamp start;
rc = epoll_wait(_epollfd, &_events[0], rc = epoll_wait(_epollfd, &_events[0], static_cast<int>(_events.size()), static_cast<int>(remainingTime.totalMilliseconds()));
static_cast<int>(_events.size()), static_cast<int>(remainingTime.totalMilliseconds()));
if (rc == 0) if (rc == 0)
{ {
if (keepWaiting(start, remainingTime)) continue; if (keepWaiting(start, remainingTime)) continue;
@ -185,7 +177,10 @@ public:
// if we are hitting the events limit, resize it; even without resizing, the subseqent // if we are hitting the events limit, resize it; even without resizing, the subseqent
// calls would round-robin through the remaining ready sockets, but it's better to give // calls would round-robin through the remaining ready sockets, but it's better to give
// the call enough room once we start hitting the boundary // the call enough room once we start hitting the boundary
if (rc >= _events.size()) _events.resize(_events.size()*2); if (rc >= _events.size())
{
_events.resize(_events.size()*2);
}
else if (rc < 0) else if (rc < 0)
{ {
// if interrupted and there's still time left, keep waiting // if interrupted and there's still time left, keep waiting
@ -217,11 +212,15 @@ public:
} }
else if (_events[i].events & EPOLLIN) // eventfd signaled else if (_events[i].events & EPOLLIN) // eventfd signaled
{ {
uint64_t val;
#ifdef WEPOLL_H_ #ifdef WEPOLL_H_
if (_pSocket && _pSocket->available()) if (_pEventSocket)
_pSocket->impl()->receiveBytes(&val, sizeof(val)); {
char val;
Poco::Net::SocketAddress sa;
_pEventSocket->receiveFrom(&val, sizeof(val), sa);
}
#else #else
std::uint64_t val;
read(_eventfd, &val, sizeof(val)); read(_eventfd, &val, sizeof(val));
#endif #endif
} }
@ -231,19 +230,14 @@ public:
void wakeUp() void wakeUp()
{ {
uint64_t val = 1;
#ifdef WEPOLL_H_ #ifdef WEPOLL_H_
#ifdef POCO_HAS_UNIX_SOCKET char val = 1;
poco_check_ptr (_pSockFile); _pEventSocket->sendTo(&val, sizeof(val), _pEventSocket->address());
StreamSocket ss(SocketAddress(_pSockFile->path()));
#else
StreamSocket ss(SocketAddress("127.0.0.1", _port));
#endif
ss.sendBytes(&val, sizeof(val));
#else #else
// This is guaranteed to write into a valid fd, // This is guaranteed to write into a valid fd,
// or 0 (meaning PollSet is being destroyed). // or 0 (meaning PollSet is being destroyed).
// Errors are ignored. // Errors are ignored.
std::uint64_t val = 1;
write(_eventfd, &val, sizeof(val)); write(_eventfd, &val, sizeof(val));
#endif #endif
} }
@ -319,31 +313,18 @@ private:
#else // WEPOLL_H_ #else // WEPOLL_H_
using EPollHandle = std::atomic<HANDLE>; using EPollHandle = std::atomic<HANDLE>;
#ifdef POCO_HAS_UNIX_SOCKET int eventfd(int, int)
int eventfd() {
static const SocketAddress eventSA("127.0.0.238"s, 0);
if (!_pEventSocket)
{ {
if (!_pSockFile) _pEventSocket.reset(new DatagramSocket(eventSA, true));
{ _pEventSocket->setBlocking(false);
_pSockFile.reset(new TemporaryFile);
_pSocket.reset(new ServerSocket(SocketAddress(_pSockFile->path())));
}
_pSocket->setBlocking(false);
return static_cast<int>(_pSocket->impl()->sockfd());
} }
std::unique_ptr<TemporaryFile> _pSockFile; return static_cast<int>(_pEventSocket->impl()->sockfd());
#else // no unix socket, listen on localhost }
int eventfd()
{
if (!_pSocket)
_pSocket.reset(new ServerSocket(SocketAddress("127.0.0.1", 0)));
_port = _pSocket->address().port();
_pSocket->setBlocking(false);
return static_cast<int>(_pSocket->impl()->sockfd());
}
int _port = 0;
#endif // POCO_HAS_UNIX_SOCKET
std::unique_ptr<ServerSocket> _pSocket; std::unique_ptr<DatagramSocket> _pEventSocket;
#endif // WEPOLL_H_ #endif // WEPOLL_H_
mutable Mutex _mutex; mutable Mutex _mutex;
@ -353,8 +334,10 @@ private:
EPollHandle _epollfd; EPollHandle _epollfd;
}; };
const epoll_event PollSetImpl::EPOLL_NULL_EVENT = {0, {0}}; const epoll_event PollSetImpl::EPOLL_NULL_EVENT = {0, {0}};
#elif defined(POCO_HAVE_FD_POLL) #elif defined(POCO_HAVE_FD_POLL)
@ -529,7 +512,6 @@ public:
} }
private: private:
void setMode(short& target, int mode) void setMode(short& target, int mode)
{ {
if (mode & PollSet::POLL_READ) if (mode & PollSet::POLL_READ)