trunk: backport changes from 1.4.3

This commit is contained in:
Marian Krivos 2012-02-04 21:15:22 +00:00
parent 73426e13e7
commit 1d101207f9
3 changed files with 245 additions and 224 deletions

View File

@ -14,6 +14,7 @@ if(CMAKE_SYSTEM MATCHES "Windows")
set(SYSLIBS ${SYSLIBS} ws2_32)
else (CMAKE_SYSTEM MATCHES "Windows")
set(SRCS ${BASE_SRCS} ${LIN_SRCS})
add_definitions(-DPOCO_HAVE_FD_POLL)
endif(CMAKE_SYSTEM MATCHES "Windows")
if (NOT POCO_STATIC)

View File

@ -609,6 +609,11 @@ inline SocketAddress Socket::peerAddress() const
}
inline bool Socket::secure() const
{
return _pImpl->secure();
}
inline bool Socket::supportsIPv4()
{
return true;

View File

@ -42,13 +42,13 @@
#if defined(POCO_HAVE_FD_EPOLL)
#include <sys/epoll.h>
#include <sys/epoll.h>
#elif defined(POCO_HAVE_FD_POLL)
#include "Poco/SharedPtr.h"
#include <poll.h>
typedef Poco::SharedPtr<pollfd,
Poco::ReferenceCounter,
Poco::ReleaseArrayPolicy<pollfd> > SharedPollArray;
#include "Poco/SharedPtr.h"
#include <poll.h>
typedef Poco::SharedPtr<pollfd,
Poco::ReferenceCounter,
Poco::ReleaseArrayPolicy<pollfd> > SharedPollArray;
#endif
@ -100,230 +100,221 @@ int Socket::select(SocketList& readList, SocketList& writeList, SocketList& exce
{
#if defined(POCO_HAVE_FD_EPOLL)
int epoll_size = readList.size() + writeList.size() + exceptList.size();
int epollSize = readList.size() + writeList.size() + exceptList.size();
if (epollSize == 0) return 0;
if (epoll_size == 0) return 0;
int epollfd = -1;
{
struct epoll_event eventsIn[epollSize];
memset(eventsIn, 0, sizeof(eventsIn));
struct epoll_event* eventLast = eventsIn;
for (SocketList::iterator it = readList.begin(); it != readList.end(); ++it)
{
poco_socket_t sockfd = it->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
struct epoll_event* e = eventsIn;
for (; e != eventLast; ++e)
{
if (reinterpret_cast<Socket*>(e->data.ptr)->sockfd() == sockfd)
break;
}
if (e == eventLast)
{
e->data.ptr = &(*it);
++eventLast;
}
e->events |= EPOLLIN;
}
}
int epollfd = -1;
{
struct epoll_event events_in[epoll_size];
memset(events_in, 0, sizeof (events_in));
for (SocketList::iterator it = writeList.begin(); it != writeList.end(); ++it)
{
poco_socket_t sockfd = it->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
struct epoll_event* e = eventsIn;
for (; e != eventLast; ++e)
{
if (reinterpret_cast<Socket*>(e->data.ptr)->sockfd() == sockfd)
break;
}
if (e == eventLast)
{
e->data.ptr = &(*it);
++eventLast;
}
e->events |= EPOLLOUT;
}
}
struct epoll_event* event_last = events_in;
for (SocketList::iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
poco_socket_t sockfd = it->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
struct epoll_event* e = eventsIn;
for (; e != eventLast; ++e)
{
if (reinterpret_cast<Socket*>(e->data.ptr)->sockfd() == sockfd)
break;
}
if (e == eventLast)
{
e->data.ptr = &(*it);
++eventLast;
}
e->events |= EPOLLERR;
}
}
e->events |= EPOLLERR;
}
}
for (SocketList::iterator it = readList.begin(); it != readList.end(); ++it)
{
if (it->sockfd() != POCO_INVALID_SOCKET)
{
struct epoll_event* e = events_in;
for (; e != event_last; ++e)
{
if (reinterpret_cast<Socket*> (e->data.ptr)->sockfd() == it->sockfd())
break;
}
epollSize = eventLast - eventsIn;
epollfd = epoll_create(epollSize);
if (epollfd < 0)
{
char buf[4000];
strerror_r(errno, buf, sizeof(buf));
SocketImpl::error(std::string("Can't create epoll queue: ") + buf);
}
if (e == event_last)
{
e->data.ptr = &(*it);
for (struct epoll_event* e = eventsIn; e != eventLast; ++e)
{
poco_socket_t sockfd = reinterpret_cast<Socket*>(e->data.ptr)->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, e) < 0)
{
char buf[4000];
strerror_r(errno, buf, sizeof(buf));
++event_last;
}
::close(epollfd);
SocketImpl::error(std::string("Can't insert socket to epoll queue: ") + buf);
}
}
}
}
e->events |= EPOLLIN;
}
}
struct epoll_event eventsOut[epollSize];
memset(eventsOut, 0, sizeof(eventsOut));
for (SocketList::iterator it = writeList.begin(); it != writeList.end(); ++it)
{
if (it->sockfd() != POCO_INVALID_SOCKET)
{
struct epoll_event* e = events_in;
for (; e != event_last; ++e)
{
if (reinterpret_cast<Socket*> (e->data.ptr)->sockfd() == it->sockfd())
break;
}
if (e == event_last)
{
e->data.ptr = &(*it);
++event_last;
}
e->events |= EPOLLOUT;
}
}
for (SocketList::iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
if (it->sockfd() != POCO_INVALID_SOCKET)
{
struct epoll_event* e = events_in;
for (; e != event_last; ++e)
{
if (reinterpret_cast<Socket*> (e->data.ptr)->sockfd() == it->sockfd())
break;
}
if (e == event_last)
{
e->data.ptr = &(*it);
++event_last;
}
e->events |= EPOLLERR;
}
}
epoll_size = event_last - events_in;
epollfd = epoll_create(epoll_size);
if (epollfd < 0)
{
char buf[4000];
strerror_r(errno, buf, sizeof(buf));
SocketImpl::error(std::string("Can't create epoll - ") + buf);
}
for (struct epoll_event* e = events_in; e != event_last; ++e)
{
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, reinterpret_cast<Socket*> (e->data.ptr)->sockfd(), e) < 0)
{
char buf[4000];
strerror_r(errno, buf, sizeof(buf));
::close(epollfd);
SocketImpl::error(std::string("Can't insert socket to epoll - ") + buf);
}
}
}
struct epoll_event events_out[epoll_size];
memset(events_out, 0, sizeof (events_out));
Poco::Timespan remainingTime(timeout);
int rc;
do
{
Poco::Timestamp start;
rc = epoll_wait(epollfd, events_out, epoll_size, remainingTime.totalMilliseconds());
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime)
remainingTime -= waited;
else
remainingTime = 0;
}
}
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
Poco::Timespan remainingTime(timeout);
int rc;
do
{
Poco::Timestamp start;
rc = epoll_wait(epollfd, eventsOut, epollSize, remainingTime.totalMilliseconds());
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime)
remainingTime -= waited;
else
remainingTime = 0;
::close(epollfd);
if (rc < 0) SocketImpl::error();
SocketList readyReadList;
SocketList readyWriteList;
SocketList readyExceptList;
for (int n = 0; n < rc; ++n)
{
if (events_out[n].events & EPOLLERR)
readyExceptList.push_back(*reinterpret_cast<Socket*>(events_out[n].data.ptr));
if (events_out[n].events & EPOLLIN)
readyReadList.push_back(*reinterpret_cast<Socket*>(events_out[n].data.ptr));
if (events_out[n].events & EPOLLOUT)
readyWriteList.push_back(*reinterpret_cast<Socket*>(events_out[n].data.ptr));
}
std::swap(readList, readyReadList);
std::swap(writeList, readyWriteList);
std::swap(exceptList, readyExceptList);
return readList.size() + writeList.size() + exceptList.size();
SocketList readyExceptList;
for (int n = 0; n < rc; ++n)
{
if (eventsOut[n].events & EPOLLERR)
readyExceptList.push_back(*reinterpret_cast<Socket*>(eventsOut[n].data.ptr));
if (eventsOut[n].events & EPOLLIN)
readyReadList.push_back(*reinterpret_cast<Socket*>(eventsOut[n].data.ptr));
if (eventsOut[n].events & EPOLLOUT)
readyWriteList.push_back(*reinterpret_cast<Socket*>(eventsOut[n].data.ptr));
}
std::swap(readList, readyReadList);
std::swap(writeList, readyWriteList);
std::swap(exceptList, readyExceptList);
return readList.size() + writeList.size() + exceptList.size();
#elif defined(POCO_HAVE_FD_POLL)
nfds_t nfd = readList.size() + writeList.size() + exceptList.size();
if (0 == nfd) return 0;
nfds_t nfd = readList.size() + writeList.size() + exceptList.size();
if (0 == nfd) return 0;
SharedPollArray pPollArr = new pollfd[nfd];
SharedPollArray pPollArr = new pollfd[nfd];
int idx = 0;
for (SocketList::iterator it = readList.begin(); it != readList.end(); ++it)
{
pPollArr[idx].fd = int(it->sockfd());
pPollArr[idx++].events |= POLLIN;
}
int idx = 0;
for (SocketList::iterator it = readList.begin(); it != readList.end(); ++it)
{
pPollArr[idx].fd = int(it->sockfd());
pPollArr[idx++].events |= POLLIN;
}
SocketList::iterator begR = readList.begin();
SocketList::iterator endR = readList.end();
for (SocketList::iterator it = writeList.begin(); it != writeList.end(); ++it)
{
SocketList::iterator pos = std::find(begR, endR, *it);
if (pos != endR)
{
pPollArr[pos-begR].events |= POLLOUT;
--nfd;
}
else
{
pPollArr[idx].fd = int(it->sockfd());
pPollArr[idx++].events |= POLLOUT;
}
}
SocketList::iterator begR = readList.begin();
SocketList::iterator endR = readList.end();
for (SocketList::iterator it = writeList.begin(); it != writeList.end(); ++it)
{
SocketList::iterator pos = std::find(begR, endR, *it);
if (pos != endR)
{
pPollArr[pos-begR].events |= POLLOUT;
--nfd;
}
else
{
pPollArr[idx].fd = int(it->sockfd());
pPollArr[idx++].events |= POLLOUT;
}
}
SocketList::iterator begW = writeList.begin();
SocketList::iterator endW = writeList.end();
for (SocketList::iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
SocketList::iterator pos = std::find(begR, endR, *it);
if (pos != endR) --nfd;
else
{
SocketList::iterator pos = std::find(begW, endW, *it);
if (pos != endW) --nfd;
else pPollArr[idx++].fd = int(it->sockfd());
}
}
SocketList::iterator begW = writeList.begin();
SocketList::iterator endW = writeList.end();
for (SocketList::iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
SocketList::iterator pos = std::find(begR, endR, *it);
if (pos != endR) --nfd;
else
{
SocketList::iterator pos = std::find(begW, endW, *it);
if (pos != endW) --nfd;
else pPollArr[idx++].fd = int(it->sockfd());
}
}
Poco::Timespan remainingTime(timeout);
int rc;
do
{
Poco::Timestamp start;
rc = ::poll(pPollArr, nfd, timeout.totalMilliseconds());
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime) remainingTime -= waited;
else remainingTime = 0;
}
}
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
if (rc < 0) SocketImpl::error();
Poco::Timespan remainingTime(timeout);
int rc;
do
{
Poco::Timestamp start;
rc = ::poll(pPollArr, nfd, timeout.totalMilliseconds());
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime) remainingTime -= waited;
else remainingTime = 0;
}
}
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
if (rc < 0) SocketImpl::error();
SocketList readyReadList;
SocketList readyWriteList;
SocketList readyExceptList;
SocketList readyReadList;
SocketList readyWriteList;
SocketList readyExceptList;
SocketList::iterator begE = exceptList.begin();
SocketList::iterator endE = exceptList.end();
for (int idx = 0; idx < nfd; ++idx)
{
SocketList::iterator slIt = std::find_if(begR, endR, Socket::FDCompare(pPollArr[idx].fd));
if (POLLIN & pPollArr[idx].revents && slIt != endR) readyReadList.push_back(*slIt);
slIt = std::find_if(begW, endW, Socket::FDCompare(pPollArr[idx].fd));
if (POLLOUT & pPollArr[idx].revents && slIt != endW) readyWriteList.push_back(*slIt);
slIt = std::find_if(begE, endE, Socket::FDCompare(pPollArr[idx].fd));
if (POLLERR & pPollArr[idx].revents && slIt != endE) readyExceptList.push_back(*slIt);
}
std::swap(readList, readyReadList);
std::swap(writeList, readyWriteList);
SocketList::iterator begE = exceptList.begin();
SocketList::iterator endE = exceptList.end();
for (int idx = 0; idx < nfd; ++idx)
{
SocketList::iterator slIt = std::find_if(begR, endR, Socket::FDCompare(pPollArr[idx].fd));
if (POLLIN & pPollArr[idx].revents && slIt != endR) readyReadList.push_back(*slIt);
slIt = std::find_if(begW, endW, Socket::FDCompare(pPollArr[idx].fd));
if (POLLOUT & pPollArr[idx].revents && slIt != endW) readyWriteList.push_back(*slIt);
slIt = std::find_if(begE, endE, Socket::FDCompare(pPollArr[idx].fd));
if (POLLERR & pPollArr[idx].revents && slIt != endE) readyExceptList.push_back(*slIt);
}
std::swap(readList, readyReadList);
std::swap(writeList, readyWriteList);
std::swap(exceptList, readyExceptList);
return readList.size() + writeList.size() + exceptList.size();
#else
@ -335,23 +326,35 @@ int Socket::select(SocketList& readList, SocketList& writeList, SocketList& exce
FD_ZERO(&fdRead);
for (SocketList::const_iterator it = readList.begin(); it != readList.end(); ++it)
{
if (int(it->sockfd()) > nfd)
nfd = int(it->sockfd());
FD_SET(it->sockfd(), &fdRead);
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (int(fd) > nfd)
nfd = int(fd);
FD_SET(fd, &fdRead);
}
}
FD_ZERO(&fdWrite);
for (SocketList::const_iterator it = writeList.begin(); it != writeList.end(); ++it)
{
if (int(it->sockfd()) > nfd)
nfd = int(it->sockfd());
FD_SET(it->sockfd(), &fdWrite);
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (int(fd) > nfd)
nfd = int(fd);
FD_SET(fd, &fdWrite);
}
}
FD_ZERO(&fdExcept);
for (SocketList::const_iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
if (int(it->sockfd()) > nfd)
nfd = int(it->sockfd());
FD_SET(it->sockfd(), &fdExcept);
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (int(fd) > nfd)
nfd = int(fd);
FD_SET(fd, &fdExcept);
}
}
if (nfd == 0) return 0;
Poco::Timespan remainingTime(timeout);
@ -379,26 +382,38 @@ int Socket::select(SocketList& readList, SocketList& writeList, SocketList& exce
SocketList readyReadList;
for (SocketList::const_iterator it = readList.begin(); it != readList.end(); ++it)
{
if (FD_ISSET(it->sockfd(), &fdRead))
readyReadList.push_back(*it);
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (FD_ISSET(fd, &fdRead))
readyReadList.push_back(*it);
}
}
std::swap(readList, readyReadList);
SocketList readyWriteList;
for (SocketList::const_iterator it = writeList.begin(); it != writeList.end(); ++it)
{
if (FD_ISSET(it->sockfd(), &fdWrite))
readyWriteList.push_back(*it);
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (FD_ISSET(fd, &fdWrite))
readyWriteList.push_back(*it);
}
}
std::swap(writeList, readyWriteList);
SocketList readyExceptList;
for (SocketList::const_iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
if (FD_ISSET(it->sockfd(), &fdExcept))
readyExceptList.push_back(*it);
}
std::swap(exceptList, readyExceptList);
return rc;
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (FD_ISSET(fd, &fdExcept))
readyExceptList.push_back(*it);
}
}
std::swap(exceptList, readyExceptList);
return rc;
#endif
}