From 484510dec5fb2af386fb31fec48f635033532a74 Mon Sep 17 00:00:00 2001 From: Aleksandar Fabijanic Date: Sat, 19 May 2012 05:16:42 +0000 Subject: [PATCH] support for FIFOBuffer in StreamSocket::read/writeBytes() --- Foundation/include/Poco/FIFOBuffer.h | 55 +++++++++++++++--- Foundation/testsuite/src/CoreTest.cpp | 61 ++++++++++++++++++-- Net/include/Poco/Net/StreamSocket.h | 36 +++++++++++- Net/src/StreamSocket.cpp | 17 ++++++ Net/testsuite/src/SocketTest.cpp | 81 +++++++++++++++++++++++++++ Net/testsuite/src/SocketTest.h | 8 +++ 6 files changed, 246 insertions(+), 12 deletions(-) diff --git a/Foundation/include/Poco/FIFOBuffer.h b/Foundation/include/Poco/FIFOBuffer.h index 70c7c534f..40c24d7fa 100644 --- a/Foundation/include/Poco/FIFOBuffer.h +++ b/Foundation/include/Poco/FIFOBuffer.h @@ -65,8 +65,8 @@ class BasicFIFOBuffer public: typedef T Type; - mutable Poco::BasicEvent Writable; - /// Event indicating "writeability" of the buffer, + mutable Poco::BasicEvent writable; + /// Event indicating "writability" of the buffer, /// triggerred as follows: /// /// * when buffer transitions from non-full to full, @@ -77,7 +77,7 @@ public: /// Writable event observers are notified, with /// true value as the argument - mutable Poco::BasicEvent Readable; + mutable Poco::BasicEvent readable; /// Event indicating "readability" of the buffer, /// triggerred as follows: /// @@ -235,6 +235,47 @@ public: if (_notify) notify(usedBefore); } + void copy(const T* ptr, std::size_t length) + /// Copies the supplied data to the buffer and adjusts + /// the used buffer size. + { + poco_check_ptr(ptr); + if (0 == length) return; + if (length > available()) + throw Poco::InvalidAccessException("Cannot extend buffer."); + + std::memcpy(&_buffer[_used], ptr, length); + advance(length); + } + + void advance(std::size_t length) + /// Advances buffer by length elements. + /// Should be called AFTER the data + /// was copied into the buffer. + { + if (length > available()) + throw Poco::InvalidAccessException("Cannot extend buffer."); + + std::size_t usedBefore = _used; + _used += length; + if (_notify) notify(usedBefore); + } + + T* begin() + /// Returns the pointer to the beginning of the buffer. + { + return _buffer.begin(); + } + + T* next() + /// Returns the pointer to the next available position in the buffer. + { + if (available() == 0) + throw InvalidAccessException("Buffer is full."); + + return _buffer.begin() + _used; + } + T& operator [] (std::size_t index) /// Returns value at index position. /// Throws InvalidAccessException if index is larger than @@ -274,14 +315,14 @@ private: { bool t = true, f = false; if (usedBefore == 0 && _used > 0) - Readable.notify(this, t); + readable.notify(this, t); else if (usedBefore > 0 && 0 == _used) - Readable.notify(this, f); + readable.notify(this, f); if (usedBefore == _buffer.size() && _used < _buffer.size()) - Writable.notify(this, t); + writable.notify(this, t); else if (usedBefore < _buffer.size() && _used == _buffer.size()) - Writable.notify(this, f); + writable.notify(this, f); } BasicFIFOBuffer(); diff --git a/Foundation/testsuite/src/CoreTest.cpp b/Foundation/testsuite/src/CoreTest.cpp index f9008722a..5472b2015 100644 --- a/Foundation/testsuite/src/CoreTest.cpp +++ b/Foundation/testsuite/src/CoreTest.cpp @@ -270,8 +270,8 @@ void CoreTest::testFIFOBufferChar() Buffer b(10); std::vector v; - f.Readable += delegate(this, &CoreTest::onReadable); - f.Writable += delegate(this, &CoreTest::onWritable); + f.readable += delegate(this, &CoreTest::onReadable); + f.writable += delegate(this, &CoreTest::onWritable); for (T c = '0'; c < '0' + 10; ++c) v.push_back(c); @@ -501,15 +501,68 @@ void CoreTest::testFIFOBufferChar() assert (8 == f.available()); assert (!f.isEmpty()); + assert(6 == _notToReadable); + assert(5 == _readableToNot); + assert(1 == _notToWritable); + assert(1 == _writableToNot); + f.drain(); + assert(6 == _notToReadable); + assert(6 == _readableToNot); + assert(1 == _notToWritable); + assert(1 == _writableToNot); + assert (3 == f.write(b, 10)); assert (10 == f.size()); assert (3 == f.used()); assert (7 == f.available()); assert (!f.isEmpty()); - f.Readable -= delegate(this, &CoreTest::onReadable); - f.Writable -= delegate(this, &CoreTest::onReadable); + assert(7 == _notToReadable); + assert(6 == _readableToNot); + assert(1 == _notToWritable); + assert(1 == _writableToNot); + + const char c[3] = {'4', '5', '6' }; + try + { + f.copy(&c[0], 8); + } catch (InvalidAccessException&) { } + + f.copy(&c[0], 3); + assert(7 == _notToReadable); + assert(6 == _readableToNot); + assert(1 == _notToWritable); + assert(1 == _writableToNot); + + assert (10 == f.size()); + assert (6 == f.used()); + assert (4 == f.available()); + + const char d[4] = {'7', '8', '9', '0' }; + f.copy(&c[0], 4); + assert(7 == _notToReadable); + assert(6 == _readableToNot); + assert(1 == _notToWritable); + assert(2 == _writableToNot); + + assert (10 == f.size()); + assert (10 == f.used()); + assert (0 == f.available()); + + try + { + f.copy(&c[0], 1); + } catch (InvalidAccessException&) { } + + f.drain(1); + assert(7 == _notToReadable); + assert(6 == _readableToNot); + assert(2 == _notToWritable); + assert(2 == _writableToNot); + + f.readable -= delegate(this, &CoreTest::onReadable); + f.writable -= delegate(this, &CoreTest::onReadable); } diff --git a/Net/include/Poco/Net/StreamSocket.h b/Net/include/Poco/Net/StreamSocket.h index 197eed148..1057132b4 100644 --- a/Net/include/Poco/Net/StreamSocket.h +++ b/Net/include/Poco/Net/StreamSocket.h @@ -42,6 +42,7 @@ #include "Poco/Net/Net.h" #include "Poco/Net/Socket.h" +#include "Poco/FIFOBuffer.h" namespace Poco { @@ -128,6 +129,19 @@ public: /// Certain socket implementations may also return a negative /// value denoting a certain condition. + int sendBytes(Poco::FIFOBuffer& buffer); + /// Sends the contents of the given buffer through + /// the socket. FIFOBuffer has writable/readable transiton + /// notifications which may be enabled to notify the caller when + /// the buffer transitions between empty, partially full and + /// full states. + /// + /// Returns the number of bytes sent, which may be + /// less than the number of bytes specified. + /// + /// Certain socket implementations may also return a negative + /// value denoting a certain condition. + int receiveBytes(void* buffer, int length, int flags = 0); /// Receives data from the socket and stores it /// in buffer. Up to length bytes are received. @@ -140,6 +154,21 @@ public: /// been set and nothing is received within that interval. /// Throws a NetException (or a subclass) in case of other errors. + int receiveBytes(Poco::FIFOBuffer& buffer); + /// Receives data from the socket and stores it + /// in buffer. Up to length bytes are received. FIFOBuffer has + /// writable/readable transiton notifications which may be enabled + /// to notify the caller when the buffer transitions between empty, + /// partially full and full states. + /// + /// Returns the number of bytes received. + /// A return value of 0 means a graceful shutdown + /// of the connection from the peer. + /// + /// Throws a TimeoutException if a receive timeout has + /// been set and nothing is received within that interval. + /// Throws a NetException (or a subclass) in case of other errors. + void sendUrgent(unsigned char data); /// Sends one byte of urgent data through /// the socket. @@ -149,7 +178,6 @@ public: /// The preferred way for a socket to receive urgent data /// is by enabling the SO_OOBINLINE option. -public: StreamSocket(SocketImpl* pImpl); /// Creates the Socket and attaches the given SocketImpl. /// The socket takes owership of the SocketImpl. @@ -157,6 +185,12 @@ public: /// The SocketImpl must be a StreamSocketImpl, otherwise /// an InvalidArgumentException will be thrown. +private: + enum + { + BUFFER_SIZE = 1024 + }; + friend class ServerSocket; friend class SocketIOS; }; diff --git a/Net/src/StreamSocket.cpp b/Net/src/StreamSocket.cpp index ac48acc0e..5f96cee4b 100644 --- a/Net/src/StreamSocket.cpp +++ b/Net/src/StreamSocket.cpp @@ -36,6 +36,7 @@ #include "Poco/Net/StreamSocket.h" #include "Poco/Net/StreamSocketImpl.h" +#include "Poco/FIFOBuffer.h" #include "Poco/Exception.h" @@ -133,12 +134,28 @@ int StreamSocket::sendBytes(const void* buffer, int length, int flags) } +int StreamSocket::sendBytes(FIFOBuffer& fifoBuf) +{ + int ret = impl()->sendBytes(&fifoBuf.buffer()[0], fifoBuf.used()); + if (ret > 0) fifoBuf.drain(ret); + return ret; +} + + int StreamSocket::receiveBytes(void* buffer, int length, int flags) { return impl()->receiveBytes(buffer, length, flags); } +int StreamSocket::receiveBytes(FIFOBuffer& fifoBuf) +{ + int ret = impl()->receiveBytes(fifoBuf.next(), fifoBuf.available()); + if (ret > 0) fifoBuf.advance(ret); + return ret; +} + + void StreamSocket::sendUrgent(unsigned char data) { impl()->sendUrgent(data); diff --git a/Net/testsuite/src/SocketTest.cpp b/Net/testsuite/src/SocketTest.cpp index 4d4fd2f3a..b2cc3d04c 100644 --- a/Net/testsuite/src/SocketTest.cpp +++ b/Net/testsuite/src/SocketTest.cpp @@ -40,6 +40,9 @@ #include "Poco/Net/NetException.h" #include "Poco/Timespan.h" #include "Poco/Stopwatch.h" +#include "Poco/Buffer.h" +#include "Poco/FIFOBuffer.h" +#include "Poco/Delegate.h" #include @@ -52,6 +55,9 @@ using Poco::Timespan; using Poco::Stopwatch; using Poco::TimeoutException; using Poco::InvalidArgumentException; +using Poco::Buffer; +using Poco::FIFOBuffer; +using Poco::delegate; SocketTest::SocketTest(const std::string& name): CppUnit::TestCase(name) @@ -122,6 +128,62 @@ void SocketTest::testAvailable() } +void SocketTest::testFIFOBuffer() +{ + Buffer b(5); + b[0] = 'h'; + b[1] = 'e'; + b[2] = 'l'; + b[3] = 'l'; + b[4] = 'o'; + + FIFOBuffer f(5, true); + + f.readable += delegate(this, &SocketTest::onReadable); + f.writable += delegate(this, &SocketTest::onWritable); + + assert(0 == _notToReadable); + assert(0 == _readableToNot); + assert(0 == _notToWritable); + assert(0 == _writableToNot); + f.write(b); + assert(1 == _notToReadable); + assert(0 == _readableToNot); + assert(0 == _notToWritable); + assert(1 == _writableToNot); + + EchoServer echoServer; + StreamSocket ss; + ss.connect(SocketAddress("localhost", echoServer.port())); + int n = ss.sendBytes(f); + assert (n == 5); + assert(1 == _notToReadable); + assert(1 == _readableToNot); + assert(1 == _notToWritable); + assert(1 == _writableToNot); + assert (f.isEmpty()); + + n = ss.receiveBytes(f); + assert (n == 5); + + assert(2 == _notToReadable); + assert(1 == _readableToNot); + assert(1 == _notToWritable); + assert(2 == _writableToNot); + + assert (f[0] == 'h'); + assert (f[1] == 'e'); + assert (f[2] == 'l'); + assert (f[3] == 'l'); + assert (f[4] == 'o'); + + f.readable -= delegate(this, &SocketTest::onReadable); + f.writable -= delegate(this, &SocketTest::onWritable); + + ss.close(); +} + + void SocketTest::testConnect() { ServerSocket serv; @@ -459,8 +521,26 @@ void SocketTest::testSelect3() } +void SocketTest::onReadable(bool& b) +{ + if (b) ++_notToReadable; + else ++_readableToNot; +}; + + +void SocketTest::onWritable(bool& b) +{ + if (b) ++_notToWritable; + else ++_writableToNot; +} + + void SocketTest::setUp() { + _readableToNot = 0; + _notToReadable = 0; + _writableToNot = 0; + _notToWritable = 0; } @@ -476,6 +556,7 @@ CppUnit::Test* SocketTest::suite() CppUnit_addTest(pSuite, SocketTest, testEcho); CppUnit_addTest(pSuite, SocketTest, testPoll); CppUnit_addTest(pSuite, SocketTest, testAvailable); + CppUnit_addTest(pSuite, SocketTest, testFIFOBuffer); CppUnit_addTest(pSuite, SocketTest, testConnect); CppUnit_addTest(pSuite, SocketTest, testConnectRefused); CppUnit_addTest(pSuite, SocketTest, testConnectRefusedNB); diff --git a/Net/testsuite/src/SocketTest.h b/Net/testsuite/src/SocketTest.h index c715bfdb5..3057dbf51 100644 --- a/Net/testsuite/src/SocketTest.h +++ b/Net/testsuite/src/SocketTest.h @@ -49,6 +49,7 @@ public: void testEcho(); void testPoll(); void testAvailable(); + void testFIFOBuffer(); void testConnect(); void testConnectRefused(); void testConnectRefusedNB(); @@ -68,6 +69,13 @@ public: static CppUnit::Test* suite(); private: + void onReadable(bool& b); + void onWritable(bool& b); + + int _readableToNot; + int _notToReadable; + int _writableToNot; + int _notToWritable; };