mirror of
https://github.com/pocoproject/poco.git
synced 2025-01-18 00:15:27 +01:00
Add support of epoll and kqueue into Socket::select and SocketImpl::poll
This commit is contained in:
parent
ee893eb764
commit
acba77b3ce
@ -184,6 +184,15 @@
|
||||
#define POCO_ARCH_BIG_ENDIAN 1
|
||||
#endif
|
||||
|
||||
//TODO: need to determine Linux > 2.6.0
|
||||
#if (POCO_OS == POCO_OS_LINUX)
|
||||
#define POCO_HAVE_FD_EPOLL 1
|
||||
#endif
|
||||
|
||||
//TODO: need to determine which of FreeBSD have kqueue
|
||||
#if (POCO_OS == POCO_OS_FREE_BSD)
|
||||
#define POCO_HAVE_FD_KQUEUE 1
|
||||
#endif
|
||||
|
||||
//TODO: determine all platforms having poll() call
|
||||
#if (POCO_OS == POCO_OS_SOLARIS) || (POCO_OS == POCO_OS_LINUX)
|
||||
|
@ -39,7 +39,15 @@
|
||||
#include "Poco/Timestamp.h"
|
||||
#include <algorithm>
|
||||
#include <string.h> // FD_SET needs memset on some platforms, so we can't use <cstring>
|
||||
#if defined(POCO_HAVE_FD_POLL)
|
||||
|
||||
|
||||
#if defined(POCO_HAVE_FD_EPOLL)
|
||||
#include <sys/epoll.h>
|
||||
#elif defined(POCO_HAVE_FD_KQUEUE)
|
||||
#include <sys/types.h>
|
||||
#include <sys/event.h>
|
||||
#include <sys/time.h>
|
||||
#elif defined(POCO_HAVE_FD_POLL)
|
||||
#include "Poco/SharedPtr.h"
|
||||
#include <poll.h>
|
||||
typedef Poco::SharedPtr<pollfd,
|
||||
@ -47,6 +55,7 @@
|
||||
Poco::ReleaseArrayPolicy<pollfd> > SharedPollArray;
|
||||
#endif
|
||||
|
||||
|
||||
namespace Poco {
|
||||
namespace Net {
|
||||
|
||||
@ -93,7 +102,303 @@ Socket::~Socket()
|
||||
|
||||
int Socket::select(SocketList& readList, SocketList& writeList, SocketList& exceptList, const Poco::Timespan& timeout)
|
||||
{
|
||||
#if defined(POCO_HAVE_FD_POLL)
|
||||
#if defined(POCO_HAVE_FD_EPOLL)
|
||||
#warning "Poco use EPOLL for Socket::select"
|
||||
//
|
||||
// Size of epoll queue
|
||||
//
|
||||
int epoll_size = readList.size() + writeList.size() + exceptList.size();
|
||||
|
||||
//
|
||||
// If nothing to do, return 0
|
||||
//
|
||||
if (epoll_size == 0)
|
||||
return 0;
|
||||
|
||||
//
|
||||
// Fill epoll queue
|
||||
//
|
||||
int epollfd = -1;
|
||||
{
|
||||
//
|
||||
// Epoll events to be filled
|
||||
//
|
||||
struct epoll_event events_in[epoll_size];
|
||||
memset(events_in, 0, sizeof (events_in));
|
||||
|
||||
//
|
||||
// Current epoll event to be filled
|
||||
//
|
||||
struct epoll_event* event_last = events_in;
|
||||
|
||||
for (SocketList::iterator it = readList.begin(); it != readList.end(); ++it)
|
||||
{
|
||||
if (it->sockfd() != POCO_INVALID_SOCKET)
|
||||
{
|
||||
//
|
||||
// Try to find file descriptor in epoll events
|
||||
//
|
||||
struct epoll_event* e = events_in;
|
||||
for (; e != event_last; ++e)
|
||||
{
|
||||
if (reinterpret_cast<Socket*> (e->data.ptr)->sockfd() == it->sockfd())
|
||||
break;
|
||||
}
|
||||
|
||||
//
|
||||
// If not found allocate new epoll event
|
||||
//
|
||||
if (e == event_last)
|
||||
{
|
||||
e->data.ptr = &(*it);
|
||||
|
||||
++event_last;
|
||||
}
|
||||
|
||||
e->events |= EPOLLIN;
|
||||
}
|
||||
}
|
||||
|
||||
for (SocketList::iterator it = writeList.begin(); it != writeList.end(); ++it)
|
||||
{
|
||||
if (it->sockfd() != POCO_INVALID_SOCKET)
|
||||
{
|
||||
//
|
||||
// Try to find file descriptor in epoll events
|
||||
//
|
||||
struct epoll_event* e = events_in;
|
||||
for (; e != event_last; ++e)
|
||||
{
|
||||
if (reinterpret_cast<Socket*> (e->data.ptr)->sockfd() == it->sockfd())
|
||||
break;
|
||||
}
|
||||
|
||||
//
|
||||
// If not found allocate new epoll event
|
||||
//
|
||||
if (e == event_last)
|
||||
{
|
||||
e->data.ptr = &(*it);
|
||||
|
||||
++event_last;
|
||||
}
|
||||
|
||||
e->events |= EPOLLOUT;
|
||||
}
|
||||
}
|
||||
|
||||
for (SocketList::iterator it = exceptList.begin(); it != exceptList.end(); ++it)
|
||||
{
|
||||
if (it->sockfd() != POCO_INVALID_SOCKET)
|
||||
{
|
||||
//
|
||||
// Try to find file descriptor in epoll events
|
||||
//
|
||||
struct epoll_event* e = events_in;
|
||||
for (; e != event_last; ++e)
|
||||
{
|
||||
if (reinterpret_cast<Socket*> (e->data.ptr)->sockfd() == it->sockfd())
|
||||
break;
|
||||
}
|
||||
|
||||
//
|
||||
// If not found allocate new epoll event
|
||||
//
|
||||
if (e == event_last)
|
||||
{
|
||||
e->data.ptr = &(*it);
|
||||
|
||||
++event_last;
|
||||
}
|
||||
|
||||
e->events |= EPOLLERR;
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Recalculate real epoll queue size
|
||||
//
|
||||
epoll_size = event_last - events_in;
|
||||
|
||||
//
|
||||
// Allocate epoll queue
|
||||
//
|
||||
epollfd = epoll_create(epoll_size);
|
||||
if (epollfd < 0)
|
||||
{
|
||||
char buf[4000];
|
||||
strerror_r(errno, buf, sizeof(buf));
|
||||
|
||||
SocketImpl::error(std::string("Can't create epoll - ") + buf);
|
||||
}
|
||||
|
||||
//
|
||||
// Place epoll events into epoll queue
|
||||
//
|
||||
for (struct epoll_event* e = events_in; e != event_last; ++e)
|
||||
{
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, reinterpret_cast<Socket*> (e->data.ptr)->sockfd(), e) < 0)
|
||||
{
|
||||
char buf[4000];
|
||||
strerror_r(errno, buf, sizeof(buf));
|
||||
|
||||
::close(epollfd);
|
||||
SocketImpl::error(std::string("Can't insert socket to epoll - ") + buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct epoll_event events_out[epoll_size];
|
||||
memset(events_out, 0, sizeof (events_out));
|
||||
|
||||
Poco::Timespan remainingTime(timeout);
|
||||
int rc;
|
||||
do
|
||||
{
|
||||
Poco::Timestamp start;
|
||||
rc = epoll_wait(epollfd, events_out, epoll_size, remainingTime.totalMilliseconds());
|
||||
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
|
||||
{
|
||||
Poco::Timestamp end;
|
||||
Poco::Timespan waited = end - start;
|
||||
if (waited < remainingTime)
|
||||
remainingTime -= waited;
|
||||
else
|
||||
remainingTime = 0;
|
||||
}
|
||||
}
|
||||
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
|
||||
|
||||
//
|
||||
// Close epoll queue
|
||||
//
|
||||
::close(epollfd);
|
||||
|
||||
if (rc < 0) SocketImpl::error();
|
||||
|
||||
SocketList readyReadList;
|
||||
SocketList readyWriteList;
|
||||
SocketList readyExceptList;
|
||||
for (int n = 0; n < rc; ++n)
|
||||
{
|
||||
if (events_out[n].events & EPOLLERR)
|
||||
readyExceptList.push_back(*reinterpret_cast<Socket*>(events_out[n].data.ptr));
|
||||
if (events_out[n].events & EPOLLIN)
|
||||
readyReadList.push_back(*reinterpret_cast<Socket*>(events_out[n].data.ptr));
|
||||
if (events_out[n].events & EPOLLOUT)
|
||||
readyWriteList.push_back(*reinterpret_cast<Socket*>(events_out[n].data.ptr));
|
||||
}
|
||||
std::swap(readList, readyReadList);
|
||||
std::swap(writeList, readyWriteList);
|
||||
std::swap(exceptList, readyExceptList);
|
||||
return readList.size() + writeList.size() + exceptList.size();
|
||||
|
||||
#elif defined(POCO_HAVE_FD_KQUEUE)
|
||||
#warning "Poco use KQUEUE for Socket::select"
|
||||
//
|
||||
// Size of kqueue queue
|
||||
//
|
||||
int kqueue_size = readList.size() + writeList.size() + exceptList.size();
|
||||
|
||||
//
|
||||
// If nothing to do, return 0
|
||||
//
|
||||
if (kqueue_size == 0)
|
||||
return 0;
|
||||
|
||||
//
|
||||
// Create kevent queue
|
||||
//
|
||||
int kqueuefd = kqueue();
|
||||
if (kqueuefd < 0)
|
||||
{
|
||||
char buf[4000];
|
||||
strerror_r(errno, buf, sizeof(buf));
|
||||
|
||||
SocketImpl::error(std::string("Can't create kqueue - ") + buf);
|
||||
}
|
||||
|
||||
//
|
||||
// Allocate in/out kevent queues
|
||||
//
|
||||
struct kevent events_in[kqueue_size];
|
||||
struct kevent events_out[kqueue_size];
|
||||
memset(&events_in , 0, sizeof(events_in));
|
||||
memset(&events_out, 0, sizeof(events_out));
|
||||
|
||||
//
|
||||
// Add sockets to events_in list for appropriate event
|
||||
//
|
||||
for (size_t i = 0; i < readList.size(); ++i)
|
||||
{
|
||||
if (readList[i].sockfd () != POCO_INVALID_SOCKET)
|
||||
{
|
||||
EV_SET(events_in + i, readList[i].sockfd (), EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, &readList[i]);
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < writeList.size(); ++i)
|
||||
{
|
||||
if (writeList[i].sockfd () != POCO_INVALID_SOCKET)
|
||||
{
|
||||
EV_SET(events_in + readList.size () + i, writeList[i].sockfd (), EVFILT_WRITE, EV_ADD|EV_CLEAR, 0, 0, &writeList[i]);
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < exceptList.size(); ++i)
|
||||
{
|
||||
if (exceptList[i].sockfd () != POCO_INVALID_SOCKET)
|
||||
{
|
||||
EV_SET(events_in + readList.size () + writeList.size () + i, exceptList[i].sockfd (), EVFILT_READ/*FIXME*/, EV_ADD|EV_CLEAR, 0, 0, &exceptList[i]);
|
||||
}
|
||||
}
|
||||
|
||||
Poco::Timespan remainingTime(timeout);
|
||||
int rc;
|
||||
do
|
||||
{
|
||||
struct timespec ts;
|
||||
ts.tv_sec = (long)remainingTime.totalSeconds();
|
||||
ts.tv_nsec = (long)remainingTime.useconds();
|
||||
|
||||
Poco::Timestamp start;
|
||||
rc = kevent(kqueuefd, events_in, kqueue_size, events_out, kqueue_size, &ts);
|
||||
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
|
||||
{
|
||||
Poco::Timestamp end;
|
||||
Poco::Timespan waited = end - start;
|
||||
if (waited < remainingTime)
|
||||
remainingTime -= waited;
|
||||
else
|
||||
remainingTime = 0;
|
||||
}
|
||||
}
|
||||
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
|
||||
|
||||
//
|
||||
// Close kqueue
|
||||
//
|
||||
::close(kqueuefd);
|
||||
|
||||
if (rc < 0) SocketImpl::error();
|
||||
|
||||
SocketList readyReadList;
|
||||
SocketList readyWriteList;
|
||||
SocketList readyExceptList;
|
||||
for (int n = 0; n < rc; ++n)
|
||||
{
|
||||
if (events_out[n].flags & EV_ERROR)
|
||||
readyExceptList.push_back(*reinterpret_cast<Socket*>(events_out[n].udata));
|
||||
else if (events_out[n].filter == EVFILT_READ)
|
||||
readyReadList.push_back(*reinterpret_cast<Socket*>(events_out[n].udata));
|
||||
else if (events_out[n].filter == EVFILT_WRITE)
|
||||
readyWriteList.push_back(*reinterpret_cast<Socket*>(events_out[n].udata));
|
||||
}
|
||||
std::swap(readList, readyReadList);
|
||||
std::swap(writeList, readyWriteList);
|
||||
std::swap(exceptList, readyExceptList);
|
||||
return readList.size() + writeList.size() + exceptList.size();
|
||||
|
||||
#elif defined(POCO_HAVE_FD_POLL)
|
||||
#warning "Poco use POLL for Socket::select"
|
||||
nfds_t nfd = readList.size() + writeList.size() + exceptList.size();
|
||||
if (0 == nfd) return 0;
|
||||
|
||||
@ -177,6 +482,7 @@ int Socket::select(SocketList& readList, SocketList& writeList, SocketList& exce
|
||||
return readList.size() + writeList.size() + exceptList.size();
|
||||
|
||||
#else
|
||||
#warning "Poco use SELECT for Socket::select"
|
||||
fd_set fdRead;
|
||||
fd_set fdWrite;
|
||||
fd_set fdExcept;
|
||||
|
@ -40,9 +40,19 @@
|
||||
#include "Poco/NumberFormatter.h"
|
||||
#include "Poco/Timestamp.h"
|
||||
#include <string.h> // FD_SET needs memset on some platforms, so we can't use <cstring>
|
||||
#if defined(POCO_HAVE_FD_POLL)
|
||||
#include <poll.h>
|
||||
|
||||
|
||||
#if defined(POCO_HAVE_FD_EPOLL)
|
||||
#include <sys/epoll.h>
|
||||
#elif defined(POCO_HAVE_FD_KQUEUE)
|
||||
#include <sys/types.h>
|
||||
#include <sys/event.h>
|
||||
#include <sys/time.h>
|
||||
#elif defined(POCO_HAVE_FD_POLL)
|
||||
#include <poll.h>
|
||||
#endif
|
||||
|
||||
|
||||
#if defined(sun) || defined(__sun) || defined(__sun__)
|
||||
#include <unistd.h>
|
||||
#include <stropts.h>
|
||||
@ -349,7 +359,146 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode)
|
||||
{
|
||||
poco_assert (_sockfd != POCO_INVALID_SOCKET);
|
||||
|
||||
#if defined(POCO_HAVE_FD_POLL)
|
||||
#if defined(POCO_HAVE_FD_EPOLL)
|
||||
#warning "Poco use EPOLL for SocketImpl::poll"
|
||||
//
|
||||
// Allocate epoll queue
|
||||
//
|
||||
int epollfd = epoll_create(1);
|
||||
if (epollfd < 0)
|
||||
{
|
||||
char buf[4000];
|
||||
strerror_r(errno, buf, sizeof(buf));
|
||||
|
||||
error(std::string("Can't create epoll - ") + buf);
|
||||
}
|
||||
|
||||
//
|
||||
// Fill epoll event
|
||||
//
|
||||
struct epoll_event ev_in;
|
||||
memset(&ev_in, 0, sizeof(ev_in));
|
||||
|
||||
if (mode & SELECT_READ)
|
||||
ev_in.events |= EPOLLIN;
|
||||
if (mode & SELECT_WRITE)
|
||||
ev_in.events |= EPOLLOUT;
|
||||
if (mode & SELECT_ERROR)
|
||||
ev_in.events |= EPOLLERR;
|
||||
|
||||
//
|
||||
// Add epoll event to epoll queue
|
||||
//
|
||||
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, _sockfd, &ev_in) < 0)
|
||||
{
|
||||
char buf[4000];
|
||||
strerror_r(errno, buf, sizeof(buf));
|
||||
|
||||
::close(epollfd);
|
||||
error(std::string("Can't insert socket to epoll - ") + buf);
|
||||
}
|
||||
|
||||
Poco::Timespan remainingTime(timeout);
|
||||
int rc;
|
||||
do
|
||||
{
|
||||
struct epoll_event ev_out;
|
||||
memset(&ev_out, 0, sizeof(ev_out));
|
||||
|
||||
Poco::Timestamp start;
|
||||
rc = epoll_wait(epollfd, &ev_out, 1, remainingTime.totalMilliseconds());
|
||||
if (rc < 0 && lastError() == POCO_EINTR)
|
||||
{
|
||||
Poco::Timestamp end;
|
||||
Poco::Timespan waited = end - start;
|
||||
if (waited < remainingTime)
|
||||
remainingTime -= waited;
|
||||
else
|
||||
remainingTime = 0;
|
||||
}
|
||||
}
|
||||
while (rc < 0 && lastError() == POCO_EINTR);
|
||||
|
||||
//
|
||||
// Close epoll
|
||||
//
|
||||
::close(epollfd);
|
||||
|
||||
if (rc < 0) error();
|
||||
return rc > 0;
|
||||
|
||||
#elif defined(POCO_HAVE_FD_KQUEUE)
|
||||
#warning "Poco use KQUEUE for SocketImpl::poll"
|
||||
//
|
||||
// Allocate kevent queue
|
||||
//
|
||||
int kqueuefd = kqueue();
|
||||
if (kqueuefd < 0)
|
||||
{
|
||||
char buf[4000];
|
||||
strerror_r(errno, buf, sizeof(buf));
|
||||
|
||||
error(std::string("Can't create kqueue - ") + buf);
|
||||
}
|
||||
|
||||
//
|
||||
// Fill kevent queue
|
||||
//
|
||||
struct kevent events_in[3];
|
||||
memset(events_in, 0, sizeof(events_in));
|
||||
|
||||
int kqueue_size = 0;
|
||||
if (mode & SELECT_READ)
|
||||
{
|
||||
EV_SET(&events_in[kqueue_size], _sockfd, EVFILT_READ, EV_ADD|EV_CLEAR, 0, 0, &events_in[kqueue_size]);
|
||||
++kqueue_size;
|
||||
}
|
||||
if (mode & SELECT_WRITE)
|
||||
{
|
||||
EV_SET(&events_in[kqueue_size], _sockfd, EVFILT_WRITE, EV_ADD|EV_CLEAR, 0, 0, &events_in[kqueue_size]);
|
||||
++kqueue_size;
|
||||
}
|
||||
if (mode & SELECT_ERROR)
|
||||
{
|
||||
EV_SET(&events_in[kqueue_size], _sockfd, EVFILT_READ/*FIXME:*/, EV_ADD|EV_CLEAR, 0, 0, &events_in[kqueue_size]);
|
||||
++kqueue_size;
|
||||
}
|
||||
|
||||
Poco::Timespan remainingTime(timeout);
|
||||
int rc;
|
||||
do
|
||||
{
|
||||
struct kevent events_out[kqueue_size];
|
||||
memset(events_out, 0, sizeof(events_out));
|
||||
|
||||
struct timespec ts;
|
||||
ts.tv_sec = (long)remainingTime.totalSeconds();
|
||||
ts.tv_nsec = (long)remainingTime.useconds();
|
||||
|
||||
Poco::Timestamp start;
|
||||
rc = kevent(kqueuefd, events_in, kqueue_size, events_out, kqueue_size, &ts);
|
||||
if (rc < 0 && lastError() == POCO_EINTR)
|
||||
{
|
||||
Poco::Timestamp end;
|
||||
Poco::Timespan waited = end - start;
|
||||
if (waited < remainingTime)
|
||||
remainingTime -= waited;
|
||||
else
|
||||
remainingTime = 0;
|
||||
}
|
||||
}
|
||||
while (rc < 0 && lastError() == POCO_EINTR);
|
||||
|
||||
//
|
||||
// Close kqueue
|
||||
//
|
||||
::close(kqueuefd);
|
||||
|
||||
if (rc < 0) error();
|
||||
return rc > 0;
|
||||
|
||||
#elif defined(POCO_HAVE_FD_POLL)
|
||||
#warning "Poco use POLL for SocketImpl::poll"
|
||||
pollfd pollBuf;
|
||||
|
||||
memset(&pollBuf, 0, sizeof(pollfd));
|
||||
@ -375,7 +524,9 @@ bool SocketImpl::poll(const Poco::Timespan& timeout, int mode)
|
||||
}
|
||||
}
|
||||
while (rc < 0 && lastError() == POCO_EINTR);
|
||||
|
||||
#else
|
||||
#warning "Poco use SELECT for SocketImpl::poll"
|
||||
fd_set fdRead;
|
||||
fd_set fdWrite;
|
||||
fd_set fdExcept;
|
||||
|
Loading…
x
Reference in New Issue
Block a user