some async-robustness additions and tests for Statement

This commit is contained in:
Aleksandar Fabijanic
2007-09-30 16:42:17 +00:00
parent 6e380b6b13
commit d9364e8f51
7 changed files with 128 additions and 29 deletions

View File

@@ -2252,6 +2252,21 @@ void SQLExecutor::asynchronous()
assert (!stmt.isAsync());
result.wait();
Statement stmt1 = (tmp << "SELECT * FROM Strings", into(data), async, now);
assert (stmt1.isAsync());
assert (stmt1.wait() == rowCount);
stmt1.execute();
try {
stmt1.execute();
fail ("must fail");
} catch (InvalidAccessException&)
{
stmt1.wait();
stmt1.execute();
stmt1.wait();
}
stmt = tmp << "SELECT * FROM Strings", into(data), async, now;
assert (stmt.isAsync());
stmt.wait();
@@ -2281,4 +2296,27 @@ void SQLExecutor::asynchronous()
assert (!stmt.isAsync());
result.wait();
assert (result.data() == rowCount);
assert (0 == rowCount % 10);
int step = (int) (rowCount/10);
data.clear();
Statement stmt2 = (tmp << "SELECT * FROM Strings", into(data), async, limit(step));
assert (data.size() == 0);
assert (!stmt2.done());
Statement::ResultType rows = 0;
for (int i = 0; !stmt2.done(); i += step)
{
stmt2.execute();
rows = stmt2.wait();
assert (step == rows);
assert (step + i == data.size());
}
assert (stmt2.done());
assert (rowCount == data.size());
stmt2 = tmp << "SELECT * FROM Strings", reset;
assert (!stmt2.isAsync());
assert ("deque" == stmt2.getStorage());
assert (stmt2.execute() == rowCount);
}

View File

@@ -1700,6 +1700,21 @@ void SQLiteTest::testAsync()
assert (!stmt.isAsync());
result.wait();
Statement stmt1 = (tmp << "SELECT * FROM Strings", into(data), async, now);
assert (stmt1.isAsync());
assert (stmt1.wait() == rowCount);
stmt1.execute();
try {
stmt1.execute();
fail ("must fail");
} catch (InvalidAccessException&)
{
stmt1.wait();
stmt1.execute();
stmt1.wait();
}
stmt = tmp << "SELECT * FROM Strings", into(data), async, now;
assert (stmt.isAsync());
stmt.wait();
@@ -1729,6 +1744,29 @@ void SQLiteTest::testAsync()
assert (!stmt.isAsync());
result.wait();
assert (result.data() == rowCount);
assert (0 == rowCount % 10);
int step = (int) (rowCount/10);
data.clear();
Statement stmt2 = (tmp << "SELECT * FROM Strings", into(data), async, limit(step));
assert (data.size() == 0);
assert (!stmt2.done());
Statement::ResultType rows = 0;
for (int i = 0; !stmt2.done(); i += step)
{
stmt2.execute();
rows = stmt2.wait();
assert (step == rows);
assert (step + i == data.size());
}
assert (stmt2.done());
assert (rowCount == data.size());
stmt2 = tmp << "SELECT * FROM Strings", reset;
assert (!stmt2.isAsync());
assert ("deque" == stmt2.getStorage());
assert (stmt2.execute() == rowCount);
}

View File

@@ -44,6 +44,7 @@
#include "Poco/Data/StatementImpl.h"
#include "Poco/Data/Range.h"
#include "Poco/SharedPtr.h"
#include "Poco/Mutex.h"
#include "Poco/ActiveMethod.h"
#include "Poco/ActiveResult.h"
@@ -205,7 +206,7 @@ public:
/// When executed on otherwise synchronous statement, this method does not alter the
/// statement's synchronous nature.
void setAsync(bool async);
void setAsync(bool async = true);
/// Sets the asynchronous flag. If this flag is true, executeAsync() is called
/// from the now() manipulator. This setting does not affect the statement's
/// capability to be executed synchronously by directly calling execute().
@@ -222,6 +223,10 @@ public:
bool initialized();
/// Returns true if the statement was initialized (i.e. not executed yet).
bool paused();
/// Returns true if the statement was paused (a range limit stopped it
/// and there is more work to do).
bool done();
/// Returns true if the statement was completely executed or false if a range limit stopped it
/// and there is more work to do. When no limit is set, it will always return true after calling execute().
@@ -263,12 +268,15 @@ private:
static const int WAIT_FOREVER = -1;
const Result& doAsyncExec();
/// Asynchronously executes the statement.
StatementImplPtr _ptr;
// asynchronous execution related members
bool _isAsync;
mutable ResultPtr _pResult;
FastMutex _mutex;
Mutex _mutex;
AsyncExecMethod _asyncExec;
};
@@ -404,6 +412,12 @@ inline bool Statement::initialized()
}
inline bool Statement::paused()
{
return _ptr->getState() == StatementImpl::ST_PAUSED;
}
inline bool Statement::done()
{
return _ptr->getState() == StatementImpl::ST_DONE;

View File

@@ -73,6 +73,7 @@ public:
ST_INITIALIZED,
ST_COMPILED,
ST_BOUND,
ST_PAUSED,
ST_DONE,
ST_RESET
};

View File

@@ -70,9 +70,6 @@ Statement::Statement(const Statement& stmt):
_pResult(stmt._pResult),
_asyncExec(_ptr, &StatementImpl::execute)
{
// if executing asynchronously, wait
if (stmt._pResult)
stmt._pResult->wait();
}
@@ -100,32 +97,41 @@ void Statement::swap(Statement& other)
Statement::ResultType Statement::execute()
{
Mutex::ScopedLock lock(_mutex);
bool isDone = done();
if (initialized() || paused() || isDone)
{
if (!isAsync())
{
if (done()) _ptr->reset();
if (isDone) _ptr->reset();
return _ptr->execute();
}
else
{
executeAsync();
doAsyncExec();
return 0;
}
} else
throw InvalidAccessException("Statement still executing.");
}
const Statement::Result& Statement::executeAsync()
{
FastMutex::ScopedLock lock(_mutex);
bool isDone = done();
if (initialized() || isDone)
{
if (isDone) _ptr->reset();
Mutex::ScopedLock lock(_mutex);
if (initialized() || paused() || done())
return doAsyncExec();
else
throw InvalidAccessException("Statement still executing.");
}
const Statement::Result& Statement::doAsyncExec()
{
if (done()) _ptr->reset();
_pResult = new Result(_asyncExec());
poco_check_ptr (_pResult);
return *_pResult;
}
else
throw InvalidAccessException("Statement still executing.");
}

View File

@@ -115,6 +115,8 @@ Poco::UInt32 StatementImpl::executeWithLimit()
_state = ST_DONE;
else if (hasNext() && _extrLimit.value() == count && _extrLimit.isHardLimit())
throw LimitException("HardLimit reached. We got more data than we asked for");
else
_state = ST_PAUSED;
return count;
}

View File

@@ -384,7 +384,7 @@ void DataTest::testColumnVector()
try
{
int i = c[100];
int i; i = c[100]; // to silence gcc
fail ("must fail");
}
catch (RangeException&) { }
@@ -446,7 +446,7 @@ void DataTest::testColumnVectorBool()
try
{
bool b = c[100];
bool b; b = c[100]; // to silence gcc
fail ("must fail");
}
catch (RangeException&) { }
@@ -522,7 +522,7 @@ void DataTest::testColumnDeque()
try
{
int i = c[100];
int i; i = c[100]; // to silence gcc
fail ("must fail");
}
catch (RangeException&) { }
@@ -598,7 +598,7 @@ void DataTest::testColumnList()
try
{
int i = c[100];
int i; i = c[100]; // to silence gcc
fail ("must fail");
}
catch (RangeException&) { }
@@ -658,13 +658,13 @@ void DataTest::testRow()
try
{
int i = row[5];
int i; i = row[5]; // to silence gcc
fail ("must fail");
}catch (RangeException&) {}
try
{
int i = row["a bad name"];
int i; i = row["a bad name"]; // to silence gcc
fail ("must fail");
}catch (NotFoundException&) {}