support for FIFOBuffer in StreamSocket::read/writeBytes()

This commit is contained in:
Aleksandar Fabijanic 2012-05-19 05:16:42 +00:00
parent 9b952a29c7
commit 484510dec5
6 changed files with 246 additions and 12 deletions

View File

@ -65,8 +65,8 @@ class BasicFIFOBuffer
public: public:
typedef T Type; typedef T Type;
mutable Poco::BasicEvent<bool> Writable; mutable Poco::BasicEvent<bool> writable;
/// Event indicating "writeability" of the buffer, /// Event indicating "writability" of the buffer,
/// triggerred as follows: /// triggerred as follows:
/// ///
/// * when buffer transitions from non-full to full, /// * when buffer transitions from non-full to full,
@ -77,7 +77,7 @@ public:
/// Writable event observers are notified, with /// Writable event observers are notified, with
/// true value as the argument /// true value as the argument
mutable Poco::BasicEvent<bool> Readable; mutable Poco::BasicEvent<bool> readable;
/// Event indicating "readability" of the buffer, /// Event indicating "readability" of the buffer,
/// triggerred as follows: /// triggerred as follows:
/// ///
@ -235,6 +235,47 @@ public:
if (_notify) notify(usedBefore); if (_notify) notify(usedBefore);
} }
void copy(const T* ptr, std::size_t length)
/// Copies the supplied data to the buffer and adjusts
/// the used buffer size.
{
poco_check_ptr(ptr);
if (0 == length) return;
if (length > available())
throw Poco::InvalidAccessException("Cannot extend buffer.");
std::memcpy(&_buffer[_used], ptr, length);
advance(length);
}
void advance(std::size_t length)
/// Advances buffer by length elements.
/// Should be called AFTER the data
/// was copied into the buffer.
{
if (length > available())
throw Poco::InvalidAccessException("Cannot extend buffer.");
std::size_t usedBefore = _used;
_used += length;
if (_notify) notify(usedBefore);
}
T* begin()
/// Returns the pointer to the beginning of the buffer.
{
return _buffer.begin();
}
T* next()
/// Returns the pointer to the next available position in the buffer.
{
if (available() == 0)
throw InvalidAccessException("Buffer is full.");
return _buffer.begin() + _used;
}
T& operator [] (std::size_t index) T& operator [] (std::size_t index)
/// Returns value at index position. /// Returns value at index position.
/// Throws InvalidAccessException if index is larger than /// Throws InvalidAccessException if index is larger than
@ -274,14 +315,14 @@ private:
{ {
bool t = true, f = false; bool t = true, f = false;
if (usedBefore == 0 && _used > 0) if (usedBefore == 0 && _used > 0)
Readable.notify(this, t); readable.notify(this, t);
else if (usedBefore > 0 && 0 == _used) else if (usedBefore > 0 && 0 == _used)
Readable.notify(this, f); readable.notify(this, f);
if (usedBefore == _buffer.size() && _used < _buffer.size()) if (usedBefore == _buffer.size() && _used < _buffer.size())
Writable.notify(this, t); writable.notify(this, t);
else if (usedBefore < _buffer.size() && _used == _buffer.size()) else if (usedBefore < _buffer.size() && _used == _buffer.size())
Writable.notify(this, f); writable.notify(this, f);
} }
BasicFIFOBuffer(); BasicFIFOBuffer();

View File

@ -270,8 +270,8 @@ void CoreTest::testFIFOBufferChar()
Buffer<T> b(10); Buffer<T> b(10);
std::vector<T> v; std::vector<T> v;
f.Readable += delegate(this, &CoreTest::onReadable); f.readable += delegate(this, &CoreTest::onReadable);
f.Writable += delegate(this, &CoreTest::onWritable); f.writable += delegate(this, &CoreTest::onWritable);
for (T c = '0'; c < '0' + 10; ++c) for (T c = '0'; c < '0' + 10; ++c)
v.push_back(c); v.push_back(c);
@ -501,15 +501,68 @@ void CoreTest::testFIFOBufferChar()
assert (8 == f.available()); assert (8 == f.available());
assert (!f.isEmpty()); assert (!f.isEmpty());
assert(6 == _notToReadable);
assert(5 == _readableToNot);
assert(1 == _notToWritable);
assert(1 == _writableToNot);
f.drain(); f.drain();
assert(6 == _notToReadable);
assert(6 == _readableToNot);
assert(1 == _notToWritable);
assert(1 == _writableToNot);
assert (3 == f.write(b, 10)); assert (3 == f.write(b, 10));
assert (10 == f.size()); assert (10 == f.size());
assert (3 == f.used()); assert (3 == f.used());
assert (7 == f.available()); assert (7 == f.available());
assert (!f.isEmpty()); assert (!f.isEmpty());
f.Readable -= delegate(this, &CoreTest::onReadable); assert(7 == _notToReadable);
f.Writable -= delegate(this, &CoreTest::onReadable); assert(6 == _readableToNot);
assert(1 == _notToWritable);
assert(1 == _writableToNot);
const char c[3] = {'4', '5', '6' };
try
{
f.copy(&c[0], 8);
} catch (InvalidAccessException&) { }
f.copy(&c[0], 3);
assert(7 == _notToReadable);
assert(6 == _readableToNot);
assert(1 == _notToWritable);
assert(1 == _writableToNot);
assert (10 == f.size());
assert (6 == f.used());
assert (4 == f.available());
const char d[4] = {'7', '8', '9', '0' };
f.copy(&c[0], 4);
assert(7 == _notToReadable);
assert(6 == _readableToNot);
assert(1 == _notToWritable);
assert(2 == _writableToNot);
assert (10 == f.size());
assert (10 == f.used());
assert (0 == f.available());
try
{
f.copy(&c[0], 1);
} catch (InvalidAccessException&) { }
f.drain(1);
assert(7 == _notToReadable);
assert(6 == _readableToNot);
assert(2 == _notToWritable);
assert(2 == _writableToNot);
f.readable -= delegate(this, &CoreTest::onReadable);
f.writable -= delegate(this, &CoreTest::onReadable);
} }

View File

@ -42,6 +42,7 @@
#include "Poco/Net/Net.h" #include "Poco/Net/Net.h"
#include "Poco/Net/Socket.h" #include "Poco/Net/Socket.h"
#include "Poco/FIFOBuffer.h"
namespace Poco { namespace Poco {
@ -128,6 +129,19 @@ public:
/// Certain socket implementations may also return a negative /// Certain socket implementations may also return a negative
/// value denoting a certain condition. /// value denoting a certain condition.
int sendBytes(Poco::FIFOBuffer& buffer);
/// Sends the contents of the given buffer through
/// the socket. FIFOBuffer has writable/readable transiton
/// notifications which may be enabled to notify the caller when
/// the buffer transitions between empty, partially full and
/// full states.
///
/// Returns the number of bytes sent, which may be
/// less than the number of bytes specified.
///
/// Certain socket implementations may also return a negative
/// value denoting a certain condition.
int receiveBytes(void* buffer, int length, int flags = 0); int receiveBytes(void* buffer, int length, int flags = 0);
/// Receives data from the socket and stores it /// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received. /// in buffer. Up to length bytes are received.
@ -140,6 +154,21 @@ public:
/// been set and nothing is received within that interval. /// been set and nothing is received within that interval.
/// Throws a NetException (or a subclass) in case of other errors. /// Throws a NetException (or a subclass) in case of other errors.
int receiveBytes(Poco::FIFOBuffer& buffer);
/// Receives data from the socket and stores it
/// in buffer. Up to length bytes are received. FIFOBuffer has
/// writable/readable transiton notifications which may be enabled
/// to notify the caller when the buffer transitions between empty,
/// partially full and full states.
///
/// Returns the number of bytes received.
/// A return value of 0 means a graceful shutdown
/// of the connection from the peer.
///
/// Throws a TimeoutException if a receive timeout has
/// been set and nothing is received within that interval.
/// Throws a NetException (or a subclass) in case of other errors.
void sendUrgent(unsigned char data); void sendUrgent(unsigned char data);
/// Sends one byte of urgent data through /// Sends one byte of urgent data through
/// the socket. /// the socket.
@ -149,7 +178,6 @@ public:
/// The preferred way for a socket to receive urgent data /// The preferred way for a socket to receive urgent data
/// is by enabling the SO_OOBINLINE option. /// is by enabling the SO_OOBINLINE option.
public:
StreamSocket(SocketImpl* pImpl); StreamSocket(SocketImpl* pImpl);
/// Creates the Socket and attaches the given SocketImpl. /// Creates the Socket and attaches the given SocketImpl.
/// The socket takes owership of the SocketImpl. /// The socket takes owership of the SocketImpl.
@ -157,6 +185,12 @@ public:
/// The SocketImpl must be a StreamSocketImpl, otherwise /// The SocketImpl must be a StreamSocketImpl, otherwise
/// an InvalidArgumentException will be thrown. /// an InvalidArgumentException will be thrown.
private:
enum
{
BUFFER_SIZE = 1024
};
friend class ServerSocket; friend class ServerSocket;
friend class SocketIOS; friend class SocketIOS;
}; };

View File

@ -36,6 +36,7 @@
#include "Poco/Net/StreamSocket.h" #include "Poco/Net/StreamSocket.h"
#include "Poco/Net/StreamSocketImpl.h" #include "Poco/Net/StreamSocketImpl.h"
#include "Poco/FIFOBuffer.h"
#include "Poco/Exception.h" #include "Poco/Exception.h"
@ -133,12 +134,28 @@ int StreamSocket::sendBytes(const void* buffer, int length, int flags)
} }
int StreamSocket::sendBytes(FIFOBuffer& fifoBuf)
{
int ret = impl()->sendBytes(&fifoBuf.buffer()[0], fifoBuf.used());
if (ret > 0) fifoBuf.drain(ret);
return ret;
}
int StreamSocket::receiveBytes(void* buffer, int length, int flags) int StreamSocket::receiveBytes(void* buffer, int length, int flags)
{ {
return impl()->receiveBytes(buffer, length, flags); return impl()->receiveBytes(buffer, length, flags);
} }
int StreamSocket::receiveBytes(FIFOBuffer& fifoBuf)
{
int ret = impl()->receiveBytes(fifoBuf.next(), fifoBuf.available());
if (ret > 0) fifoBuf.advance(ret);
return ret;
}
void StreamSocket::sendUrgent(unsigned char data) void StreamSocket::sendUrgent(unsigned char data)
{ {
impl()->sendUrgent(data); impl()->sendUrgent(data);

View File

@ -40,6 +40,9 @@
#include "Poco/Net/NetException.h" #include "Poco/Net/NetException.h"
#include "Poco/Timespan.h" #include "Poco/Timespan.h"
#include "Poco/Stopwatch.h" #include "Poco/Stopwatch.h"
#include "Poco/Buffer.h"
#include "Poco/FIFOBuffer.h"
#include "Poco/Delegate.h"
#include <iostream> #include <iostream>
@ -52,6 +55,9 @@ using Poco::Timespan;
using Poco::Stopwatch; using Poco::Stopwatch;
using Poco::TimeoutException; using Poco::TimeoutException;
using Poco::InvalidArgumentException; using Poco::InvalidArgumentException;
using Poco::Buffer;
using Poco::FIFOBuffer;
using Poco::delegate;
SocketTest::SocketTest(const std::string& name): CppUnit::TestCase(name) SocketTest::SocketTest(const std::string& name): CppUnit::TestCase(name)
@ -122,6 +128,62 @@ void SocketTest::testAvailable()
} }
void SocketTest::testFIFOBuffer()
{
Buffer<char> b(5);
b[0] = 'h';
b[1] = 'e';
b[2] = 'l';
b[3] = 'l';
b[4] = 'o';
FIFOBuffer f(5, true);
f.readable += delegate(this, &SocketTest::onReadable);
f.writable += delegate(this, &SocketTest::onWritable);
assert(0 == _notToReadable);
assert(0 == _readableToNot);
assert(0 == _notToWritable);
assert(0 == _writableToNot);
f.write(b);
assert(1 == _notToReadable);
assert(0 == _readableToNot);
assert(0 == _notToWritable);
assert(1 == _writableToNot);
EchoServer echoServer;
StreamSocket ss;
ss.connect(SocketAddress("localhost", echoServer.port()));
int n = ss.sendBytes(f);
assert (n == 5);
assert(1 == _notToReadable);
assert(1 == _readableToNot);
assert(1 == _notToWritable);
assert(1 == _writableToNot);
assert (f.isEmpty());
n = ss.receiveBytes(f);
assert (n == 5);
assert(2 == _notToReadable);
assert(1 == _readableToNot);
assert(1 == _notToWritable);
assert(2 == _writableToNot);
assert (f[0] == 'h');
assert (f[1] == 'e');
assert (f[2] == 'l');
assert (f[3] == 'l');
assert (f[4] == 'o');
f.readable -= delegate(this, &SocketTest::onReadable);
f.writable -= delegate(this, &SocketTest::onWritable);
ss.close();
}
void SocketTest::testConnect() void SocketTest::testConnect()
{ {
ServerSocket serv; ServerSocket serv;
@ -459,8 +521,26 @@ void SocketTest::testSelect3()
} }
void SocketTest::onReadable(bool& b)
{
if (b) ++_notToReadable;
else ++_readableToNot;
};
void SocketTest::onWritable(bool& b)
{
if (b) ++_notToWritable;
else ++_writableToNot;
}
void SocketTest::setUp() void SocketTest::setUp()
{ {
_readableToNot = 0;
_notToReadable = 0;
_writableToNot = 0;
_notToWritable = 0;
} }
@ -476,6 +556,7 @@ CppUnit::Test* SocketTest::suite()
CppUnit_addTest(pSuite, SocketTest, testEcho); CppUnit_addTest(pSuite, SocketTest, testEcho);
CppUnit_addTest(pSuite, SocketTest, testPoll); CppUnit_addTest(pSuite, SocketTest, testPoll);
CppUnit_addTest(pSuite, SocketTest, testAvailable); CppUnit_addTest(pSuite, SocketTest, testAvailable);
CppUnit_addTest(pSuite, SocketTest, testFIFOBuffer);
CppUnit_addTest(pSuite, SocketTest, testConnect); CppUnit_addTest(pSuite, SocketTest, testConnect);
CppUnit_addTest(pSuite, SocketTest, testConnectRefused); CppUnit_addTest(pSuite, SocketTest, testConnectRefused);
CppUnit_addTest(pSuite, SocketTest, testConnectRefusedNB); CppUnit_addTest(pSuite, SocketTest, testConnectRefusedNB);

View File

@ -49,6 +49,7 @@ public:
void testEcho(); void testEcho();
void testPoll(); void testPoll();
void testAvailable(); void testAvailable();
void testFIFOBuffer();
void testConnect(); void testConnect();
void testConnectRefused(); void testConnectRefused();
void testConnectRefusedNB(); void testConnectRefusedNB();
@ -68,6 +69,13 @@ public:
static CppUnit::Test* suite(); static CppUnit::Test* suite();
private: private:
void onReadable(bool& b);
void onWritable(bool& b);
int _readableToNot;
int _notToReadable;
int _writableToNot;
int _notToWritable;
}; };