Add mechanism to start a task from within a task (#1287)

* Add mechanism to start a task from within a task

Staying in the same thread.

* Provide seeds for a task queue creation

TaskManager::taskFinished removes the finished task from the task list
before dispatching the taskFinished notification

* fixup! Add mechanism to start a task from within a task

* fixup! Add mechanism to start a task from within a task

* Add Task::yield

on the same model as Task::sleep
This commit is contained in:
Philippe RG 2017-11-13 17:02:49 +01:00 committed by Aleksandar Fabijanic
parent e28504e7a9
commit 3ada2d9084
6 changed files with 169 additions and 2 deletions

View File

@ -106,6 +106,16 @@ protected:
///
/// A Task should use this method in favor of Thread::sleep().
bool yield();
/// Yields cpu to other threads
///
/// If the task is cancelled while it is suspended,
/// yield() will return true. If the tasks resumes
/// without being cancelled, the
/// return value is false.
///
/// A Task should use this method in favor of Thread::yield().
void setProgress(float progress);
/// Sets the task's progress.
/// The value should be between 0.0 (just started)

View File

@ -68,6 +68,11 @@ public:
/// The TaskManager takes ownership of the Task object
/// and deletes it when it it finished.
void startSync(Task* pTask);
/// Starts the given task in the current thread.
/// The TaskManager takes ownership of the Task object
/// and deletes it when it it finished.
void cancelAll();
/// Requests cancellation of all tasks.

View File

@ -89,6 +89,13 @@ bool Task::sleep(long milliseconds)
}
bool Task::yield()
{
Thread::yield();
return isCancelled();
}
void Task::setProgress(float taskProgress)
{
FastMutex::ScopedLock lock(_mutex);

View File

@ -62,6 +62,32 @@ void TaskManager::start(Task* pTask, int cpu)
}
void TaskManager::startSync(Task* pTask)
{
TaskPtr pAutoTask(pTask); // take ownership immediately
ScopedLockWithUnlock<FastMutex> lock(_mutex);
pAutoTask->setOwner(this);
pAutoTask->setState(Task::TASK_STARTING);
_taskList.push_back(pAutoTask);
lock.unlock();
try
{
pAutoTask->run();
}
catch (...)
{
FastMutex::ScopedLock miniLock(_mutex);
// Make sure that we don't act like we own the task since
// we never started it. If we leave the task on our task
// list, the size of the list is incorrect.
_taskList.pop_back();
throw;
}
}
void TaskManager::cancelAll()
{
FastMutex::ScopedLock lock(_mutex);
@ -132,17 +158,21 @@ void TaskManager::taskCancelled(Task* pTask)
void TaskManager::taskFinished(Task* pTask)
{
_nc.postNotification(new TaskFinishedNotification(pTask));
TaskPtr currentTask;
ScopedLockWithUnlock<FastMutex> lock(_mutex);
FastMutex::ScopedLock lock(_mutex);
for (TaskList::iterator it = _taskList.begin(); it != _taskList.end(); ++it)
{
if (*it == pTask)
{
currentTask = *it;
_taskList.erase(it);
break;
}
}
lock.unlock();
_nc.postNotification(new TaskFinishedNotification(pTask));
}

View File

@ -96,6 +96,21 @@ namespace
}
};
class IncludingTask: public Task
{
public:
IncludingTask(): Task("IncludingTask")
{
}
void runTask()
{
setProgress(0.5);
getOwner()->startSync(new SimpleTask);
setProgress(1.0);
}
};
class TaskObserver
{
public:
@ -229,6 +244,47 @@ namespace
private:
C _custom;
};
class SimpleTaskQueue
{
public:
SimpleTaskQueue(TaskManager& tm): _tm(tm)
{
_tm.addObserver(Observer<SimpleTaskQueue, TaskFinishedNotification>(*this, &SimpleTaskQueue::taskFinished));
}
void enqueue(Task* pTask)
{
_tasks.push_back(pTask);
}
void startQueue()
{
if (_tm.count() == 0 && _tasks.size())
{
Task* pTask = _tasks.back();
// not thread-safe
_tasks.pop_back();
_tm.start(pTask);
}
}
void taskFinished(TaskFinishedNotification* pNf)
{
if (_tasks.size())
{
Task* pTask = _tasks.back();
// not thread-safe
_tasks.pop_back();
_tm.startSync(pTask);
}
pNf->release();
}
private:
std::vector<Task*> _tasks;
TaskManager& _tm;
};
}
@ -443,6 +499,63 @@ void TaskManagerTest::testMultiTasks()
}
void TaskManagerTest::testTaskInclusion()
{
TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
IncludingTask* pTask = new IncludingTask;
pTask->duplicate();
tm.start(pTask);
// wait for the included task to be started
while (pTask->progress() < 0.5)
{
Thread::sleep(100);
}
Thread::sleep(100);
assert (tm.count() == 2);
tm.cancelAll();
while (tm.count() > 0) Thread::sleep(100);
assert (tm.count() == 0);
}
void TaskManagerTest::testTaskQueue()
{
TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION);
SimpleTaskQueue tq(tm);
Task* pT1 = new SimpleTask;
Task* pT2 = new SimpleTask;
Task* pT3 = new SimpleTask;
tq.enqueue(pT1);
tq.enqueue(pT2);
tq.startQueue();
assert (tm.count() == 1);
Thread::sleep(500);
assert (pT1->state() == Task::TASK_RUNNING);
assert (pT2->state() == Task::TASK_IDLE);
tq.enqueue(pT3);
pT1->cancel();
Thread::sleep(500);
assert (tm.count() == 1);
assert (pT2->state() == Task::TASK_RUNNING);
assert (pT3->state() == Task::TASK_IDLE);
pT2->cancel();
Thread::sleep(500);
assert (tm.count() == 1);
assert (pT3->state() == Task::TASK_RUNNING);
tm.cancelAll();
while (tm.count() > 0) Thread::sleep(100);
assert (tm.count() == 0);
}
void TaskManagerTest::testCustomThreadPool()
{
ThreadPool tp(2, 5, 120, POCO_THREAD_STACK_SIZE, ThreadPool::TAP_UNIFORM_DISTRIBUTION);

View File

@ -35,6 +35,8 @@ public:
void testError();
void testCustom();
void testMultiTasks();
void testTaskInclusion();
void testTaskQueue();
void testCustomThreadPool();
void setUp();