diff --git a/Data/Data_VS90.vcproj b/Data/Data_VS90.vcproj
index 233d67d0a..53e09ff50 100644
--- a/Data/Data_VS90.vcproj
+++ b/Data/Data_VS90.vcproj
@@ -651,10 +651,6 @@
RelativePath=".\include\Poco\Data\SimpleRowFormatter.h"
>
-
-
@@ -791,10 +787,6 @@
RelativePath=".\src\SimpleRowFormatter.cpp"
>
-
-
diff --git a/Data/Data_vs170.vcxproj.filters b/Data/Data_vs170.vcxproj.filters
index 519029a35..e3de1552a 100644
--- a/Data/Data_vs170.vcxproj.filters
+++ b/Data/Data_vs170.vcxproj.filters
@@ -180,9 +180,6 @@
DataCore\Header Files
-
- DataCore\Header Files
-
DataCore\Header Files
@@ -297,6 +294,9 @@
Logging\Header Files
+
+ Logging\Header Files
+
@@ -374,9 +374,6 @@
DataCore\Source Files
-
- DataCore\Source Files
-
DataCore\Source Files
@@ -440,6 +437,9 @@
Logging\Source Files
+
+ Logging\Source Files
+
diff --git a/Data/ODBC/include/Poco/Data/ODBC/Binder.h b/Data/ODBC/include/Poco/Data/ODBC/Binder.h
index fb03be086..8ef164941 100644
--- a/Data/ODBC/include/Poco/Data/ODBC/Binder.h
+++ b/Data/ODBC/include/Poco/Data/ODBC/Binder.h
@@ -999,7 +999,7 @@ private:
{
std::size_t sz = it->size() * typeSize;
if (sz > _maxFieldSize)
- throw LengthExceededException();
+ throw LengthExceededException("ODBC::Binder::getMinValueSize(%s, %d)", std::string(typeid(T).name()), static_cast(size));
if (sz == _maxFieldSize)
{
diff --git a/Data/ODBC/src/Binder.cpp b/Data/ODBC/src/Binder.cpp
index 36ed66961..9c5541319 100644
--- a/Data/ODBC/src/Binder.cpp
+++ b/Data/ODBC/src/Binder.cpp
@@ -528,8 +528,8 @@ void Binder::getColSizeAndPrecision(std::size_t pos,
if (actualSize > colSize)
{
- throw LengthExceededException(Poco::format("Error binding column %z size=%z, max size=%ld)",
- pos, actualSize, static_cast(colSize)));
+ throw LengthExceededException(Poco::format("ODBC::Binder::getColSizeAndPrecision();%d: Error binding column %z size=%z, max size=%ld)",
+ __LINE__, pos, actualSize, static_cast(colSize)));
}
foundPrec = _pTypeInfo->tryGetInfo(cDataType, "MAXIMUM_SCALE", tmp);
if (foundPrec) decDigits = tmp;
@@ -565,8 +565,8 @@ void Binder::getColSizeAndPrecision(std::size_t pos,
// last check, just in case
if ((0 != colSize) && (actualSize > colSize))
{
- throw LengthExceededException(Poco::format("Error binding column %z size=%z, max size=%ld)",
- pos, actualSize, static_cast(colSize)));
+ throw LengthExceededException(Poco::format("ODBC::Binder::getColSizeAndPrecision();%d: Error binding column %z size=%z, max size=%ld)",
+ __LINE__, pos, actualSize, static_cast(colSize)));
}
return;
diff --git a/Data/ODBC/testsuite/src/ODBCSQLServerTest.cpp b/Data/ODBC/testsuite/src/ODBCSQLServerTest.cpp
index 44e5fa7ed..8b98e0f80 100644
--- a/Data/ODBC/testsuite/src/ODBCSQLServerTest.cpp
+++ b/Data/ODBC/testsuite/src/ODBCSQLServerTest.cpp
@@ -24,6 +24,7 @@
#include
+using namespace std::string_literals;
using namespace Poco::Data::Keywords;
using Poco::Data::DataException;
using Poco::Data::Statement;
@@ -216,6 +217,76 @@ void ODBCSQLServerTest::testBLOB()
}
+void ODBCSQLServerTest::testBigString()
+{
+ std::string lastName(8000, 'l');
+ std::string firstName(8000, 'f');
+ std::string address("Address");
+ int age = 42;
+
+ for (int i = 0; i < 8;)
+ {
+ recreatePersonBigStringTable();
+ session().setFeature("autoBind", bindValue(i));
+ session().setFeature("autoExtract", bindValue(i + 1));
+ try
+ {
+ session() << "INSERT INTO Person VALUES (?,?,?,?)"s,
+ use(lastName), use(firstName), use(address), use(age), now;
+ }
+ catch (DataException& ce)
+ {
+ std::cout << ce.displayText() << std::endl;
+ failmsg(__func__);
+ }
+ i += 2;
+ }
+}
+
+
+void ODBCSQLServerTest::testBigBatch()
+{
+ const std::string query("INSERT INTO Person VALUES('L', 'N', 'A', %d);");
+ std::string bigQuery;
+ // TODO: see what exactly the limits are here
+ int rows = 316, cnt = 0;
+ for (int i = 0; i < rows; ++i)
+ {
+ bigQuery += Poco::format(query, i);
+ }
+
+ for (int i = 0; i < 8;)
+ {
+ recreatePersonBigStringTable();
+ session().setFeature("autoBind", bindValue(i));
+ session().setFeature("autoExtract", bindValue(i + 1));
+
+ try
+ {
+ session() << bigQuery, now;
+ }
+ catch (DataException& ce)
+ {
+ std::cout << ce.displayText() << std::endl;
+ failmsg(__func__);
+ }
+
+ try
+ {
+ session() << "SELECT COUNT(*) FROM Person", into(cnt), now;
+ assertEqual(rows, cnt);
+ }
+ catch (DataException& ce)
+ {
+ std::cout << ce.displayText() << std::endl;
+ failmsg(__func__);
+ }
+
+ i += 2;
+ }
+}
+
+
void ODBCSQLServerTest::testNull()
{
// test for NOT NULL violation exception
@@ -706,6 +777,15 @@ void ODBCSQLServerTest::recreatePersonBLOBTable()
}
+void ODBCSQLServerTest::recreatePersonBigStringTable()
+{
+ dropObject("TABLE", "Person");
+ try { session() << "CREATE TABLE Person (LastName VARCHAR(MAX), FirstName VARCHAR(8000), Address VARCHAR(30), Age INTEGER)", now; }
+ catch (ConnectionException& ce) { std::cout << ce.toString() << std::endl; fail("recreatePersonBLOBTable()"); }
+ catch (StatementException& se) { std::cout << se.toString() << std::endl; fail("recreatePersonBLOBTable()"); }
+}
+
+
void ODBCSQLServerTest::recreatePersonDateTimeTable()
{
dropObject("TABLE", "Person");
@@ -939,6 +1019,8 @@ CppUnit::Test* ODBCSQLServerTest::suite()
CppUnit_addTest(pSuite, ODBCSQLServerTest, testSingleSelect);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testEmptyDB);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBLOB);
+ CppUnit_addTest(pSuite, ODBCSQLServerTest, testBigString);
+ CppUnit_addTest(pSuite, ODBCSQLServerTest, testBigBatch);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBLOBContainer);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testBLOBStmt);
CppUnit_addTest(pSuite, ODBCSQLServerTest, testRecordSet);
diff --git a/Data/ODBC/testsuite/src/ODBCSQLServerTest.h b/Data/ODBC/testsuite/src/ODBCSQLServerTest.h
index c52319296..a9c3d98fa 100644
--- a/Data/ODBC/testsuite/src/ODBCSQLServerTest.h
+++ b/Data/ODBC/testsuite/src/ODBCSQLServerTest.h
@@ -47,6 +47,8 @@ public:
void testTempTable();
void testBLOB();
+ void testBigString();
+ void testBigBatch();
void testNull();
void testBulk();
@@ -65,6 +67,7 @@ private:
void recreateNullableTable();
void recreatePersonTable();
void recreatePersonBLOBTable();
+ void recreatePersonBigStringTable();
void recreatePersonDateTimeTable();
void recreatePersonDateTable() { /* no-op */ };
void recreatePersonTimeTable() { /* no-op */ };
diff --git a/Data/SQLite/testsuite/TestSuite_vs170.vcxproj b/Data/SQLite/testsuite/TestSuite_vs170.vcxproj
index 0572a4d56..f1b8c93c9 100644
--- a/Data/SQLite/testsuite/TestSuite_vs170.vcxproj
+++ b/Data/SQLite/testsuite/TestSuite_vs170.vcxproj
@@ -1,4 +1,4 @@
-
+
@@ -81,7 +81,7 @@
TestSuite
Win32Proj
-
+
Application
MultiByte
@@ -172,63 +172,63 @@
MultiByte
v143
-
-
+
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
<_ProjectFileVersion>17.0.34714.143
TestSuited
@@ -352,7 +352,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -388,9 +388,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -421,7 +421,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -457,9 +457,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -490,7 +490,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -526,9 +526,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -559,7 +559,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -595,9 +595,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -628,7 +628,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -664,9 +664,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -697,7 +697,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -733,9 +733,9 @@
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -757,8 +757,8 @@
Disabled
- ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)
- WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
+ ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
+ WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
true
EnableFastChecks
MultiThreadedDebugDLL
@@ -766,7 +766,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -794,17 +794,17 @@
true
Speed
true
- ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)
- WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
+ ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
+ WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;%(PreprocessorDefinitions)
true
MultiThreadedDLL
false
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -826,8 +826,8 @@
Disabled
- ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)
- WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
+ WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
EnableFastChecks
MultiThreadedDebug
@@ -835,7 +835,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -863,17 +863,17 @@
true
Speed
true
- ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)
- WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
+ WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
MultiThreaded
false
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -895,8 +895,8 @@
Disabled
- ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)
- WIN32;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
+ WIN32;_CRT_SECURE_NO_WARNINGS;_DEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
EnableFastChecks
MultiThreadedDebugDLL
@@ -904,7 +904,7 @@
true
true
true
-
+
Level3
ProgramDatabase
Default
@@ -932,17 +932,17 @@
true
Speed
true
- ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;%(AdditionalIncludeDirectories)
- WIN32;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
+ ..\include;..\..\..\CppUnit\include;..\..\..\Foundation\include;..\..\..\Data\include;..\..\..\Data\SQLParser;..\..\..\Data\SQLParser\src;%(AdditionalIncludeDirectories)
+ WIN32;_CRT_SECURE_NO_WARNINGS;NDEBUG;_WINDOWS;WINVER=0x0600;POCO_STATIC;%(PreprocessorDefinitions)
true
MultiThreadedDLL
false
true
true
true
-
+
Level3
-
+
Default
$(OutDir)$(TargetName).pdb
/Zc:__cplusplus %(AdditionalOptions)
@@ -962,8 +962,8 @@
-
-
+
+
@@ -982,6 +982,6 @@
stdc11
-
-
-
+
+
+
\ No newline at end of file
diff --git a/Data/SQLite/testsuite/src/SQLiteTest.cpp b/Data/SQLite/testsuite/src/SQLiteTest.cpp
index c197db981..d86e0838c 100755
--- a/Data/SQLite/testsuite/src/SQLiteTest.cpp
+++ b/Data/SQLite/testsuite/src/SQLiteTest.cpp
@@ -2480,7 +2480,7 @@ void SQLiteTest::testSQLChannel()
{
Thread::sleep(10);
if (sw.elapsedSeconds() > 3)
- fail ("SQLExecutor::sqlLogger(): SQLChannel timed out");
+ fail ("SQLChannel timed out");
}
// bulk binding mode is not suported by SQLite, but SQLChannel should handle it internally
pChannel->setProperty("bulk", "true");
@@ -2537,6 +2537,17 @@ void SQLiteTest::testSQLChannel()
rs2.moveNext();
assertTrue("WarningSource" == rs2["Source"]);
assertTrue("f Warning sync message" == rs2["Text"]);
+
+ pChannel->setProperty("minBatch", "1024");
+ constexpr int mcount { 2000 };
+ for (int i = 0; i < mcount; i++)
+ {
+ Message msgInfG("InformationSource", "g Informational sync message", Message::PRIO_INFORMATION);
+ pChannel->log(msgInfG);
+ }
+ pChannel.reset();
+ RecordSet rsl(tmp, "SELECT * FROM T_POCO_LOG");
+ assertEquals(2+mcount, rsl.rowCount());
}
@@ -3513,7 +3524,7 @@ void SQLiteTest::testIllegalFilePath()
}
}
-void SQLiteTest::testTransactionTypeProperty()
+void SQLiteTest::testTransactionTypeProperty()
{
try {
using namespace Poco::Data::SQLite;
diff --git a/Data/include/Poco/Data/SQLChannel.h b/Data/include/Poco/Data/SQLChannel.h
index cb1c8e454..9491224da 100644
--- a/Data/include/Poco/Data/SQLChannel.h
+++ b/Data/include/Poco/Data/SQLChannel.h
@@ -31,7 +31,9 @@
#include "Poco/String.h"
#include "Poco/NotificationQueue.h"
#include "Poco/Thread.h"
-#include "Poco/Mutex.h"
+#include "Poco/Stopwatch.h"
+#include "Poco/Event.h"
+#include
#include
@@ -56,15 +58,21 @@ class Data_API SQLChannel: public Poco::Channel, Poco::Runnable
///
/// The table name is configurable through "table" property.
/// Other than DateTime filed name used for optional time-based archiving purposes, currently the
- /// field names are not mandated. However, it is recomended to use names as specified above.
+ /// field names are not mandated. However, it is recommended to use names as specified above.
///
- /// To provide as non-intrusive operation as possbile, the log entries are cached and
+ /// To provide as non-intrusive operation as possible, the log entries are cached and
/// inserted into the target database asynchronously by default. The blocking, however, will occur
/// before the next entry insertion with default timeout of 1 second. The default settings can be
/// overridden (see async, timeout and throw properties for details).
/// If throw property is false, insertion timeouts are ignored, otherwise a TimeoutException is thrown.
/// To force insertion of every entry, set timeout to 0. This setting, however, introduces
/// a risk of long blocking periods in case of remote server communication delays.
+ ///
+ /// A default-constructed SQLChannel operates without an active DB connection, in a store-and-forward
+ /// mode. For this mode of operation, a separate service is required to consume and execute the SQL
+ /// statements from the stored files and insert the log entries into the database. Since this setup
+ /// stores SQL inserts in the OS file system, it is strongly recommended to take the necessary
+ /// precautions to limit and secure the access to those files.
{
public:
class LogNotification : public Poco::Notification
@@ -88,9 +96,15 @@ public:
static const int DEFAULT_MIN_BATCH_SIZE = 1;
static const int DEFAULT_MAX_BATCH_SIZE = 1000;
+ static const int DEFAULT_MAX_SQL_SIZE = 65536;
+ static const int DEFAULT_FLUSH_SECONDS = 10;
SQLChannel();
/// Creates SQLChannel.
+ /// An SQLChannel without DB connector and local file name
+ /// only logs SQL inserts into local files. A separate file
+ /// is created for each insert batch; files are named
+ /// ..log.sql.
SQLChannel(const std::string& connector,
const std::string& connect,
@@ -98,27 +112,28 @@ public:
const std::string& table = "T_POCO_LOG",
int timeout = 1000,
int minBatch = DEFAULT_MIN_BATCH_SIZE,
- int maxBatch = DEFAULT_MAX_BATCH_SIZE);
+ int maxBatch = DEFAULT_MAX_BATCH_SIZE,
+ int maxSQL = DEFAULT_MAX_SQL_SIZE);
/// Creates an SQLChannel with the given connector, connect string, timeout, table and name.
/// The connector must be already registered.
- void open();
+ void open() override;
/// Opens the SQLChannel.
/// Returns true if succesful.
- void close();
+ void close() override;
/// Closes the SQLChannel.
- void run();
+ void run() override;
/// Dequeues and sends the logs to the DB.
bool isRunning() const;
/// Returns true if the logging thread is running.
- void log(const Message& msg);
+ void log(const Message& msg) override;
/// Writes the log message to the database.
- void setProperty(const std::string& name, const std::string& value);
+ void setProperty(const std::string& name, const std::string& value) override;
/// Sets the property with the given value.
///
/// The following properties are supported:
@@ -149,8 +164,7 @@ public:
/// is asynchronous since the 1.13.0. release.
///
/// * timeout: Timeout (ms) to wait for previous log operation completion.
- /// Values "0" and "" mean no timeout. Only valid when logging
- /// is asynchronous, otherwise ignored.
+ /// Values "0" and "" mean no timeout.
///
/// * throw: Boolean value indicating whether to throw in case of timeout.
/// Setting this property to false may result in log entries being lost.
@@ -166,13 +180,23 @@ public:
/// reaches this size, log entries are silently discarded.
/// Defaults to 100, can't be zero or larger than 1000.
///
+ /// * maxSQL: Maximum total length of the SQL statement. Defaults to 65536.
+ ///
/// * bulk: Do bulk execute (on most DBMS systems, this can speed up things
/// drastically).
///
/// * file Destination file name for the backup FileChannel, used when DB
/// connection is not present to log not executed SQL statements.
+ ///
+ /// * flush Time in seconds to flush outstanding log entries; since logging
+ /// is performed in batches of entries, the entries that do not make
+ /// it into the last logged batch may remain unlogged for a long time
+ /// during an extened period of inactivity. This setting ensures that
+ /// unlogged entries are flushed in such circumstances, even when the
+ /// minimum batch size was not reached.
+ /// Zero value means no flushing.
- std::string getProperty(const std::string& name) const;
+ std::string getProperty(const std::string& name) const override;
/// Returns the value of the property with the given name.
void stop();
@@ -198,42 +222,55 @@ public:
static const std::string PROP_TIMEOUT;
static const std::string PROP_MIN_BATCH;
static const std::string PROP_MAX_BATCH;
+ static const std::string PROP_MAX_SQL;
static const std::string PROP_BULK;
static const std::string PROP_THROW;
+ static const std::string PROP_DIRECTORY;
static const std::string PROP_FILE;
+ static const std::string PROP_FLUSH;
protected:
- ~SQLChannel();
+ ~SQLChannel() override;
private:
static const std::string SQL_INSERT_STMT;
- typedef Poco::SharedPtr SessionPtr;
- typedef Poco::SharedPtr StatementPtr;
- typedef Poco::Message::Priority Priority;
- typedef Poco::SharedPtr StrategyPtr;
+ using SessionPtr = Poco::SharedPtr;
+ using StatementPtr = Poco::SharedPtr;
+ using Priority = Poco::Message::Priority;
+ using StrategyPtr = Poco::SharedPtr;
void reconnect();
/// Closes and opens the DB connection.
- bool processOne(int minBatch = 0);
- /// Processes one message.
+ bool processBatch(int minBatch = 0);
+ /// Processes a batch of messages.
/// If the number of acummulated messages is greater
/// than minBatch, sends logs to the destination.
- /// Returns true if log entry was processed.
+ /// Returns true if at least one log entry was sent
+ /// to the destination.
- size_t execSQL();
+ size_t execSQL(bool flush = false);
/// Executes the log statement.
- size_t logSync();
+ size_t logSync(bool flush = false);
/// Inserts entries into the target database.
bool isTrue(const std::string& value) const;
/// Returns true is value is "true", "t", "yes" or "y".
/// Case insensitive.
- size_t logTofile(AutoPtr& pFileChannel, const std::string& fileName, bool clear = false);
- /// Logs cached entries to a file. Called in case DB insertions fail.
+ size_t logToDB(bool flush = false);
+ /// Logs cached entries to the DB.
+
+ size_t logToFile(bool flush = false);
+ /// Logs cached entries to a file.
+ /// Called in case DB insertions fail or
+ /// in the store-and-forward mode of operation.
+
+ void logLocal(const std::string&, Message::Priority prio = Message::PRIO_ERROR);
+ /// Adds the message to the local SQLChannel log queue, and logs it to the file.
+ /// Typically used to log DB connection/execution erors.
std::string maskPwd();
/// Masks the password in the connection
@@ -241,38 +278,46 @@ private:
/// bullet-proof method; if not succesful,
/// empty string is returned.
+ bool shouldFlush() const;
+ /// Returns true if there are unflushed log entries
+ /// and the flush timer has expired.
+
mutable Poco::FastMutex _mutex;
- std::string _connector;
- std::string _connect;
- SessionPtr _pSession;
- std::string _sql;
- std::string _name;
- std::string _table;
- bool _tableChanged;
- int _timeout;
- std::atomic _minBatch;
- int _maxBatch;
- bool _bulk;
+ std::string _connector;
+ std::string _connect;
+ SessionPtr _pSession;
+ std::string _sql;
+ std::string _name;
+ std::string _table;
+ std::atomic _tableChanged;
+ std::atomic _timeout;
+ std::atomic _minBatch;
+ std::atomic _maxBatch;
+ std::atomic _maxSQL;
+ std::atomic _bulk;
std::atomic _throw;
+ std::atomic _flush;
// members for log entry cache
- std::vector _source;
- std::vector _pid;
- std::vector _thread;
- std::vector _tid;
- std::vector _priority;
- std::vector _text;
- std::vector _dateTime;
+ std::deque _source;
+ std::deque _pid;
+ std::deque _thread;
+ std::deque _tid;
+ std::deque _priority;
+ std::deque _text;
+ std::deque _dateTime;
Poco::NotificationQueue _logQueue;
- std::unique_ptr _pDBThread;
+ std::unique_ptr _pLogThread;
std::atomic _reconnect;
- std::atomic _running;
std::atomic _stop;
std::atomic _logged;
StrategyPtr _pArchiveStrategy;
std::string _file;
+ std::string _directory;
AutoPtr _pFileChannel;
+ Poco::Stopwatch _flushTimer;
+ Poco::Event _event;
Poco::Logger& _logger = Poco::Logger::get("SQLChannel");
};
@@ -293,7 +338,7 @@ inline bool SQLChannel::isTrue(const std::string& value) const
inline bool SQLChannel::isRunning() const
{
- return _running;
+ return _pLogThread && _pLogThread->isRunning();
}
@@ -303,6 +348,13 @@ inline size_t SQLChannel::logged() const
}
+inline bool SQLChannel::shouldFlush() const
+{
+ return (_flush > 0 && _source.size() &&
+ (_flushTimer.elapsedSeconds() >= _flush));
+}
+
+
} } // namespace Poco::Data
diff --git a/Data/src/SQLChannel.cpp b/Data/src/SQLChannel.cpp
index e7b7ffbc6..4507c490d 100644
--- a/Data/src/SQLChannel.cpp
+++ b/Data/src/SQLChannel.cpp
@@ -22,10 +22,13 @@
#include "Poco/Instantiator.h"
#include "Poco/NumberParser.h"
#include "Poco/NumberFormatter.h"
-#include "Poco/Stopwatch.h"
#include "Poco/Format.h"
+#include "Poco/Path.h"
#include "Poco/File.h"
+#include "Poco/UUID.h"
+#include "Poco/UUIDGenerator.h"
#include
+#include
namespace Poco {
@@ -45,9 +48,12 @@ const std::string SQLChannel::PROP_ASYNC("async");
const std::string SQLChannel::PROP_TIMEOUT("timeout");
const std::string SQLChannel::PROP_MIN_BATCH("minBatch");
const std::string SQLChannel::PROP_MAX_BATCH("maxBatch");
+const std::string SQLChannel::PROP_MAX_SQL("maxSQL");
const std::string SQLChannel::PROP_BULK("bulk");
const std::string SQLChannel::PROP_THROW("throw");
+const std::string SQLChannel::PROP_DIRECTORY("directory");
const std::string SQLChannel::PROP_FILE("file");
+const std::string SQLChannel::PROP_FLUSH("flush");
const std::string SQLChannel::SQL_INSERT_STMT = "INSERT INTO %s " \
@@ -56,22 +62,23 @@ const std::string SQLChannel::SQL_INSERT_STMT = "INSERT INTO %s " \
SQLChannel::SQLChannel():
- _name("-"),
- _table("T_POCO_LOG"),
+ _name("SQLChannel"),
_tableChanged(true),
_timeout(1000),
_minBatch(DEFAULT_MIN_BATCH_SIZE),
_maxBatch(DEFAULT_MAX_BATCH_SIZE),
+ _maxSQL(DEFAULT_MAX_SQL_SIZE),
_bulk(true),
_throw(false),
_pid(),
_tid(),
_priority(),
+ _pLogThread(new Thread),
_reconnect(false),
- _running(false),
_stop(false),
_logged(0)
{
+ _pLogThread->start(*this);
}
@@ -81,7 +88,8 @@ SQLChannel::SQLChannel(const std::string& connector,
const std::string& table,
int timeout,
int minBatch,
- int maxBatch) :
+ int maxBatch,
+ int maxSQL) :
_connector(connector),
_connect(connect),
_name(name),
@@ -90,18 +98,18 @@ SQLChannel::SQLChannel(const std::string& connector,
_timeout(timeout),
_minBatch(minBatch),
_maxBatch(maxBatch),
+ _maxSQL(maxSQL),
_bulk(false),
_throw(false),
_pid(),
_tid(),
_priority(),
- _pDBThread(new Thread),
+ _pLogThread(new Thread),
_reconnect(true),
- _running(false),
_stop(false),
_logged(0)
{
- _pDBThread->start(*this);
+ _pLogThread->start(*this);
}
@@ -157,12 +165,13 @@ void SQLChannel::open()
_pSession = new Session(_connector, _connect, _timeout / 1000);
if (_pSession->hasProperty("maxFieldSize")) _pSession->setProperty("maxFieldSize", 8192);
if (_pSession->hasProperty("autoBind")) _pSession->setFeature("autoBind", true);
- _logger.information("Connected to %s: %s", _connector, maskPwd());
+ if (!_stop) _logger.information("Connected to %s: %s", _connector, maskPwd());
+ else logLocal(Poco::format("Connected to %s: %s", _connector, maskPwd()));
return;
}
- catch (DataException& ex)
+ catch (const DataException& ex)
{
- _logger.error(ex.displayText());
+ logLocal(ex.displayText());
}
}
_pSession = nullptr;
@@ -173,23 +182,35 @@ void SQLChannel::open()
void SQLChannel::close()
{
wait(_timeout);
- _pSession = nullptr;
+ if (_pSession)
+ {
+ _pSession->close();
+ _pSession = nullptr;
+ }
+}
+
+
+void SQLChannel::logLocal(const std::string& message, Message::Priority prio)
+{
+ Message msg("SQLChannel"s, message, prio);
+ log(msg);
}
void SQLChannel::log(const Message& msg)
{
_logQueue.enqueueNotification(new LogNotification(msg));
+ _event.set();
}
-size_t SQLChannel::logSync()
+size_t SQLChannel::logSync(bool flush)
{
try
{
- return execSQL();
+ return execSQL(flush);
}
- catch (Exception&)
+ catch (...)
{
if (_throw) throw;
}
@@ -198,10 +219,10 @@ size_t SQLChannel::logSync()
}
-bool SQLChannel::processOne(int minBatch)
+bool SQLChannel::processBatch(int minBatch)
{
- bool ret = false;
- if (_logQueue.size())
+ bool ret = false, flush = (minBatch == 0);
+ while (_logQueue.size())
{
Notification::Ptr pN = _logQueue.dequeueNotification();
LogNotification::Ptr pLN = pN.cast();
@@ -218,11 +239,13 @@ bool SQLChannel::processOne(int minBatch)
_priority.push_back(msg.getPriority());
_text.push_back(msg.getText());
Poco::replaceInPlace(_text.back(), "'", "''");
- _dateTime.push_back(msg.getTime());
+ _dateTime.emplace_back(msg.getTime());
+ if ((_source.size() >= minBatch) || flush)
+ logSync(flush);
+ ret = true;
}
- ret = true;
}
- if (_source.size() >= _minBatch) logSync();
+ if (flush) logSync(flush);
return ret;
}
@@ -230,6 +253,7 @@ bool SQLChannel::processOne(int minBatch)
void SQLChannel::run()
{
+ _flushTimer.start();
long sleepTime = 100; // milliseconds
while (!_stop)
{
@@ -239,53 +263,286 @@ void SQLChannel::run()
{
close();
open();
- _reconnect = _pSession.isNull();
+ _reconnect = !_connector.empty() && _pSession.isNull();
if (_reconnect && sleepTime < 12800)
sleepTime *= 2;
}
- processOne(_minBatch);
- sleepTime = 100;
+
+ if (!_reconnect)
+ {
+ if (_logQueue.size()) processBatch(_minBatch);
+ if (shouldFlush()) processBatch();
+ sleepTime = 100;
+ }
}
- catch (Poco::Exception& ex)
+ catch (const Poco::Exception& ex)
{
- _logger.error(ex.displayText());
+ if (!_stop)
+ _logger.error(ex.displayText());
+ else
+ logLocal(ex.displayText());
}
- catch (std::exception& ex)
+ catch (const std::exception& ex)
{
- _logger.error(ex.what());
+ if (!_stop)
+ _logger.error(ex.what());
+ else
+ logLocal(ex.what());
}
catch (...)
{
- _logger.error("SQLChannel::run(): unknown exception");
+ if (!_stop)
+ _logger.error("SQLChannel::run(): unknown exception"s);
+ else
+ logLocal("SQLChannel::run(): unknown exception"s);
}
- _running = true;
- Thread::sleep(100);
+ if (_stop)
+ {
+ break;
+ }
+ _event.tryWait(sleepTime);
}
- _running = false;
+ while (_logQueue.size() || _source.size())
+ processBatch();
}
void SQLChannel::stop()
{
- if (_pDBThread)
+ if (_pLogThread)
{
_reconnect = false;
_stop = true;
- _pDBThread->join();
- while (_logQueue.size())
- processOne();
+ while (_pLogThread->isRunning())
+ Thread::sleep(10);
+ _pLogThread->join();
+ _pLogThread.reset();
+ _event.set();
}
}
void SQLChannel::reconnect()
{
- if (!_pDBThread)
- {
- _pDBThread.reset(new Thread);
- _pDBThread->start(*this);
- }
_reconnect = true;
+ if (!_pLogThread)
+ {
+ _pLogThread = std::make_unique();
+ _pLogThread->start(*this);
+ }
+}
+
+
+size_t SQLChannel::logToFile(bool flush)
+{
+ if (_source.empty()) return 0u;
+
+ static std::vector names;
+ if (names.size() != _source.size())
+ names.resize(_source.size(), Poco::replace(_name, "'", "''"));
+
+ std::size_t n = 0, batch = 0;
+
+ AutoPtr pFileChannel;
+ std::string file = _file;
+ if (!_pFileChannel && !_file.empty())
+ {
+ if (!Path(File(file).path()).isAbsolute())
+ file = _directory + file;
+
+ _pFileChannel = new FileChannel(file);
+ pFileChannel = _pFileChannel;
+ }
+ else
+ {
+ UUID uuid = UUIDGenerator::defaultGenerator().createRandom();
+ std::string filename(_directory);
+ filename.append(DateTimeFormatter::format(LocalDateTime(), "%Y%m%d%H%M%S%i.").append(uuid.toString()).append(".log.sql"s));
+ pFileChannel = new FileChannel(filename);
+ }
+
+ std::stringstream os;
+ auto doLog = [&]
+ {
+ Message msg(_source[0], os.str(), Message::PRIO_FATAL);
+ pFileChannel->log(msg);
+ n += batch;
+ _logged += batch;
+ _source.erase(_source.begin(), _source.begin()+batch);
+ _pid.erase(_pid.begin(), _pid.begin() + batch);
+ _thread.erase(_thread.begin(), _thread.begin() + batch);
+ _tid.erase(_tid.begin(), _tid.begin() + batch);
+ _priority.erase(_priority.begin(), _priority.begin() + batch);
+ _text.erase(_text.begin(), _text.begin() + batch);
+ _dateTime.erase(_dateTime.begin(), _dateTime.begin() + batch);
+ _flushTimer.restart();
+ };
+
+ if (pFileChannel)
+ {
+ std::string sql;
+ Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
+ std::stringstream tmp;
+ os << sql << '\n';
+ auto it = _source.begin();
+ auto end = _source.end();
+ int idx = 0;
+ for (; it != end; ++idx)
+ {
+ std::string dt = DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i");
+ tmp.str("");
+ tmp << "('" << *it << "','" <<
+ names[idx] << "'," <<
+ _pid[idx] << ",'" <<
+ _thread[idx] << "'," <<
+ _tid[idx] << ',' <<
+ _priority[idx] << ",'" <<
+ _text[idx] << "','" <<
+ dt << "')";
+
+ if (++batch == _maxBatch || (os.str().length() + tmp.str().length()) >= _maxSQL)
+ {
+ os << ";\n";
+ doLog();
+ os.str(""); sql.clear();
+ Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
+ os << sql << '\n' << tmp.str();
+ batch = 0;
+ }
+
+ os << tmp.str();
+
+ if (++it == end)
+ {
+ os << ";\n";
+ break;
+ }
+ os << ",\n";
+ }
+
+ if ((batch >= _minBatch) || flush) doLog();
+ }
+
+ return n;
+}
+
+
+size_t SQLChannel::logToDB(bool flush)
+{
+ if (_source.empty()) return 0u;
+
+ static std::vector names;
+ if (names.size() != _source.size())
+ names.resize(_source.size(), Poco::replace(_name, "'", "''"));
+ static std::string placeholders = "(?,?,?,?,?,?,?,?)";
+
+ Poco::FastMutex::ScopedLock l(_mutex);
+
+ if (_tableChanged)
+ {
+ Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders);
+ _tableChanged = false;
+ }
+
+ std::size_t n = 0;
+
+ try
+ {
+ if (_bulk)
+ {
+ try
+ {
+ (*_pSession) << _sql,
+ use(_source, bulk),
+ use(names, bulk),
+ use(_pid, bulk),
+ use(_thread, bulk),
+ use(_tid, bulk),
+ use(_priority, bulk),
+ use(_text, bulk),
+ use(_dateTime, bulk), now;
+ }
+ // most likely bulk mode not supported, try again
+ catch (const Poco::InvalidAccessException&)
+ {
+ (*_pSession) << _sql,
+ use(_source),
+ use(names),
+ use(_pid),
+ use(_thread),
+ use(_tid),
+ use(_priority),
+ use(_text),
+ use(_dateTime), now;
+ _bulk = false;
+ }
+ }
+ else
+ {
+ (*_pSession) << _sql,
+ use(_source),
+ use(names),
+ use(_pid),
+ use(_thread),
+ use(_tid),
+ use(_priority),
+ use(_text),
+ use(_dateTime), now;
+ }
+ n = _source.size();
+ }
+ catch (const Poco::Exception& ex)
+ {
+ logLocal(ex.displayText());
+ close();
+ _reconnect = true;
+ }
+ catch (const std::exception& ex)
+ {
+ logLocal(ex.what());
+ close();
+ _reconnect = true;
+ }
+
+ if (n)
+ {
+ _logged += n;
+ _source.erase(_source.begin(), _source.begin() + n);
+ _pid.erase(_pid.begin(), _pid.begin() + n);
+ _thread.erase(_thread.begin(), _thread.begin() + n);
+ _tid.erase(_tid.begin(), _tid.begin() + n);
+ _priority.erase(_priority.begin(), _priority.begin() + n);
+ _text.erase(_text.begin(), _text.begin() + n);
+ _dateTime.erase(_dateTime.begin(), _dateTime.begin() + n);
+ _flushTimer.restart();
+ }
+
+ return n;
+}
+
+
+size_t SQLChannel::execSQL(bool flush)
+{
+ if (!_connector.empty() && (!_pSession || !_pSession->isConnected())) open();
+ if (_pArchiveStrategy) _pArchiveStrategy->archive();
+
+ size_t n = _pSession ? logToDB(flush) : logToFile(flush);
+
+ return n;
+}
+
+
+std::size_t SQLChannel::wait(int ms)
+{
+ Stopwatch sw;
+ sw.start();
+ while (_logQueue.size())
+ {
+ Thread::sleep(10);
+ if (ms && sw.elapsed() * 1000 > ms)
+ break;
+ }
+ return _logQueue.size();
}
@@ -319,7 +576,7 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
{
if (value.empty())
{
- _pArchiveStrategy = 0;
+ _pArchiveStrategy = nullptr;
}
else if (_pArchiveStrategy)
{
@@ -336,7 +593,7 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
{
if (value.empty() || "forever" == value)
{
- _pArchiveStrategy = 0;
+ _pArchiveStrategy = nullptr;
}
else if (_pArchiveStrategy)
{
@@ -374,6 +631,11 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
throw Poco::InvalidArgumentException(Poco::format("SQLChannel::setProperty(%s,%s)", name, value));
_maxBatch = maxBatch;
}
+ else if (name == PROP_MAX_SQL)
+ {
+ int maxSQL = NumberParser::parse(value);
+ _maxSQL = maxSQL;
+ }
else if (name == PROP_BULK)
{
_bulk = isTrue(value);
@@ -382,10 +644,24 @@ void SQLChannel::setProperty(const std::string& name, const std::string& value)
{
_throw = isTrue(value);
}
+ else if (name == PROP_DIRECTORY)
+ {
+ std::string dir = value;
+ if (!Path(File(dir).path()).isAbsolute())
+ {
+ Path d(dir);
+ dir = d.makeDirectory().makeAbsolute().toString();
+ }
+ _directory = dir;
+ }
else if (name == PROP_FILE)
{
_file = value;
}
+ else if (name == PROP_FLUSH)
+ {
+ _flush.store(NumberParser::parse(value));
+ }
else
{
Channel::setProperty(name, value);
@@ -434,6 +710,10 @@ std::string SQLChannel::getProperty(const std::string& name) const
{
return std::to_string(_maxBatch);
}
+ else if (name == PROP_MAX_SQL)
+ {
+ return std::to_string(_maxSQL);
+ }
else if (name == PROP_BULK)
{
if (_bulk) return "true";
@@ -444,10 +724,18 @@ std::string SQLChannel::getProperty(const std::string& name) const
if (_throw) return "true";
else return "false";
}
+ else if (name == PROP_DIRECTORY)
+ {
+ return _directory;
+ }
else if (name == PROP_FILE)
{
return _file;
}
+ else if (name == PROP_FLUSH)
+ {
+ return std::to_string(_flush);
+ }
else
{
return Channel::getProperty(name);
@@ -455,189 +743,6 @@ std::string SQLChannel::getProperty(const std::string& name) const
}
-size_t SQLChannel::logTofile(AutoPtr& pFileChannel, const std::string& fileName, bool clear)
-{
- static std::vector names;
- if (names.size() != _source.size())
- names.resize(_source.size(), Poco::replace(_name, "'", "''"));
-
- std::size_t n = 0;
-
- if (!pFileChannel) pFileChannel = new FileChannel(fileName);
- if (pFileChannel)
- {
- std::string sql;
- Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
- std::stringstream os;
- os << sql << '\n';
- auto it = _source.begin();
- auto end = _source.end();
- int idx = 0, batch = 0;
- for (; it != end; ++idx)
- {
- std::string dt = Poco::DateTimeFormatter::format(_dateTime[idx], "%Y-%m-%d %H:%M:%S.%i");
- os << "('" << *it << "','" <<
- names[idx] << "'," <<
- _pid[idx] << ",'" <<
- _thread[idx] << "'," <<
- _tid[idx] << ',' <<
- _priority[idx] << ",'" <<
- _text[idx] << "','" <<
- dt << "')";
- if (++batch == _maxBatch)
- {
- os << ";\n";
- Message msg(_source[0], os.str(), Message::PRIO_ERROR);
- pFileChannel->log(msg);
- os.str(""); sql.clear();
- Poco::format(sql, SQL_INSERT_STMT, _table, std::string());
- batch = 0;
- }
- if (++it == end)
- {
- os << ";\n";
- break;
- }
- os << ",\n";
- }
- Message msg(_source[0], os.str(), Message::PRIO_ERROR);
- pFileChannel->log(msg);
- n = _source.size();
- if (clear && n)
- {
- _source.clear();
- _pid.clear();
- _thread.clear();
- _tid.clear();
- _priority.clear();
- _text.clear();
- _dateTime.clear();
- }
- }
- return n;
-}
-
-
-size_t SQLChannel::execSQL()
-{
- static std::vector names;
- if (names.size() != _source.size())
- names.resize(_source.size(), Poco::replace(_name, "'", "''"));
- static std::string placeholders = "(?,?,?,?,?,?,?,?)";
-
- Poco::FastMutex::ScopedLock l(_mutex);
-
- if (_tableChanged)
- {
- Poco::format(_sql, SQL_INSERT_STMT, _table, placeholders);
- _tableChanged = false;
- }
-
- if (!_pSession || !_pSession->isConnected()) open();
- if (_pArchiveStrategy) _pArchiveStrategy->archive();
-
- size_t n = 0;
- if (_pSession)
- {
- try
- {
- if (_bulk)
- {
- try
- {
- (*_pSession) << _sql,
- use(_source, bulk),
- use(names, bulk),
- use(_pid, bulk),
- use(_thread, bulk),
- use(_tid, bulk),
- use(_priority, bulk),
- use(_text, bulk),
- use(_dateTime, bulk), now;
- }
- // most likely bulk mode not supported,
- // log and try again
- catch (Poco::InvalidAccessException& ex)
- {
- _logger.log(ex);
- (*_pSession) << _sql,
- use(_source),
- use(names),
- use(_pid),
- use(_thread),
- use(_tid),
- use(_priority),
- use(_text),
- use(_dateTime), now;
- _bulk = false;
- }
- }
- else
- {
- (*_pSession) << _sql,
- use(_source),
- use(names),
- use(_pid),
- use(_thread),
- use(_tid),
- use(_priority),
- use(_text),
- use(_dateTime), now;
- }
- n = _source.size();
- }
- catch (Poco::Exception& ex)
- {
- _logger.error(ex.displayText());
- if (!_file.empty())
- n = logTofile(_pFileChannel, _file);
- close();
- _reconnect = true;
- }
- catch (std::exception& ex)
- {
- _logger.error(ex.what());
- if (!_file.empty())
- n = logTofile(_pFileChannel, _file);
- close();
- _reconnect = true;
- }
- }
- else
- {
- if (!_file.empty())
- n = logTofile(_pFileChannel, _file);
- }
- if (n)
- {
- _logged += n;
- _source.clear();
- _pid.clear();
- _thread.clear();
- _tid.clear();
- _priority.clear();
- _text.clear();
- _dateTime.clear();
- }
- return n;
-}
-
-
-std::size_t SQLChannel::wait(int ms)
-{
- Stopwatch sw;
- sw.start();
- int processed = _logQueue.size();
- while (_logQueue.size())
- {
- Thread::sleep(10);
- if (ms && sw.elapsed() * 1000 > ms)
- break;
- }
- return processed - _logQueue.size();
-}
-
-
void SQLChannel::registerChannel()
{
Poco::LoggingFactory::defaultFactory().registerChannelClass("SQLChannel",
diff --git a/Data/testsuite/src/DataTest.cpp b/Data/testsuite/src/DataTest.cpp
index fef722b54..5297e91ad 100644
--- a/Data/testsuite/src/DataTest.cpp
+++ b/Data/testsuite/src/DataTest.cpp
@@ -20,6 +20,7 @@
#include "Poco/Data/Column.h"
#include "Poco/Data/Date.h"
#include "Poco/Data/Time.h"
+#include "Poco/Data/SQLChannel.h"
#include "Poco/Data/SimpleRowFormatter.h"
#include "Poco/Data/JSONRowFormatter.h"
#include "Poco/Data/DataException.h"
@@ -27,12 +28,18 @@
#include "Poco/BinaryReader.h"
#include "Poco/BinaryWriter.h"
#include "Poco/DateTime.h"
+#include "Poco/Stopwatch.h"
#include "Poco/Types.h"
#include "Poco/Dynamic/Var.h"
#include "Poco/Data/DynamicLOB.h"
#include "Poco/Data/DynamicDateTime.h"
#include "Poco/Latin1Encoding.h"
#include "Poco/Exception.h"
+#include "Poco/DirectoryIterator.h"
+#include "Poco/Glob.h"
+#include "Poco/File.h"
+#include "Poco/Path.h"
+#include
#include
#include
#include
@@ -40,46 +47,10 @@
using namespace Poco;
+using Poco::Dynamic::Var;
using namespace Poco::Data;
using namespace Poco::Data::Keywords;
-
-
-using Poco::BinaryReader;
-using Poco::BinaryWriter;
-using Poco::UInt32;
-using Poco::Int64;
-using Poco::UInt64;
-using Poco::DateTime;
-using Poco::Latin1Encoding;
-using Poco::Dynamic::Var;
-using Poco::InvalidAccessException;
-using Poco::IllegalStateException;
-using Poco::RangeException;
-using Poco::NotFoundException;
-using Poco::InvalidArgumentException;
-using Poco::NotImplementedException;
-using Poco::Data::Session;
-using Poco::Data::SessionFactory;
-using Poco::Data::Statement;
-using Poco::Data::NotSupportedException;
-using Poco::Data::CLOB;
-using Poco::Data::CLOBInputStream;
-using Poco::Data::CLOBOutputStream;
-using Poco::Data::MetaColumn;
-using Poco::Data::Column;
-using Poco::Data::Row;
-using Poco::Data::RowFormatter;
-using Poco::Data::SimpleRowFormatter;
-using Poco::Data::JSONRowFormatter;
-using Poco::Data::Date;
-using Poco::Data::Time;
-using Poco::Data::AbstractExtractor;
-using Poco::Data::AbstractExtraction;
-using Poco::Data::AbstractExtractionVec;
-using Poco::Data::AbstractExtractionVecVec;
-using Poco::Data::AbstractBinding;
-using Poco::Data::AbstractBindingVec;
-using Poco::Data::NotConnectedException;
+using namespace std::string_literals;
DataTest::DataTest(const std::string& name): CppUnit::TestCase(name)
@@ -1570,6 +1541,65 @@ void DataTest::testSQLParse()
}
+void DataTest::testSQLChannel()
+{
+ std::string dir = Path::tempHome();
+ AutoPtr pChannel = new SQLChannel();
+ pChannel->setProperty("directory", dir);
+ Stopwatch sw; sw.start();
+ while (!pChannel->isRunning())
+ {
+ Thread::sleep(10);
+ if (sw.elapsedSeconds() > 3)
+ fail("SQLChannel timed out");
+ }
+
+ Glob g("*.log.sql");
+ {
+ DirectoryIterator it(dir);
+ DirectoryIterator end;
+ while (it != end)
+ {
+ if (g.match(it->path()))
+ {
+ File(it->path()).remove();
+ }
+ ++it;
+ }
+ }
+
+ constexpr int mcount{10};
+ constexpr int batch{3};
+ pChannel->setProperty("minBatch", std::to_string(batch));
+ constexpr int flush{1};
+ pChannel->setProperty("flush", std::to_string(flush));
+ assertEqual(flush, NumberParser::parse(pChannel->getProperty("flush")));
+ for (int i = 0; i < mcount; i++)
+ {
+ Message msgInfA("InformationSource", Poco::format("%d Informational sync message", i), Message::PRIO_INFORMATION);
+ pChannel->log(msgInfA);
+ }
+ Thread::sleep(2000*flush); // give it time to flush
+ auto logged = pChannel->logged();
+ assertEqual(mcount, logged);
+ pChannel.reset();
+
+ int count = 0;
+ DirectoryIterator it(dir);
+ DirectoryIterator end;
+ while (it != end)
+ {
+ if (g.match(it->path()))
+ {
+ ++count;
+ File(it->path()).remove();
+ }
+ ++it;
+ }
+ assertEqual(count, (mcount / batch) + (mcount % batch));
+}
+
+
void DataTest::setUp()
{
}
@@ -1603,6 +1633,7 @@ CppUnit::Test* DataTest::suite()
CppUnit_addTest(pSuite, DataTest, testExternalBindingAndExtraction);
CppUnit_addTest(pSuite, DataTest, testTranscode);
CppUnit_addTest(pSuite, DataTest, testSQLParse);
+ CppUnit_addTest(pSuite, DataTest, testSQLChannel);
return pSuite;
}
diff --git a/Data/testsuite/src/DataTest.h b/Data/testsuite/src/DataTest.h
index 8d9825030..5e34dbdc4 100644
--- a/Data/testsuite/src/DataTest.h
+++ b/Data/testsuite/src/DataTest.h
@@ -46,6 +46,7 @@ public:
void testExternalBindingAndExtraction();
void testTranscode();
void testSQLParse();
+ void testSQLChannel();
void setUp();
void tearDown();
diff --git a/Foundation/testsuite/src/ThreadTest.cpp b/Foundation/testsuite/src/ThreadTest.cpp
index 4c94964f3..fe3006ae2 100644
--- a/Foundation/testsuite/src/ThreadTest.cpp
+++ b/Foundation/testsuite/src/ThreadTest.cpp
@@ -122,6 +122,37 @@ private:
};
+class JoinRunnable : public Runnable
+{
+public:
+ JoinRunnable() : _stop(false), _running(false)
+ {
+ }
+
+ void run()
+ {
+ _running = true;
+ while (!_stop)
+ Thread::sleep(100);
+ _running = false;
+ }
+
+ void stop()
+ {
+ _stop = true;
+ }
+
+ bool running() const
+ {
+ return _running;
+ }
+
+private:
+ std::atomic _stop;
+ std::atomic _running;
+};
+
+
class TrySleepRunnable : public Runnable
{
public:
@@ -268,7 +299,7 @@ void ThreadTest::testThreads()
}
-void ThreadTest::testJoin()
+void ThreadTest::testTryJoin()
{
Thread thread;
MyRunnable r;
@@ -283,6 +314,22 @@ void ThreadTest::testJoin()
}
+void ThreadTest::testJoin()
+{
+ Thread thread;
+ JoinRunnable r;
+ assertTrue(!thread.isRunning());
+ thread.start(r);
+ Thread::sleep(200);
+ assertTrue(thread.isRunning());
+ assertTrue(!thread.tryJoin(100));
+ r.stop();
+ thread.join();
+ assertTrue(!thread.isRunning());
+ assertTrue(!r.running());
+}
+
+
void ThreadTest::testNotJoin()
{
Thread thread;
@@ -512,6 +559,7 @@ CppUnit::Test* ThreadTest::suite()
CppUnit_addTest(pSuite, ThreadTest, testNamedThread);
CppUnit_addTest(pSuite, ThreadTest, testCurrent);
CppUnit_addTest(pSuite, ThreadTest, testThreads);
+ CppUnit_addTest(pSuite, ThreadTest, testTryJoin);
CppUnit_addTest(pSuite, ThreadTest, testJoin);
CppUnit_addTest(pSuite, ThreadTest, testNotJoin);
CppUnit_addTest(pSuite, ThreadTest, testNotRun);
diff --git a/Foundation/testsuite/src/ThreadTest.h b/Foundation/testsuite/src/ThreadTest.h
index 1bda503da..11084e5ff 100644
--- a/Foundation/testsuite/src/ThreadTest.h
+++ b/Foundation/testsuite/src/ThreadTest.h
@@ -28,6 +28,7 @@ public:
void testNamedThread();
void testCurrent();
void testThreads();
+ void testTryJoin();
void testJoin();
void testNotJoin();
void testNotRun();