poco/Net/src/Socket.cpp
Aleksandar Fabijanic c4e676d36d
Feature net udp (#2347)
* add PMTU discovery #2329

* add socket gather/scatter capabilities #2330 (win, udp)

* enable WSAPoll

* add FastMemoryPool

* add receiveFrom() with native args

* allow copying of StringTokenizer

* add AtomicFlag and SpinlockMutex

* update .gitignore

* UDPServer and client #2343 (windows)

* fix warnings

* fix warnings

* regenerate Net VS solutions

* regenerate CppUnit projects/solutions

* clang fixes

* gcc fixes

* try to fix travis

* more travis fixes

* more travis fixes

* handle UDPClient exception

* fix makefiles and init order warnings

* add UNIX gather/scatter sendto/recvfrom implementations and tests

* run travis tests as sudo

* try to run tests as sudo, 2nd attempt

* fix warning

* use mutex in reactor

* lock-order-inversion in SocketReactor #2346

* add PMTU discovery #2329 (linux)

* ICMPSocket does not check reply address #1921

* remove some ignored tests

* add PMTU discovery #2329 (reconcile logic with #1921)

* fix native receiveFrome()

* reinstate ignoring of proxy errors

* add testMTU to ignore list

* add include atomic

* NTPClient not checking reply address #2348

* some ICMP/MTU fixes

* UDPSocketReader cleanup

* resolve some socket inheritance warnings

* add NTP time sync to ignored tests

* SocketNotifier not thread-safe #2345

* prevent x64 samples build attempt for win32

* build TestApp and Library

* fix ICMP tests

* regen VS projects

* regen VS projects and add missing 2012 files

* remove debug prints
2018-06-02 14:02:33 -05:00

468 lines
11 KiB
C++

//
// Socket.cpp
//
// Library: Net
// Package: Sockets
// Module: Socket
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/Net/Socket.h"
#include "Poco/Net/StreamSocketImpl.h"
#include "Poco/Timestamp.h"
#include <algorithm>
#include <string.h> // FD_SET needs memset on some platforms, so we can't use <cstring>
#if defined(POCO_HAVE_FD_EPOLL)
#include <sys/epoll.h>
#elif defined(POCO_HAVE_FD_POLL)
#include "Poco/SharedPtr.h"
#include <poll.h>
#endif
namespace Poco {
namespace Net {
Socket::Socket():
_pImpl(new StreamSocketImpl)
{
}
Socket::Socket(SocketImpl* pImpl):
_pImpl(pImpl)
{
poco_check_ptr (_pImpl);
}
Socket::Socket(const Socket& socket):
_pImpl(socket._pImpl)
{
poco_check_ptr (_pImpl);
_pImpl->duplicate();
}
Socket& Socket::operator = (const Socket& socket)
{
if (&socket != this)
{
if (_pImpl) _pImpl->release();
_pImpl = socket._pImpl;
if (_pImpl) _pImpl->duplicate();
}
return *this;
}
Socket::~Socket()
{
_pImpl->release();
}
int Socket::select(SocketList& readList, SocketList& writeList, SocketList& exceptList, const Poco::Timespan& timeout)
{
#if defined(POCO_HAVE_FD_EPOLL)
int epollSize = readList.size() + writeList.size() + exceptList.size();
if (epollSize == 0) return 0;
int epollfd = -1;
{
struct epoll_event eventsIn[epollSize];
memset(eventsIn, 0, sizeof(eventsIn));
struct epoll_event* eventLast = eventsIn;
for (SocketList::iterator it = readList.begin(); it != readList.end(); ++it)
{
poco_socket_t sockfd = it->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
struct epoll_event* e = eventsIn;
for (; e != eventLast; ++e)
{
if (reinterpret_cast<Socket*>(e->data.ptr)->sockfd() == sockfd)
break;
}
if (e == eventLast)
{
e->data.ptr = &(*it);
++eventLast;
}
e->events |= EPOLLIN;
}
}
for (SocketList::iterator it = writeList.begin(); it != writeList.end(); ++it)
{
poco_socket_t sockfd = it->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
struct epoll_event* e = eventsIn;
for (; e != eventLast; ++e)
{
if (reinterpret_cast<Socket*>(e->data.ptr)->sockfd() == sockfd)
break;
}
if (e == eventLast)
{
e->data.ptr = &(*it);
++eventLast;
}
e->events |= EPOLLOUT;
}
}
for (SocketList::iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
poco_socket_t sockfd = it->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
struct epoll_event* e = eventsIn;
for (; e != eventLast; ++e)
{
if (reinterpret_cast<Socket*>(e->data.ptr)->sockfd() == sockfd)
break;
}
if (e == eventLast)
{
e->data.ptr = &(*it);
++eventLast;
}
e->events |= EPOLLERR;
}
}
epollSize = eventLast - eventsIn;
if (epollSize == 0) return 0;
epollfd = epoll_create(1);
if (epollfd < 0)
{
SocketImpl::error("Can't create epoll queue");
}
for (struct epoll_event* e = eventsIn; e != eventLast; ++e)
{
poco_socket_t sockfd = reinterpret_cast<Socket*>(e->data.ptr)->sockfd();
if (sockfd != POCO_INVALID_SOCKET)
{
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, e) < 0)
{
::close(epollfd);
SocketImpl::error("Can't insert socket to epoll queue");
}
}
}
}
struct epoll_event eventsOut[epollSize];
memset(eventsOut, 0, sizeof(eventsOut));
Poco::Timespan remainingTime(timeout);
int rc;
do
{
Poco::Timestamp start;
rc = epoll_wait(epollfd, eventsOut, epollSize, remainingTime.totalMilliseconds());
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime)
remainingTime -= waited;
else
remainingTime = 0;
}
}
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
::close(epollfd);
if (rc < 0) SocketImpl::error();
SocketList readyReadList;
SocketList readyWriteList;
SocketList readyExceptList;
for (int n = 0; n < rc; ++n)
{
if (eventsOut[n].events & EPOLLERR)
readyExceptList.push_back(*reinterpret_cast<Socket*>(eventsOut[n].data.ptr));
if (eventsOut[n].events & EPOLLIN)
readyReadList.push_back(*reinterpret_cast<Socket*>(eventsOut[n].data.ptr));
if (eventsOut[n].events & EPOLLOUT)
readyWriteList.push_back(*reinterpret_cast<Socket*>(eventsOut[n].data.ptr));
}
std::swap(readList, readyReadList);
std::swap(writeList, readyWriteList);
std::swap(exceptList, readyExceptList);
return readList.size() + writeList.size() + exceptList.size();
#elif defined(POCO_HAVE_FD_POLL)
typedef Poco::SharedPtr<pollfd, Poco::ReferenceCounter, Poco::ReleaseArrayPolicy<pollfd> > SharedPollArray;
nfds_t nfd = readList.size() + writeList.size() + exceptList.size();
if (0 == nfd) return 0;
SharedPollArray pPollArr = new pollfd[nfd]();
int idx = 0;
for (SocketList::iterator it = readList.begin(); it != readList.end(); ++it)
{
pPollArr[idx].fd = int(it->sockfd());
pPollArr[idx++].events |= POLLIN;
}
SocketList::iterator begR = readList.begin();
SocketList::iterator endR = readList.end();
for (SocketList::iterator it = writeList.begin(); it != writeList.end(); ++it)
{
SocketList::iterator pos = std::find(begR, endR, *it);
if (pos != endR)
{
pPollArr[pos-begR].events |= POLLOUT;
--nfd;
}
else
{
pPollArr[idx].fd = int(it->sockfd());
pPollArr[idx++].events |= POLLOUT;
}
}
SocketList::iterator begW = writeList.begin();
SocketList::iterator endW = writeList.end();
for (SocketList::iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
SocketList::iterator pos = std::find(begR, endR, *it);
if (pos != endR) --nfd;
else
{
SocketList::iterator pos = std::find(begW, endW, *it);
if (pos != endW) --nfd;
else pPollArr[idx++].fd = int(it->sockfd());
}
}
Poco::Timespan remainingTime(timeout);
int rc;
do
{
Poco::Timestamp start;
rc = ::poll(pPollArr, nfd, remainingTime.totalMilliseconds());
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime) remainingTime -= waited;
else remainingTime = 0;
}
}
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
if (rc < 0) SocketImpl::error();
SocketList readyReadList;
SocketList readyWriteList;
SocketList readyExceptList;
SocketList::iterator begE = exceptList.begin();
SocketList::iterator endE = exceptList.end();
for (int idx = 0; idx < nfd; ++idx)
{
SocketList::iterator slIt = std::find_if(begR, endR, Socket::FDCompare(pPollArr[idx].fd));
if (POLLIN & pPollArr[idx].revents && slIt != endR) readyReadList.push_back(*slIt);
slIt = std::find_if(begW, endW, Socket::FDCompare(pPollArr[idx].fd));
if (POLLOUT & pPollArr[idx].revents && slIt != endW) readyWriteList.push_back(*slIt);
slIt = std::find_if(begE, endE, Socket::FDCompare(pPollArr[idx].fd));
if (POLLERR & pPollArr[idx].revents && slIt != endE) readyExceptList.push_back(*slIt);
}
std::swap(readList, readyReadList);
std::swap(writeList, readyWriteList);
std::swap(exceptList, readyExceptList);
return readList.size() + writeList.size() + exceptList.size();
#else
fd_set fdRead;
fd_set fdWrite;
fd_set fdExcept;
int nfd = 0;
FD_ZERO(&fdRead);
for (SocketList::const_iterator it = readList.begin(); it != readList.end(); ++it)
{
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (int(fd) > nfd)
nfd = int(fd);
FD_SET(fd, &fdRead);
}
}
FD_ZERO(&fdWrite);
for (SocketList::const_iterator it = writeList.begin(); it != writeList.end(); ++it)
{
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (int(fd) > nfd)
nfd = int(fd);
FD_SET(fd, &fdWrite);
}
}
FD_ZERO(&fdExcept);
for (SocketList::const_iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (int(fd) > nfd)
nfd = int(fd);
FD_SET(fd, &fdExcept);
}
}
if (nfd == 0) return 0;
Poco::Timespan remainingTime(timeout);
int rc;
do
{
struct timeval tv;
tv.tv_sec = (long) remainingTime.totalSeconds();
tv.tv_usec = (long) remainingTime.useconds();
Poco::Timestamp start;
rc = ::select(nfd + 1, &fdRead, &fdWrite, &fdExcept, &tv);
if (rc < 0 && SocketImpl::lastError() == POCO_EINTR)
{
Poco::Timestamp end;
Poco::Timespan waited = end - start;
if (waited < remainingTime)
remainingTime -= waited;
else
remainingTime = 0;
}
}
while (rc < 0 && SocketImpl::lastError() == POCO_EINTR);
if (rc < 0) SocketImpl::error();
SocketList readyReadList;
for (SocketList::const_iterator it = readList.begin(); it != readList.end(); ++it)
{
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (FD_ISSET(fd, &fdRead))
readyReadList.push_back(*it);
}
}
std::swap(readList, readyReadList);
SocketList readyWriteList;
for (SocketList::const_iterator it = writeList.begin(); it != writeList.end(); ++it)
{
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (FD_ISSET(fd, &fdWrite))
readyWriteList.push_back(*it);
}
}
std::swap(writeList, readyWriteList);
SocketList readyExceptList;
for (SocketList::const_iterator it = exceptList.begin(); it != exceptList.end(); ++it)
{
poco_socket_t fd = it->sockfd();
if (fd != POCO_INVALID_SOCKET)
{
if (FD_ISSET(fd, &fdExcept))
readyExceptList.push_back(*it);
}
}
std::swap(exceptList, readyExceptList);
return rc;
#endif // POCO_HAVE_FD_EPOLL
}
SocketBufVec Socket::makeBufVec(std::size_t size, std::size_t bufLen)
{
SocketBufVec buf(size);
SocketBufVec::iterator it = buf.begin();
SocketBufVec::iterator end = buf.end();
for (; it != end; ++it)
{
// TODO: use memory pool
*it = makeBuffer(malloc(bufLen), bufLen);
}
return buf;
}
void Socket::destroyBufVec(SocketBufVec& buf)
{
SocketBufVec::iterator it = buf.begin();
SocketBufVec::iterator end = buf.end();
for (; it != end; ++it)
{
#if defined(POCO_OS_FAMILY_WINDOWS)
free(it->buf);
#elif defined(POCO_OS_FAMILY_UNIX)
free(it->iov_base);
#endif
}
SocketBufVec().swap(buf);
}
SocketBuf Socket::makeBuffer(void* buffer, std::size_t length)
{
SocketBuf ret;
#if defined(POCO_OS_FAMILY_WINDOWS)
ret.buf = reinterpret_cast<char*>(buffer);
ret.len = static_cast<ULONG>(length);
#elif defined(POCO_OS_FAMILY_UNIX)
ret.iov_base = buffer;
ret.iov_len = length;
#else
throw NotImplementedException("Socket::makeBuffer(void*, size_t)");
#endif
return ret;
}
SocketBufVec Socket::makeBufVec(const std::vector<char*>& vec)
{
SocketBufVec buf(vec.size());
SocketBufVec::iterator it = buf.begin();
SocketBufVec::iterator end = buf.end();
std::vector<char*>::const_iterator vIt = vec.begin();
for (; it != end; ++it, ++vIt)
{
*it = makeBuffer(*vIt, strlen(*vIt));
}
return buf;
}
SocketBufVec Socket::makeBufVec(const std::vector<std::string>& vec)
{
SocketBufVec buf(vec.size());
SocketBufVec::iterator it = buf.begin();
SocketBufVec::iterator end = buf.end();
std::vector<std::string>::const_iterator vIt = vec.begin();
for (; it != end; ++it, ++vIt)
{
char* c = const_cast<char*>(vIt->data());
*it = makeBuffer(reinterpret_cast<void*>(c), vIt->size());
}
return buf;
}
} } // namespace Poco::Net