This commit is contained in:
Alex Fabijanic 2021-04-21 07:47:57 +02:00
parent cd6422fde3
commit b27f440e36
5 changed files with 162 additions and 24 deletions

View File

@ -263,10 +263,8 @@ public:
if (it->fd == fd) if (it->fd == fd)
{ {
it->events = 0; it->events = 0;
if (mode & PollSet::POLL_READ) it->revents = 0;
it->events |= POLLIN; setMode(it->fd, it->events, mode);
if (mode & PollSet::POLL_WRITE)
it->events |= POLLOUT;
} }
} }
} }
@ -307,11 +305,7 @@ public:
pfd.fd = it->first; pfd.fd = it->first;
pfd.events = 0; pfd.events = 0;
pfd.revents = 0; pfd.revents = 0;
if (it->second & PollSet::POLL_READ) setMode(pfd.fd, pfd.events, it->second);
pfd.events |= POLLIN;
if (it->second & PollSet::POLL_WRITE)
pfd.events |= POLLOUT;
_pollfds.push_back(pfd); _pollfds.push_back(pfd);
} }
_addMap.clear(); _addMap.clear();
@ -325,9 +319,15 @@ public:
{ {
Poco::Timestamp start; Poco::Timestamp start;
#ifdef _WIN32 #ifdef _WIN32
rc = WSAPoll(&_pollfds[0], static_cast<ULONG>(_pollfds.size()), static_cast<INT>(timeout.totalMilliseconds())); rc = WSAPoll(&_pollfds[0], static_cast<ULONG>(_pollfds.size()), static_cast<INT>(remainingTime.totalMilliseconds()));
// see https://github.com/pocoproject/poco/issues/3248
if ((remainingTime > 0) && (rc > 0) && !hasSignaledFDs())
{
rc = -1;
WSASetLastError(WSAEINTR);
}
#else #else
rc = ::poll(&_pollfds[0], _pollfds.size(), timeout.totalMilliseconds()); rc = ::poll(&_pollfds[0], _pollfds.size(), remainingTime.totalMilliseconds());
#endif #endif
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR) if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{ {
@ -352,16 +352,20 @@ public:
std::map<poco_socket_t, Socket>::const_iterator its = _socketMap.find(it->fd); std::map<poco_socket_t, Socket>::const_iterator its = _socketMap.find(it->fd);
if (its != _socketMap.end()) if (its != _socketMap.end())
{ {
if (it->revents & POLLIN) if ((it->revents & POLLIN)
#ifdef _WIN32
|| (it->revents & POLLHUP)
#endif
)
result[its->second] |= PollSet::POLL_READ; result[its->second] |= PollSet::POLL_READ;
if (it->revents & POLLOUT) if ((it->revents & POLLOUT)
#ifdef _WIN32
&& (_wantPOLLOUT.find(it->fd) != _wantPOLLOUT.end())
#endif
)
result[its->second] |= PollSet::POLL_WRITE; result[its->second] |= PollSet::POLL_WRITE;
if (it->revents & POLLERR) if (it->revents & POLLERR)
result[its->second] |= PollSet::POLL_ERROR; result[its->second] |= PollSet::POLL_ERROR;
#ifdef _WIN32
if (it->revents & POLLHUP)
result[its->second] |= PollSet::POLL_READ;
#endif
} }
it->revents = 0; it->revents = 0;
} }
@ -372,8 +376,52 @@ public:
} }
private: private:
#ifdef _WIN32
void setMode(poco_socket_t fd, short& target, int mode)
{
if (mode & PollSet::POLL_READ)
target |= POLLIN;
if (mode & PollSet::POLL_WRITE)
_wantPOLLOUT.insert(fd);
else
_wantPOLLOUT.erase(fd);
target |= POLLOUT;
}
bool hasSignaledFDs()
{
for (const auto& pollfd : _pollfds)
{
if ((pollfd.revents | POLLOUT) &&
(_wantPOLLOUT.find(pollfd.fd) != _wantPOLLOUT.end()))
{
return true;
}
}
return false;
}
#else
void setMode(poco_socket_t fd, short& target, int mode)
{
if (mode & PollSet::POLL_READ)
target |= POLLIN;
if (mode & PollSet::POLL_WRITE)
target |= POLLOUT;
}
#endif
mutable Poco::FastMutex _mutex; mutable Poco::FastMutex _mutex;
std::map<poco_socket_t, Socket> _socketMap; std::map<poco_socket_t, Socket> _socketMap;
#ifdef _WIN32
std::set<poco_socket_t> _wantPOLLOUT;
#endif
std::map<poco_socket_t, int> _addMap; std::map<poco_socket_t, int> _addMap;
std::set<poco_socket_t> _removeSet; std::set<poco_socket_t> _removeSet;
std::vector<pollfd> _pollfds; std::vector<pollfd> _pollfds;

View File

@ -23,7 +23,8 @@ using Poco::Net::SocketAddress;
EchoServer::EchoServer(): EchoServer::EchoServer():
_socket(SocketAddress()), _socket(SocketAddress()),
_thread("EchoServer"), _thread("EchoServer"),
_stop(false) _stop(false),
_done(false)
{ {
_thread.start(*this); _thread.start(*this);
_ready.wait(); _ready.wait();
@ -33,7 +34,8 @@ EchoServer::EchoServer():
EchoServer::EchoServer(const Poco::Net::SocketAddress& address): EchoServer::EchoServer(const Poco::Net::SocketAddress& address):
_socket(address), _socket(address),
_thread("EchoServer"), _thread("EchoServer"),
_stop(false) _stop(false),
_done(false)
{ {
_thread.start(*this); _thread.start(*this);
_ready.wait(); _ready.wait();
@ -78,5 +80,18 @@ void EchoServer::run()
} }
} }
} }
_done = true;
}
void EchoServer::stop()
{
_stop = true;
}
bool EchoServer::done()
{
return _done;
} }

View File

@ -40,11 +40,18 @@ public:
void run(); void run();
/// Does the work. /// Does the work.
void stop();
/// Sets the stop flag.
bool done();
/// Retruns true if if server is done.
private: private:
Poco::Net::ServerSocket _socket; Poco::Net::ServerSocket _socket;
Poco::Thread _thread; Poco::Thread _thread;
Poco::Event _ready; Poco::Event _ready;
bool _stop; bool _stop;
bool _done;
}; };

View File

@ -28,6 +28,7 @@ using Poco::Net::ConnectionRefusedException;
using Poco::Net::PollSet; using Poco::Net::PollSet;
using Poco::Timespan; using Poco::Timespan;
using Poco::Stopwatch; using Poco::Stopwatch;
using Poco::Thread;
PollSetTest::PollSetTest(const std::string& name): CppUnit::TestCase(name) PollSetTest::PollSetTest(const std::string& name): CppUnit::TestCase(name)
@ -76,7 +77,7 @@ void PollSetTest::testPoll()
assertTrue (sm.find(ss1) != sm.end()); assertTrue (sm.find(ss1) != sm.end());
assertTrue (sm.find(ss2) == sm.end()); assertTrue (sm.find(ss2) == sm.end());
assertTrue (sm.find(ss1)->second == PollSet::POLL_WRITE); assertTrue (sm.find(ss1)->second == PollSet::POLL_WRITE);
assertTrue (sw.elapsed() < 100000); assertTrue (sw.elapsed() < 1100000);
ps.update(ss1, PollSet::POLL_READ); ps.update(ss1, PollSet::POLL_READ);
@ -87,7 +88,7 @@ void PollSetTest::testPoll()
assertTrue (sm.find(ss1) != sm.end()); assertTrue (sm.find(ss1) != sm.end());
assertTrue (sm.find(ss2) == sm.end()); assertTrue (sm.find(ss2) == sm.end());
assertTrue (sm.find(ss1)->second == PollSet::POLL_READ); assertTrue (sm.find(ss1)->second == PollSet::POLL_READ);
assertTrue (sw.elapsed() < 100000); assertTrue (sw.elapsed() < 1100000);
int n = ss1.receiveBytes(buffer, sizeof(buffer)); int n = ss1.receiveBytes(buffer, sizeof(buffer));
assertTrue (n == 5); assertTrue (n == 5);
@ -100,7 +101,7 @@ void PollSetTest::testPoll()
assertTrue (sm.find(ss1) == sm.end()); assertTrue (sm.find(ss1) == sm.end());
assertTrue (sm.find(ss2) != sm.end()); assertTrue (sm.find(ss2) != sm.end());
assertTrue (sm.find(ss2)->second == PollSet::POLL_READ); assertTrue (sm.find(ss2)->second == PollSet::POLL_READ);
assertTrue (sw.elapsed() < 100000); assertTrue (sw.elapsed() < 1100000);
n = ss2.receiveBytes(buffer, sizeof(buffer)); n = ss2.receiveBytes(buffer, sizeof(buffer));
assertTrue (n == 5); assertTrue (n == 5);
@ -125,6 +126,69 @@ void PollSetTest::testPoll()
} }
void PollSetTest::testPollNoServer()
{
StreamSocket ss1;
StreamSocket ss2;
ss1.connectNB(SocketAddress("127.0.0.1", 0xFEFE));
ss2.connectNB(SocketAddress("127.0.0.1", 0xFEFF));
PollSet ps;
assertTrue(ps.empty());
ps.add(ss1, PollSet::POLL_READ);
ps.add(ss2, PollSet::POLL_READ);
assertTrue(!ps.empty());
assertTrue(ps.has(ss1));
assertTrue(ps.has(ss2));
PollSet::SocketModeMap sm;
Stopwatch sw; sw.start();
do
{
sm = ps.poll(Timespan(1000000));
if (sw.elapsedSeconds() > 10) fail();
} while (sm.size() < 2);
assertTrue(sm.size() == 2);
for (auto s : sm)
assertTrue(0 != (s.second | PollSet::POLL_ERROR));
}
void PollSetTest::testPollClosedServer()
{
EchoServer echoServer1;
EchoServer echoServer2;
StreamSocket ss1;
StreamSocket ss2;
ss1.connectNB(SocketAddress("127.0.0.1", echoServer1.port()));
ss2.connectNB(SocketAddress("127.0.0.1", echoServer2.port()));
PollSet ps;
assertTrue(ps.empty());
ps.add(ss1, PollSet::POLL_READ);
ps.add(ss2, PollSet::POLL_READ);
assertTrue(!ps.empty());
assertTrue(ps.has(ss1));
assertTrue(ps.has(ss2));
echoServer1.stop();
ss1.sendBytes("HELLO", 5);
while (!echoServer1.done()) Thread::sleep(10);
echoServer2.stop();
ss2.sendBytes("HELLO", 5);
while (!echoServer2.done()) Thread::sleep(10);
PollSet::SocketModeMap sm;
Stopwatch sw; sw.start();
do
{
sm = ps.poll(Timespan(1000000));
if (sw.elapsedSeconds() > 10) fail();
} while (sm.size() < 2);
assertTrue(sm.size() == 2);
assertTrue(0 == ss1.receiveBytes(0, 0));
assertTrue(0 == ss2.receiveBytes(0, 0));
}
void PollSetTest::setUp() void PollSetTest::setUp()
{ {
} }
@ -140,6 +204,8 @@ CppUnit::Test* PollSetTest::suite()
CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("PollSetTest"); CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("PollSetTest");
CppUnit_addTest(pSuite, PollSetTest, testPoll); CppUnit_addTest(pSuite, PollSetTest, testPoll);
CppUnit_addTest(pSuite, PollSetTest, testPollNoServer);
CppUnit_addTest(pSuite, PollSetTest, testPollClosedServer);
return pSuite; return pSuite;
} }

View File

@ -25,6 +25,8 @@ public:
~PollSetTest(); ~PollSetTest();
void testPoll(); void testPoll();
void testPollNoServer();
void testPollClosedServer();
void setUp(); void setUp();
void tearDown(); void tearDown();