mirror of
https://github.com/pocoproject/poco.git
synced 2025-05-03 15:58:23 +02:00
Add WebSocket::receiveFrame() that appends to a Poco::Buffer<char>
This commit is contained in:
parent
4336528290
commit
08a748dae1
@ -23,6 +23,7 @@
|
|||||||
#include "Poco/Net/Net.h"
|
#include "Poco/Net/Net.h"
|
||||||
#include "Poco/Net/StreamSocket.h"
|
#include "Poco/Net/StreamSocket.h"
|
||||||
#include "Poco/Net/HTTPCredentials.h"
|
#include "Poco/Net/HTTPCredentials.h"
|
||||||
|
#include "Poco/Buffer.h"
|
||||||
|
|
||||||
|
|
||||||
namespace Poco {
|
namespace Poco {
|
||||||
@ -221,6 +222,21 @@ public:
|
|||||||
/// The frame flags and opcode (FrameFlags and FrameOpcodes)
|
/// The frame flags and opcode (FrameFlags and FrameOpcodes)
|
||||||
/// is stored in flags.
|
/// is stored in flags.
|
||||||
|
|
||||||
|
int receiveFrame(Poco::Buffer<char>& buffer, int& flags);
|
||||||
|
/// Receives a frame from the socket and stores it
|
||||||
|
/// after any previous content in buffer.
|
||||||
|
///
|
||||||
|
/// Returns the number of bytes received.
|
||||||
|
/// A return value of 0 means that the peer has
|
||||||
|
/// shut down or closed the connection.
|
||||||
|
///
|
||||||
|
/// Throws a TimeoutException if a receive timeout has
|
||||||
|
/// been set and nothing is received within that interval.
|
||||||
|
/// Throws a NetException (or a subclass) in case of other errors.
|
||||||
|
///
|
||||||
|
/// The frame flags and opcode (FrameFlags and FrameOpcodes)
|
||||||
|
/// is stored in flags.
|
||||||
|
|
||||||
Mode mode() const;
|
Mode mode() const;
|
||||||
/// Returns WS_SERVER if the WebSocket is a server-side
|
/// Returns WS_SERVER if the WebSocket is a server-side
|
||||||
/// WebSocket, or WS_CLIENT otherwise.
|
/// WebSocket, or WS_CLIENT otherwise.
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
|
|
||||||
#include "Poco/Net/StreamSocketImpl.h"
|
#include "Poco/Net/StreamSocketImpl.h"
|
||||||
|
#include "Poco/Buffer.h"
|
||||||
#include "Poco/Random.h"
|
#include "Poco/Random.h"
|
||||||
|
|
||||||
|
|
||||||
@ -43,6 +44,9 @@ public:
|
|||||||
virtual int receiveBytes(void* buffer, int length, int flags);
|
virtual int receiveBytes(void* buffer, int length, int flags);
|
||||||
/// Receives a WebSocket protocol frame.
|
/// Receives a WebSocket protocol frame.
|
||||||
|
|
||||||
|
virtual int receiveBytes(Poco::Buffer<char>& buffer, int flags);
|
||||||
|
/// Receives a WebSocket protocol frame.
|
||||||
|
|
||||||
virtual SocketImpl* acceptConnection(SocketAddress& clientAddr);
|
virtual SocketImpl* acceptConnection(SocketAddress& clientAddr);
|
||||||
virtual void connect(const SocketAddress& address);
|
virtual void connect(const SocketAddress& address);
|
||||||
virtual void connect(const SocketAddress& address, const Poco::Timespan& timeout);
|
virtual void connect(const SocketAddress& address, const Poco::Timespan& timeout);
|
||||||
@ -77,6 +81,9 @@ protected:
|
|||||||
MAX_HEADER_LENGTH = 14
|
MAX_HEADER_LENGTH = 14
|
||||||
};
|
};
|
||||||
|
|
||||||
|
int receiveHeader(char mask[4], bool& useMask);
|
||||||
|
int receivePayload(char *buffer, int payloadLength, char mask[4], bool useMask);
|
||||||
|
|
||||||
int receiveNBytes(void* buffer, int bytes);
|
int receiveNBytes(void* buffer, int bytes);
|
||||||
virtual ~WebSocketImpl();
|
virtual ~WebSocketImpl();
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
#include "Poco/String.h"
|
#include "Poco/String.h"
|
||||||
#include "Poco/Random.h"
|
#include "Poco/Random.h"
|
||||||
#include "Poco/StreamCopier.h"
|
#include "Poco/StreamCopier.h"
|
||||||
|
#include "Poco/Buffer.h"
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
|
||||||
|
|
||||||
@ -114,6 +115,14 @@ int WebSocket::receiveFrame(void* buffer, int length, int& flags)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int WebSocket::receiveFrame(Poco::Buffer<char>& buffer, int& flags)
|
||||||
|
{
|
||||||
|
int n = static_cast<WebSocketImpl*>(impl())->receiveBytes(buffer, 0);
|
||||||
|
flags = static_cast<WebSocketImpl*>(impl())->frameFlags();
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
WebSocket::Mode WebSocket::mode() const
|
WebSocket::Mode WebSocket::mode() const
|
||||||
{
|
{
|
||||||
return static_cast<WebSocketImpl*>(impl())->mustMaskPayload() ? WS_CLIENT : WS_SERVER;
|
return static_cast<WebSocketImpl*>(impl())->mustMaskPayload() ? WS_CLIENT : WS_SERVER;
|
||||||
|
@ -104,7 +104,7 @@ int WebSocketImpl::sendBytes(const void* buffer, int length, int flags)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int WebSocketImpl::receiveBytes(void* buffer, int length, int)
|
int WebSocketImpl::receiveHeader(char mask[4], bool& useMask)
|
||||||
{
|
{
|
||||||
char header[MAX_HEADER_LENGTH];
|
char header[MAX_HEADER_LENGTH];
|
||||||
int n = receiveNBytes(header, 2);
|
int n = receiveNBytes(header, 2);
|
||||||
@ -114,82 +114,100 @@ int WebSocketImpl::receiveBytes(void* buffer, int length, int)
|
|||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
poco_assert (n == 2);
|
poco_assert (n == 2);
|
||||||
Poco::UInt8 lengthByte = static_cast<Poco::UInt8>(header[1]);
|
Poco::UInt8 flags = static_cast<Poco::UInt8>(header[0]);
|
||||||
int maskOffset = 0;
|
|
||||||
if (lengthByte & FRAME_FLAG_MASK) maskOffset += 4;
|
|
||||||
lengthByte &= 0x7f;
|
|
||||||
if (lengthByte > 0 || maskOffset > 0)
|
|
||||||
{
|
|
||||||
if (lengthByte + 2 + maskOffset < MAX_HEADER_LENGTH)
|
|
||||||
{
|
|
||||||
n = receiveNBytes(header + 2, lengthByte + maskOffset);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
n = receiveNBytes(header + 2, MAX_HEADER_LENGTH - 2);
|
|
||||||
}
|
|
||||||
if (n <= 0) throw WebSocketException("Incomplete header received", WebSocket::WS_ERR_INCOMPLETE_FRAME);
|
|
||||||
n += 2;
|
|
||||||
}
|
|
||||||
Poco::MemoryInputStream istr(header, n);
|
|
||||||
Poco::BinaryReader reader(istr, Poco::BinaryReader::NETWORK_BYTE_ORDER);
|
|
||||||
Poco::UInt8 flags;
|
|
||||||
char mask[4];
|
|
||||||
reader >> flags >> lengthByte;
|
|
||||||
_frameFlags = flags;
|
_frameFlags = flags;
|
||||||
int payloadLength = 0;
|
Poco::UInt8 lengthByte = static_cast<Poco::UInt8>(header[1]);
|
||||||
int payloadOffset = 2;
|
useMask = ((lengthByte & FRAME_FLAG_MASK) != 0);
|
||||||
if ((lengthByte & 0x7f) == 127)
|
int payloadLength;
|
||||||
|
lengthByte &= 0x7f;
|
||||||
|
if (lengthByte == 127)
|
||||||
{
|
{
|
||||||
|
n = receiveNBytes(header + 2, 8);
|
||||||
|
if (n <= 0)
|
||||||
|
{
|
||||||
|
_frameFlags = 0;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
Poco::MemoryInputStream istr(header + 2, 8);
|
||||||
|
Poco::BinaryReader reader(istr, Poco::BinaryReader::NETWORK_BYTE_ORDER);
|
||||||
Poco::UInt64 l;
|
Poco::UInt64 l;
|
||||||
reader >> l;
|
reader >> l;
|
||||||
if (l > length) throw WebSocketException(Poco::format("Insufficient buffer for payload size %Lu", l), WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
|
|
||||||
payloadLength = static_cast<int>(l);
|
payloadLength = static_cast<int>(l);
|
||||||
payloadOffset += 8;
|
} else if (lengthByte == 126)
|
||||||
}
|
|
||||||
else if ((lengthByte & 0x7f) == 126)
|
|
||||||
{
|
{
|
||||||
|
n = receiveNBytes(header + 2, 2);
|
||||||
|
if (n <= 0)
|
||||||
|
{
|
||||||
|
_frameFlags = 0;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
Poco::MemoryInputStream istr(header + 2, 2);
|
||||||
|
Poco::BinaryReader reader(istr, Poco::BinaryReader::NETWORK_BYTE_ORDER);
|
||||||
Poco::UInt16 l;
|
Poco::UInt16 l;
|
||||||
reader >> l;
|
reader >> l;
|
||||||
if (l > length) throw WebSocketException(Poco::format("Insufficient buffer for payload size %hu", l), WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
|
|
||||||
payloadLength = static_cast<int>(l);
|
payloadLength = static_cast<int>(l);
|
||||||
payloadOffset += 2;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Poco::UInt8 l = lengthByte & 0x7f;
|
payloadLength = lengthByte;
|
||||||
if (l > length) throw WebSocketException(Poco::format("Insufficient buffer for payload size %u", unsigned(l)), WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
|
|
||||||
payloadLength = static_cast<int>(l);
|
|
||||||
}
|
}
|
||||||
if (lengthByte & FRAME_FLAG_MASK)
|
|
||||||
|
if (useMask)
|
||||||
{
|
{
|
||||||
reader.readRaw(mask, 4);
|
n = receiveNBytes(mask, 4);
|
||||||
payloadOffset += 4;
|
if (n <= 0)
|
||||||
|
{
|
||||||
|
_frameFlags = 0;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
int received = 0;
|
|
||||||
if (payloadOffset < n)
|
return payloadLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int WebSocketImpl::receivePayload(char *buffer, int payloadLength, char mask[4], bool useMask)
|
||||||
|
{
|
||||||
|
int received = receiveNBytes(reinterpret_cast<char*>(buffer), payloadLength);
|
||||||
|
if (received <= 0) throw WebSocketException("Incomplete frame received", WebSocket::WS_ERR_INCOMPLETE_FRAME);
|
||||||
|
|
||||||
|
if (useMask)
|
||||||
{
|
{
|
||||||
std::memcpy(buffer, header + payloadOffset, n - payloadOffset);
|
|
||||||
received = n - payloadOffset;
|
|
||||||
}
|
|
||||||
if (received < payloadLength)
|
|
||||||
{
|
|
||||||
n = receiveNBytes(reinterpret_cast<char*>(buffer) + received, payloadLength - received);
|
|
||||||
if (n <= 0) throw WebSocketException("Incomplete frame received", WebSocket::WS_ERR_INCOMPLETE_FRAME);
|
|
||||||
received += n;
|
|
||||||
}
|
|
||||||
if (lengthByte & FRAME_FLAG_MASK)
|
|
||||||
{
|
|
||||||
char* p = reinterpret_cast<char*>(buffer);
|
|
||||||
for (int i = 0; i < received; i++)
|
for (int i = 0; i < received; i++)
|
||||||
{
|
{
|
||||||
p[i] ^= mask[i % 4];
|
buffer[i] ^= mask[i % 4];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return received;
|
return received;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int WebSocketImpl::receiveBytes(void* buffer, int length, int)
|
||||||
|
{
|
||||||
|
char mask[4];
|
||||||
|
bool useMask;
|
||||||
|
int payloadLength = receiveHeader(mask, useMask);
|
||||||
|
if (payloadLength <= 0)
|
||||||
|
return payloadLength;
|
||||||
|
if (payloadLength > length)
|
||||||
|
throw WebSocketException(Poco::format("Insufficient buffer for payload size %hu", payloadLength), WebSocket::WS_ERR_PAYLOAD_TOO_BIG);
|
||||||
|
return receivePayload(reinterpret_cast<char*>(buffer), payloadLength, mask, useMask);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int WebSocketImpl::receiveBytes(Poco::Buffer<char>& buffer, int)
|
||||||
|
{
|
||||||
|
char mask[4];
|
||||||
|
bool useMask;
|
||||||
|
int payloadLength = receiveHeader(mask, useMask);
|
||||||
|
if (payloadLength <= 0)
|
||||||
|
return payloadLength;
|
||||||
|
int oldSize = buffer.size();
|
||||||
|
buffer.resize(oldSize + payloadLength);
|
||||||
|
return receivePayload(buffer.begin() + oldSize, payloadLength, mask, useMask);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int WebSocketImpl::receiveNBytes(void* buffer, int bytes)
|
int WebSocketImpl::receiveNBytes(void* buffer, int bytes)
|
||||||
{
|
{
|
||||||
int received = _pStreamSocketImpl->receiveBytes(reinterpret_cast<char*>(buffer), bytes);
|
int received = _pStreamSocketImpl->receiveBytes(reinterpret_cast<char*>(buffer), bytes);
|
||||||
|
@ -141,6 +141,13 @@ void WebSocketTest::testWebSocket()
|
|||||||
assert (n == payload.size());
|
assert (n == payload.size());
|
||||||
assert (payload.compare(0, payload.size(), buffer, 0, n) == 0);
|
assert (payload.compare(0, payload.size(), buffer, 0, n) == 0);
|
||||||
assert (flags == WebSocket::FRAME_TEXT);
|
assert (flags == WebSocket::FRAME_TEXT);
|
||||||
|
|
||||||
|
ws.sendFrame(payload.data(), (int) payload.size());
|
||||||
|
Poco::Buffer<char> pocobuffer(0);
|
||||||
|
n = ws.receiveFrame(pocobuffer, flags);
|
||||||
|
assert (n == payload.size());
|
||||||
|
assert (payload.compare(0, payload.size(), pocobuffer.begin(), 0, n) == 0);
|
||||||
|
assert (flags == WebSocket::FRAME_TEXT);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 125; i < 129; i++)
|
for (int i = 125; i < 129; i++)
|
||||||
@ -151,6 +158,13 @@ void WebSocketTest::testWebSocket()
|
|||||||
assert (n == payload.size());
|
assert (n == payload.size());
|
||||||
assert (payload.compare(0, payload.size(), buffer, 0, n) == 0);
|
assert (payload.compare(0, payload.size(), buffer, 0, n) == 0);
|
||||||
assert (flags == WebSocket::FRAME_TEXT);
|
assert (flags == WebSocket::FRAME_TEXT);
|
||||||
|
|
||||||
|
ws.sendFrame(payload.data(), (int) payload.size());
|
||||||
|
Poco::Buffer<char> pocobuffer(0);
|
||||||
|
n = ws.receiveFrame(pocobuffer, flags);
|
||||||
|
assert (n == payload.size());
|
||||||
|
assert (payload.compare(0, payload.size(), pocobuffer.begin(), 0, n) == 0);
|
||||||
|
assert (flags == WebSocket::FRAME_TEXT);
|
||||||
}
|
}
|
||||||
|
|
||||||
payload = "Hello, world!";
|
payload = "Hello, world!";
|
||||||
@ -210,6 +224,49 @@ void WebSocketTest::testWebSocketLarge()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void WebSocketTest::testOneLargeFrame(int msgSize)
|
||||||
|
{
|
||||||
|
Poco::Net::ServerSocket ss(0);
|
||||||
|
Poco::Net::HTTPServer server(new WebSocketRequestHandlerFactory(msgSize), ss, new Poco::Net::HTTPServerParams);
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
Poco::Thread::sleep(200);
|
||||||
|
|
||||||
|
HTTPClientSession cs("localhost", ss.address().port());
|
||||||
|
HTTPRequest request(HTTPRequest::HTTP_GET, "/ws");
|
||||||
|
HTTPResponse response;
|
||||||
|
WebSocket ws(cs, request, response);
|
||||||
|
ws.setSendBufferSize(msgSize);
|
||||||
|
ws.setReceiveBufferSize(msgSize);
|
||||||
|
std::string payload(msgSize, 'x');
|
||||||
|
|
||||||
|
ws.sendFrame(payload.data(), msgSize);
|
||||||
|
|
||||||
|
char buffer[msgSize];
|
||||||
|
int flags;
|
||||||
|
int n;
|
||||||
|
|
||||||
|
n = ws.receiveFrame(buffer, sizeof(buffer), flags);
|
||||||
|
assert (n == payload.size());
|
||||||
|
assert (payload.compare(0, payload.size(), buffer, 0, n) == 0);
|
||||||
|
|
||||||
|
ws.sendFrame(payload.data(), msgSize);
|
||||||
|
|
||||||
|
Poco::Buffer<char> pocobuffer(0);
|
||||||
|
|
||||||
|
n = ws.receiveFrame(pocobuffer, flags);
|
||||||
|
assert (n == payload.size());
|
||||||
|
assert (payload.compare(0, payload.size(), pocobuffer.begin(), 0, n) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void WebSocketTest::testWebSocketLargeInOneFrame()
|
||||||
|
{
|
||||||
|
testOneLargeFrame(64000);
|
||||||
|
testOneLargeFrame(70000);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void WebSocketTest::setUp()
|
void WebSocketTest::setUp()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -226,6 +283,7 @@ CppUnit::Test* WebSocketTest::suite()
|
|||||||
|
|
||||||
CppUnit_addTest(pSuite, WebSocketTest, testWebSocket);
|
CppUnit_addTest(pSuite, WebSocketTest, testWebSocket);
|
||||||
CppUnit_addTest(pSuite, WebSocketTest, testWebSocketLarge);
|
CppUnit_addTest(pSuite, WebSocketTest, testWebSocketLarge);
|
||||||
|
CppUnit_addTest(pSuite, WebSocketTest, testWebSocketLargeInOneFrame);
|
||||||
|
|
||||||
return pSuite;
|
return pSuite;
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ public:
|
|||||||
|
|
||||||
void testWebSocket();
|
void testWebSocket();
|
||||||
void testWebSocketLarge();
|
void testWebSocketLarge();
|
||||||
|
void testWebSocketLargeInOneFrame();
|
||||||
|
|
||||||
void setUp();
|
void setUp();
|
||||||
void tearDown();
|
void tearDown();
|
||||||
@ -35,6 +36,7 @@ public:
|
|||||||
static CppUnit::Test* suite();
|
static CppUnit::Test* suite();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void testOneLargeFrame(int msgSize);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user