enh(FileStream): Add FileStreamBuf::resizeBuffer to set larger internal buffers. (#4621)

Larger buffers improve performance significantly when streaming large quantity of data on very fast devices.
This commit is contained in:
Matej Kenda 2024-09-09 17:33:50 +02:00 committed by GitHub
parent 710c2a41f3
commit 91c256095f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 224 additions and 53 deletions

View File

@ -43,15 +43,15 @@ class BasicBufferedBidirectionalStreamBuf: public std::basic_streambuf<ch, tr>
/// for implementing an iostream. /// for implementing an iostream.
{ {
protected: protected:
typedef std::basic_streambuf<ch, tr> Base; using Base = std::basic_streambuf<ch, tr>;
typedef std::basic_ios<ch, tr> IOS; using IOS = std::basic_ios<ch, tr>;
typedef ch char_type; using char_type = ch;
typedef tr char_traits; using char_traits = tr;
typedef ba Allocator; using Allocator = ba;
typedef typename Base::int_type int_type; using int_type = typename Base::int_type;
typedef typename Base::pos_type pos_type; using pos_type = typename Base::pos_type;
typedef typename Base::off_type off_type; using off_type = typename Base::off_type;
typedef typename IOS::openmode openmode; using openmode = typename IOS::openmode;
public: public:
BasicBufferedBidirectionalStreamBuf(std::streamsize bufferSize, openmode mode): BasicBufferedBidirectionalStreamBuf(std::streamsize bufferSize, openmode mode):
@ -69,6 +69,9 @@ public:
Allocator::deallocate(_pWriteBuffer, _bufsize); Allocator::deallocate(_pWriteBuffer, _bufsize);
} }
BasicBufferedBidirectionalStreamBuf(const BasicBufferedBidirectionalStreamBuf&) = delete;
BasicBufferedBidirectionalStreamBuf& operator = (const BasicBufferedBidirectionalStreamBuf&) = delete;
virtual int_type overflow(int_type c) virtual int_type overflow(int_type c)
{ {
if (!(_mode & IOS::out)) return char_traits::eof(); if (!(_mode & IOS::out)) return char_traits::eof();
@ -130,6 +133,22 @@ protected:
this->setp(_pWriteBuffer, _pWriteBuffer + _bufsize); this->setp(_pWriteBuffer, _pWriteBuffer + _bufsize);
} }
virtual bool resizeBuffer(std::streamsize bufferSize)
{
if (_bufsize != bufferSize)
{
Allocator::deallocate(_pReadBuffer, _bufsize);
Allocator::deallocate(_pWriteBuffer, _bufsize);
_bufsize = bufferSize;
_pReadBuffer = Allocator::allocate(_bufsize);
_pWriteBuffer = Allocator::allocate(_bufsize);
}
resetBuffers();
return true;
}
private: private:
virtual int readFromDevice(char_type* /*buffer*/, std::streamsize /*length*/) virtual int readFromDevice(char_type* /*buffer*/, std::streamsize /*length*/)
{ {
@ -152,13 +171,10 @@ private:
return -1; return -1;
} }
std::streamsize _bufsize; std::streamsize _bufsize {0};
char_type* _pReadBuffer; char_type* _pReadBuffer {nullptr};
char_type* _pWriteBuffer; char_type* _pWriteBuffer {nullptr};
openmode _mode; openmode _mode {0};
BasicBufferedBidirectionalStreamBuf(const BasicBufferedBidirectionalStreamBuf&);
BasicBufferedBidirectionalStreamBuf& operator = (const BasicBufferedBidirectionalStreamBuf&);
}; };
@ -172,8 +188,8 @@ private:
#if defined(_MSC_VER) && defined(POCO_DLL) && !defined(Foundation_EXPORTS) #if defined(_MSC_VER) && defined(POCO_DLL) && !defined(Foundation_EXPORTS)
template class Foundation_API BasicBufferedBidirectionalStreamBuf<char, std::char_traits<char>>; template class Foundation_API BasicBufferedBidirectionalStreamBuf<char, std::char_traits<char>>;
#endif #endif
typedef BasicBufferedBidirectionalStreamBuf<char, std::char_traits<char>> BufferedBidirectionalStreamBuf; using BufferedBidirectionalStreamBuf
= BasicBufferedBidirectionalStreamBuf<char, std::char_traits<char>>;
} // namespace Poco } // namespace Poco

View File

@ -53,7 +53,7 @@ public:
FileIOS(); FileIOS();
/// Creates the basic stream. /// Creates the basic stream.
~FileIOS(); ~FileIOS() override;
/// Destroys the stream. /// Destroys the stream.
virtual void open(const std::string& path, std::ios::openmode mode); virtual void open(const std::string& path, std::ios::openmode mode);
@ -115,7 +115,7 @@ public:
/// Throws a FileNotFoundException (or a similar exception) if the file /// Throws a FileNotFoundException (or a similar exception) if the file
/// does not exist or is not accessible for other reasons. /// does not exist or is not accessible for other reasons.
~FileInputStream(); ~FileInputStream() override;
/// Destroys the stream. /// Destroys the stream.
void open(const std::string& path, std::ios::openmode mode = std::ios::in) override; void open(const std::string& path, std::ios::openmode mode = std::ios::in) override;
@ -158,7 +158,7 @@ public:
/// for std::ofstream, which is std::ios::out only. This is for backwards compatibility /// for std::ofstream, which is std::ios::out only. This is for backwards compatibility
/// with earlier POCO versions. /// with earlier POCO versions.
~FileOutputStream(); ~FileOutputStream() override;
/// Destroys the FileOutputStream. /// Destroys the FileOutputStream.
void open(const std::string& path, std::ios::openmode mode = std::ios::out | std::ios::trunc) override; void open(const std::string& path, std::ios::openmode mode = std::ios::out | std::ios::trunc) override;
@ -203,7 +203,7 @@ public:
/// for std::fstream, which is std::ios::out only. This is for backwards compatibility /// for std::fstream, which is std::ios::out only. This is for backwards compatibility
/// with earlier POCO versions. /// with earlier POCO versions.
~FileStream(); ~FileStream() override;
/// Destroys the FileOutputStream. /// Destroys the FileOutputStream.
void open(const std::string& path, std::ios::openmode mode = std::ios::out | std::ios::in) override; void open(const std::string& path, std::ios::openmode mode = std::ios::out | std::ios::in) override;

View File

@ -36,7 +36,7 @@ public:
FileStreamBuf(); FileStreamBuf();
/// Creates a FileStreamBuf. /// Creates a FileStreamBuf.
~FileStreamBuf(); ~FileStreamBuf() override;
/// Destroys the FileStream. /// Destroys the FileStream.
void open(const std::string& path, std::ios::openmode mode); void open(const std::string& path, std::ios::openmode mode);
@ -49,10 +49,16 @@ public:
/// Closes the File stream buffer. Returns true if successful, /// Closes the File stream buffer. Returns true if successful,
/// false otherwise. /// false otherwise.
std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out); bool resizeBuffer(std::streamsize bufferSize) override;
/// Resizes internal buffer. Minimum size is BUFFER_SIZE.
/// Minimum is used when requested size is smaller.
/// Buffer can be resized only when the file is not open.
/// Returns true if resize succeeded.
std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// Change position by offset, according to way and mode. /// Change position by offset, according to way and mode.
std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out); std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// Change to specified position, according to mode. /// Change to specified position, according to mode.
void flushToDisk(); void flushToDisk();
@ -70,13 +76,13 @@ protected:
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
}; };
int readFromDevice(char* buffer, std::streamsize length); int readFromDevice(char* buffer, std::streamsize length) override;
int writeToDevice(const char* buffer, std::streamsize length); int writeToDevice(const char* buffer, std::streamsize length) override;
private: private:
std::string _path; std::string _path;
NativeHandle _fd; NativeHandle _fd;
std::streamoff _pos; std::streamoff _pos {0};
}; };

View File

@ -25,7 +25,6 @@
namespace Poco { namespace Poco {
class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf class Foundation_API FileStreamBuf: public BufferedBidirectionalStreamBuf
/// This stream buffer handles Fileio /// This stream buffer handles Fileio
{ {
@ -48,10 +47,16 @@ public:
/// Closes the File stream buffer. Returns true if successful, /// Closes the File stream buffer. Returns true if successful,
/// false otherwise. /// false otherwise.
std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out); bool resizeBuffer(std::streamsize bufferSize) override;
/// Resizes internal buffer. Minimum size is BUFFER_SIZE.
/// Minimum is used when requested size is smaller.
/// Buffer can be resized only when the file is not open.
/// Returns true if resize succeeded.
std::streampos seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// change position by offset, according to way and mode /// change position by offset, according to way and mode
std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out); std::streampos seekpos(std::streampos pos, std::ios::openmode mode = std::ios::in | std::ios::out) override;
/// change to specified position, according to mode /// change to specified position, according to mode
void flushToDisk(); void flushToDisk();
@ -69,13 +74,13 @@ protected:
BUFFER_SIZE = 4096 BUFFER_SIZE = 4096
}; };
int readFromDevice(char* buffer, std::streamsize length); int readFromDevice(char* buffer, std::streamsize length) override;
int writeToDevice(const char* buffer, std::streamsize length); int writeToDevice(const char* buffer, std::streamsize length) override;
private: private:
std::string _path; std::string _path;
NativeHandle _handle; NativeHandle _handle;
UInt64 _pos; UInt64 _pos {0};
}; };

View File

@ -95,7 +95,7 @@ int FileStreamBuf::readFromDevice(char* buffer, std::streamsize length)
if (getMode() & std::ios::out) if (getMode() & std::ios::out)
sync(); sync();
int n = read(_fd, buffer, length); int n = ::read(_fd, buffer, length);
if (n == -1) if (n == -1)
File::handleLastError(_path); File::handleLastError(_path);
_pos += n; _pos += n;
@ -108,9 +108,9 @@ int FileStreamBuf::writeToDevice(const char* buffer, std::streamsize length)
if (_fd == -1) return -1; if (_fd == -1) return -1;
#if defined(POCO_VXWORKS) #if defined(POCO_VXWORKS)
int n = write(_fd, const_cast<char*>(buffer), length); int n = ::write(_fd, const_cast<char*>(buffer), length);
#else #else
int n = write(_fd, buffer, length); int n = ::write(_fd, buffer, length);
#endif #endif
if (n == -1) if (n == -1)
File::handleLastError(_path); File::handleLastError(_path);
@ -139,6 +139,18 @@ bool FileStreamBuf::close()
} }
bool FileStreamBuf::resizeBuffer(std::streamsize bufferSize)
{
if (_fd != -1)
return false;
if (bufferSize < BUFFER_SIZE)
bufferSize = BUFFER_SIZE;
return BufferedBidirectionalStreamBuf::resizeBuffer(bufferSize);
}
std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode) std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode)
{ {
if (_fd == -1 || !(getMode() & mode)) if (_fd == -1 || !(getMode() & mode))
@ -165,7 +177,7 @@ std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir,
{ {
whence = SEEK_END; whence = SEEK_END;
} }
_pos = lseek(_fd, off, whence); _pos = ::lseek(_fd, off, whence);
return _pos; return _pos;
} }
@ -180,7 +192,7 @@ std::streampos FileStreamBuf::seekpos(std::streampos pos, std::ios::openmode mod
resetBuffers(); resetBuffers();
_pos = lseek(_fd, pos, SEEK_SET); _pos = ::lseek(_fd, pos, SEEK_SET);
return _pos; return _pos;
} }
@ -190,7 +202,7 @@ void FileStreamBuf::flushToDisk()
if (getMode() & std::ios::out) if (getMode() & std::ios::out)
{ {
sync(); sync();
if (fsync(_fd) != 0) if (::fsync(_fd) != 0)
File::handleLastError(_path); File::handleLastError(_path);
} }
} }
@ -204,7 +216,7 @@ FileStreamBuf::NativeHandle FileStreamBuf::nativeHandle() const
Poco::UInt64 FileStreamBuf::size() const Poco::UInt64 FileStreamBuf::size() const
{ {
struct stat stat_buf; struct stat stat_buf;
int rc = fstat(_fd, &stat_buf); int rc = ::fstat(_fd, &stat_buf);
if (rc < 0) if (rc < 0)
{ {
Poco::SystemException(strerror(errno), errno); Poco::SystemException(strerror(errno), errno);

View File

@ -63,7 +63,7 @@ void FileStreamBuf::open(const std::string& path, std::ios::openmode mode)
std::wstring utf16Path; std::wstring utf16Path;
FileImpl::convertPath(path, utf16Path); FileImpl::convertPath(path, utf16Path);
_handle = CreateFileW(utf16Path.c_str(), access, shareMode, NULL, creationDisp, flags, NULL); _handle = ::CreateFileW(utf16Path.c_str(), access, shareMode, NULL, creationDisp, flags, NULL);
if (_handle == INVALID_HANDLE_VALUE) if (_handle == INVALID_HANDLE_VALUE)
File::handleLastError(_path); File::handleLastError(_path);
@ -98,10 +98,10 @@ int FileStreamBuf::readFromDevice(char* buffer, std::streamsize length)
sync(); sync();
DWORD bytesRead(0); DWORD bytesRead(0);
BOOL rc = ReadFile(_handle, buffer, static_cast<DWORD>(length), &bytesRead, NULL); BOOL rc = ::ReadFile(_handle, buffer, static_cast<DWORD>(length), &bytesRead, NULL);
if (rc == 0) if (rc == 0)
{ {
if (GetLastError() == ERROR_BROKEN_PIPE) if (::GetLastError() == ERROR_BROKEN_PIPE)
{ {
// Read from closed pipe -> treat as EOF // Read from closed pipe -> treat as EOF
return 0; return 0;
@ -124,14 +124,14 @@ int FileStreamBuf::writeToDevice(const char* buffer, std::streamsize length)
{ {
LARGE_INTEGER li; LARGE_INTEGER li;
li.QuadPart = 0; li.QuadPart = 0;
li.LowPart = SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_END); li.LowPart = ::SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_END);
if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) if (li.LowPart == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR)
File::handleLastError(_path); File::handleLastError(_path);
_pos = li.QuadPart; _pos = li.QuadPart;
} }
DWORD bytesWritten(0); DWORD bytesWritten(0);
BOOL rc = WriteFile(_handle, buffer, static_cast<DWORD>(length), &bytesWritten, NULL); BOOL rc = ::WriteFile(_handle, buffer, static_cast<DWORD>(length), &bytesWritten, NULL);
if (rc == 0) if (rc == 0)
File::handleLastError(_path); File::handleLastError(_path);
@ -155,13 +155,25 @@ bool FileStreamBuf::close()
{ {
success = false; success = false;
} }
CloseHandle(_handle); ::CloseHandle(_handle);
_handle = INVALID_HANDLE_VALUE; _handle = INVALID_HANDLE_VALUE;
} }
return success; return success;
} }
bool FileStreamBuf::resizeBuffer(std::streamsize bufferSize)
{
if (_handle != INVALID_HANDLE_VALUE)
return false;
if (bufferSize < BUFFER_SIZE)
bufferSize = BUFFER_SIZE;
return BufferedBidirectionalStreamBuf::resizeBuffer(bufferSize);
}
std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode) std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir, std::ios::openmode mode)
{ {
if (INVALID_HANDLE_VALUE == _handle || !(getMode() & mode)) if (INVALID_HANDLE_VALUE == _handle || !(getMode() & mode))
@ -191,9 +203,9 @@ std::streampos FileStreamBuf::seekoff(std::streamoff off, std::ios::seekdir dir,
LARGE_INTEGER li; LARGE_INTEGER li;
li.QuadPart = off; li.QuadPart = off;
li.LowPart = SetFilePointer(_handle, li.LowPart, &li.HighPart, offset); li.LowPart = ::SetFilePointer(_handle, li.LowPart, &li.HighPart, offset);
if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) if (li.LowPart == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR)
File::handleLastError(_path); File::handleLastError(_path);
_pos = li.QuadPart; _pos = li.QuadPart;
return std::streampos(static_cast<std::streamoff>(_pos)); return std::streampos(static_cast<std::streamoff>(_pos));
@ -212,9 +224,9 @@ std::streampos FileStreamBuf::seekpos(std::streampos pos, std::ios::openmode mod
LARGE_INTEGER li; LARGE_INTEGER li;
li.QuadPart = pos; li.QuadPart = pos;
li.LowPart = SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_BEGIN); li.LowPart = ::SetFilePointer(_handle, li.LowPart, &li.HighPart, FILE_BEGIN);
if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) if (li.LowPart == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR)
File::handleLastError(_path); File::handleLastError(_path);
_pos = li.QuadPart; _pos = li.QuadPart;
return std::streampos(static_cast<std::streamoff>(_pos)); return std::streampos(static_cast<std::streamoff>(_pos));
@ -226,7 +238,7 @@ void FileStreamBuf::flushToDisk()
if (getMode() & std::ios::out) if (getMode() & std::ios::out)
{ {
sync(); sync();
if (FlushFileBuffers(_handle) == 0) if (::FlushFileBuffers(_handle) == 0)
File::handleLastError(_path); File::handleLastError(_path);
} }
} }

View File

@ -15,6 +15,8 @@
#include "Poco/File.h" #include "Poco/File.h"
#include "Poco/TemporaryFile.h" #include "Poco/TemporaryFile.h"
#include "Poco/Exception.h" #include "Poco/Exception.h"
#include "Poco/Stopwatch.h"
#include <iostream>
FileStreamTest::FileStreamTest(const std::string& name): CppUnit::TestCase(name) FileStreamTest::FileStreamTest(const std::string& name): CppUnit::TestCase(name)
@ -336,6 +338,120 @@ void FileStreamTest::testMultiOpen()
str.close(); str.close();
} }
void FileStreamTest::testBufferSize()
{
// Tests correctness of stream operation when buffer is resized.
Poco::FileStream str("test.txt", std::ios::out | std::ios::trunc);
str << "0123456789\n";
str << "abcdefghij\n";
str << "klmnopqrst\n";
// Can't resize buffer when file is open.
assertFalse(str.rdbuf()->resizeBuffer(8000));
str.close();
std::string s;
assertTrue(str.rdbuf()->resizeBuffer(8000));
str.open("test.txt", std::ios::in);
std::getline(str, s);
assertTrue (s == "0123456789");
std::getline(str, s);
assertTrue (s == "abcdefghij");
str.close();
assertTrue(str.rdbuf()->resizeBuffer(8000));
str.open("test.txt", std::ios::out | std::ios::trunc);
str << "0123456789\n";
str << "abcdefghij\n";
str << "klmnopqrst\n";
str.close();
assertTrue(str.rdbuf()->resizeBuffer(4000));
str.open("test.txt", std::ios::in);
std::getline(str, s);
assertTrue (s == "0123456789");
std::getline(str, s);
assertTrue (s == "abcdefghij");
str.close();
}
void FileStreamTest::testBufferSizePerformance()
{
Poco::Stopwatch watch;
// Write with default buffer size
Poco::FileStream str("test.txt", std::ios::out | std::ios::trunc);
const std::string outStrBuf(150LL*1024LL, '#');
watch.start();
for (int r{0}; r < 10; r++)
{
for (int i{0}; i < 4000; i++)
{
str.write(outStrBuf.data(), outStrBuf.size());
}
str.seekp(0);
}
const auto writeDefUs { watch.elapsed() };
str.close();
// Read with default buffer size
std::string strBuf(150LL*1024LL, '0');
str.open("test.txt", std::ios::in);
watch.restart();
for (int r{0}; r < 10; r++)
{
for (int i{0}; i < 4000; i++)
{
str.read(strBuf.data(), strBuf.size());
}
str.seekg(0);
}
const auto readDefUs { watch.elapsed() };
str.close();
// Write with increased buffer
str.rdbuf()->resizeBuffer(128LL*1024LL);
str.open("test.txt", std::ios::out | std::ios::trunc);
watch.restart();
for (int r{0}; r < 10; r++)
{
for (int i{0}; i < 4000; i++)
{
str.write(outStrBuf.data(), outStrBuf.size());
}
str.seekp(0);
}
const auto writeLargeUs { watch.elapsed() };
str.close();
// Read with increased buffer size
str.open("test.txt", std::ios::in);
watch.restart();
for (int r{0}; r < 10; r++)
{
for (int i{0}; i < 4000; i++)
{
str.read(strBuf.data(), strBuf.size());
}
str.seekg(0);
}
const auto readLargeUs { watch.elapsed() };
str.close();
watch.stop();
Poco::File("test.txt").remove();
std::cout << std::endl;
std::cout << "Write (us) default/128k: " << writeDefUs << "/" << writeLargeUs << std::endl;
std::cout << "Read (us) default/128k: " << readDefUs << "/" << readLargeUs << std::endl;
// Measurements in test environment are not reliable enough.
//assertTrue(writeDefUs > writeLargeUs);
//assertTrue(readDefUs > readLargeUs);
}
void FileStreamTest::setUp() void FileStreamTest::setUp()
{ {
@ -363,6 +479,8 @@ CppUnit::Test* FileStreamTest::suite()
CppUnit_addTest(pSuite, FileStreamTest, testOpenModeApp); CppUnit_addTest(pSuite, FileStreamTest, testOpenModeApp);
CppUnit_addTest(pSuite, FileStreamTest, testSeek); CppUnit_addTest(pSuite, FileStreamTest, testSeek);
CppUnit_addTest(pSuite, FileStreamTest, testMultiOpen); CppUnit_addTest(pSuite, FileStreamTest, testMultiOpen);
CppUnit_addTest(pSuite, FileStreamTest, testBufferSize);
CppUnit_addTest(pSuite, FileStreamTest, testBufferSizePerformance);
return pSuite; return pSuite;
} }

View File

@ -36,6 +36,8 @@ public:
void testOpenModeApp(); void testOpenModeApp();
void testSeek(); void testSeek();
void testMultiOpen(); void testMultiOpen();
void testBufferSize();
void testBufferSizePerformance();
void setUp(); void setUp();
void tearDown(); void tearDown();