diff --git a/Data/ODBC/testsuite/src/SQLExecutor.cpp b/Data/ODBC/testsuite/src/SQLExecutor.cpp index c594b384f..5af064cee 100644 --- a/Data/ODBC/testsuite/src/SQLExecutor.cpp +++ b/Data/ODBC/testsuite/src/SQLExecutor.cpp @@ -2252,6 +2252,21 @@ void SQLExecutor::asynchronous() assert (!stmt.isAsync()); result.wait(); + Statement stmt1 = (tmp << "SELECT * FROM Strings", into(data), async, now); + assert (stmt1.isAsync()); + assert (stmt1.wait() == rowCount); + + stmt1.execute(); + try { + stmt1.execute(); + fail ("must fail"); + } catch (InvalidAccessException&) + { + stmt1.wait(); + stmt1.execute(); + stmt1.wait(); + } + stmt = tmp << "SELECT * FROM Strings", into(data), async, now; assert (stmt.isAsync()); stmt.wait(); @@ -2281,4 +2296,27 @@ void SQLExecutor::asynchronous() assert (!stmt.isAsync()); result.wait(); assert (result.data() == rowCount); + + assert (0 == rowCount % 10); + int step = (int) (rowCount/10); + data.clear(); + Statement stmt2 = (tmp << "SELECT * FROM Strings", into(data), async, limit(step)); + assert (data.size() == 0); + assert (!stmt2.done()); + Statement::ResultType rows = 0; + + for (int i = 0; !stmt2.done(); i += step) + { + stmt2.execute(); + rows = stmt2.wait(); + assert (step == rows); + assert (step + i == data.size()); + } + assert (stmt2.done()); + assert (rowCount == data.size()); + + stmt2 = tmp << "SELECT * FROM Strings", reset; + assert (!stmt2.isAsync()); + assert ("deque" == stmt2.getStorage()); + assert (stmt2.execute() == rowCount); } diff --git a/Data/SQLite/testsuite/src/SQLiteTest.cpp b/Data/SQLite/testsuite/src/SQLiteTest.cpp index 99dd085b1..6a5403e04 100644 --- a/Data/SQLite/testsuite/src/SQLiteTest.cpp +++ b/Data/SQLite/testsuite/src/SQLiteTest.cpp @@ -1700,6 +1700,21 @@ void SQLiteTest::testAsync() assert (!stmt.isAsync()); result.wait(); + Statement stmt1 = (tmp << "SELECT * FROM Strings", into(data), async, now); + assert (stmt1.isAsync()); + assert (stmt1.wait() == rowCount); + + stmt1.execute(); + try { + stmt1.execute(); + fail ("must fail"); + } catch (InvalidAccessException&) + { + stmt1.wait(); + stmt1.execute(); + stmt1.wait(); + } + stmt = tmp << "SELECT * FROM Strings", into(data), async, now; assert (stmt.isAsync()); stmt.wait(); @@ -1729,6 +1744,29 @@ void SQLiteTest::testAsync() assert (!stmt.isAsync()); result.wait(); assert (result.data() == rowCount); + + assert (0 == rowCount % 10); + int step = (int) (rowCount/10); + data.clear(); + Statement stmt2 = (tmp << "SELECT * FROM Strings", into(data), async, limit(step)); + assert (data.size() == 0); + assert (!stmt2.done()); + Statement::ResultType rows = 0; + + for (int i = 0; !stmt2.done(); i += step) + { + stmt2.execute(); + rows = stmt2.wait(); + assert (step == rows); + assert (step + i == data.size()); + } + assert (stmt2.done()); + assert (rowCount == data.size()); + + stmt2 = tmp << "SELECT * FROM Strings", reset; + assert (!stmt2.isAsync()); + assert ("deque" == stmt2.getStorage()); + assert (stmt2.execute() == rowCount); } diff --git a/Data/include/Poco/Data/Statement.h b/Data/include/Poco/Data/Statement.h index 231e9e781..647bb033f 100644 --- a/Data/include/Poco/Data/Statement.h +++ b/Data/include/Poco/Data/Statement.h @@ -44,6 +44,7 @@ #include "Poco/Data/StatementImpl.h" #include "Poco/Data/Range.h" #include "Poco/SharedPtr.h" +#include "Poco/Mutex.h" #include "Poco/ActiveMethod.h" #include "Poco/ActiveResult.h" @@ -205,7 +206,7 @@ public: /// When executed on otherwise synchronous statement, this method does not alter the /// statement's synchronous nature. - void setAsync(bool async); + void setAsync(bool async = true); /// Sets the asynchronous flag. If this flag is true, executeAsync() is called /// from the now() manipulator. This setting does not affect the statement's /// capability to be executed synchronously by directly calling execute(). @@ -222,6 +223,10 @@ public: bool initialized(); /// Returns true if the statement was initialized (i.e. not executed yet). + bool paused(); + /// Returns true if the statement was paused (a range limit stopped it + /// and there is more work to do). + bool done(); /// Returns true if the statement was completely executed or false if a range limit stopped it /// and there is more work to do. When no limit is set, it will always return true after calling execute(). @@ -263,12 +268,15 @@ private: static const int WAIT_FOREVER = -1; + const Result& doAsyncExec(); + /// Asynchronously executes the statement. + StatementImplPtr _ptr; // asynchronous execution related members bool _isAsync; mutable ResultPtr _pResult; - FastMutex _mutex; + Mutex _mutex; AsyncExecMethod _asyncExec; }; @@ -404,6 +412,12 @@ inline bool Statement::initialized() } +inline bool Statement::paused() +{ + return _ptr->getState() == StatementImpl::ST_PAUSED; +} + + inline bool Statement::done() { return _ptr->getState() == StatementImpl::ST_DONE; diff --git a/Data/include/Poco/Data/StatementImpl.h b/Data/include/Poco/Data/StatementImpl.h index 1c0f88fc1..ca3720419 100644 --- a/Data/include/Poco/Data/StatementImpl.h +++ b/Data/include/Poco/Data/StatementImpl.h @@ -73,6 +73,7 @@ public: ST_INITIALIZED, ST_COMPILED, ST_BOUND, + ST_PAUSED, ST_DONE, ST_RESET }; diff --git a/Data/src/Statement.cpp b/Data/src/Statement.cpp index b72f1f7b7..03a4fed04 100644 --- a/Data/src/Statement.cpp +++ b/Data/src/Statement.cpp @@ -70,9 +70,6 @@ Statement::Statement(const Statement& stmt): _pResult(stmt._pResult), _asyncExec(_ptr, &StatementImpl::execute) { - // if executing asynchronously, wait - if (stmt._pResult) - stmt._pResult->wait(); } @@ -100,35 +97,44 @@ void Statement::swap(Statement& other) Statement::ResultType Statement::execute() { - if (!isAsync()) + Mutex::ScopedLock lock(_mutex); + bool isDone = done(); + if (initialized() || paused() || isDone) { - if (done()) _ptr->reset(); - return _ptr->execute(); - } - else - { - executeAsync(); - return 0; - } + if (!isAsync()) + { + if (isDone) _ptr->reset(); + return _ptr->execute(); + } + else + { + doAsyncExec(); + return 0; + } + } else + throw InvalidAccessException("Statement still executing."); } const Statement::Result& Statement::executeAsync() { - FastMutex::ScopedLock lock(_mutex); - bool isDone = done(); - if (initialized() || isDone) - { - if (isDone) _ptr->reset(); - _pResult = new Result(_asyncExec()); - poco_check_ptr (_pResult); - return *_pResult; - } + Mutex::ScopedLock lock(_mutex); + if (initialized() || paused() || done()) + return doAsyncExec(); else throw InvalidAccessException("Statement still executing."); } +const Statement::Result& Statement::doAsyncExec() +{ + if (done()) _ptr->reset(); + _pResult = new Result(_asyncExec()); + poco_check_ptr (_pResult); + return *_pResult; +} + + Statement::ResultType Statement::wait(long milliseconds) { if (!_pResult) return 0; diff --git a/Data/src/StatementImpl.cpp b/Data/src/StatementImpl.cpp index 11d1e66d8..26e051045 100644 --- a/Data/src/StatementImpl.cpp +++ b/Data/src/StatementImpl.cpp @@ -115,6 +115,8 @@ Poco::UInt32 StatementImpl::executeWithLimit() _state = ST_DONE; else if (hasNext() && _extrLimit.value() == count && _extrLimit.isHardLimit()) throw LimitException("HardLimit reached. We got more data than we asked for"); + else + _state = ST_PAUSED; return count; } diff --git a/Data/testsuite/src/DataTest.cpp b/Data/testsuite/src/DataTest.cpp index 3bab609c8..ec0daaaf7 100644 --- a/Data/testsuite/src/DataTest.cpp +++ b/Data/testsuite/src/DataTest.cpp @@ -384,7 +384,7 @@ void DataTest::testColumnVector() try { - int i = c[100]; + int i; i = c[100]; // to silence gcc fail ("must fail"); } catch (RangeException&) { } @@ -446,7 +446,7 @@ void DataTest::testColumnVectorBool() try { - bool b = c[100]; + bool b; b = c[100]; // to silence gcc fail ("must fail"); } catch (RangeException&) { } @@ -522,7 +522,7 @@ void DataTest::testColumnDeque() try { - int i = c[100]; + int i; i = c[100]; // to silence gcc fail ("must fail"); } catch (RangeException&) { } @@ -598,7 +598,7 @@ void DataTest::testColumnList() try { - int i = c[100]; + int i; i = c[100]; // to silence gcc fail ("must fail"); } catch (RangeException&) { } @@ -658,13 +658,13 @@ void DataTest::testRow() try { - int i = row[5]; + int i; i = row[5]; // to silence gcc fail ("must fail"); }catch (RangeException&) {} try { - int i = row["a bad name"]; + int i; i = row["a bad name"]; // to silence gcc fail ("must fail"); }catch (NotFoundException&) {}