Merge remote-tracking branch 'origin/FIFOBuffer-eof-error' into develop

This commit is contained in:
aleks-f
2013-01-05 14:03:42 -06:00
3 changed files with 238 additions and 18 deletions

View File

@@ -56,8 +56,11 @@ class BasicFIFOBuffer
/// A simple buffer class with support for re-entrant, /// A simple buffer class with support for re-entrant,
/// FIFO-style read/write operations, as well as (optional) /// FIFO-style read/write operations, as well as (optional)
/// empty/non-empty/full (i.e. writable/readable) transition /// empty/non-empty/full (i.e. writable/readable) transition
/// notifications. Buffer size, as well as amount of unread data /// notifications. Buffer can be flagged with end-of-file and
/// and available space introspections are supported as well. /// error flags, which renders it un-readable/writable.
///
/// Buffer size, as well as amount of unread data and
/// available space introspections are supported as well.
/// ///
/// This class is useful anywhere where a FIFO functionality /// This class is useful anywhere where a FIFO functionality
/// is needed. /// is needed.
@@ -93,7 +96,9 @@ public:
_buffer(size), _buffer(size),
_begin(0), _begin(0),
_used(0), _used(0),
_notify(notify) _notify(notify),
_eof(false),
_error(false)
/// Creates the FIFOBuffer. /// Creates the FIFOBuffer.
{ {
} }
@@ -102,7 +107,9 @@ public:
_buffer(pBuffer, size), _buffer(pBuffer, size),
_begin(0), _begin(0),
_used(0), _used(0),
_notify(notify) _notify(notify),
_eof(false),
_error(false)
/// Creates the FIFOBuffer. /// Creates the FIFOBuffer.
{ {
} }
@@ -111,7 +118,9 @@ public:
_buffer(pBuffer, size), _buffer(pBuffer, size),
_begin(0), _begin(0),
_used(size), _used(size),
_notify(notify) _notify(notify),
_eof(false),
_error(false)
/// Creates the FIFOBuffer. /// Creates the FIFOBuffer.
{ {
} }
@@ -154,6 +163,7 @@ public:
{ {
if (0 == length) return 0; if (0 == length) return 0;
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
if (!isReadable()) return 0;
if (length > _used) length = _used; if (length > _used) length = _used;
std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T)); std::memcpy(pBuffer, _buffer.begin() + _begin, length * sizeof(T));
return length; return length;
@@ -172,6 +182,7 @@ public:
/// supplied buffer. /// supplied buffer.
{ {
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
if (!isReadable()) return 0;
if (0 == length || length > _used) length = _used; if (0 == length || length > _used) length = _used;
buffer.resize(length); buffer.resize(length);
return peek(buffer.begin(), length); return peek(buffer.begin(), length);
@@ -185,10 +196,9 @@ public:
/// ///
/// Returns the reference to the buffer. /// Returns the reference to the buffer.
{ {
if (0 == length) return 0;
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
if (!isReadable()) return 0;
if (0 == _used) return 0;
std::size_t usedBefore = _used; std::size_t usedBefore = _used;
std::size_t readLen = peek(pBuffer, length); std::size_t readLen = peek(pBuffer, length);
poco_assert (_used >= readLen); poco_assert (_used >= readLen);
@@ -210,9 +220,7 @@ public:
/// Returns the reference to the buffer. /// Returns the reference to the buffer.
{ {
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
if (!isReadable()) return 0;
if (0 == _used) return 0;
std::size_t usedBefore = _used; std::size_t usedBefore = _used;
std::size_t readLen = peek(buffer, length); std::size_t readLen = peek(buffer, length);
poco_assert (_used >= readLen); poco_assert (_used >= readLen);
@@ -236,10 +244,12 @@ public:
/// ///
/// Returns the length of data written. /// Returns the length of data written.
{ {
if (0 == length || isFull()) return 0; if (0 == length) return 0;
Mutex::ScopedLock lock(_mutex); Mutex::ScopedLock lock(_mutex);
if (!isWritable()) return 0;
if (_buffer.size() - (_begin + _used) < length) if (_buffer.size() - (_begin + _used) < length)
{ {
std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used); std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used);
@@ -268,11 +278,14 @@ public:
/// ///
/// Returns the length of data written. /// Returns the length of data written.
{ {
if (isFull()) return 0; std::size_t len = length;
if (0 == length || length > buffer.size()) length = buffer.size(); if (len == 0)
len = buffer.size();
else if (len > buffer.size())
len = buffer.size();
return write(buffer.begin(), length); return write(buffer.begin(), len);
} }
std::size_t size() const std::size_t size() const
@@ -298,6 +311,8 @@ public:
/// If length is zero or greater than buffer current /// If length is zero or greater than buffer current
/// content length, buffer is emptied. /// content length, buffer is emptied.
{ {
Mutex::ScopedLock lock(_mutex);
std::size_t usedBefore = _used; std::size_t usedBefore = _used;
if (0 == length || length >= _used) if (0 == length || length >= _used)
@@ -317,11 +332,19 @@ public:
{ {
poco_check_ptr(ptr); poco_check_ptr(ptr);
if (0 == length) return; if (0 == length) return;
Mutex::ScopedLock lock(_mutex);
if (length > available()) if (length > available())
throw Poco::InvalidAccessException("Cannot extend buffer."); throw Poco::InvalidAccessException("Cannot extend buffer.");
if (!isWritable())
throw Poco::InvalidAccessException("Buffer not writable.");
std::memcpy(&_buffer[_used], ptr, length); std::memcpy(&_buffer[_used], ptr, length);
advance(length); std::size_t usedBefore = _used;
_used += length;
if (_notify) notify(usedBefore);
} }
void advance(std::size_t length) void advance(std::size_t length)
@@ -329,8 +352,13 @@ public:
/// Should be called AFTER the data /// Should be called AFTER the data
/// was copied into the buffer. /// was copied into the buffer.
{ {
Mutex::ScopedLock lock(_mutex);
if (length > available()) if (length > available())
throw Poco::InvalidAccessException("Cannot extend buffer."); throw Poco::InvalidAccessException("Cannot extend buffer.");
if (!isWritable())
throw Poco::InvalidAccessException("Buffer not writable.");
std::size_t usedBefore = _used; std::size_t usedBefore = _used;
_used += length; _used += length;
@@ -357,6 +385,7 @@ public:
/// Throws InvalidAccessException if index is larger than /// Throws InvalidAccessException if index is larger than
/// the last valid (used) buffer position. /// the last valid (used) buffer position.
{ {
Mutex::ScopedLock lock(_mutex);
if (index >= _used) if (index >= _used)
throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1)); throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
@@ -368,6 +397,7 @@ public:
/// Throws InvalidAccessException if index is larger than /// Throws InvalidAccessException if index is larger than
/// the last valid (used) buffer position. /// the last valid (used) buffer position.
{ {
Mutex::ScopedLock lock(_mutex);
if (index >= _used) if (index >= _used)
throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1)); throw InvalidAccessException(format("Index out of bounds: %z (max index allowed: %z)", index, _used - 1));
@@ -379,9 +409,75 @@ public:
{ {
return _buffer; return _buffer;
} }
void setError(bool error = true)
/// Sets the error flag on the buffer and empties it.
/// If notifications are enabled, they will be triggered
/// if appropriate.
///
/// Setting error flag to true prevents reading and writing
/// to the buffer; to re-enable FIFOBuffer for reading/writing,
/// the error flag must be set to false.
{
if (error)
{
bool f = false;
Mutex::ScopedLock lock(_mutex);
if (error && isReadable() && _notify) readable.notify(this, f);
if (error && isWritable() && _notify) writable.notify(this, f);
_error = error;
_used = 0;
}
else
{
bool t = true;
Mutex::ScopedLock lock(_mutex);
_error = false;
if (_notify && !_eof) writable.notify(this, t);
}
}
bool isValid() const
/// Returns true if error flag is not set on the buffer,
/// otherwise returns false.
{
return !_error;
}
void setEOF(bool eof = true)
/// Sets end-of-file flag on the buffer.
///
/// Setting EOF flag to true prevents writing to the
/// buffer; reading from the buffer will still be
/// allowed until all data present in the buffer at the
/// EOF set time is drained. After that, to re-enable
/// FIFOBuffer for reading/writing, EOF must be
/// set to false.
///
/// Setting EOF flag to false clears EOF state if it
/// was previously set. If EOF was not set, it has no
/// effect.
{
Mutex::ScopedLock lock(_mutex);
bool flag = !eof;
if (_notify) writable.notify(this, flag);
_eof = eof;
}
bool hasEOF() const
/// Returns true if EOF flag has been set.
{
return _eof;
}
bool isEOF() const
/// Returns true if EOF flag has been set and buffer is empty.
{
return isEmpty() && _eof;
}
bool isEmpty() const bool isEmpty() const
/// Returns true is buffer is empty, flase otherwise. /// Returns true is buffer is empty, false otherwise.
{ {
return 0 == _used; return 0 == _used;
} }
@@ -392,6 +488,20 @@ public:
return size() == _used; return size() == _used;
} }
bool isReadable() const
/// Returns true if buffer contains data and is not
/// in error state.
{
return !isEmpty() && isValid();
}
bool isWritable() const
/// Returns true if buffer is not full and is not
/// in error state.
{
return !isFull() && isValid() && !_eof;
}
void setNotify(bool notify = true) void setNotify(bool notify = true)
/// Enables/disables notifications. /// Enables/disables notifications.
{ {
@@ -428,6 +538,8 @@ private:
std::size_t _used; std::size_t _used;
bool _notify; bool _notify;
mutable Mutex _mutex; mutable Mutex _mutex;
bool _eof;
bool _error;
}; };

View File

@@ -284,6 +284,112 @@ void CoreTest::testBuffer()
} }
void CoreTest::testFIFOBufferEOFAndError()
{
typedef FIFOBuffer::Type T;
FIFOBuffer f(20, true);
assert (f.isEmpty());
assert (!f.isFull());
Buffer<T> b(10);
std::vector<T> v;
f.readable += delegate(this, &CoreTest::onReadable);
f.writable += delegate(this, &CoreTest::onWritable);
for (T c = '0'; c < '0' + 10; ++c)
v.push_back(c);
std::memcpy(b.begin(), &v[0], sizeof(T) * v.size());
assert(0 == _notToReadable);
assert(0 == _readableToNot);
assert (10 == f.write(b));
assert(1 == _notToReadable);
assert(0 == _readableToNot);
assert (20 == f.size());
assert (10 == f.used());
assert (!f.isEmpty());
f.setEOF();
assert(0 == _notToWritable);
assert(1 == _writableToNot);
assert (f.hasEOF());
assert (!f.isEOF());
assert(1 == _notToReadable);
assert(0 == _readableToNot);
assert (20 == f.size());
assert (10 == f.used());
assert (0 == f.write(b));
assert (!f.isEmpty());
assert (5 == f.read(b, 5));
assert(1 == _notToReadable);
assert(0 == _readableToNot);
assert (f.hasEOF());
assert (!f.isEOF());
assert (5 == f.read(b, 5));
assert(1 == _notToReadable);
assert(1 == _readableToNot);
assert (f.hasEOF());
assert (f.isEOF());
assert(0 == _notToWritable);
assert(1 == _writableToNot);
f.setEOF(false);
assert (!f.hasEOF());
assert (!f.isEOF());
assert(1 == _notToWritable);
assert(1 == _writableToNot);
assert(1 == _notToReadable);
assert(1 == _readableToNot);
assert (5 == f.write(b));
assert(1 == _notToWritable);
assert(1 == _writableToNot);
assert(2 == _notToReadable);
assert(1 == _readableToNot);
assert (20 == f.size());
assert (5 == f.used());
f.setError();
assert (0 == f.write(b));
try
{
f.copy(b.begin(), 5);
fail ("must throw InvalidAccessException");
}
catch (InvalidAccessException&) { }
try
{
f.advance(5);
fail ("must throw InvalidAccessException");
}
catch (InvalidAccessException&) { }
assert(1 == _notToWritable);
assert(2 == _writableToNot);
assert(2 == _notToReadable);
assert(2 == _readableToNot);
assert (20 == f.size());
assert (0 == f.used());
f.setError(false);
assert(2 == _notToWritable);
assert(2 == _writableToNot);
assert(2 == _notToReadable);
assert(2 == _readableToNot);
assert (20 == f.size());
assert (0 == f.used());
assert (5 == f.write(b));
assert(2 == _notToWritable);
assert(2 == _writableToNot);
assert(3 == _notToReadable);
assert(2 == _readableToNot);
assert (20 == f.size());
assert (5 == f.used());
}
void CoreTest::testFIFOBufferChar() void CoreTest::testFIFOBufferChar()
{ {
typedef FIFOBuffer::Type T; typedef FIFOBuffer::Type T;
@@ -964,6 +1070,7 @@ CppUnit::Test* CoreTest::suite()
CppUnit_addTest(pSuite, CoreTest, testBuffer); CppUnit_addTest(pSuite, CoreTest, testBuffer);
CppUnit_addTest(pSuite, CoreTest, testFIFOBufferChar); CppUnit_addTest(pSuite, CoreTest, testFIFOBufferChar);
CppUnit_addTest(pSuite, CoreTest, testFIFOBufferInt); CppUnit_addTest(pSuite, CoreTest, testFIFOBufferInt);
CppUnit_addTest(pSuite, CoreTest, testFIFOBufferEOFAndError);
CppUnit_addTest(pSuite, CoreTest, testAtomicCounter); CppUnit_addTest(pSuite, CoreTest, testAtomicCounter);
CppUnit_addTest(pSuite, CoreTest, testNullable); CppUnit_addTest(pSuite, CoreTest, testNullable);
CppUnit_addTest(pSuite, CoreTest, testAscii); CppUnit_addTest(pSuite, CoreTest, testAscii);

View File

@@ -54,6 +54,7 @@ public:
void testBuffer(); void testBuffer();
void testFIFOBufferChar(); void testFIFOBufferChar();
void testFIFOBufferInt(); void testFIFOBufferInt();
void testFIFOBufferEOFAndError();
void testAtomicCounter(); void testAtomicCounter();
void testNullable(); void testNullable();
void testAscii(); void testAscii();