Feature net udp (#2347)

* add PMTU discovery #2329

* add socket gather/scatter capabilities #2330 (win, udp)

* enable WSAPoll

* add FastMemoryPool

* add receiveFrom() with native args

* allow copying of StringTokenizer

* add AtomicFlag and SpinlockMutex

* update .gitignore

* UDPServer and client #2343 (windows)

* fix warnings

* fix warnings

* regenerate Net VS solutions

* regenerate CppUnit projects/solutions

* clang fixes

* gcc fixes

* try to fix travis

* more travis fixes

* more travis fixes

* handle UDPClient exception

* fix makefiles and init order warnings

* add UNIX gather/scatter sendto/recvfrom implementations and tests

* run travis tests as sudo

* try to run tests as sudo, 2nd attempt

* fix warning

* use mutex in reactor

* lock-order-inversion in SocketReactor #2346

* add PMTU discovery #2329 (linux)

* ICMPSocket does not check reply address #1921

* remove some ignored tests

* add PMTU discovery #2329 (reconcile logic with #1921)

* fix native receiveFrome()

* reinstate ignoring of proxy errors

* add testMTU to ignore list

* add include atomic

* NTPClient not checking reply address #2348

* some ICMP/MTU fixes

* UDPSocketReader cleanup

* resolve some socket inheritance warnings

* add NTP time sync to ignored tests

* SocketNotifier not thread-safe #2345

* prevent x64 samples build attempt for win32

* build TestApp and Library

* fix ICMP tests

* regen VS projects

* regen VS projects and add missing 2012 files

* remove debug prints
This commit is contained in:
Aleksandar Fabijanic
2018-06-02 14:02:33 -05:00
committed by GitHub
parent da15142f69
commit c4e676d36d
127 changed files with 5540 additions and 1408 deletions

View File

@@ -20,6 +20,7 @@
#include "Poco/Net/Net.h"
#include "Poco/Net/Socket.h"
#include "Poco/Buffer.h"
namespace Poco {
@@ -108,12 +109,29 @@ public:
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
int sendBytes(const SocketBufVec& buffer, int flags = 0);
/// Sends the contents of the given buffers through
/// the socket.
///
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
int receiveBytes(void* buffer, int length, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
///
/// Returns the number of bytes received.
int receiveBytes(SocketBufVec& buffer, int flags = 0);
/// Receives data from the socket and stores it in buffers.
///
/// Returns the number of bytes received.
int receiveBytes(Poco::Buffer<char>& buffer, int flags = 0, const Poco::Timespan& timeout = 100000);
/// Receives data from the socket and stores it in buffers.
///
/// Returns the number of bytes received.
int sendTo(const void* buffer, int length, const SocketAddress& address, int flags = 0);
/// Sends the contents of the given buffer through
/// the socket to the given address.
@@ -121,6 +139,13 @@ public:
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
int sendTo(const SocketBufVec& buffers, const SocketAddress& address, int flags = 0);
/// Sends the contents of the given buffers through
/// the socket to the given address.
///
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
int receiveFrom(void* buffer, int length, SocketAddress& address, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
@@ -128,6 +153,30 @@ public:
///
/// Returns the number of bytes received.
int receiveFrom(void* buffer, int length, struct sockaddr** ppSA, poco_socklen_t** ppSALen, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
/// Stores the native address of the sender in
/// ppSA, and the length of native address in ppSALen.
///
/// Returns the number of bytes received.
int receiveFrom(SocketBufVec& buffers, SocketAddress& address, int flags = 0);
/// Receives data from the socket and stores it
/// in buffers. Up to total length of all buffers
/// are received.
/// Stores the address of the sender in address.
///
/// Returns the number of bytes received.
int receiveFrom(SocketBufVec& buffers, struct sockaddr** ppSA, poco_socklen_t** ppSALen, int flags = 0);
/// Receives data from the socket and stores it
/// in buffers.
/// Stores the native address of the sender in
/// ppSA, and the length of native address in ppSALen.
///
/// Returns the number of bytes received.
void setBroadcast(bool flag);
/// Sets the value of the SO_BROADCAST socket option.
///

View File

@@ -67,7 +67,7 @@ public:
int repeat = 1,
int dataSize = 48,
int ttl = 128,
int timeout = 50000);
int timeout = 100000);
/// Pings the specified address [repeat] times.
/// Notifications are not posted for events.
///
@@ -77,7 +77,7 @@ public:
int repeat = 1,
int dataSize = 48,
int ttl = 128,
int timeout = 50000);
int timeout = 100000);
/// Calls ICMPClient::ping(SocketAddress&, int) and
/// returns the result.
///

View File

@@ -59,20 +59,22 @@ public:
/// Returns current epoch time if either buffer or length are equal to zero.
/// Otherwise, it extracts the time value from the supplied buffer and
/// returns the extracted value.
///
///
/// Supplied buffer includes IP header, ICMP header and data.
bool validReplyID(Poco::UInt8* buffer, int length) const;
/// Returns true if the extracted id is recognized
/// (equals the process id).
///
///
/// Supplied buffer includes IP header, ICMP header and data.
std::string errorDescription(Poco::UInt8* buffer, int length);
std::string errorDescription(Poco::UInt8* buffer, int length, int& type, int& code);
/// Returns error description string.
/// If supplied buffer contains an ICMP echo reply packet, an
/// empty string is returned indicating the absence of error.
///
/// If type and code of the error can be determined, they are
/// assigned to the type and code respectively.
///
/// Supplied buffer includes IP header, ICMP header and data.
std::string typeDescription(int typeId);

View File

@@ -71,11 +71,13 @@ public:
/// Supplied buffer includes IP header, ICMP header and data.
/// Must be overriden.
virtual std::string errorDescription(Poco::UInt8* buffer, int length) = 0;
virtual std::string errorDescription(Poco::UInt8* buffer, int length, int& type, int& code) = 0;
/// Returns error description string.
/// If supplied buffer contains an ICMP echo reply packet, an
/// empty string is returned indicating the absence of error.
///
/// If type and code of the error can be determined, they are
/// assigned to the type and code respectively.
///
/// Supplied buffer includes IP header, ICMP header and data.
/// Must be overriden.
@@ -84,6 +86,7 @@ public:
/// Must be overriden.
static const Poco::UInt16 MAX_PACKET_SIZE;
static const Poco::UInt16 MAX_PAYLOAD_SIZE;
static const Poco::UInt16 MAX_SEQ_VALUE;
protected:

View File

@@ -69,12 +69,19 @@ public:
int dataSize() const;
/// Returns the data size in bytes.
int packetSize() const;
/// Returns the packet size in bytes.
int ttl() const;
/// Returns the Time-To-Live value.
int timeout() const;
/// Returns the socket timeout value.
static Poco::UInt16 mtu(const SocketAddress& address, Poco::UInt16 sz);
/// Returns minimum payload path MTU size for the destination,
/// or 0 if MTU can not be determined.
protected:
ICMPSocket(SocketImpl* pImpl);
/// Creates the Socket and attaches the given SocketImpl.

View File

@@ -51,6 +51,9 @@ public:
int dataSize() const;
/// Returns the data size in bytes.
int packetSize() const;
/// Returns the packet size in bytes.
int ttl() const;
/// Returns the Time-To-Live value.
@@ -61,6 +64,8 @@ protected:
~ICMPSocketImpl();
private:
void checkFragmentation(const std::string& err, int type, int code);
ICMPPacket _icmpPacket;
int _ttl;
int _timeout;
@@ -70,6 +75,12 @@ private:
//
// inlines
//
inline int ICMPSocketImpl::packetSize() const
{
return _icmpPacket.packetSize();
}
inline int ICMPSocketImpl::dataSize() const
{
return _icmpPacket.getDataSize();

View File

@@ -30,7 +30,7 @@ namespace Net {
class Net_API ICMPv4PacketImpl : public ICMPPacketImpl
/// This class implements the ICMPv4 packet.
/// Parts are based on original ICMP code by
/// Parts are based on the original ICMP code by
/// Mike Muuss
/// U. S. Army Ballistic Research Laboratory
/// December, 1983
@@ -135,10 +135,12 @@ public:
///
/// Buffer includes IP header, ICMP header and data.
virtual std::string errorDescription(Poco::UInt8* buffer, int length);
virtual std::string errorDescription(Poco::UInt8* buffer, int length, int& type, int& code);
/// Returns error description string.
/// If supplied buffer contains ICMPv4 echo reply packet, an
/// empty string is returned indicating the absence of error.
/// If type and code of the error can be determined, they are
/// assigned to the type and code respectively.
///
/// Buffer includes IP header, ICMP header and data.

View File

@@ -0,0 +1,128 @@
//
// MultiSocketPoller.h
//
// Library: Net
// Package: UDP
// Module: MultiSocketPoller
//
// Definition of the MultiSocketPoller class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Net_MultiSocketPoller_INCLUDED
#define Net_MultiSocketPoller_INCLUDED
#include "Poco/Net/Net.h"
#include "Poco/Net/Socket.h"
#include "Poco/Net/UDPHandler.h"
namespace Poco {
namespace Net {
template <std::size_t S = POCO_UDP_BUF_SIZE>
class MultiSocketPoller
/// MultiSocketPoller, as its name indicates, repeatedly polls a set of
/// sockets for readability and/or eror. If socket is readable or in error
/// state, the reading/error handling actions are delegated to the reader.
{
public:
MultiSocketPoller(typename UDPHandlerImpl<S>::List& handlers, const Poco::Net::SocketAddress& sa, int nSockets = 10, Poco::Timespan timeout = 250000):
_address(sa),
_timeout(timeout),
_reader(handlers)
/// Creates the MutiSocketPoller.
{
poco_assert (_address.port() > 0 && _address.host().toString() != "0.0.0.0");
addSockets(nSockets);
}
MultiSocketPoller(typename UDPHandlerImpl<S>::List& handlers, const UDPServerParams& serverParams):
_address(serverParams.address()),
_timeout(serverParams.timeout()),
_reader(handlers)
/// Creates the MutiSocketPoller.
{
poco_assert (_address.port() > 0 && _address.host().toString() != "0.0.0.0");
addSockets(serverParams.numberOfSockets());
}
~MultiSocketPoller()
/// Destroys MutiSocketPoller
{
}
Poco::UInt16 port() const
/// Returns the port the socket is
/// listening on.
{
return _address.port();
}
Poco::Net::SocketAddress address() const
/// Returns the address of the server.
{
return _address;
}
void poll()
{
if (_reader.handlerStopped()) return;
PollSet::SocketModeMap sm;
PollSet::SocketModeMap::iterator it;
PollSet::SocketModeMap::iterator end;
sm = _pollSet.poll(_timeout);
it = sm.begin();
end = sm.end();
for (; it != end; ++it)
{
if (it->second & PollSet::POLL_READ)
{
DatagramSocket ds(it->first);
_reader.read(ds);
}
else if (it->second & PollSet::POLL_ERROR)
{
_reader.setError(it->first.impl()->sockfd());
}
}
}
void stop()
{
_reader.stopHandler();
}
bool done() const
{
return _reader.handlerDone();
}
private:
void addSockets(int nSockets)
{
for (int i = 0; i < nSockets; ++i)
{
DatagramSocket ds; ds.bind(_address, true, true);
_pollSet.add(ds, PollSet::POLL_READ | PollSet::POLL_ERROR);
}
}
PollSet _pollSet;
SocketAddress _address;
Poco::Timespan _timeout;
UDPSocketReader<S> _reader;
};
} } // namespace Poco::Net
#endif // Net_MultiSocketPoller_INCLUDED

View File

@@ -47,6 +47,7 @@ POCO_DECLARE_EXCEPTION(Net_API, FTPException, NetException)
POCO_DECLARE_EXCEPTION(Net_API, SMTPException, NetException)
POCO_DECLARE_EXCEPTION(Net_API, POP3Exception, NetException)
POCO_DECLARE_EXCEPTION(Net_API, ICMPException, NetException)
POCO_DECLARE_EXCEPTION(Net_API, ICMPFragmentationException, NetException)
POCO_DECLARE_EXCEPTION(Net_API, NTPException, NetException)
POCO_DECLARE_EXCEPTION(Net_API, HTMLFormException, NetException)
POCO_DECLARE_EXCEPTION(Net_API, WebSocketException, NetException)

View File

@@ -53,7 +53,8 @@ class ParallelSocketAcceptor
/// details.
{
public:
typedef Poco::Net::ParallelSocketReactor<SR> ParallelReactor;
typedef Poco::Net::ParallelSocketReactor<SR> ParallelReactor;
typedef Poco::Observer<ParallelSocketAcceptor, ReadableNotification> Observer;
explicit ParallelSocketAcceptor(ServerSocket& socket,
unsigned threads = Poco::Environment::processorCount()):
@@ -79,9 +80,7 @@ public:
/// with the given SocketReactor.
{
init();
_pReactor->addEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
_pReactor->addEventHandler(_socket, Observer(*this, &ParallelSocketAcceptor::onAccept));
}
virtual ~ParallelSocketAcceptor()
@@ -91,9 +90,7 @@ public:
{
if (_pReactor)
{
_pReactor->removeEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
_pReactor->removeEventHandler(_socket, Observer(*this, &ParallelSocketAcceptor::onAccept));
}
}
catch (...)
@@ -105,15 +102,9 @@ public:
void setReactor(SocketReactor& reactor)
/// Sets the reactor for this acceptor.
{
_pReactor = &reactor;
if (!_pReactor->hasEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept)))
{
registerAcceptor(reactor);
}
registerAcceptor(reactor);
}
virtual void registerAcceptor(SocketReactor& reactor)
/// Registers the ParallelSocketAcceptor with a SocketReactor.
///
@@ -123,13 +114,11 @@ public:
/// The overriding method must either call the base class
/// implementation or register the accept handler on its own.
{
if (_pReactor)
throw Poco::InvalidAccessException("Acceptor already registered.");
_pReactor = &reactor;
_pReactor->addEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
if (!_pReactor->hasEventHandler(_socket, Observer(*this, &ParallelSocketAcceptor::onAccept)))
{
_pReactor->addEventHandler(_socket, Observer(*this, &ParallelSocketAcceptor::onAccept));
}
}
virtual void unregisterAcceptor()
@@ -143,9 +132,7 @@ public:
{
if (_pReactor)
{
_pReactor->removeEventHandler(_socket,
Poco::Observer<ParallelSocketAcceptor,
ReadableNotification>(*this, &ParallelSocketAcceptor::onAccept));
_pReactor->removeEventHandler(_socket, Observer(*this, &ParallelSocketAcceptor::onAccept));
}
}

View File

@@ -0,0 +1,104 @@
//
// SingleSocketPoller.h
//
// Library: Net
// Package: UDP
// Module: SingleSocketPoller
//
// Definition of the SingleSocketPoller class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Net_SingleSocketPoller_INCLUDED
#define Net_SingleSocketPoller_INCLUDED
#include "Poco/Net/Net.h"
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/UDPHandler.h"
#include "Poco/Net/PollSet.h"
namespace Poco {
namespace Net {
template <std::size_t S = POCO_UDP_BUF_SIZE>
class SingleSocketPoller
/// SinlgeSocketPoller, as its name indicates, repeatedly polls a single
/// socket for readability; if the socket is readable, the reading action
/// is delegated to the reader.
{
public:
SingleSocketPoller(typename UDPHandlerImpl<S>::List& handlers, const Poco::Net::SocketAddress& sa, Poco::Timespan timeout = 250000): _reader(handlers), _timeout(timeout)
/// Creates the SingleSocketPoller and binds it to
/// the given address.
{
_socket.bind(sa, false, false);
_socket.setBlocking(false);
}
SingleSocketPoller(typename UDPHandlerImpl<S>::List& handlers, const UDPServerParams& serverParams): _reader(handlers, serverParams), _timeout(serverParams.timeout())
/// Creates the SingleSocketPoller and binds it to
/// the given address.
{
_socket.bind(serverParams.address(), false, false);
_socket.setBlocking(false);
}
~SingleSocketPoller()
/// Destroys SingleSocketPoller
{
}
Poco::UInt16 port() const
/// Returns the port the socket is
/// listening on.
{
return _socket.address().port();
}
Poco::Net::SocketAddress address() const
/// Returns the address of the server.
{
return _socket.address();
}
void poll()
/// Poll the socket and read if readable.
{
if (_reader.handlerStopped()) return;
if (_socket.poll(_timeout, Socket::SELECT_READ))
{
_reader.read(_socket);
}
}
void stop()
/// Stops the handler.
{
_reader.stopHandler();
}
bool done() const
/// Returns tru if handler is done.
{
return _reader.handlerDone();
}
private:
Poco::Net::DatagramSocket _socket;
UDPSocketReader<S> _reader;
Poco::Timespan _timeout;
};
} } // namespace Poco::Net
#endif // Net_SingleSocketPoller_INCLUDED

View File

@@ -35,6 +35,8 @@ class Net_API Socket
/// It provides operations common to all socket types.
{
public:
typedef SocketBufVec BufVec;
enum SelectMode
/// The mode argument to poll() and select().
{
@@ -294,6 +296,44 @@ public:
/// of the Socket object makes sense. One example is setting
/// a socket option before calling bind() on a ServerSocket.
static SocketBuf makeBuffer(void* buffer, std::size_t length);
/// Creates and returns buffer. Suitable for creating
/// the appropriate buffer for the platform.
static SocketBufVec makeBufVec(std::size_t size, std::size_t bufLen);
/// Creates and returns a vector of requested size, with
/// allocated buffers and lengths set accordingly.
/// This utility function works well when all buffers are
/// of same size.
static void destroyBufVec(SocketBufVec& buf);
/// Releases the memory pointed to by vector members
/// and shrinks the vector to size 0.
/// The vector must be created by makeBufVec(size_t, size_t).
static SocketBufVec makeBufVec(const std::vector<char*>& vec);
/// Creates and returns a vector of requested size, with
/// buffers pointing to the supplied data (so, `vec` must
/// remain available at the time of use) and lengths set
/// accordingly.
/// Notes:
/// - data length is determined using `strlen`, so this
/// function is not meant to be used with binary data.
///
/// - if the returned buffer is used for read operations
/// (ie. operations that write to the bufer), pointing
/// to string literals will result in undefined behavior,
/// in best case an I/O error and subsequent exception
static SocketBufVec makeBufVec(const std::vector<std::string>& vec);
/// Creates and returns a vector of requested size, with
/// buffers pointing to the supplied data (so, `vec` must
/// remain available at the time of use) and lengths set
/// accordingly.
/// Note:: this function is not suitable for creation
/// of buffers used for writing (ie. reading from socket
/// into buffers).
protected:
Socket(SocketImpl* pImpl);
/// Creates the Socket and attaches the given SocketImpl.

View File

@@ -68,6 +68,8 @@ class SocketAcceptor
/// if special steps are necessary to create a ServiceHandler object.
{
public:
typedef Poco::Observer<SocketAcceptor, ReadableNotification> Observer;
explicit SocketAcceptor(ServerSocket& socket):
_socket(socket),
_pReactor(0)
@@ -81,8 +83,7 @@ public:
/// Creates a SocketAcceptor, using the given ServerSocket.
/// The SocketAcceptor registers itself with the given SocketReactor.
{
_pReactor->addEventHandler(_socket, Poco::Observer<SocketAcceptor,
ReadableNotification>(*this, &SocketAcceptor::onAccept));
_pReactor->addEventHandler(_socket, Observer(*this, &SocketAcceptor::onAccept));
}
virtual ~SocketAcceptor()
@@ -92,8 +93,7 @@ public:
{
if (_pReactor)
{
_pReactor->removeEventHandler(_socket, Poco::Observer<SocketAcceptor,
ReadableNotification>(*this, &SocketAcceptor::onAccept));
_pReactor->removeEventHandler(_socket, Observer(*this, &SocketAcceptor::onAccept));
}
}
catch (...)
@@ -120,10 +120,9 @@ public:
/// the reactor.
{
_pReactor = &reactor;
if (!_pReactor->hasEventHandler(_socket, Poco::Observer<SocketAcceptor,
ReadableNotification>(*this, &SocketAcceptor::onAccept)))
if (!_pReactor->hasEventHandler(_socket, Observer(*this, &SocketAcceptor::onAccept)))
{
_pReactor->addEventHandler(_socket, Poco::Observer<SocketAcceptor, ReadableNotification>(*this, &SocketAcceptor::onAccept));
_pReactor->addEventHandler(_socket, Observer(*this, &SocketAcceptor::onAccept));
}
}
@@ -139,7 +138,7 @@ public:
{
if (_pReactor)
{
_pReactor->removeEventHandler(_socket, Poco::Observer<SocketAcceptor, ReadableNotification>(*this, &SocketAcceptor::onAccept));
_pReactor->removeEventHandler(_socket, Observer(*this, &SocketAcceptor::onAccept));
}
}

View File

@@ -18,6 +18,9 @@
#define Net_SocketDefs_INCLUDED
#include <vector>
#define POCO_ENOERR 0
@@ -25,6 +28,7 @@
#include "Poco/UnWindows.h"
#include <winsock2.h>
#include <ws2tcpip.h>
#include <ws2def.h>
#define POCO_INVALID_SOCKET INVALID_SOCKET
#define poco_socket_t SOCKET
#define poco_socklen_t int
@@ -133,6 +137,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/uio.h>
#include <fcntl.h>
#if POCO_OS != POCO_OS_HPUX
#include <sys/select.h>
@@ -351,6 +356,14 @@ namespace Poco {
namespace Net {
#if defined(POCO_OS_FAMILY_WINDOWS)
typedef WSABUF SocketBuf;
#elif defined(POCO_OS_FAMILY_UNIX) // TODO: may need more refinement
typedef iovec SocketBuf;
#endif
typedef std::vector<SocketBuf> SocketBufVec;
struct AddressFamily
/// AddressFamily::Family replaces the previously used IPAddress::Family
/// enumeration and is now used for IPAddress::Family and SocketAddress::Family.

View File

@@ -23,6 +23,7 @@
#include "Poco/Net/SocketAddress.h"
#include "Poco/RefCountedObject.h"
#include "Poco/Timespan.h"
#include "Poco/Buffer.h"
namespace Poco {
@@ -167,6 +168,13 @@ public:
/// Certain socket implementations may also return a negative
/// value denoting a certain condition.
virtual int sendBytes(const SocketBufVec& buffers, int flags = 0);
/// Receives data from the socket and stores it in buffers.
///
/// Returns the number of bytes received.
///
/// Always returns zero for platforms where not implemented.
virtual int receiveBytes(void* buffer, int length, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
@@ -176,6 +184,22 @@ public:
/// Certain socket implementations may also return a negative
/// value denoting a certain condition.
virtual int receiveBytes(SocketBufVec& buffers, int flags = 0);
/// Receives data from the socket and stores it in buffers.
///
/// Returns the number of bytes received.
///
/// Always returns zero for platforms where not implemented.
virtual int receiveBytes(Poco::Buffer<char>& buffer, int flags = 0, const Poco::Timespan& timeout = 100000);
/// Receives data from the socket and stores it in the buffer.
/// If needed, the buffer will be resized to accomodate the
/// data. Note that this function may impose additional
/// performance penalties due to the check for the available
/// amount of data.
///
/// Returns the number of bytes received.
virtual int sendTo(const void* buffer, int length, const SocketAddress& address, int flags = 0);
/// Sends the contents of the given buffer through
/// the socket to the given address.
@@ -183,6 +207,23 @@ public:
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
virtual int sendTo(const SocketBufVec& buffers, const SocketAddress& address, int flags = 0);
/// Sends the contents of the buffers through
/// the socket to the given address.
///
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
///
/// Always returns zero for platforms where not implemented.
int receiveFrom(void* buffer, int length, struct sockaddr** ppSA, poco_socklen_t** ppSALen, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
/// Stores the native address of the sender in
/// ppSA, and the length of native address in ppSALen.
///
/// Returns the number of bytes received.
virtual int receiveFrom(void* buffer, int length, SocketAddress& address, int flags = 0);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received.
@@ -190,6 +231,23 @@ public:
///
/// Returns the number of bytes received.
virtual int receiveFrom(SocketBufVec& buffers, SocketAddress& address, int flags = 0);
/// Receives data from the socket and stores it
/// in buffers.
/// Stores the address of the sender in address.
///
/// Returns the number of bytes received.
///
/// Always returns zero for platforms where not implemented.
int receiveFrom(SocketBufVec& buffers, struct sockaddr** ppSA, poco_socklen_t** ppSALen, int flags);
/// Receives data from the socket and stores it
/// in buffers.
/// Stores the native address of the sender in
/// ppSA, and the length of native address in ppSALen.
///
/// Returns the number of bytes received.
virtual void sendUrgent(unsigned char data);
/// Sends one byte of urgent data through
/// the socket.
@@ -432,6 +490,8 @@ protected:
void reset(poco_socket_t fd = POCO_INVALID_SOCKET);
/// Allows subclasses to set the socket manually, iff no valid socket is set yet.
void checkBrokenTimeout();
static int lastError();
/// Returns the last error code.
@@ -451,11 +511,11 @@ private:
SocketImpl(const SocketImpl&);
SocketImpl& operator = (const SocketImpl&);
poco_socket_t _sockfd;
poco_socket_t _sockfd;
Poco::Timespan _recvTimeout;
Poco::Timespan _sndTimeout;
bool _blocking;
bool _isBrokenTimeout;
bool _blocking;
bool _isBrokenTimeout;
friend class Socket;
friend class SecureSocketImpl;

View File

@@ -70,10 +70,13 @@ protected:
private:
typedef std::multiset<SocketNotification*> EventSet;
typedef Poco::FastMutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
EventSet _events;
Poco::NotificationCenter _nc;
Socket _socket;
MutexType _mutex;
};
@@ -82,6 +85,7 @@ private:
//
inline bool SocketNotifier::accepts(SocketNotification* pNotification)
{
ScopedLock l(_mutex);
return _events.find(pNotification) != _events.end();
}

View File

@@ -26,6 +26,9 @@
#include "Poco/Observer.h"
#include "Poco/AutoPtr.h"
#include <map>
#ifdef POCO_ENABLE_CPP11
#include <atomic>
#endif
namespace Poco {
@@ -209,30 +212,35 @@ private:
typedef Poco::AutoPtr<SocketNotifier> NotifierPtr;
typedef Poco::AutoPtr<SocketNotification> NotificationPtr;
typedef std::map<Socket, NotifierPtr> EventHandlerMap;
typedef Poco::FastMutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
typedef Poco::FastMutex MutexType;
typedef MutexType::ScopedLock ScopedLock;
bool hasSocketHandlers();
void dispatch(NotifierPtr& pNotifier, SocketNotification* pNotification);
NotifierPtr getNotifier(const Socket& socket, bool makeNew = false);
enum
{
DEFAULT_TIMEOUT = 250000
};
bool _stop;
Poco::Timespan _timeout;
EventHandlerMap _handlers;
PollSet _pollSet;
NotificationPtr _pReadableNotification;
NotificationPtr _pWritableNotification;
NotificationPtr _pErrorNotification;
NotificationPtr _pTimeoutNotification;
NotificationPtr _pIdleNotification;
NotificationPtr _pShutdownNotification;
MutexType _mutex;
Poco::Thread* _pThread;
#ifdef POCO_ENABLE_CPP11
std::atomic<bool> _stop;
#else
bool _stop;
#endif
Poco::Timespan _timeout;
EventHandlerMap _handlers;
PollSet _pollSet;
NotificationPtr _pReadableNotification;
NotificationPtr _pWritableNotification;
NotificationPtr _pErrorNotification;
NotificationPtr _pTimeoutNotification;
NotificationPtr _pIdleNotification;
NotificationPtr _pShutdownNotification;
MutexType _mutex;
Poco::Thread* _pThread;
friend class SocketNotifier;
};

View File

@@ -107,6 +107,13 @@ public:
/// Certain socket implementations may also return a negative
/// value denoting a certain condition.
int sendBytes(const SocketBufVec& buffer, int flags = 0);
/// Sends the contents of the given buffers through
/// the socket.
///
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
int sendBytes(Poco::FIFOBuffer& buffer);
/// Sends the contents of the given buffer through
/// the socket. FIFOBuffer has writable/readable transition
@@ -132,6 +139,16 @@ public:
/// been set and nothing is received within that interval.
/// Throws a NetException (or a subclass) in case of other errors.
int receiveBytes(SocketBufVec& buffer, int flags = 0);
/// Receives data from the socket and stores it in buffers.
///
/// Returns the number of bytes received.
int receiveBytes(Poco::Buffer<char>& buffer, int flags = 0, const Poco::Timespan& timeout = 100000);
/// Receives data from the socket and stores it in buffers.
///
/// Returns the number of bytes received.
int receiveBytes(Poco::FIFOBuffer& buffer);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received. FIFOBuffer has

View File

@@ -0,0 +1,158 @@
//
// UDPClient.h
//
// Library: Net
// Package: UDP
// Module: UDPClient
//
// Definition of the UDPClient class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Net_UDPClient_INCLUDED
#define Net_UDPClient_INCLUDED
#include "Poco/Net/Net.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Timespan.h"
#include "Poco/Runnable.h"
#include "Poco/Thread.h"
namespace Poco {
namespace Net {
class Net_API UDPClient : public Poco::Runnable
/// UDP client can either send, or send/receive UDP packets.
/// The mode of operation is specified at construction time.
/// If receiving functionality is enabled, it will run in a
/// separate thread.
/// This class is written as a "companion" to Poco::Net::UDPServer.
/// For other servers, inherit from this class and override the
/// handleResponse(char*, int) virtual member.
{
public:
UDPClient(const std::string& address, Poco::UInt16 port, bool listen = false);
/// Creates UDP client and connects it to specified address/port.
/// If listen is true, a thread is launched where client can receive
/// responses rom the server.
virtual ~UDPClient();
/// Destroys UDPClient.
void run();
/// Runs listener (typically invoked internally, in separate thread).
SocketAddress address() const;
/// Returns client address.
SocketAddress peerAddress() const;
/// Returns server address.
int send(void* data, int length);
/// Sends data.
int send(const SocketBufVec& vec);
/// Sends data.
virtual int handleResponse(char* buffer, int length);
/// Handles responses from UDP server. For non-POCO UDP servers,
/// this function should be overriden in inheriting class.
void setOption(int opt, int val);
/// Sets socket option.
int getOption(int opt);
/// Returns socket option.
void stop();
/// Stops the server reply receiving thread, if running.
int dataBacklog() const;
/// Returns current server data backlog.
int errorBacklog();
/// Returns current server error backlog.
private:
DatagramSocket _socket;
SocketAddress _address;
Thread* _pThread;
bool _stop;
Poco::AtomicCounter _dataBacklog;
Poco::AtomicCounter _errorBacklog;
};
//
// inlines
//
inline SocketAddress UDPClient::address() const
{
return _socket.address();
}
inline SocketAddress UDPClient::peerAddress() const
{
return _address;
}
inline int UDPClient::send(void* data, int length)
{
return _socket.sendBytes(data, length);
}
inline int UDPClient::send(const SocketBufVec& vec)
{
return _socket.sendBytes(vec);
}
inline void UDPClient::setOption(int opt, int val)
{
_socket.setOption(SOL_SOCKET, opt, val);
}
inline int UDPClient::getOption(int opt)
{
int val = 0;
_socket.getOption(SOL_SOCKET, opt, val);
return val;
}
inline void UDPClient::stop()
{
_stop = true;
}
inline int UDPClient::dataBacklog() const
{
return _dataBacklog;
}
inline int UDPClient::errorBacklog()
{
return _errorBacklog;
}
} } // namespace Poco::Net
#endif // Net_UDPClient_INCLUDED

View File

@@ -0,0 +1,381 @@
//
// UDPHandler.h
//
// Library: Net
// Package: UDP
// Module: UDPHandler
//
// Definition of the UDPHandler class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Net_UDPHandler_INCLUDED
#define Net_UDPHandler_INCLUDED
#include "Poco/Net/Net.h"
#include "Poco/RefCountedObject.h"
#include "Poco/AutoPtr.h"
#include "Poco/Runnable.h"
#include "Poco/Thread.h"
#include "Poco/MemoryPool.h"
#include "Poco/Event.h"
#include "Poco/Error.h"
#include "Poco/Mutex.h"
#include "Poco/StringTokenizer.h"
#include <deque>
#include <cstring>
namespace Poco {
namespace Net {
typedef int UDPMsgSizeT;
#define POCO_UDP_BUF_SIZE 1472 + sizeof(UDPMsgSizeT) + SocketAddress::MAX_ADDRESS_LENGTH
template <std::size_t S = POCO_UDP_BUF_SIZE>
class UDPHandlerImpl: public Runnable, public RefCountedObject
/// UDP handler handles the data that arives to the UDP server.
/// The class is thread-safe and runs in its own thread, so many handlers
/// can be used in parallel.Handler manages and provides the storage
/// (fixed-size memory blocks of S size) to the reader, which signals back
/// to the handler when there is data or error ready for processing.
/// Typically, user will inherit from this class and override processData()
/// and processError() members to do the actual work.
{
public:
typedef UDPMsgSizeT MsgSizeT;
typedef AutoPtr<UDPHandlerImpl> Ptr;
typedef std::vector<Ptr> List;
typedef typename List::iterator Iterator;
#ifdef POCO_HAVE_STD_ATOMICS
typedef Poco::SpinlockMutex DFMutex;
#else
typedef Poco::FastMutex DFMutex;
#endif
static const MsgSizeT BUF_STATUS_IDLE = 0;
static const MsgSizeT BUF_STATUS_BUSY = -1;
static const MsgSizeT BUF_STATUS_ERROR = -2;
UDPHandlerImpl(std::size_t bufListSize = 1000, std::ostream* pErr = 0):
_thread("UDPHandlerImpl"),
_stop(false),
_done(false),
_bufListSize(bufListSize),
_blockSize(S),
_dataBacklog(0),
_errorBacklog(0),
_pErr(pErr)
/// Creates the UDPHandlerImpl.
{
_thread.start(*this);
}
~UDPHandlerImpl()
/// Destroys the UDPHandlerImpl.
{
stop();
_thread.join();
}
std::size_t blockSize() const
/// Returns the memory block size.
{
return _blockSize;
}
char* next(poco_socket_t sock)
/// Creates the next BufList entry, and returns
/// the pointers to the newly created guard/buffer.
/// If mutex lock times out, returns null pointer.
{
char* ret = 0;
if (_mutex.tryLock(10))
{
if (_buffers[sock].size() < _bufListSize) // building buffer list
{
makeNext(sock, &ret);
}
else if (*reinterpret_cast<MsgSizeT*>(*_bufIt[sock]) != 0) // busy
{
makeNext(sock, &ret);
}
else if (*reinterpret_cast<MsgSizeT*>(*_bufIt[sock]) == 0) // available
{
setBusy(*_bufIt[sock]);
ret = *_bufIt[sock];
if (++_bufIt[sock] == _buffers[sock].end())
{
_bufIt[sock] = _buffers[sock].begin();
}
}
else // last resort, full scan
{
BufList::iterator it = _buffers[sock].begin();
BufList::iterator end = _buffers[sock].end();
for (; it != end; ++it)
{
if (*reinterpret_cast<MsgSizeT*>(*_bufIt[sock]) == 0) // available
{
setBusy(*it);
ret = *it;
_bufIt[sock] = it;
if (++_bufIt[sock] == _buffers[sock].end())
{
_bufIt[sock] = _buffers[sock].begin();
}
break;
}
}
if (it == end) makeNext(sock, &ret);
}
_mutex.unlock();
}
return ret;
}
void notify()
/// Sets the ready event.
{
_ready.set();
}
void run()
/// Does the work.
{
while (!_stop)
{
_ready.wait();
if (_stop) break;
if (_mutex.tryLock(10))
{
BufMap::iterator it = _buffers.begin();
BufMap::iterator end = _buffers.end();
for (; it != end; ++it)
{
BufList::iterator lIt = it->second.begin();
BufList::iterator lEnd = it->second.end();
for (; lIt != lEnd; ++lIt)
{
if (hasData(*lIt))
{
processData(*lIt);
--_dataBacklog;
setIdle(*lIt);
}
else if (isError(*lIt))
{
processError(*lIt);
++_errorBacklog;
}
}
}
_mutex.unlock();
}
}
_done = true;
}
void stop()
/// Signals the handler to stop.
{
_stop = true;
_ready.set();
}
bool stopped() const
/// Returns true if the handler was signalled to stop.
{
return _stop == true;
}
bool done() const
/// Returns true if handler is done (ie. run() thread
/// entrypoint end was reached).
{
return _done;
}
void setBusy(char*& pBuf)
/// Flags the buffer as busy (usually done before buffer
/// is passed to the reader.
{
setStatus(pBuf, BUF_STATUS_BUSY);
}
void setIdle(char*& pBuf)
/// Flags the buffer as idle, ie. not used by reader or
/// waiting to be processed, so ready to be reused for
/// reading.
{
setStatus(pBuf, BUF_STATUS_IDLE);
}
AtomicCounter::ValueType setData(char*& pBuf, MsgSizeT sz)
/// Flags the buffer as containing data.
{
setStatus(pBuf, sz);
return ++_dataBacklog;
}
AtomicCounter::ValueType setError(char*& pBuf, const std::string& err)
/// Sets the error into the buffer.
{
std::size_t availLen = S - sizeof(MsgSizeT);
std::memset(pBuf + sizeof(MsgSizeT), 0, availLen);
std::size_t msgLen = err.length();
if (msgLen)
{
if (msgLen >= availLen) msgLen = availLen;
std::memcpy(pBuf + sizeof(MsgSizeT), err.data(), msgLen);
}
setStatus(pBuf, BUF_STATUS_ERROR);
return --_errorBacklog;
}
bool hasData(char*& pBuf)
/// Returns true if buffer contains data.
{
DFMutex::ScopedLock l(_dfMutex);
return *reinterpret_cast<MsgSizeT*>(pBuf) > 0;
}
bool isError(char*& pBuf)
/// Returns true if buffer contains error.
{
DFMutex::ScopedLock l(_dfMutex);
return *reinterpret_cast<MsgSizeT*>(pBuf) == BUF_STATUS_ERROR;
}
static Poco::UInt16 offset()
/// Returns buffer data offset.
{
return sizeof(MsgSizeT) + sizeof(poco_socklen_t) + SocketAddress::MAX_ADDRESS_LENGTH;
}
static MsgSizeT payloadSize(char* buf)
{
return *((MsgSizeT*) buf);
}
static SocketAddress address(char* buf)
{
poco_socklen_t* len = reinterpret_cast<poco_socklen_t*>(buf + sizeof(MsgSizeT));
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(buf + sizeof(MsgSizeT) + sizeof(poco_socklen_t));
return SocketAddress(pSA, *len);
}
static char* payload(char* buf)
/// Returns pointer to payload.
///
/// Total message size is S.
///
/// Data message layout is as follows:
///
/// +------------------------+------------------------+-----------------------------------+--------- ~ ---+
/// | sizeof(MsgSizeT) bytes | sizeof(poco_socklen_t) | SocketAddress::MAX_ADDRESS_LENGTH | payload |
/// +------------------------+------------------------+-----------------------------------+--------- ~ ---+
{
return buf + offset();
}
static Poco::StringTokenizer payload(char* buf, char delimiter)
/// Returns tokenized payload.
/// Used when multiple logical messages are contained in a
/// single physical message. Messages must be ASCII, as well as
/// unambiguously delimited in order for this function to succeed.
{
return Poco::StringTokenizer(payload(buf), std::string(1, delimiter), StringTokenizer::TOK_IGNORE_EMPTY);
}
static char* error(char* buf)
/// Returns pointer to the erro message payload.
///
/// Total message size is S.
///
/// Error message layout is as follows:
///
/// +------------------------+--------- ~ ---+
/// | sizeof(MsgSizeT) bytes | payload |
/// +------------------------+--------- ~ ---+
{
return buf + sizeof(MsgSizeT);
}
virtual void processData(char*)
/// Caled when data is received by reader.
///
/// No-op here, must be overriden by inheriting
/// class in order to do useful work.
{
};
virtual void processError(char* buf)
/// Caled when error is detected by reader.
///
/// Only functional if stream pointer is provided
/// to the handler, otherwise it must be overriden
/// by inheriting class in order to do useful work.
{
if (_pErr) *_pErr << error(buf) << std::endl;
setIdle(buf);
}
private:
typedef std::deque<char*> BufList;
typedef std::map<poco_socket_t, BufList> BufMap;
typedef typename BufList::iterator BLIt;
typedef std::map<poco_socket_t, BLIt> BufIt;
typedef Poco::FastMemoryPool<char[S]> MemPool;
void setStatusImpl(char*& pBuf, MsgSizeT status)
{
*reinterpret_cast<MsgSizeT*>(pBuf) = status;
}
void setStatus(char*& pBuf, MsgSizeT status)
{
DFMutex::ScopedLock l(_dfMutex);
setStatusImpl(pBuf, status);
}
void makeNext(poco_socket_t sock, char** ret)
{
_buffers[sock].push_back(reinterpret_cast<char*>(_memPool.get()));
setStatusImpl(_buffers[sock].back(), BUF_STATUS_BUSY);
_bufIt[sock] = _buffers[sock].begin();
*ret = _buffers[sock].back();
}
Poco::Event _ready;
Poco::Thread _thread;
bool _stop;
bool _done;
BufMap _buffers;
BufIt _bufIt;
std::size_t _bufListSize;
const std::size_t _blockSize;
MemPool _memPool;
AtomicCounter _dataBacklog;
AtomicCounter _errorBacklog;
Poco::FastMutex _mutex;
DFMutex _dfMutex;
std::ostream* _pErr;
};
typedef UDPHandlerImpl<POCO_UDP_BUF_SIZE> UDPHandler;
} } // namespace Poco::Net
#endif // Net_UDPHandler_INCLUDED

View File

@@ -0,0 +1,106 @@
//
// UDPServer.h
//
// Library: Net
// Package: UDP
// Module: UDPServer
//
// Definition of the UDPServer class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Net_UDPServer_INCLUDED
#define Net_UDPServer_INCLUDED
#include "Poco/Net/Net.h"
#include "Poco/Net/DatagramSocket.h"
#include "Poco/Net/PollSet.h"
#include "Poco/Net/UDPHandler.h"
#include "Poco/Net/UDPServerParams.h"
#include "Poco/Net/UDPSocketReader.h"
#include "Poco/Net/SingleSocketPoller.h"
#include "Poco/Net/MultiSocketPoller.h"
#include <map>
namespace Poco {
namespace Net {
template <std::size_t S = POCO_UDP_BUF_SIZE,
typename P = SingleSocketPoller<S> >
class UDPServerImpl: public Poco::Runnable
/// UDP server, runs in its own thread and owns a poller, to which
/// data arrival and discovery is delegated. See SingleSocketPoller and
/// MultipleSocketPoller for more information.
{
public:
UDPServerImpl(typename UDPHandlerImpl<S>::List& handlers, const Poco::Net::SocketAddress& sa):
_poller(handlers, sa),
_thread("UDPServer"),
_stop(false)
/// Creates the UDPServer and binds it to
/// the given address.
{
_thread.start(*this);
}
UDPServerImpl(typename UDPHandlerImpl<S>::List& handlers, const UDPServerParams& params):
_poller(handlers, params),
_thread("UDPServer"),
_stop(false)
/// Creates the UDPServer and binds it to
/// the given address.
{
_thread.start(*this);
}
~UDPServerImpl()
/// Destroys the UDPServer.
{
_stop = true;
_poller.stop();
while (!_poller.done()) Thread::sleep(10);
_thread.join();
}
Poco::UInt16 port() const
/// Returns the port the server is
/// listening on.
{
return _poller.port();
}
Poco::Net::SocketAddress address() const
/// Returns the address of the server.
{
return _poller.address();
}
void run()
/// Does the work.
{
while (!_stop) _poller.poll();
}
private:
P _poller;
Poco::Thread _thread;
bool _stop;
};
typedef UDPServerImpl<POCO_UDP_BUF_SIZE, SingleSocketPoller<POCO_UDP_BUF_SIZE> > UDPServer;
typedef UDPServerImpl<POCO_UDP_BUF_SIZE, MultiSocketPoller<POCO_UDP_BUF_SIZE> > UDPMultiServer;
} } // namespace Poco::Net
#endif // Net_UDPServer_INCLUDED

View File

@@ -0,0 +1,122 @@
//
// UDPServerParams.h
//
// Library: Net
// Package: UDP
// Module: UDPServerParams
//
// Definition of the UDPServerParams class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Net_UDPServerParams_INCLUDED
#define Net_UDPServerParams_INCLUDED
#include "Poco/Net/Net.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Timespan.h"
namespace Poco {
namespace Net {
class Net_API UDPServerParams
/// A class encapsulating UDP server parameters.
{
public:
UDPServerParams(const Poco::Net::SocketAddress& sa,
int nSockets = 10,
Poco::Timespan timeout = 250000,
std::size_t handlerBufListSize = 1000,
bool notifySender = false,
int backlogThreshold = 10);
/// Creates UDPServerParams.
~UDPServerParams();
/// Destroys UDPServerParams.
const Poco::Net::SocketAddress& address() const;
/// Returns the server address.
Poco::Timespan timeout() const;
/// Returns polling timeout.
int numberOfSockets() const;
/// Returns nuber of sockets for the server.
std::size_t handlerBufListSize() const;
/// Returns the number of handler buffers allocated
/// before buffers start being reused.
bool notifySender() const;
/// Returns the flag inidcating whether server
/// should send notifications back to the client.
int backlogThreshold() const;
/// Size of mesage or error backlogs at which server
/// reports backlogs back to the client. Only meaningful
/// if notifySender() is true.
private:
UDPServerParams();
Poco::Net::SocketAddress _sa;
int _nSockets;
Poco::Timespan _timeout;
std::size_t _handlerBufListSize;
bool _notifySender;
int _backlogThreshold;
};
//
// inlines
//
inline const Poco::Net::SocketAddress& UDPServerParams::address() const
{
return _sa;
}
inline int UDPServerParams::numberOfSockets() const
{
return _nSockets;
}
inline Poco::Timespan UDPServerParams::timeout() const
{
return _timeout;
}
inline std::size_t UDPServerParams::handlerBufListSize() const
{
return _handlerBufListSize;
}
inline bool UDPServerParams::notifySender() const
{
return _notifySender;
}
inline int UDPServerParams::backlogThreshold() const
{
return _backlogThreshold;
}
} } // namespace Poco::Net
#endif // Net_UDPServerParams_INCLUDED

View File

@@ -0,0 +1,222 @@
//
// UDPSocketReader.h
//
// Library: Net
// Package: UDP
// Module: UDPSocketReader
//
// Definition of the UDPSocketReader class.
//
// Copyright (c) 2005-2006, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Net_UDPSocketReader_INCLUDED
#define Net_UDPSocketReader_INCLUDED
#include "Poco/Net/Net.h"
#include "Poco/Net/DatagramSocket.h"
namespace Poco {
namespace Net {
template <std::size_t S = POCO_UDP_BUF_SIZE>
class UDPSocketReader
/// UDPSocketReader is responsible for UDP socket I/O operations.
/// When data or error is detected on a socket, the reader is invoked
/// to do the I/O. After the data is read, it is passed on, to one
/// of the handlers for processing. Errors are also passed to a
/// handler for handling (if any configured).
/// Depending on settings, data senders may be notified of the handler's
/// data and error backlogs.
{
private:
class Counter
{
public:
Counter(): val(0)
{
}
operator AtomicCounter::ValueType ()
{
return static_cast<AtomicCounter::ValueType>(val);
}
Counter& operator=(AtomicCounter::ValueType value)
{
val = static_cast<Poco::Int32>(value);
return *this;
}
Poco::Int32 operator++()
{
return ++val;
}
Counter& operator++(int)
{
return val++;
}
private:
Poco::Int32 val;
};
public:
UDPSocketReader(typename UDPHandlerImpl<S>::List& handlers, int backlogThreshold = 0):
_handlers(handlers),
_handler(_handlers.begin()),
_backlogThreshold(backlogThreshold)
/// Creates the UDPSocketReader.
{
poco_assert(_handler != _handlers.end());
}
UDPSocketReader(typename UDPHandlerImpl<S>::List& handlers, const UDPServerParams& serverParams):
_handlers(handlers),
_handler(_handlers.begin()),
_backlogThreshold(serverParams.backlogThreshold())
/// Creates the UDPSocketReader.
{
poco_assert(_handler != _handlers.end());
}
~UDPSocketReader()
/// Destroys the UDPSocketReader.
{
}
void read(DatagramSocket& sock)
/// Reads data from the socket and passes it to the next handler.
/// Errors are also passed to the handler. If object is configured
/// for replying to sender and data or error backlog threshold is
/// exceeded, sender is notified of the current backlog size.
{
typedef typename UDPHandlerImpl<S>::MsgSizeT RT;
char* p = 0;
struct sockaddr* pSA = 0;
poco_socklen_t* pAL = 0;
poco_socket_t sockfd = sock.impl()->sockfd();
nextHandler();
try
{
p = handler().next(sockfd);
if (p)
{
Poco::UInt16 off = handler().offset();
poco_socklen_t* pAL = reinterpret_cast<poco_socklen_t*>(p + sizeof(RT));
*pAL = SocketAddress::MAX_ADDRESS_LENGTH;
struct sockaddr* pSA = reinterpret_cast<struct sockaddr*>(p + sizeof(RT) + sizeof(poco_socklen_t));
RT ret = sock.receiveFrom(p + off, S - off - 1, &pSA, &pAL);
if (ret < 0)
{
AtomicCounter::ValueType errors = setError(sock.impl()->sockfd(), p, Error::getMessage(Error::last()));
if (_backlogThreshold > 0 && errors > _backlogThreshold && errors != _errorBacklog[sockfd])
{
Poco::Int32 err = static_cast<Poco::Int32>(errors);
sock.sendTo(&err, sizeof(Poco::Int32), SocketAddress(pSA, *pAL));
_errorBacklog[sockfd] = errors;
}
return;
}
AtomicCounter::ValueType data = handler().setData(p, ret);
p[off + ret] = 0; // for ascii convenience, zero-terminate
if (_backlogThreshold > 0 && data > _backlogThreshold && data != _dataBacklog[sockfd])
{
Poco::Int32 d = static_cast<Poco::Int32>(data);
sock.sendTo(&d, sizeof(Poco::Int32), SocketAddress(pSA, *pAL));
_dataBacklog[sockfd] = data;
}
}
else return;
}
catch (Poco::Exception& exc)
{
AtomicCounter::ValueType errors = setError(sock.impl()->sockfd(), p, exc.displayText());
if (_backlogThreshold > 0 && errors > _backlogThreshold && errors != _errorBacklog[sockfd] && pSA && pAL)
{
Poco::Int32 err = static_cast<Poco::Int32>(errors);
sock.sendTo(&err, sizeof(Poco::Int32), SocketAddress(pSA, *pAL));
_errorBacklog[sockfd] = errors;
}
}
handler().notify();
}
bool handlerStopped() const
/// Returns true if all handlers are stopped.
{
bool stopped = true;
typename UDPHandlerImpl<S>::List::iterator it = _handlers.begin();
typename UDPHandlerImpl<S>::List::iterator end = _handlers.end();
for (; it != end; ++it) stopped = stopped && (*it)->stopped();
return stopped;
}
void stopHandler()
/// Stops all handlers.
{
typename UDPHandlerImpl<S>::List::iterator it = _handlers.begin();
typename UDPHandlerImpl<S>::List::iterator end = _handlers.end();
for (; it != end; ++it) (*it)->stop();
}
bool handlerDone() const
/// Returns true if all handlers are done processing data.
{
bool done = true;
typename UDPHandlerImpl<S>::List::iterator it = _handlers.begin();
typename UDPHandlerImpl<S>::List::iterator end = _handlers.end();
for (; it != end; ++it) done = done && (*it)->done();
return done;
}
AtomicCounter::ValueType setError(poco_socket_t sock, char* buf = 0, const std::string& err = "")
/// Sets error to the provided buffer buf. If the buffer is null, a new buffer is obtained
/// from handler.
/// If successful, returns the handler's eror backlog size, otherwise returns zero.
{
if (!buf) buf = handler().next(sock);
if (buf) return handler().setError(buf, err.empty() ? Error::getMessage(Error::last()) : err);
return 0;
}
private:
void nextHandler()
/// Re-points the handler iterator to the next handler in
/// round-robin fashion.
{
poco_assert_dbg (_handler != _handlers.end());
if (++_handler == _handlers.end()) _handler = _handlers.begin();
}
UDPHandlerImpl<S>& handler()
/// Returns the reference to the current handler.
{
poco_assert_dbg (_handler != _handlers.end());
return **_handler;
}
typedef typename UDPHandlerImpl<S>::List HandlerList;
typedef typename UDPHandlerImpl<S>::List::iterator HandlerIterator;
typedef std::map<poco_socket_t, Counter> CounterMap;
HandlerList& _handlers;
HandlerIterator _handler;
CounterMap _dataBacklog;
CounterMap _errorBacklog;
int _backlogThreshold;
};
} } // namespace Poco::Net
#endif // Net_UDPSocketReader_INCLUDED

View File

@@ -21,7 +21,6 @@
#include "Poco/Net/StreamSocketImpl.h"
#include "Poco/Buffer.h"
#include "Poco/Random.h"
#include "Poco/Buffer.h"
namespace Poco {
@@ -46,7 +45,7 @@ public:
virtual int receiveBytes(void* buffer, int length, int flags);
/// Receives a WebSocket protocol frame.
virtual int receiveBytes(Poco::Buffer<char>& buffer, int flags);
virtual int receiveBytes(Poco::Buffer<char>& buffer, int flags = 0, const Poco::Timespan& span = 0);
/// Receives a WebSocket protocol frame.
virtual SocketImpl* acceptConnection(SocketAddress& clientAddr);