From 3ada2d90843b7361cd1859c30e0f5bc992325c3d Mon Sep 17 00:00:00 2001 From: Philippe RG Date: Mon, 13 Nov 2017 17:02:49 +0100 Subject: [PATCH] Add mechanism to start a task from within a task (#1287) * Add mechanism to start a task from within a task Staying in the same thread. * Provide seeds for a task queue creation TaskManager::taskFinished removes the finished task from the task list before dispatching the taskFinished notification * fixup! Add mechanism to start a task from within a task * fixup! Add mechanism to start a task from within a task * Add Task::yield on the same model as Task::sleep --- Foundation/include/Poco/Task.h | 10 ++ Foundation/include/Poco/TaskManager.h | 5 + Foundation/src/Task.cpp | 7 ++ Foundation/src/TaskManager.cpp | 34 +++++- Foundation/testsuite/src/TaskManagerTest.cpp | 113 +++++++++++++++++++ Foundation/testsuite/src/TaskManagerTest.h | 2 + 6 files changed, 169 insertions(+), 2 deletions(-) diff --git a/Foundation/include/Poco/Task.h b/Foundation/include/Poco/Task.h index 47c1871ca..ed8652056 100644 --- a/Foundation/include/Poco/Task.h +++ b/Foundation/include/Poco/Task.h @@ -106,6 +106,16 @@ protected: /// /// A Task should use this method in favor of Thread::sleep(). + bool yield(); + /// Yields cpu to other threads + /// + /// If the task is cancelled while it is suspended, + /// yield() will return true. If the tasks resumes + /// without being cancelled, the + /// return value is false. + /// + /// A Task should use this method in favor of Thread::yield(). + void setProgress(float progress); /// Sets the task's progress. /// The value should be between 0.0 (just started) diff --git a/Foundation/include/Poco/TaskManager.h b/Foundation/include/Poco/TaskManager.h index cd4cdd4eb..ae252a6c4 100644 --- a/Foundation/include/Poco/TaskManager.h +++ b/Foundation/include/Poco/TaskManager.h @@ -68,6 +68,11 @@ public: /// The TaskManager takes ownership of the Task object /// and deletes it when it it finished. + void startSync(Task* pTask); + /// Starts the given task in the current thread. + /// The TaskManager takes ownership of the Task object + /// and deletes it when it it finished. + void cancelAll(); /// Requests cancellation of all tasks. diff --git a/Foundation/src/Task.cpp b/Foundation/src/Task.cpp index bd57b8b0b..0f969d750 100644 --- a/Foundation/src/Task.cpp +++ b/Foundation/src/Task.cpp @@ -89,6 +89,13 @@ bool Task::sleep(long milliseconds) } +bool Task::yield() +{ + Thread::yield(); + return isCancelled(); +} + + void Task::setProgress(float taskProgress) { FastMutex::ScopedLock lock(_mutex); diff --git a/Foundation/src/TaskManager.cpp b/Foundation/src/TaskManager.cpp index a88dc2508..0593b601f 100644 --- a/Foundation/src/TaskManager.cpp +++ b/Foundation/src/TaskManager.cpp @@ -62,6 +62,32 @@ void TaskManager::start(Task* pTask, int cpu) } +void TaskManager::startSync(Task* pTask) +{ + TaskPtr pAutoTask(pTask); // take ownership immediately + ScopedLockWithUnlock lock(_mutex); + + pAutoTask->setOwner(this); + pAutoTask->setState(Task::TASK_STARTING); + _taskList.push_back(pAutoTask); + lock.unlock(); + try + { + pAutoTask->run(); + } + catch (...) + { + FastMutex::ScopedLock miniLock(_mutex); + + // Make sure that we don't act like we own the task since + // we never started it. If we leave the task on our task + // list, the size of the list is incorrect. + _taskList.pop_back(); + throw; + } +} + + void TaskManager::cancelAll() { FastMutex::ScopedLock lock(_mutex); @@ -132,17 +158,21 @@ void TaskManager::taskCancelled(Task* pTask) void TaskManager::taskFinished(Task* pTask) { - _nc.postNotification(new TaskFinishedNotification(pTask)); + TaskPtr currentTask; + ScopedLockWithUnlock lock(_mutex); - FastMutex::ScopedLock lock(_mutex); for (TaskList::iterator it = _taskList.begin(); it != _taskList.end(); ++it) { if (*it == pTask) { + currentTask = *it; _taskList.erase(it); break; } } + lock.unlock(); + + _nc.postNotification(new TaskFinishedNotification(pTask)); } diff --git a/Foundation/testsuite/src/TaskManagerTest.cpp b/Foundation/testsuite/src/TaskManagerTest.cpp index 4afe582c2..bfc4e997a 100644 --- a/Foundation/testsuite/src/TaskManagerTest.cpp +++ b/Foundation/testsuite/src/TaskManagerTest.cpp @@ -96,6 +96,21 @@ namespace } }; + class IncludingTask: public Task + { + public: + IncludingTask(): Task("IncludingTask") + { + } + + void runTask() + { + setProgress(0.5); + getOwner()->startSync(new SimpleTask); + setProgress(1.0); + } + }; + class TaskObserver { public: @@ -229,6 +244,47 @@ namespace private: C _custom; }; + + class SimpleTaskQueue + { + public: + SimpleTaskQueue(TaskManager& tm): _tm(tm) + { + _tm.addObserver(Observer(*this, &SimpleTaskQueue::taskFinished)); + } + + void enqueue(Task* pTask) + { + _tasks.push_back(pTask); + } + + void startQueue() + { + if (_tm.count() == 0 && _tasks.size()) + { + Task* pTask = _tasks.back(); + // not thread-safe + _tasks.pop_back(); + _tm.start(pTask); + } + } + + void taskFinished(TaskFinishedNotification* pNf) + { + if (_tasks.size()) + { + Task* pTask = _tasks.back(); + // not thread-safe + _tasks.pop_back(); + _tm.startSync(pTask); + } + pNf->release(); + } + + private: + std::vector _tasks; + TaskManager& _tm; + }; } @@ -443,6 +499,63 @@ void TaskManagerTest::testMultiTasks() } +void TaskManagerTest::testTaskInclusion() +{ + TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); + IncludingTask* pTask = new IncludingTask; + + pTask->duplicate(); + + tm.start(pTask); + // wait for the included task to be started + while (pTask->progress() < 0.5) + { + Thread::sleep(100); + } + Thread::sleep(100); + assert (tm.count() == 2); + + tm.cancelAll(); + while (tm.count() > 0) Thread::sleep(100); + assert (tm.count() == 0); +} + + +void TaskManagerTest::testTaskQueue() +{ + TaskManager tm(ThreadPool::TAP_UNIFORM_DISTRIBUTION); + SimpleTaskQueue tq(tm); + + Task* pT1 = new SimpleTask; + Task* pT2 = new SimpleTask; + Task* pT3 = new SimpleTask; + tq.enqueue(pT1); + tq.enqueue(pT2); + tq.startQueue(); + + assert (tm.count() == 1); + Thread::sleep(500); + assert (pT1->state() == Task::TASK_RUNNING); + assert (pT2->state() == Task::TASK_IDLE); + + tq.enqueue(pT3); + pT1->cancel(); + Thread::sleep(500); + assert (tm.count() == 1); + assert (pT2->state() == Task::TASK_RUNNING); + assert (pT3->state() == Task::TASK_IDLE); + + pT2->cancel(); + Thread::sleep(500); + assert (tm.count() == 1); + assert (pT3->state() == Task::TASK_RUNNING); + + tm.cancelAll(); + while (tm.count() > 0) Thread::sleep(100); + assert (tm.count() == 0); +} + + void TaskManagerTest::testCustomThreadPool() { ThreadPool tp(2, 5, 120, POCO_THREAD_STACK_SIZE, ThreadPool::TAP_UNIFORM_DISTRIBUTION); diff --git a/Foundation/testsuite/src/TaskManagerTest.h b/Foundation/testsuite/src/TaskManagerTest.h index c8a86d673..7d6481458 100644 --- a/Foundation/testsuite/src/TaskManagerTest.h +++ b/Foundation/testsuite/src/TaskManagerTest.h @@ -35,6 +35,8 @@ public: void testError(); void testCustom(); void testMultiTasks(); + void testTaskInclusion(); + void testTaskQueue(); void testCustomThreadPool(); void setUp();