4529 sql logger shutdown (#4530)

* enh(ThreadTest): add join test

* enh(SQLLogger): #4529 (wip)

* enh(SQLChannel): just few C++17 modernisations.

* enh(SQLiteTest): extend unit test to destroy SQL channel while it has pending messages. (#4529)

* enh(ODBC): improve exception descriptions and add string/batch size tests

* feat(SQLChannel): add store-and-forward mode

* fix(DataTest): SQLChannel *nix build and test run #4529

* fix(DataTest): CodeQL warning #4529

* chore(ODBCTest): lower the number of max statements in big batch

* feat(SQLChannel): add flush property #4529

* enh(SQLChannel): use event for interruptible sleep; reduce code duplication with lambda; update comments #4529

* fix(SQLChannel): flush time determination #4529

---------

Co-authored-by: Matej Kenda <matejken@gmail.com>
This commit is contained in:
Aleksandar Fabijanic 2024-07-16 13:03:32 -05:00 committed by GitHub
parent 2442c66f84
commit ed181d99dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 724 additions and 398 deletions

View File

@ -651,10 +651,6 @@
RelativePath=".\include\Poco\Data\SimpleRowFormatter.h"
>
</File>
<File
RelativePath=".\include\Poco\Data\SQLChannel.h"
>
</File>
<File
RelativePath=".\include\Poco\Data\SQLParser.h"
>
@ -791,10 +787,6 @@
RelativePath=".\src\SimpleRowFormatter.cpp"
>
</File>
<File
RelativePath=".\src\SQLChannel.cpp"
>
</File>
<File
RelativePath=".\src\Statement.cpp"
>

View File

@ -180,9 +180,6 @@
<ClInclude Include="include\Poco\Data\SimpleRowFormatter.h">
<Filter>DataCore\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Data\SQLChannel.h">
<Filter>DataCore\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Data\SQLParser.h">
<Filter>DataCore\Header Files</Filter>
</ClInclude>
@ -297,6 +294,9 @@
<ClInclude Include="include\Poco\Data\ArchiveStrategy.h">
<Filter>Logging\Header Files</Filter>
</ClInclude>
<ClInclude Include="include\Poco\Data\SQLChannel.h">
<Filter>Logging\Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="src\AbstractBinder.cpp">
@ -374,9 +374,6 @@
<ClCompile Include="src\SimpleRowFormatter.cpp">
<Filter>DataCore\Source Files</Filter>
</ClCompile>
<ClCompile Include="src\SQLChannel.cpp">
<Filter>DataCore\Source Files</Filter>
</ClCompile>
<ClCompile Include="src\Statement.cpp">
<Filter>DataCore\Source Files</Filter>
</ClCompile>
@ -440,6 +437,9 @@
<ClCompile Include="src\ArchiveStrategy.cpp">
<Filter>Logging\Source Files</Filter>
</ClCompile>
<ClCompile Include="src\SQLChannel.cpp">
<Filter>Logging\Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="..\DLLVersion.rc" />

View File

@ -999,7 +999,7 @@ private:
{
std::size_t sz = it->size() * typeSize;
if (sz > _maxFieldSize)
throw LengthExceededException();
throw LengthExceededException("ODBC::Binder::getMinValueSize(%s, %d)", std::string(typeid(T).name()), static_cast<int>(size));
if (sz == _maxFieldSize)
{

View File

@ -528,8 +528,8 @@ void Binder::getColSizeAndPrecision(std::size_t pos,
if (actualSize > colSize)
{
throw LengthExceededException(Poco::format("Error binding column %z size=%z, max size=%ld)",
pos, actualSize, static_cast<long>(colSize)));
throw LengthExceededException(Poco::format("ODBC::Binder::getColSizeAndPrecision();%d: Error binding column %z size=%z, max size=%ld)",
__LINE__, pos, actualSize, static_cast<long>(colSize)));
}
foundPrec = _pTypeInfo->tryGetInfo(cDataType, "MAXIMUM_SCALE", tmp);
if (foundPrec) decDigits = tmp;
@ -565,8 +565,8 @@ void Binder::getColSizeAndPrecision(std::size_t pos,
// last check, just in case
if ((0 != colSize) && (actualSize > colSize))
{
throw LengthExceededException(Poco::format("Error binding column %z size=%z, max size=%ld)",
pos, actualSize, static_cast<long>(colSize)));
throw LengthExceededException(Poco::format("ODBC::Binder::getColSizeAndPrecision();%d: Error binding column %z size=%z, max size=%ld)",
__LINE__, pos, actualSize, static_cast<long>(colSize)));
}
return;

View File

@ -24,6 +24,7 @@
#include <iostream>
using namespace std::string_literals;
using namespace Poco::Data::Keywords;
using Poco::Data::DataException;
using Poco::Data::Statement;
@ -216,6 +217,76 @@ void ODBCSQLServerTest::testBLOB()
}
void ODBCSQLServerTest::testBigString()
{
std::string lastName(8000, 'l');
std::string firstName(8000, 'f');
std::string address("Address");
int age = 42;
for (int i = 0; i < 8;)
{
recreatePersonBigStringTable();
session().setFeature("autoBind", bindValue(i));
session().setFeature("autoExtract", bindValue(i + 1));
try
{
session() << "INSERT INTO Person VALUES (?,?,?,?)"s,
use(lastName), use(firstName), use(address), use(age), now;
}
catch (DataException& ce)
{
std::cout << ce.displayText() << std::endl;
failmsg(__func__);
}
i += 2;
}
}
void ODBCSQLServerTest::testBigBatch()
{
const std::string query("INSERT INTO Person VALUES('L', 'N', 'A', %d);");
std::string bigQuery;
// TODO: see what exactly the limits are here
int rows = 316, cnt = 0;
for (int i = 0; i < rows; ++i)
{
bigQuery += Poco::format(query, i);
}
for (int i = 0; i < 8;)
{
recreatePersonBigStringTable();
session().setFeature("autoBind", bindValue(i));
session().setFeature("autoExtract", bindValue(i + 1));
try
{
session() << bigQuery, now;
}
catch (DataException& ce)
{
std::cout << ce.displayText() << std::endl;
failmsg(__func__);
}
try
{
session() << "SELECT COUNT(*) FROM Person", into(cnt), now;
assertEqual(rows, cnt);
}
catch (DataException& ce)
{
std::cout << ce.displayText() << std::endl;
failmsg(__func__);
}
i += 2;
}
}
void ODBCSQLServerTest::testNull()
{
// test for NOT NULL violation exception
@ -706,6 +777,15 @@ void ODBCSQLServerTest::recreatePersonBLOBTable()
}
void ODBCSQLServerTest::recreatePersonBigStringTable()
{
dropObject("TABLE", "Person");
try { session() << "CREATE TABLE Person (LastName VARCHAR(MAX), FirstName VARCHAR(8000), Address VARCHAR(30), Age INTEGER)", now; }
catch (ConnectionException& ce) { std::cout << ce.toString() << std::endl; fail("recreatePersonBLOBTable()"); }
catch (StatementException& se) { std::cout << se.toString() << std::endl; fail("recreatePersonBLOBTable()"); }
}
void ODBCSQLServerTest::recreatePersonDateTimeTable()
{
dropObject("TABLE", "Person");
@ -939,6 +1019,8 @@ CppUnit::Test* ODBCSQLServerTest::suite()
CppUnit_addTest(pSuite, ODBCSQLServerTest, testSingleSelect);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testEmptyDB);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBLOB);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBigString);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBigBatch);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBLOBContainer);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBLOBStmt);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testRecordSet);

View File

@ -47,6 +47,8 @@ public:
void testTempTable();
void testBLOB();
void testBigString();
void testBigBatch();
void testNull();
void testBulk();
@ -65,6 +67,7 @@ private:
void recreateNullableTable();
void recreatePersonTable();
void recreatePersonBLOBTable();
void recreatePersonBigStringTable();
void recreatePersonDateTimeTable();
void recreatePersonDateTable() { /* no-op */ };
void recreatePersonTimeTable() { /* no-op */ };

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="Current" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="debug_shared|ARM64">
@ -757,8 +757,8 @@
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='debug_shared|x64'">
<ClCompile>
<Optimization>Disabled</Optimization>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<StringPooling>true</StringPooling>
<BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
<RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary>
@ -794,8 +794,8 @@
<IntrinsicFunctions>true</IntrinsicFunctions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<OmitFramePointers>true</OmitFramePointers>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<StringPooling>true</StringPooling>
<RuntimeLibrary>MultiThreadedDLL</RuntimeLibrary>
<BufferSecurityCheck>false</BufferSecurityCheck>
@ -826,8 +826,8 @@
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='debug_static_mt|x64'">
<ClCompile>
<Optimization>Disabled</Optimization>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<StringPooling>true</StringPooling>
<BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
<RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary>
@ -863,8 +863,8 @@
<IntrinsicFunctions>true</IntrinsicFunctions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<OmitFramePointers>true</OmitFramePointers>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<StringPooling>true</StringPooling>
<RuntimeLibrary>MultiThreaded</RuntimeLibrary>
<BufferSecurityCheck>false</BufferSecurityCheck>
@ -895,8 +895,8 @@
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='debug_static_md|x64'">
<ClCompile>
<Optimization>Disabled</Optimization>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<StringPooling>true</StringPooling>
<BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
<RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary>
@ -932,8 +932,8 @@
<IntrinsicFunctions>true</IntrinsicFunctions>
<FavorSizeOrSpeed>Speed</FavorSizeOrSpeed>
<OmitFramePointers>true</OmitFramePointers>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalIncludeDirectories>..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<StringPooling>true</StringPooling>
<RuntimeLibrary>MultiThreadedDLL</RuntimeLibrary>
<BufferSecurityCheck>false</BufferSecurityCheck>

View File

@ -2480,7 +2480,7 @@ void SQLiteTest::testSQLChannel()
{
Thread::sleep(10);
if (sw.elapsedSeconds() > 3)
fail ("SQLExecutor::sqlLogger(): SQLChannel timed out");
fail ("SQLChannel timed out");
}
// bulk binding mode is not suported by SQLite, but SQLChannel should handle it internally
pChannel->setProperty("bulk", "true");
@ -2537,6 +2537,17 @@ void SQLiteTest::testSQLChannel()
rs2.moveNext();
assertTrue("WarningSource" == rs2["Source"]);
assertTrue("f Warning sync message" == rs2["Text"]);
pChannel->setProperty("minBatch", "1024");
constexpr int mcount { 2000 };
for (int i = 0; i < mcount; i++)
{
Message msgInfG("InformationSource", "g Informational sync message", Message::PRIO_INFORMATION);
pChannel->log(msgInfG);
}
pChannel.reset();
RecordSet rsl(tmp, "SELECT * FROM T_POCO_LOG");
assertEquals(2+mcount, rsl.rowCount());
}

View File

@ -31,7 +31,9 @@
#include "Poco/String.h"
#include "Poco/NotificationQueue.h"
#include "Poco/Thread.h"
#include "Poco/Mutex.h"
#include "Poco/Stopwatch.h"
#include "Poco/Event.h"
#include <atomic>
#include <atomic>
@ -56,15 +58,21 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
///
/// The table name is configurable through "table" property.
/// Other than DateTime filed name used for optional time-based archiving purposes, currently the
/// field names are not mandated. However, it is recomended to use names as specified above.
/// field names are not mandated. However, it is recommended to use names as specified above.
///
/// To provide as non-intrusive operation as possbile, the log entries are cached and
/// To provide as non-intrusive operation as possible, the log entries are cached and
/// inserted into the target database asynchronously by default. The blocking, however, will occur
/// before the next entry insertion with default timeout of 1 second. The default settings can be
/// overridden (see async, timeout and throw properties for details).
/// If throw property is false, insertion timeouts are ignored, otherwise a TimeoutException is thrown.
/// To force insertion of every entry, set timeout to 0. This setting, however, introduces
/// a risk of long blocking periods in case of remote server communication delays.
///
/// A default-constructed SQLChannel operates without an active DB connection, in a store-and-forward
/// mode. For this mode of operation, a separate service is required to consume and execute the SQL
/// statements from the stored files and insert the log entries into the database. Since this setup
/// stores SQL inserts in the OS file system, it is strongly recommended to take the necessary
/// precautions to limit and secure the access to those files.
{
public:
class LogNotification : public Poco::Notification
@ -88,9 +96,15 @@ public:
static const int DEFAULT_MIN_BATCH_SIZE = 1;
static const int DEFAULT_MAX_BATCH_SIZE = 1000;
static const int DEFAULT_MAX_SQL_SIZE = 65536;
static const int DEFAULT_FLUSH_SECONDS = 10;
SQLChannel();
/// Creates SQLChannel.
/// An SQLChannel without DB connector and local file name
/// only logs SQL inserts into local files. A separate file
/// is created for each insert batch; files are named
/// <YYYYMMDDHH24MISSmmm>.<UUID>.log.sql.
SQLChannel(const std::string& connector,
const std::string& connect,
@ -98,27 +112,28 @@ public:
const std::string& table = "T_POCO_LOG",
int timeout = 1000,
int minBatch = DEFAULT_MIN_BATCH_SIZE,
int maxBatch = DEFAULT_MAX_BATCH_SIZE);
int maxBatch = DEFAULT_MAX_BATCH_SIZE,
int maxSQL = DEFAULT_MAX_SQL_SIZE);
/// Creates an SQLChannel with the given connector, connect string, timeout, table and name.
/// The connector must be already registered.
void open();
void open() override;
/// Opens the SQLChannel.
/// Returns true if succesful.
void close();
void close() override;
/// Closes the SQLChannel.
void run();
void run() override;
/// Dequeues and sends the logs to the DB.
bool isRunning() const;
/// Returns true if the logging thread is running.
void log(const Message& msg);
void log(const Message& msg) override;
/// Writes the log message to the database.
void setProperty(const std::string& name, const std::string& value);
void setProperty(const std::string& name, const std::string& value) override;
/// Sets the property with the given value.
///
/// The following properties are supported:
@ -149,8 +164,7 @@ public:
/// is asynchronous since the 1.13.0. release.
///
/// * timeout: Timeout (ms) to wait for previous log operation completion.
/// Values "0" and "" mean no timeout. Only valid when logging
/// is asynchronous, otherwise ignored.
/// Values "0" and "" mean no timeout.
///
/// * throw: Boolean value indicating whether to throw in case of timeout.
/// Setting this property to false may result in log entries being lost.
@ -166,13 +180,23 @@ public:
/// reaches this size, log entries are silently discarded.
/// Defaults to 100, can't be zero or larger than 1000.
///
/// * maxSQL: Maximum total length of the SQL statement. Defaults to 65536.
///
/// * bulk: Do bulk execute (on most DBMS systems, this can speed up things
/// drastically).
///
/// * file Destination file name for the backup FileChannel, used when DB
/// connection is not present to log not executed SQL statements.
///
/// * flush Time in seconds to flush outstanding log entries; since logging
/// is performed in batches of entries, the entries that do not make
/// it into the last logged batch may remain unlogged for a long time
/// during an extened period of inactivity. This setting ensures that
/// unlogged entries are flushed in such circumstances, even when the
/// minimum batch size was not reached.
/// Zero value means no flushing.
std::string getProperty(const std::string& name) const;
std::string getProperty(const std::string& name) const override;
/// Returns the value of the property with the given name.
void stop();
@ -198,42 +222,55 @@ public:
static const std::string PROP_TIMEOUT;
static const std::string PROP_MIN_BATCH;
static const std::string PROP_MAX_BATCH;
static const std::string PROP_MAX_SQL;
static const std::string PROP_BULK;
static const std::string PROP_THROW;
static const std::string PROP_DIRECTORY;
static const std::string PROP_FILE;
static const std::string PROP_FLUSH;
protected:
~SQLChannel();
~SQLChannel() override;
private:
static const std::string SQL_INSERT_STMT;
typedef Poco::SharedPtr<Session> SessionPtr;
typedef Poco::SharedPtr<Statement> StatementPtr;
typedef Poco::Message::Priority Priority;
typedef Poco::SharedPtr<ArchiveStrategy> StrategyPtr;
using SessionPtr = Poco::SharedPtr<Session>;
using StatementPtr = Poco::SharedPtr<Statement>;
using Priority = Poco::Message::Priority;
using StrategyPtr = Poco::SharedPtr<ArchiveStrategy>;
void reconnect();
/// Closes and opens the DB connection.
bool processOne(int minBatch = 0);
/// Processes one message.
bool processBatch(int minBatch = 0);
/// Processes a batch of messages.
/// If the number of acummulated messages is greater
/// than minBatch, sends logs to the destination.
/// Returns true if log entry was processed.
/// Returns true if at least one log entry was sent
/// to the destination.
size_t execSQL();
size_t execSQL(bool flush = false);
/// Executes the log statement.
size_t logSync();
size_t logSync(bool flush = false);
/// Inserts entries into the target database.
bool isTrue(const std::string& value) const;
/// Returns true is value is "true", "t", "yes" or "y".
/// Case insensitive.
size_t logTofile(AutoPtr<FileChannel>& pFileChannel, const std::string& fileName, bool clear = false);
/// Logs cached entries to a file. Called in case DB insertions fail.
size_t logToDB(bool flush = false);
/// Logs cached entries to the DB.
size_t logToFile(bool flush = false);
/// Logs cached entries to a file.
/// Called in case DB insertions fail or
/// in the store-and-forward mode of operation.
void logLocal(const std::string&, Message::Priority prio = Message::PRIO_ERROR);
/// Adds the message to the local SQLChannel log queue, and logs it to the file.
/// Typically used to log DB connection/execution erors.
std::string maskPwd();
/// Masks the password in the connection
@ -241,6 +278,10 @@ private:
/// bullet-proof method; if not succesful,
/// empty string is returned.
bool shouldFlush() const;
/// Returns true if there are unflushed log entries
/// and the flush timer has expired.
mutable Poco::FastMutex _mutex;
std::string _connector;
@ -249,30 +290,34 @@ private:
std::string _sql;
std::string _name;
std::string _table;
bool _tableChanged;
int _timeout;
std::atomic<bool> _tableChanged;
std::atomic<int> _timeout;
std::atomic<int> _minBatch;
int _maxBatch;
bool _bulk;
std::atomic<int> _maxBatch;
std::atomic<int> _maxSQL;
std::atomic<bool> _bulk;
std::atomic<bool> _throw;
std::atomic<int> _flush;
// members for log entry cache
std::vector<std::string> _source;
std::vector<long> _pid;
std::vector<std::string> _thread;
std::vector<long> _tid;
std::vector<int> _priority;
std::vector<std::string> _text;
std::vector<DateTime> _dateTime;
std::deque<std::string> _source;
std::deque<long> _pid;
std::deque<std::string> _thread;
std::deque<long> _tid;
std::deque<int> _priority;
std::deque<std::string> _text;
std::deque<DateTime> _dateTime;
Poco::NotificationQueue _logQueue;
std::unique_ptr<Poco::Thread> _pDBThread;
std::unique_ptr<Poco::Thread> _pLogThread;
std::atomic<bool> _reconnect;
std::atomic<bool> _running;
std::atomic<bool> _stop;
std::atomic<size_t> _logged;
StrategyPtr _pArchiveStrategy;
std::string _file;
std::string _directory;
AutoPtr<FileChannel> _pFileChannel;
Poco::Stopwatch _flushTimer;
Poco::Event _event;
Poco::Logger& _logger = Poco::Logger::get("SQLChannel");
};
@ -293,7 +338,7 @@ inline bool SQLChannel::isTrue(const std::string& value) const
inline bool SQLChannel::isRunning() const
{
return _running;
return _pLogThread && _pLogThread->isRunning();
}
@ -303,6 +348,13 @@ inline size_t SQLChannel::logged() const
}
inline bool SQLChannel::shouldFlush() const
{
return (_flush > 0 && _source.size() &&
(_flushTimer.elapsedSeconds() >= _flush));
}
} } // namespace Poco::Data

View File

@ -22,10 +22,13 @@
#include "Poco/Instantiator.h"
#include "Poco/NumberParser.h"
#include "Poco/NumberFormatter.h"
#include "Poco/Stopwatch.h"
#include "Poco/Format.h"
#include "Poco/Path.h"
#include "Poco/File.h"
#include "Poco/UUID.h"
#include "Poco/UUIDGenerator.h"
#include <fstream>
#include <memory>
namespace Poco {
@ -45,9 +48,12 @@ const std::string SQLChannel::PROP_ASYNC("async");
const std::string SQLChannel::PROP_TIMEOUT("timeout");
const std::string SQLChannel::PROP_MIN_BATCH("minBatch");
const std::string SQLChannel::PROP_MAX_BATCH("maxBatch");
const std::string SQLChannel::PROP_MAX_SQL("maxSQL");
const std::string SQLChannel::PROP_BULK("bulk");
const std::string SQLChannel::PROP_THROW("throw");
const std::string SQLChannel::PROP_DIRECTORY("directory");
const std::string SQLChannel::PROP_FILE("file");
const std::string SQLChannel::PROP_FLUSH("flush");
const std::string SQLChannel::SQL_INSERT_STMT = "INSERT INTO %s " \
@ -56,22 +62,23 @@ const std::string SQLChannel::SQL_INSERT_STMT = "INSERT INTO %s " \
SQLChannel::SQLChannel():
_name("-"),
_table("T_POCO_LOG"),
_name("SQLChannel"),
_tableChanged(true),
_timeout(1000),
_minBatch(DEFAULT_MIN_BATCH_SIZE),
_maxBatch(DEFAULT_MAX_BATCH_SIZE),
_maxSQL(DEFAULT_MAX_SQL_SIZE),
_bulk(true),
_throw(false),
_pid(),
_tid(),
_priority(),
_pLogThread(new Thread),
_reconnect(false),
_running(false),
_stop(false),
_logged(0)
{
_pLogThread->start(*this);
}
@ -81,7 +88,8 @@ SQLChannel::SQLChannel(const std::string& connector,
const std::string& table,
int timeout,
int minBatch,
int maxBatch) :
int maxBatch,
int maxSQL) :
_connector(connector),
_connect(connect),
_name(name),
@ -90,18 +98,18 @@ SQLChannel::SQLChannel(const std::string& connector,
_timeout(timeout),
_minBatch(minBatch),
_maxBatch(maxBatch),
_maxSQL(maxSQL),
_bulk(false),
_throw(false),
_pid(),
_tid(),
_priority(),
_pDBThread(new Thread),
_pLogThread(new Thread),
_reconnect(true),
_running(false),
_stop(false),
_logged(0)
{
_pDBThread->start(*this);
_pLogThread->start(*this);
}
@ -157,12 +165,13 @@ void SQLChannel::open()
_pSession = new Session(_connector, _connect, _timeout / 1000);
if (_pSession->hasProperty("maxFieldSize")) _pSession->setProperty("maxFieldSize", 8192);
if (_pSession->hasProperty("autoBind")) _pSession->setFeature("autoBind", true);
_logger.information("Connected to %s: %s", _connector, maskPwd());
if (!_stop) _logger.information("Connected to %s: %s", _connector, maskPwd());
else logLocal(Poco::format("Connected to %s: %s", _connector, maskPwd()));
return;
}
catch (DataException& ex)
catch (const DataException& ex)
{
_logger.error(ex.displayText());
logLocal(ex.displayText());
}
}
_pSession = nullptr;
@ -173,23 +182,35 @@ void SQLChannel::open()
void SQLChannel::close()
{
wait(_timeout);
if (_pSession)
{
_pSession->close();
_pSession = nullptr;
}
}
void SQLChannel::logLocal(const std::string& message, Message::Priority prio)
{
Message msg("SQLChannel"s, message, prio);
log(msg);
}
void SQLChannel::log(const Message& msg)
{
_logQueue.enqueueNotification(new LogNotification(msg));
_event.set();
}
size_t SQLChannel::logSync()
size_t SQLChannel::logSync(bool flush)
{
try
{
return execSQL();
return execSQL(flush);
}
catch (Exception&)
catch (...)
{
if (_throw) throw;
}
@ -198,10 +219,10 @@ size_t SQLChannel::logSync()
}
bool SQLChannel::processOne(int minBatch)
bool SQLChannel::processBatch(int minBatch)
{
bool ret = false;
if (_logQueue.size())
bool ret = false, flush = (minBatch == 0);
while (_logQueue.size())
{
Notification::Ptr pN = _logQueue.dequeueNotification();
LogNotification::Ptr pLN = pN.cast<LogNotification>();
@ -218,11 +239,13 @@ bool SQLChannel::processOne(int minBatch)
_priority.push_back(msg.getPriority());
_text.push_back(msg.getText());
Poco::replaceInPlace(_text.back(), "'", "''");
_dateTime.push_back(msg.getTime());
}
_dateTime.emplace_back(msg.getTime());
if ((_source.size() >= minBatch) || flush)
logSync(flush);
ret = true;
}
if (_source.size() >= _minBatch) logSync();
}
if (flush) logSync(flush);
return ret;
}
@ -230,6 +253,7 @@ bool SQLChannel::processOne(int minBatch)
void SQLChannel::run()
{
_flushTimer.start();
long sleepTime = 100; // milliseconds
while (!_stop)
{
@ -239,53 +263,286 @@ void SQLChannel::run()
{
close();
open();
_reconnect = _pSession.isNull();
_reconnect = !_connector.empty() && _pSession.isNull();
if (_reconnect && sleepTime < 12800)
sleepTime *= 2;
}
processOne(_minBatch);
if (!_reconnect)
{
if (_logQueue.size()) processBatch(_minBatch);
if (shouldFlush()) processBatch();
sleepTime = 100;
}
catch (Poco::Exception& ex)
{
_logger.error(ex.displayText());
}
catch (std::exception& ex)
catch (const Poco::Exception& ex)
{
if (!_stop)
_logger.error(ex.displayText());
else
logLocal(ex.displayText());
}
catch (const std::exception& ex)
{
if (!_stop)
_logger.error(ex.what());
else
logLocal(ex.what());
}
catch (...)
{
_logger.error("SQLChannel::run(): unknown exception");
if (!_stop)
_logger.error("SQLChannel::run(): unknown exception"s);
else
logLocal("SQLChannel::run(): unknown exception"s);
}
_running = true;
Thread::sleep(100);
if (_stop)
{
break;
}
_running = false;
_event.tryWait(sleepTime);
}
while (_logQueue.size() || _source.size())
processBatch();
}
void SQLChannel::stop()
{
if (_pDBThread)
if (_pLogThread)
{
_reconnect = false;
_stop = true;
_pDBThread->join();
while (_logQueue.size())
processOne();
while (_pLogThread->isRunning())
Thread::sleep(10);
_pLogThread->join();
_pLogThread.reset();
_event.set();
}
}
void SQLChannel::reconnect()
{
if (!_pDBThread)
{
_pDBThread.reset(new Thread);
_pDBThread->start(*this);
}
_reconnect = true;
if (!_pLogThread)
{
_pLogThread = std::make_unique<Thread>();
_pLogThread->start(*this);
}
}
size_t SQLChannel::logToFile(bool flush)
{
if (_source.empty()) return 0u;
static std::vector<std::string> names;
if (names.size() != _source.size())
names.resize(_source.size(), Poco::replace(_name, "'", "''"));
std::size_t n = 0, batch = 0;
AutoPtr<FileChannel> pFileChannel;
std::string file = _file;
if (!_pFileChannel && !_file.empty())
{
if (!Path(File(file).path()).isAbsolute())
file = _directory + file;
_pFileChannel = new FileChannel(file);
pFileChannel = _pFileChannel;
}
else
{
UUID uuid = UUIDGenerator::defaultGenerator().createRandom();
std::string filename(_directory);
filename.append(DateTimeFormatter::format(LocalDateTime(), "%Y%m%d%H%M%S%i.").append(uuid.toString()).append(".log.sql"s));
pFileChannel = new FileChannel(filename);
}
std::stringstream os;
auto doLog = [&]
{
Message msg(_source[0], os.str(), Message::PRIO_FATAL);
pFileChannel->log(msg);
n += batch;
_logged += batch;
_source.erase(_source.begin(), _source.begin()+batch);
_pid.erase(_pid.begin(), _pid.begin() + batch);
_thread.erase(_thread.begin(), _thread.begin() + batch);
_tid.erase(_tid.begin(), _tid.begin() + batch);
_priority.erase(_priority.begin(), _priority.begin() + batch);
_text.erase(_text.begin(), _text.begin() + batch);
_dateTime.erase(_dateTime.begin(), _dateTime.begin() + batch);
_flushTimer.restart();
};
if (pFileChannel)
{
std::string sql;
Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
std::stringstream tmp;
os << sql << '\n';
auto it = _source.begin();
auto end = _source.end();
int idx = 0;
for (; it != end; ++idx)
{
std::string dt = DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i");
tmp.str("");
tmp << "('" << *it << "','" <<
names[idx] << "'," <<
_pid[idx] << ",'" <<
_thread[idx] << "'," <<
_tid[idx] << ',' <<
_priority[idx] << ",'" <<
_text[idx] << "','" <<
dt << "')";
if (++batch == _maxBatch || (os.str().length() + tmp.str().length()) >= _maxSQL)
{
os << ";\n";
doLog();
os.str(""); sql.clear();
Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
os << sql << '\n' << tmp.str();
batch = 0;
}
os << tmp.str();
if (++it == end)
{
os << ";\n";
break;
}
os << ",\n";
}
if ((batch >= _minBatch) || flush) doLog();
}
return n;
}
size_t SQLChannel::logToDB(bool flush)
{
if (_source.empty()) return 0u;
static std::vector<std::string> names;
if (names.size() != _source.size())
names.resize(_source.size(), Poco::replace(_name, "'", "''"));
static std::string placeholders = "(?,?,?,?,?,?,?,?)";
Poco::FastMutex::ScopedLock l(_mutex);
if (_tableChanged)
{
Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders);
_tableChanged = false;
}
std::size_t n = 0;
try
{
if (_bulk)
{
try
{
(*_pSession) << _sql,
use(_source, bulk),
use(names, bulk),
use(_pid, bulk),
use(_thread, bulk),
use(_tid, bulk),
use(_priority, bulk),
use(_text, bulk),
use(_dateTime, bulk), now;
}
// most likely bulk mode not supported, try again
catch (const Poco::InvalidAccessException&)
{
(*_pSession) << _sql,
use(_source),
use(names),
use(_pid),
use(_thread),
use(_tid),
use(_priority),
use(_text),
use(_dateTime), now;
_bulk = false;
}
}
else
{
(*_pSession) << _sql,
use(_source),
use(names),
use(_pid),
use(_thread),
use(_tid),
use(_priority),
use(_text),
use(_dateTime), now;
}
n = _source.size();
}
catch (const Poco::Exception& ex)
{
logLocal(ex.displayText());
close();
_reconnect = true;
}
catch (const std::exception& ex)
{
logLocal(ex.what());
close();
_reconnect = true;
}
if (n)
{
_logged += n;
_source.erase(_source.begin(), _source.begin() + n);
_pid.erase(_pid.begin(), _pid.begin() + n);
_thread.erase(_thread.begin(), _thread.begin() + n);
_tid.erase(_tid.begin(), _tid.begin() + n);
_priority.erase(_priority.begin(), _priority.begin() + n);
_text.erase(_text.begin(), _text.begin() + n);
_dateTime.erase(_dateTime.begin(), _dateTime.begin() + n);
_flushTimer.restart();
}
return n;
}
size_t SQLChannel::execSQL(bool flush)
{
if (!_connector.empty() && (!_pSession || !_pSession->isConnected())) open();
if (_pArchiveStrategy) _pArchiveStrategy->archive();
size_t n = _pSession ? logToDB(flush) : logToFile(flush);
return n;
}
std::size_t SQLChannel::wait(int ms)
{
Stopwatch sw;
sw.start();
while (_logQueue.size())
{
Thread::sleep(10);
if (ms && sw.elapsed() * 1000 > ms)
break;
}
return _logQueue.size();
}
@ -319,7 +576,7 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
{
if (value.empty())
{
_pArchiveStrategy = 0;
_pArchiveStrategy = nullptr;
}
else if (_pArchiveStrategy)
{
@ -336,7 +593,7 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
{
if (value.empty() || "forever" == value)
{
_pArchiveStrategy = 0;
_pArchiveStrategy = nullptr;
}
else if (_pArchiveStrategy)
{
@ -374,6 +631,11 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
throw Poco::InvalidArgumentException(Poco::format("SQLChannel::setProperty(%s,%s)", name, value));
_maxBatch = maxBatch;
}
else if (name == PROP_MAX_SQL)
{
int maxSQL = NumberParser::parse(value);
_maxSQL = maxSQL;
}
else if (name == PROP_BULK)
{
_bulk = isTrue(value);
@ -382,10 +644,24 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
{
_throw = isTrue(value);
}
else if (name == PROP_DIRECTORY)
{
std::string dir = value;
if (!Path(File(dir).path()).isAbsolute())
{
Path d(dir);
dir = d.makeDirectory().makeAbsolute().toString();
}
_directory = dir;
}
else if (name == PROP_FILE)
{
_file = value;
}
else if (name == PROP_FLUSH)
{
_flush.store(NumberParser::parse(value));
}
else
{
Channel::setProperty(name, value);
@ -434,6 +710,10 @@ std::string SQLChannel::getProperty(const std::string& name) const
{
return std::to_string(_maxBatch);
}
else if (name == PROP_MAX_SQL)
{
return std::to_string(_maxSQL);
}
else if (name == PROP_BULK)
{
if (_bulk) return "true";
@ -444,10 +724,18 @@ std::string SQLChannel::getProperty(const std::string& name) const
if (_throw) return "true";
else return "false";
}
else if (name == PROP_DIRECTORY)
{
return _directory;
}
else if (name == PROP_FILE)
{
return _file;
}
else if (name == PROP_FLUSH)
{
return std::to_string(_flush);
}
else
{
return Channel::getProperty(name);
@ -455,189 +743,6 @@ std::string SQLChannel::getProperty(const std::string& name) const
}
size_t SQLChannel::logTofile(AutoPtr<FileChannel>& pFileChannel, const std::string& fileName, bool clear)
{
static std::vector<std::string> names;
if (names.size() != _source.size())
names.resize(_source.size(), Poco::replace(_name, "'", "''"));
std::size_t n = 0;
if (!pFileChannel) pFileChannel = new FileChannel(fileName);
if (pFileChannel)
{
std::string sql;
Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
std::stringstream os;
os << sql << '\n';
auto it = _source.begin();
auto end = _source.end();
int idx = 0, batch = 0;
for (; it != end; ++idx)
{
std::string dt = Poco::DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i");
os << "('" << *it << "','" <<
names[idx] << "'," <<
_pid[idx] << ",'" <<
_thread[idx] << "'," <<
_tid[idx] << ',' <<
_priority[idx] << ",'" <<
_text[idx] << "','" <<
dt << "')";
if (++batch == _maxBatch)
{
os << ";\n";
Message msg(_source[0], os.str(), Message::PRIO_ERROR);
pFileChannel->log(msg);
os.str(""); sql.clear();
Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
batch = 0;
}
if (++it == end)
{
os << ";\n";
break;
}
os << ",\n";
}
Message msg(_source[0], os.str(), Message::PRIO_ERROR);
pFileChannel->log(msg);
n = _source.size();
if (clear && n)
{
_source.clear();
_pid.clear();
_thread.clear();
_tid.clear();
_priority.clear();
_text.clear();
_dateTime.clear();
}
}
return n;
}
size_t SQLChannel::execSQL()
{
static std::vector<std::string> names;
if (names.size() != _source.size())
names.resize(_source.size(), Poco::replace(_name, "'", "''"));
static std::string placeholders = "(?,?,?,?,?,?,?,?)";
Poco::FastMutex::ScopedLock l(_mutex);
if (_tableChanged)
{
Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders);
_tableChanged = false;
}
if (!_pSession || !_pSession->isConnected()) open();
if (_pArchiveStrategy) _pArchiveStrategy->archive();
size_t n = 0;
if (_pSession)
{
try
{
if (_bulk)
{
try
{
(*_pSession) << _sql,
use(_source, bulk),
use(names, bulk),
use(_pid, bulk),
use(_thread, bulk),
use(_tid, bulk),
use(_priority, bulk),
use(_text, bulk),
use(_dateTime, bulk), now;
}
// most likely bulk mode not supported,
// log and try again
catch (Poco::InvalidAccessException& ex)
{
_logger.log(ex);
(*_pSession) << _sql,
use(_source),
use(names),
use(_pid),
use(_thread),
use(_tid),
use(_priority),
use(_text),
use(_dateTime), now;
_bulk = false;
}
}
else
{
(*_pSession) << _sql,
use(_source),
use(names),
use(_pid),
use(_thread),
use(_tid),
use(_priority),
use(_text),
use(_dateTime), now;
}
n = _source.size();
}
catch (Poco::Exception& ex)
{
_logger.error(ex.displayText());
if (!_file.empty())
n = logTofile(_pFileChannel, _file);
close();
_reconnect = true;
}
catch (std::exception& ex)
{
_logger.error(ex.what());
if (!_file.empty())
n = logTofile(_pFileChannel, _file);
close();
_reconnect = true;
}
}
else
{
if (!_file.empty())
n = logTofile(_pFileChannel, _file);
}
if (n)
{
_logged += n;
_source.clear();
_pid.clear();
_thread.clear();
_tid.clear();
_priority.clear();
_text.clear();
_dateTime.clear();
}
return n;
}
std::size_t SQLChannel::wait(int ms)
{
Stopwatch sw;
sw.start();
int processed = _logQueue.size();
while (_logQueue.size())
{
Thread::sleep(10);
if (ms && sw.elapsed() * 1000 > ms)
break;
}
return processed - _logQueue.size();
}
void SQLChannel::registerChannel()
{
Poco::LoggingFactory::defaultFactory().registerChannelClass("SQLChannel",

View File

@ -20,6 +20,7 @@
#include "Poco/Data/Column.h"
#include "Poco/Data/Date.h"
#include "Poco/Data/Time.h"
#include "Poco/Data/SQLChannel.h"
#include "Poco/Data/SimpleRowFormatter.h"
#include "Poco/Data/JSONRowFormatter.h"
#include "Poco/Data/DataException.h"
@ -27,12 +28,18 @@
#include "Poco/BinaryReader.h"
#include "Poco/BinaryWriter.h"
#include "Poco/DateTime.h"
#include "Poco/Stopwatch.h"
#include "Poco/Types.h"
#include "Poco/Dynamic/Var.h"
#include "Poco/Data/DynamicLOB.h"
#include "Poco/Data/DynamicDateTime.h"
#include "Poco/Latin1Encoding.h"
#include "Poco/Exception.h"
#include "Poco/DirectoryIterator.h"
#include "Poco/Glob.h"
#include "Poco/File.h"
#include "Poco/Path.h"
#include <string>
#include <cstring>
#include <sstream>
#include <iomanip>
@ -40,46 +47,10 @@
using namespace Poco;
using Poco::Dynamic::Var;
using namespace Poco::Data;
using namespace Poco::Data::Keywords;
using Poco::BinaryReader;
using Poco::BinaryWriter;
using Poco::UInt32;
using Poco::Int64;
using Poco::UInt64;
using Poco::DateTime;
using Poco::Latin1Encoding;
using Poco::Dynamic::Var;
using Poco::InvalidAccessException;
using Poco::IllegalStateException;
using Poco::RangeException;
using Poco::NotFoundException;
using Poco::InvalidArgumentException;
using Poco::NotImplementedException;
using Poco::Data::Session;
using Poco::Data::SessionFactory;
using Poco::Data::Statement;
using Poco::Data::NotSupportedException;
using Poco::Data::CLOB;
using Poco::Data::CLOBInputStream;
using Poco::Data::CLOBOutputStream;
using Poco::Data::MetaColumn;
using Poco::Data::Column;
using Poco::Data::Row;
using Poco::Data::RowFormatter;
using Poco::Data::SimpleRowFormatter;
using Poco::Data::JSONRowFormatter;
using Poco::Data::Date;
using Poco::Data::Time;
using Poco::Data::AbstractExtractor;
using Poco::Data::AbstractExtraction;
using Poco::Data::AbstractExtractionVec;
using Poco::Data::AbstractExtractionVecVec;
using Poco::Data::AbstractBinding;
using Poco::Data::AbstractBindingVec;
using Poco::Data::NotConnectedException;
using namespace std::string_literals;
DataTest::DataTest(const std::string& name): CppUnit::TestCase(name)
@ -1570,6 +1541,65 @@ void DataTest::testSQLParse()
}
void DataTest::testSQLChannel()
{
std::string dir = Path::tempHome();
AutoPtr<SQLChannel> pChannel = new SQLChannel();
pChannel->setProperty("directory", dir);
Stopwatch sw; sw.start();
while (!pChannel->isRunning())
{
Thread::sleep(10);
if (sw.elapsedSeconds() > 3)
fail("SQLChannel timed out");
}
Glob g("*.log.sql");
{
DirectoryIterator it(dir);
DirectoryIterator end;
while (it != end)
{
if (g.match(it->path()))
{
File(it->path()).remove();
}
++it;
}
}
constexpr int mcount{10};
constexpr int batch{3};
pChannel->setProperty("minBatch", std::to_string(batch));
constexpr int flush{1};
pChannel->setProperty("flush", std::to_string(flush));
assertEqual(flush, NumberParser::parse(pChannel->getProperty("flush")));
for (int i = 0; i < mcount; i++)
{
Message msgInfA("InformationSource", Poco::format("%d Informational sync message", i), Message::PRIO_INFORMATION);
pChannel->log(msgInfA);
}
Thread::sleep(2000*flush); // give it time to flush
auto logged = pChannel->logged();
assertEqual(mcount, logged);
pChannel.reset();
int count = 0;
DirectoryIterator it(dir);
DirectoryIterator end;
while (it != end)
{
if (g.match(it->path()))
{
++count;
File(it->path()).remove();
}
++it;
}
assertEqual(count, (mcount / batch) + (mcount % batch));
}
void DataTest::setUp()
{
}
@ -1603,6 +1633,7 @@ CppUnit::Test* DataTest::suite()
CppUnit_addTest(pSuite, DataTest, testExternalBindingAndExtraction);
CppUnit_addTest(pSuite, DataTest, testTranscode);
CppUnit_addTest(pSuite, DataTest, testSQLParse);
CppUnit_addTest(pSuite, DataTest, testSQLChannel);
return pSuite;
}

View File

@ -46,6 +46,7 @@ public:
void testExternalBindingAndExtraction();
void testTranscode();
void testSQLParse();
void testSQLChannel();
void setUp();
void tearDown();

View File

@ -122,6 +122,37 @@ private:
};
class JoinRunnable : public Runnable
{
public:
JoinRunnable() : _stop(false), _running(false)
{
}
void run()
{
_running = true;
while (!_stop)
Thread::sleep(100);
_running = false;
}
void stop()
{
_stop = true;
}
bool running() const
{
return _running;
}
private:
std::atomic<bool> _stop;
std::atomic<bool> _running;
};
class TrySleepRunnable : public Runnable
{
public:
@ -268,7 +299,7 @@ void ThreadTest::testThreads()
}
void ThreadTest::testJoin()
void ThreadTest::testTryJoin()
{
Thread thread;
MyRunnable r;
@ -283,6 +314,22 @@ void ThreadTest::testJoin()
}
void ThreadTest::testJoin()
{
Thread thread;
JoinRunnable r;
assertTrue(!thread.isRunning());
thread.start(r);
Thread::sleep(200);
assertTrue(thread.isRunning());
assertTrue(!thread.tryJoin(100));
r.stop();
thread.join();
assertTrue(!thread.isRunning());
assertTrue(!r.running());
}
void ThreadTest::testNotJoin()
{
Thread thread;
@ -512,6 +559,7 @@ CppUnit::Test* ThreadTest::suite()
CppUnit_addTest(pSuite, ThreadTest, testNamedThread);
CppUnit_addTest(pSuite, ThreadTest, testCurrent);
CppUnit_addTest(pSuite, ThreadTest, testThreads);
CppUnit_addTest(pSuite, ThreadTest, testTryJoin);
CppUnit_addTest(pSuite, ThreadTest, testJoin);
CppUnit_addTest(pSuite, ThreadTest, testNotJoin);
CppUnit_addTest(pSuite, ThreadTest, testNotRun);

View File

@ -28,6 +28,7 @@ public:
void testNamedThread();
void testCurrent();
void testThreads();
void testTryJoin();
void testJoin();
void testNotJoin();
void testNotRun();