fix(logs): synchronise log file rotation and compression.

This commit is contained in:
Matej Kenda 2024-01-25 18:15:15 +01:00
parent 5bdbab6c5c
commit 9aec79719b
6 changed files with 204 additions and 36 deletions

View File

@ -23,6 +23,8 @@
#include "Poco/File.h"
#include "Poco/DateTimeFormatter.h"
#include "Poco/NumberFormatter.h"
#include "Poco/Mutex.h"
#include "Poco/Condition.h"
#include <atomic>
@ -51,6 +53,8 @@ public:
/// and creates and returns a new log file.
/// The given LogFile object is deleted.
void close();
void compress(bool flag = true);
/// Enables or disables compression of archived files.
@ -58,10 +62,21 @@ protected:
void moveFile(const std::string& oldName, const std::string& newName);
bool exists(const std::string& name);
Poco::FastMutex _rotateMutex;
// Log rotation must wait until all of the compression tasks complete
int _compressingCount;
Poco::Condition _compressingComplete;
private:
friend class ArchiveCompressor;
ArchiveStrategy(const ArchiveStrategy&);
ArchiveStrategy& operator = (const ArchiveStrategy&);
void compressFile(const std::string& path);
std::atomic<bool> _compress;
std::atomic<ArchiveCompressor*> _pCompressor;
};

View File

@ -24,6 +24,7 @@
#include "Poco/Void.h"
#include "Poco/FileStream.h"
#include <string_view>
namespace Poco {
@ -45,35 +46,18 @@ public:
{
}
ActiveMethod<void, std::string, ArchiveCompressor, ActiveStarter<ActiveDispatcher>> compress;
struct ArchiveToCompress
{
ArchiveStrategy* as;
std::string path;
};
ActiveMethod<void, ArchiveToCompress, ArchiveCompressor, ActiveStarter<ActiveDispatcher>> compress;
protected:
void compressImpl(const std::string& path)
void compressImpl(const ArchiveToCompress& ac)
{
std::string gzPath(path);
gzPath.append(".gz");
FileInputStream istr(path);
FileOutputStream ostr(gzPath);
try
{
DeflatingOutputStream deflater(ostr, DeflatingStreamBuf::STREAM_GZIP);
StreamCopier::copyStream(istr, deflater);
if (!deflater.good() || !ostr.good()) throw WriteFileException(gzPath);
deflater.close();
ostr.close();
istr.close();
}
catch (Poco::Exception&)
{
// deflating failed - remove gz file and leave uncompressed log file
ostr.close();
Poco::File gzf(gzPath);
gzf.remove();
return;
}
File f(path);
f.remove();
return;
ac.as->compressFile(ac.path);
}
};
@ -82,17 +66,41 @@ protected:
// ArchiveStrategy
//
// Prefix that is added to the file being compressed to be skipped by the
// purge strategy.
static const std::string compressFilePrefix ( ".~" );
ArchiveStrategy::ArchiveStrategy():
_compressingCount(0),
_compress(false),
_pCompressor(0)
_pCompressor(nullptr)
{
}
ArchiveStrategy::~ArchiveStrategy()
{
try
{
close();
}
catch(...)
{
poco_unexpected();
}
}
void ArchiveStrategy::close()
{
FastMutex::ScopedLock l(_rotateMutex);
while (_compressingCount > 0)
_compressingComplete.wait(_rotateMutex, 1000);
delete _pCompressor;
_pCompressor = nullptr;
}
@ -105,7 +113,7 @@ void ArchiveStrategy::compress(bool flag)
void ArchiveStrategy::moveFile(const std::string& oldPath, const std::string& newPath)
{
bool compressed = false;
Path p(oldPath);
const Path p(oldPath);
File f(oldPath);
if (!f.exists())
{
@ -115,15 +123,23 @@ void ArchiveStrategy::moveFile(const std::string& oldPath, const std::string& ne
std::string mvPath(newPath);
if (_compress || compressed)
mvPath.append(".gz");
if (!_compress || compressed)
{
f.renameTo(mvPath);
}
else
{
f.renameTo(newPath);
if (!_pCompressor) _pCompressor = new ArchiveCompressor;
_pCompressor.load()->compress(newPath);
_compressingCount++;
Path logdir { newPath };
logdir.makeParent();
const auto logfile { Path(newPath).getFileName() };
const auto compressPath = logdir.append(compressFilePrefix + logfile).toString();
f.renameTo(compressPath);
if (!_pCompressor)
_pCompressor = new ArchiveCompressor;
_pCompressor.load()->compress( {this, compressPath} );
}
}
@ -146,6 +162,62 @@ bool ArchiveStrategy::exists(const std::string& name)
}
void ArchiveStrategy::compressFile(const std::string& path)
{
FastMutex::ScopedLock l(_rotateMutex);
Path logdir { path };
logdir.makeParent();
auto removeFilePrefix = [&logdir](const std::string& path, const std::string& prefix) -> std::string
{
auto fname { Path(path).getFileName() };
const std::string_view fprefix(fname.data(), prefix.size());
if (fprefix == prefix)
return Path(logdir, fname.substr(prefix.size())).toString();
return path;
};
File f(path);
std::string gzPath(path);
gzPath.append(".gz");
FileInputStream istr(path);
FileOutputStream ostr(gzPath);
try
{
DeflatingOutputStream deflater(ostr, DeflatingStreamBuf::STREAM_GZIP);
StreamCopier::copyStream(istr, deflater);
if (!deflater.good() || !ostr.good())
throw WriteFileException(gzPath);
deflater.close();
ostr.close();
istr.close();
// Remove temporary prefix and set modification time to
// the time of the uncompressed file for purge strategy to work correctly
File zf(gzPath);
zf.renameTo(removeFilePrefix(gzPath, compressFilePrefix));
zf.setLastModified(f.getLastModified());
}
catch (const Poco::Exception&)
{
// deflating failed - remove gz file and leave uncompressed log file
ostr.close();
Poco::File gzf(gzPath);
gzf.remove();
f.renameTo(removeFilePrefix(path, compressFilePrefix));
}
f.remove();
_compressingCount--;
if (_compressingCount < 1)
_compressingComplete.broadcast();
}
//
// ArchiveByNumberStrategy
//
@ -169,6 +241,11 @@ LogFile* ArchiveByNumberStrategy::open(LogFile* pFile)
LogFile* ArchiveByNumberStrategy::archive(LogFile* pFile)
{
FastMutex::ScopedLock l(_rotateMutex);
while (_compressingCount > 0)
_compressingComplete.wait(_rotateMutex, 1000);
std::string basePath = pFile->path();
delete pFile;
int n = -1;

View File

@ -44,7 +44,7 @@ FileChannel::FileChannel():
_compress(false),
_flush(true),
_rotateOnOpen(false),
_pFile(0),
_pFile(nullptr),
_pRotateStrategy(new NullRotateStrategy()),
_pArchiveStrategy(new ArchiveByNumberStrategy),
_pPurgeStrategy(new NullPurgeStrategy())
@ -58,7 +58,7 @@ FileChannel::FileChannel(const std::string& path):
_compress(false),
_flush(true),
_rotateOnOpen(false),
_pFile(0),
_pFile(nullptr),
_pRotateStrategy(new NullRotateStrategy()),
_pArchiveStrategy(new ArchiveByNumberStrategy),
_pPurgeStrategy(new NullPurgeStrategy())
@ -111,8 +111,11 @@ void FileChannel::close()
{
FastMutex::ScopedLock lock(_mutex);
if (_pFile != nullptr)
_pArchiveStrategy->close();
delete _pFile;
_pFile = 0;
_pFile = nullptr;
}
@ -298,7 +301,7 @@ void FileChannel::setRotation(const std::string& rotation)
ArchiveStrategy* FileChannel::createArchiveStrategy(const std::string& archive, const std::string& times) const
{
ArchiveStrategy* pStrategy = 0;
ArchiveStrategy* pStrategy = nullptr;
if (archive == "number")
{
pStrategy = new ArchiveByNumberStrategy;
@ -328,7 +331,7 @@ void FileChannel::setArchiveStrategy(ArchiveStrategy* strategy)
void FileChannel::setArchive(const std::string& archive)
{
ArchiveStrategy* pStrategy = 0;
ArchiveStrategy* pStrategy = nullptr;
if (archive == "number")
{
pStrategy = new ArchiveByNumberStrategy;

View File

@ -16,6 +16,7 @@
#include "Poco/Path.h"
#include "Poco/DirectoryIterator.h"
#include "Poco/Timestamp.h"
#include <algorithm>
namespace Poco {
@ -126,6 +127,14 @@ void PurgeByCountStrategy::purge(const std::string& path)
{
std::vector<File> files;
list(path, files);
// Order files in ascending name order. Files with largest
// sequence number will be deleted in case that multiple files
// have the same modification time.
std::sort (files.begin(), files.end(),
[](const Poco::File& a, const Poco::File& b) { return a.path() < b.path(); }
);
while (files.size() > _count)
{
std::vector<File>::iterator it = files.begin();

View File

@ -30,6 +30,7 @@
#include "Poco/ArchiveStrategy.h"
#include "Poco/PurgeStrategy.h"
#include <vector>
#include <iostream>
using Poco::FileChannel;
@ -559,6 +560,67 @@ void FileChannelTest::testCompress()
}
void FileChannelTest::testCompressedRotation()
{
static const uint32_t MAX_ROLLOVER_TIMES = 8;
static const uint32_t LONG_MESSAGE_LENGTH = 1024;
static const uint32_t LONG_MAX_FILESIZE = 1024;
std::vector<uint8_t> longMessage(LONG_MESSAGE_LENGTH, '&');
longMessage.push_back(0);
Poco::Path logsPath(Poco::Path::current(), "logs");
Poco::File logsDir(logsPath.toString());
if (logsDir.exists())
logsDir.remove(true);
logsDir.createDirectory();
logsPath.append("test.log");
Poco::AutoPtr<Poco::FileChannel> fileChannel = new Poco::FileChannel("ABC");
fileChannel->setProperty(Poco::FileChannel::PROP_PATH, logsPath.toString());
fileChannel->setProperty(Poco::FileChannel::PROP_FLUSH, "false");
fileChannel->setProperty(Poco::FileChannel::PROP_ROTATION, "1 M");
fileChannel->setProperty(Poco::FileChannel::PROP_PURGECOUNT, "5");
fileChannel->setProperty(Poco::FileChannel::PROP_ARCHIVE, "number");
fileChannel->setProperty(Poco::FileChannel::PROP_TIMES, "local");
fileChannel->setProperty(Poco::FileChannel::PROP_COMPRESS, "true");
fileChannel->open();
std::string text(longMessage.begin(), longMessage.end());
for (uint32_t i = 1; i <= MAX_ROLLOVER_TIMES; ++i)
{
for (uint32_t j = 0; j < LONG_MAX_FILESIZE; ++j)
{
Poco::Message message("ABC", text, Poco::Message::PRIO_INFORMATION);
fileChannel->log(message);
}
}
fileChannel->close();
std::vector<std::string> files;
logsDir.list(files);
std::sort(files.begin(), files.end());
for (const auto& f: files)
std::cout << "log file: " << f << std::endl;
assertEqual(5+1+1, files.size()); // 5+1 rotated files, current file
assertEqual("test.log", files[0]);
assertEqual("test.log.0.gz", files[1]);
assertEqual("test.log.1.gz", files[2]);
assertEqual("test.log.2.gz", files[3]);
assertEqual("test.log.3.gz", files[4]);
assertEqual("test.log.4.gz", files[5]);
assertEqual("test.log.5.gz", files[6]);
logsDir.remove(true);
}
void FileChannelTest::purgeAge(const std::string& pa)
{
std::string name = filename();
@ -898,6 +960,7 @@ CppUnit::Test* FileChannelTest::suite()
CppUnit_addTest(pSuite, FileChannelTest, testArchive);
CppUnit_addTest(pSuite, FileChannelTest, testArchiveByStrategy);
CppUnit_addTest(pSuite, FileChannelTest, testCompress);
CppUnit_addTest(pSuite, FileChannelTest, testCompressedRotation);
CppUnit_addLongTest(pSuite, FileChannelTest, testPurgeAge);
CppUnit_addTest(pSuite, FileChannelTest, testPurgeCount);
CppUnit_addTest(pSuite, FileChannelTest, testWrongPurgeOption);

View File

@ -45,6 +45,7 @@ public:
void testArchive();
void testArchiveByStrategy();
void testCompress();
void testCompressedRotation();
void testPurgeAge();
void testPurgeCount();
void testWrongPurgeOption();