mirror of
https://github.com/pocoproject/poco.git
synced 2025-01-19 00:46:03 +01:00
FIFOBuffer EOF and error support
FIFOBuffer now supports EOF and error conditions. If EOF flag is set, buffer will not accept writing but will allow reading of the remainder of data. After it is emptied, buffer remains in EOF state until flag is cleared. Setting error flag, immediately empties the buffer and prevents any I/O operation until flag is cleared. Flag setting will trigger transiton notifications (if notifications are enabled). For details, see the code diffs.
This commit is contained in:
parent
7094df540f
commit
b153850188
@ -56,8 +56,11 @@ class BasicFIFOBuffer
|
||||
/// A simple buffer class with support for re-entrant,
|
||||
/// FIFO-style read/write operations, as well as (optional)
|
||||
/// empty/non-empty/full (i.e. writable/readable) transition
|
||||
/// notifications. Buffer size, as well as amount of unread data
|
||||
/// and available space introspections are supported as well.
|
||||
/// notifications. Buffer can be flagged with end-of-file and
|
||||
/// error flags, which renders it un-readable/writable.
|
||||
///
|
||||
/// Buffer size, as well as amount of unread data and
|
||||
/// available space introspections are supported as well.
|
||||
///
|
||||
/// This class is useful anywhere where a FIFO functionality
|
||||
/// is needed.
|
||||
@ -93,7 +96,9 @@ public:
|
||||
_buffer(size),
|
||||
_begin(0),
|
||||
_used(0),
|
||||
_notify(notify)
|
||||
_notify(notify),
|
||||
_eof(false),
|
||||
_error(false)
|
||||
/// Creates the FIFOBuffer.
|
||||
{
|
||||
}
|
||||
@ -102,7 +107,9 @@ public:
|
||||
_buffer(pBuffer, size),
|
||||
_begin(0),
|
||||
_used(0),
|
||||
_notify(notify)
|
||||
_notify(notify),
|
||||
_eof(false),
|
||||
_error(false)
|
||||
/// Creates the FIFOBuffer.
|
||||
{
|
||||
}
|
||||
@ -111,7 +118,9 @@ public:
|
||||
_buffer(pBuffer, size),
|
||||
_begin(0),
|
||||
_used(size),
|
||||
_notify(notify)
|
||||
_notify(notify),
|
||||
_eof(false),
|
||||
_error(false)
|
||||
/// Creates the FIFOBuffer.
|
||||
{
|
||||
}
|
||||
@ -154,6 +163,7 @@ public:
|
||||
{
|
||||
if (0 == length) return 0;
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
if (!isReadable()) return 0;
|
||||
if (length > _used) length = _used;
|
||||
std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T));
|
||||
return length;
|
||||
@ -172,6 +182,7 @@ public:
|
||||
/// supplied buffer.
|
||||
{
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
if (!isReadable()) return 0;
|
||||
if (0 == length || length > _used) length = _used;
|
||||
buffer.resize(length);
|
||||
return peek(buffer.begin(), length);
|
||||
@ -185,10 +196,9 @@ public:
|
||||
///
|
||||
/// Returns the reference to the buffer.
|
||||
{
|
||||
if (0 == length) return 0;
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
|
||||
if (0 == _used) return 0;
|
||||
|
||||
if (!isReadable()) return 0;
|
||||
std::size_t usedBefore = _used;
|
||||
std::size_t readLen = peek(pBuffer, length);
|
||||
poco_assert (_used >= readLen);
|
||||
@ -210,9 +220,7 @@ public:
|
||||
/// Returns the reference to the buffer.
|
||||
{
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
|
||||
if (0 == _used) return 0;
|
||||
|
||||
if (!isReadable()) return 0;
|
||||
std::size_t usedBefore = _used;
|
||||
std::size_t readLen = peek(buffer, length);
|
||||
poco_assert (_used >= readLen);
|
||||
@ -236,10 +244,12 @@ public:
|
||||
///
|
||||
/// Returns the length of data written.
|
||||
{
|
||||
if (0 == length || isFull()) return 0;
|
||||
if (0 == length) return 0;
|
||||
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
|
||||
|
||||
if (!isWritable()) return 0;
|
||||
|
||||
if (_buffer.size() - (_begin + _used) < length)
|
||||
{
|
||||
std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used);
|
||||
@ -268,11 +278,14 @@ public:
|
||||
///
|
||||
/// Returns the length of data written.
|
||||
{
|
||||
if (isFull()) return 0;
|
||||
std::size_t len = length;
|
||||
|
||||
if (0 == length || length > buffer.size()) length = buffer.size();
|
||||
if (len == 0)
|
||||
len = buffer.size();
|
||||
else if (len > buffer.size())
|
||||
len = buffer.size();
|
||||
|
||||
return write(buffer.begin(), length);
|
||||
return write(buffer.begin(), len);
|
||||
}
|
||||
|
||||
std::size_t size() const
|
||||
@ -298,6 +311,8 @@ public:
|
||||
/// If length is zero or greater than buffer current
|
||||
/// content length, buffer is emptied.
|
||||
{
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
|
||||
std::size_t usedBefore = _used;
|
||||
|
||||
if (0 == length || length >= _used)
|
||||
@ -317,11 +332,19 @@ public:
|
||||
{
|
||||
poco_check_ptr(ptr);
|
||||
if (0 == length) return;
|
||||
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
|
||||
if (length > available())
|
||||
throw Poco::InvalidAccessException("Cannot extend buffer.");
|
||||
|
||||
if (!isWritable())
|
||||
throw Poco::InvalidAccessException("Buffer not writable.");
|
||||
|
||||
std::memcpy(&_buffer[_used], ptr, length);
|
||||
advance(length);
|
||||
std::size_t usedBefore = _used;
|
||||
_used += length;
|
||||
if (_notify) notify(usedBefore);
|
||||
}
|
||||
|
||||
void advance(std::size_t length)
|
||||
@ -329,8 +352,13 @@ public:
|
||||
/// Should be called AFTER the data
|
||||
/// was copied into the buffer.
|
||||
{
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
|
||||
if (length > available())
|
||||
throw Poco::InvalidAccessException("Cannot extend buffer.");
|
||||
|
||||
if (!isWritable())
|
||||
throw Poco::InvalidAccessException("Buffer not writable.");
|
||||
|
||||
std::size_t usedBefore = _used;
|
||||
_used += length;
|
||||
@ -357,6 +385,7 @@ public:
|
||||
/// Throws InvalidAccessException if index is larger than
|
||||
/// the last valid (used) buffer position.
|
||||
{
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
if (index >= _used)
|
||||
throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
|
||||
|
||||
@ -368,6 +397,7 @@ public:
|
||||
/// Throws InvalidAccessException if index is larger than
|
||||
/// the last valid (used) buffer position.
|
||||
{
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
if (index >= _used)
|
||||
throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
|
||||
|
||||
@ -379,9 +409,75 @@ public:
|
||||
{
|
||||
return _buffer;
|
||||
}
|
||||
|
||||
void setError(bool error = true)
|
||||
/// Sets the error flag on the buffer and empties it.
|
||||
/// If notifications are enabled, they will be triggered
|
||||
/// if appropriate.
|
||||
///
|
||||
/// Setting error flag to true prevents reading and writing
|
||||
/// to the buffer; to re-enable FIFOBuffer for reading/writing,
|
||||
/// the error flag must be set to false.
|
||||
{
|
||||
if (error)
|
||||
{
|
||||
bool f = false;
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
if (error && isReadable() && _notify) readable.notify(this, f);
|
||||
if (error && isWritable() && _notify) writable.notify(this, f);
|
||||
_error = error;
|
||||
_used = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
bool t = true;
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
_error = false;
|
||||
if (_notify && !_eof) writable.notify(this, t);
|
||||
}
|
||||
}
|
||||
|
||||
bool isValid() const
|
||||
/// Returns true if error flag is not set on the buffer,
|
||||
/// otherwise returns false.
|
||||
{
|
||||
return !_error;
|
||||
}
|
||||
|
||||
void setEOF(bool eof = true)
|
||||
/// Sets end-of-file flag on the buffer.
|
||||
///
|
||||
/// Setting EOF flag to true prevents writing to the
|
||||
/// buffer; reading from the buffer will still be
|
||||
/// allowed until all data present in the buffer at the
|
||||
/// EOF set time is drained. After that, to re-enable
|
||||
/// FIFOBuffer for reading/writing, EOF must be
|
||||
/// set to false.
|
||||
///
|
||||
/// Setting EOF flag to false clears EOF state if it
|
||||
/// was previously set. If EOF was not set, it has no
|
||||
/// effect.
|
||||
{
|
||||
Mutex::ScopedLock lock(_mutex);
|
||||
bool flag = !eof;
|
||||
if (_notify) writable.notify(this, flag);
|
||||
_eof = eof;
|
||||
}
|
||||
|
||||
bool hasEOF() const
|
||||
/// Returns true if EOF flag has been set.
|
||||
{
|
||||
return _eof;
|
||||
}
|
||||
|
||||
bool isEOF() const
|
||||
/// Returns true if EOF flag has been set and buffer is empty.
|
||||
{
|
||||
return isEmpty() && _eof;
|
||||
}
|
||||
|
||||
bool isEmpty() const
|
||||
/// Returns true is buffer is empty, flase otherwise.
|
||||
/// Returns true is buffer is empty, false otherwise.
|
||||
{
|
||||
return 0 == _used;
|
||||
}
|
||||
@ -392,6 +488,20 @@ public:
|
||||
return size() == _used;
|
||||
}
|
||||
|
||||
bool isReadable() const
|
||||
/// Returns true if buffer contains data and is not
|
||||
/// in error state.
|
||||
{
|
||||
return !isEmpty() && isValid();
|
||||
}
|
||||
|
||||
bool isWritable() const
|
||||
/// Returns true if buffer is not full and is not
|
||||
/// in error state.
|
||||
{
|
||||
return !isFull() && isValid() && !_eof;
|
||||
}
|
||||
|
||||
void setNotify(bool notify = true)
|
||||
/// Enables/disables notifications.
|
||||
{
|
||||
@ -428,6 +538,8 @@ private:
|
||||
std::size_t _used;
|
||||
bool _notify;
|
||||
mutable Mutex _mutex;
|
||||
bool _eof;
|
||||
bool _error;
|
||||
};
|
||||
|
||||
|
||||
|
@ -284,6 +284,112 @@ void CoreTest::testBuffer()
|
||||
}
|
||||
|
||||
|
||||
void CoreTest::testFIFOBufferEOFAndError()
|
||||
{
|
||||
typedef FIFOBuffer::Type T;
|
||||
|
||||
FIFOBuffer f(20, true);
|
||||
|
||||
assert (f.isEmpty());
|
||||
assert (!f.isFull());
|
||||
|
||||
Buffer<T> b(10);
|
||||
std::vector<T> v;
|
||||
|
||||
f.readable += delegate(this, &CoreTest::onReadable);
|
||||
f.writable += delegate(this, &CoreTest::onWritable);
|
||||
|
||||
for (T c = '0'; c < '0' + 10; ++c)
|
||||
v.push_back(c);
|
||||
|
||||
std::memcpy(b.begin(), &v[0], sizeof(T) * v.size());
|
||||
assert(0 == _notToReadable);
|
||||
assert(0 == _readableToNot);
|
||||
assert (10 == f.write(b));
|
||||
assert(1 == _notToReadable);
|
||||
assert(0 == _readableToNot);
|
||||
assert (20 == f.size());
|
||||
assert (10 == f.used());
|
||||
assert (!f.isEmpty());
|
||||
f.setEOF();
|
||||
assert(0 == _notToWritable);
|
||||
assert(1 == _writableToNot);
|
||||
assert (f.hasEOF());
|
||||
assert (!f.isEOF());
|
||||
assert(1 == _notToReadable);
|
||||
assert(0 == _readableToNot);
|
||||
assert (20 == f.size());
|
||||
assert (10 == f.used());
|
||||
assert (0 == f.write(b));
|
||||
assert (!f.isEmpty());
|
||||
assert (5 == f.read(b, 5));
|
||||
assert(1 == _notToReadable);
|
||||
assert(0 == _readableToNot);
|
||||
assert (f.hasEOF());
|
||||
assert (!f.isEOF());
|
||||
assert (5 == f.read(b, 5));
|
||||
assert(1 == _notToReadable);
|
||||
assert(1 == _readableToNot);
|
||||
assert (f.hasEOF());
|
||||
assert (f.isEOF());
|
||||
assert(0 == _notToWritable);
|
||||
assert(1 == _writableToNot);
|
||||
|
||||
f.setEOF(false);
|
||||
assert (!f.hasEOF());
|
||||
assert (!f.isEOF());
|
||||
assert(1 == _notToWritable);
|
||||
assert(1 == _writableToNot);
|
||||
assert(1 == _notToReadable);
|
||||
assert(1 == _readableToNot);
|
||||
|
||||
assert (5 == f.write(b));
|
||||
assert(1 == _notToWritable);
|
||||
assert(1 == _writableToNot);
|
||||
assert(2 == _notToReadable);
|
||||
assert(1 == _readableToNot);
|
||||
assert (20 == f.size());
|
||||
assert (5 == f.used());
|
||||
f.setError();
|
||||
assert (0 == f.write(b));
|
||||
|
||||
try
|
||||
{
|
||||
f.copy(b.begin(), 5);
|
||||
fail ("must throw InvalidAccessException");
|
||||
}
|
||||
catch (InvalidAccessException&) { }
|
||||
|
||||
try
|
||||
{
|
||||
f.advance(5);
|
||||
fail ("must throw InvalidAccessException");
|
||||
}
|
||||
catch (InvalidAccessException&) { }
|
||||
|
||||
assert(1 == _notToWritable);
|
||||
assert(2 == _writableToNot);
|
||||
assert(2 == _notToReadable);
|
||||
assert(2 == _readableToNot);
|
||||
assert (20 == f.size());
|
||||
assert (0 == f.used());
|
||||
f.setError(false);
|
||||
assert(2 == _notToWritable);
|
||||
assert(2 == _writableToNot);
|
||||
assert(2 == _notToReadable);
|
||||
assert(2 == _readableToNot);
|
||||
assert (20 == f.size());
|
||||
assert (0 == f.used());
|
||||
assert (5 == f.write(b));
|
||||
assert(2 == _notToWritable);
|
||||
assert(2 == _writableToNot);
|
||||
assert(3 == _notToReadable);
|
||||
assert(2 == _readableToNot);
|
||||
assert (20 == f.size());
|
||||
assert (5 == f.used());
|
||||
}
|
||||
|
||||
|
||||
void CoreTest::testFIFOBufferChar()
|
||||
{
|
||||
typedef FIFOBuffer::Type T;
|
||||
@ -964,6 +1070,7 @@ CppUnit::Test* CoreTest::suite()
|
||||
CppUnit_addTest(pSuite, CoreTest, testBuffer);
|
||||
CppUnit_addTest(pSuite, CoreTest, testFIFOBufferChar);
|
||||
CppUnit_addTest(pSuite, CoreTest, testFIFOBufferInt);
|
||||
CppUnit_addTest(pSuite, CoreTest, testFIFOBufferEOFAndError);
|
||||
CppUnit_addTest(pSuite, CoreTest, testAtomicCounter);
|
||||
CppUnit_addTest(pSuite, CoreTest, testNullable);
|
||||
CppUnit_addTest(pSuite, CoreTest, testAscii);
|
||||
|
@ -54,6 +54,7 @@ public:
|
||||
void testBuffer();
|
||||
void testFIFOBufferChar();
|
||||
void testFIFOBufferInt();
|
||||
void testFIFOBufferEOFAndError();
|
||||
void testAtomicCounter();
|
||||
void testNullable();
|
||||
void testAscii();
|
||||
|
Loading…
x
Reference in New Issue
Block a user