- FIFOBuffer drain() problem #552

- StreamSocket::receiveBytes(FIFOBuffer&) and sendBytes(FIFOBuffer&) are
  not thread safe #402
This commit is contained in:
Alex Fabijanic 2014-10-03 16:12:42 -05:00
parent 24ba3c0df0
commit a25877bfc2
3 changed files with 83 additions and 7 deletions

View File

@ -38,6 +38,11 @@ class BasicFIFOBuffer
/// empty/non-empty/full (i.e. writable/readable) transition
/// notifications. Buffer can be flagged with end-of-file and
/// error flags, which renders it un-readable/writable.
///
/// Critical portions of code are protected by a recursive mutex.
/// However, to achieve thread-safety in cases where multiple
/// member function calls are involved and have to be atomic,
/// the mutex must be locked externally.
///
/// Buffer size, as well as amount of unread data and
/// available space introspections are supported as well.
@ -232,7 +237,7 @@ public:
if (_buffer.size() - (_begin + _used) < length)
{
std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used);
std::memmove(_buffer.begin(), _buffer.begin() + _begin, _used * sizeof(T));
_begin = 0;
}
@ -301,7 +306,10 @@ public:
_used = 0;
}
else
{
_begin += length;
_used -= length;
}
if (_notify) notify(usedBefore);
}
@ -321,7 +329,7 @@ public:
if (!isWritable())
throw Poco::InvalidAccessException("Buffer not writable.");
std::memcpy(&_buffer[_used], ptr, length);
std::memcpy(&_buffer[_used], ptr, length * sizeof(T));
std::size_t usedBefore = _used;
_used += length;
if (_notify) notify(usedBefore);
@ -494,6 +502,12 @@ public:
return _notify;
}
Mutex& mutex()
/// Returns reference to mutex.
{
return _mutex;
}
private:
void notify(std::size_t usedBefore)
{

View File

@ -695,6 +695,7 @@ void CoreTest::testFIFOBufferChar()
try
{
f.copy(&arr[0], 8);
fail("must fail");
} catch (InvalidAccessException&) { }
f.copy(&arr[0], 3);
@ -707,7 +708,6 @@ void CoreTest::testFIFOBufferChar()
assert (6 == f.used());
assert (4 == f.available());
const char d[4] = {'7', '8', '9', '0' };
f.copy(&arr[0], 4);
assert(7 == _notToReadable);
assert(6 == _readableToNot);
@ -722,6 +722,7 @@ void CoreTest::testFIFOBufferChar()
try
{
f.copy(&arr[0], 1);
fail("must fail");
} catch (InvalidAccessException&) { }
f.drain(1);
@ -730,6 +731,33 @@ void CoreTest::testFIFOBufferChar()
assert(2 == _notToWritable);
assert(2 == _writableToNot);
f.drain(9);
assert (10 == f.size());
assert (0 == f.used());
assert (10 == f.available());
const char e[10] = { '1', '2', '3', '4', '5', '6', '7', '8', '9', '0' };
f.copy(&e[0], 10);
assert (10 == f.size());
assert (10 == f.used());
assert (0 == f.available());
f.drain(1);
f.write(e, 1);
assert (10 == f.size());
assert (10 == f.used());
assert (0 == f.available());
assert(f[0] == '2');
assert(f[1] == '3');
assert(f[2] == '4');
assert(f[3] == '5');
assert(f[4] == '6');
assert(f[5] == '7');
assert(f[6] == '8');
assert(f[7] == '9');
assert(f[8] == '0');
assert(f[9] == '1');
f.readable -= delegate(this, &CoreTest::onReadable);
f.writable -= delegate(this, &CoreTest::onReadable);
}
@ -737,7 +765,7 @@ void CoreTest::testFIFOBufferChar()
void CoreTest::testFIFOBufferInt()
{
typedef char T;
typedef int T;
BasicFIFOBuffer<T> f(20);
Buffer<T> b(10);
@ -842,7 +870,34 @@ void CoreTest::testFIFOBufferInt()
assert (16 == f[1]);
assert (17 == f[2]);
assert (18 == f[3]);
assert (19 == f[4]);
assert(19 == f[4]);
f.drain(9);
assert(10 == f.size());
assert(0 == f.used());
assert(10 == f.available());
const int e[10] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
f.copy(&e[0], 10);
assert(10 == f.size());
assert(10 == f.used());
assert(0 == f.available());
f.drain(1);
f.write(e, 1);
assert(10 == f.size());
assert(10 == f.used());
assert(0 == f.available());
assert(f[0] == 2);
assert(f[1] == 3);
assert(f[2] == 4);
assert(f[3] == 5);
assert(f[4] == 6);
assert(f[5] == 7);
assert(f[6] == 8);
assert(f[7] == 9);
assert(f[8] == 0);
assert(f[9] == 1);
f.resize(3, false);
assert (3 == f.size());

View File

@ -17,10 +17,13 @@
#include "Poco/Net/StreamSocket.h"
#include "Poco/Net/StreamSocketImpl.h"
#include "Poco/FIFOBuffer.h"
#include "Poco/Mutex.h"
#include "Poco/Exception.h"
using Poco::InvalidArgumentException;
using Poco::Mutex;
using Poco::ScopedLock;
namespace Poco {
@ -116,7 +119,9 @@ int StreamSocket::sendBytes(const void* buffer, int length, int flags)
int StreamSocket::sendBytes(FIFOBuffer& fifoBuf)
{
int ret = impl()->sendBytes(&fifoBuf.buffer()[0], (int) fifoBuf.used());
ScopedLock<Mutex> l(fifoBuf.mutex());
int ret = impl()->sendBytes(fifoBuf.begin(), (int) fifoBuf.used());
if (ret > 0) fifoBuf.drain(ret);
return ret;
}
@ -130,7 +135,9 @@ int StreamSocket::receiveBytes(void* buffer, int length, int flags)
int StreamSocket::receiveBytes(FIFOBuffer& fifoBuf)
{
int ret = impl()->receiveBytes(fifoBuf.next(), (int) fifoBuf.available());
ScopedLock<Mutex> l(fifoBuf.mutex());
int ret = impl()->receiveBytes(fifoBuf.next(), (int)fifoBuf.available());
if (ret > 0) fifoBuf.advance(ret);
return ret;
}