diff --git a/ethread/Future.hpp b/ethread/Future.hpp index f85f809..b270198 100644 --- a/ethread/Future.hpp +++ b/ethread/Future.hpp @@ -11,13 +11,33 @@ #include namespace ethread { + /** + * @brief Simple future to process + */ class Future { private: - ememory::SharedPtr m_promise; + ememory::SharedPtr m_promise; //!< the assiciated promise that we are waiting and of action public: + /** + * @brief Simple Future contructor + * @param[in] _promise Associated promise to wait + */ Future(ememory::SharedPtr _promise=nullptr); + /** + * @brief Check if the action is finished + * @return true, the action is done, false otherwise + */ bool isFinished(); + /** + * @brief Wait some time that the action finished + * @param[in] _delay Delay to wait the action is finished + * @return true, the action is finished, false, the time-out apear. + */ bool wait(echrono::Duration _delay=echrono::seconds(2)); + /** + * @brief Action to do when the action is finished + * @param[in] _action New action to do. + */ void andThen(std::function _action); }; } diff --git a/ethread/Pool.cpp b/ethread/Pool.cpp index bd5cb96..2ff4a53 100644 --- a/ethread/Pool.cpp +++ b/ethread/Pool.cpp @@ -39,6 +39,15 @@ ethread::Future ethread::Pool::async(std::function _call, uint64_t _exec ememory::SharedPtr promise = ememory::makeShared(); ememory::SharedPtr action = ememory::makeShared(_executionInGroupId, promise, _call); m_listActions.push_back(action); + for(auto &it : m_listThread) { + if (it == nullptr) { + continue; + } + if (it->touch() == true) { + // Find one to force action now ... + break; + } + } return ethread::Future(promise); } @@ -106,15 +115,16 @@ void ethread::Pool::stop() { void ethread::Pool::join() { std::unique_lock lock(m_mutex); - ETHREAD_INFO("start join all the threads in pool " << m_listThread.size()); + ETHREAD_DEBUG("start join all the threads in pool " << m_listThread.size()); for (size_t iii=0; iiijoin(); } - ETHREAD_INFO(" ==> all joined"); + ETHREAD_DEBUG(" ==> all joined"); m_listThread.clear(); - ETHREAD_INFO(" ==> all reset"); + ETHREAD_DEBUG(" ==> all reset"); } + diff --git a/ethread/Pool.hpp b/ethread/Pool.hpp index 37febdd..bcc109b 100644 --- a/ethread/Pool.hpp +++ b/ethread/Pool.hpp @@ -13,6 +13,9 @@ namespace ethread { class PoolExecutor; + /** + * @brief A pool is an association of a list of active thread that excecute some actions + */ class Pool { private: std::mutex m_mutex; //!< global add and release some thread @@ -21,14 +24,46 @@ namespace ethread { std::vector m_listIdPool; //!< Thread pool uint32_t m_lastTrandId; //!< to group the action in a single thread public: + /** + * @brief Contructor of the threadPool + * @param[in] _numberOfThread Basic number of thread + */ Pool(uint16_t _numberOfThread); - ~Pool(); + /** + * @brief DEstructor (virtualized) + */ + virtual ~Pool(); + /** + * @brief Create a new group ID to manage a single ID + * @return A simple incremented group ID (number 0 to 10 are reserved) + */ uint32_t createGroupId(); + /** + * @brief Ad a async action to process later + * @param[in] _func Function to execure to process action + * @param[in] _executionInGroupId Group in which the process must be run + * @return A future on the action done + */ // Execte in a group != of 0 request ordering the action in a single thread (same as a trand ...) - ethread::Future async(std::function, uint64_t _executionInGroupId=0); //!< execute an action in the thread pool... + ethread::Future async(std::function _func, uint64_t _executionInGroupId=0); //!< execute an action in the thread pool... + // internal: + /** + * @brief Gan an Action to process + * @return A single action to process in a thread + */ ememory::SharedPtr getAction(); + /** + * @brief Release an action id in the pool of action in progress + * @param[in] _id Id to release + */ void releaseId(uint64_t _id); + /** + * @brief Stop all the thread and current actions + */ void stop(); + /** + * @brief Join all thread to destroy the pool + */ void join(); }; } diff --git a/ethread/PoolAction.hpp b/ethread/PoolAction.hpp index 3055efc..b63542a 100644 --- a/ethread/PoolAction.hpp +++ b/ethread/PoolAction.hpp @@ -12,14 +12,30 @@ #include namespace ethread { + /** + * @brief A pool Action is a link between an action to do (the call) and a group of excecution (poolId) and the result store in a promise. + */ class PoolAction { private: - uint64_t m_currentPoolId; - ememory::SharedPtr m_promise; - std::function m_call; + uint64_t m_currentPoolId; //!< execution group Id requested + ememory::SharedPtr m_promise; //!< Return promise of the action + std::function m_call; //!< Action to do ... public: + /** + * @brief Contuctor of a simple action + * @param[in] _currentPoolId Id Of the action might be process inside + * @param[in] _promise Promise to call when action is done + * @param[in] _call Action to do (callable object) + */ PoolAction(uint64_t _currentPoolId, ememory::SharedPtr _promise, std::function _call); + /** + * @brief Get the Pool id of the Action + * @return The pool id of this action (0 for no request) + */ uint64_t getPoolId() const; + /** + * @brief Call the action to do (real action will be done) + */ void call(); }; } diff --git a/ethread/PoolExecutor.cpp b/ethread/PoolExecutor.cpp index 3271f23..f0e6016 100644 --- a/ethread/PoolExecutor.cpp +++ b/ethread/PoolExecutor.cpp @@ -13,23 +13,34 @@ #include "debug.hpp" ethread::PoolExecutor::PoolExecutor(ethread::Pool& _pool): + m_needProcess(false), + m_isWaiting(false), m_pool(_pool), - m_running(false), - m_uniqueId(0) { - static uint32_t uid = 10; - m_uniqueId = uid++; + m_running(false) { + } void ethread::PoolExecutor::threadCallback() { - ETHREAD_INFO("[" << m_uniqueId << "] RUN: thread in Pool [START]"); - ethread::setName("pool " + etk::to_string(m_uniqueId)); + ETHREAD_DEBUG("RUN: thread in Pool [START]"); + ethread::setName("pool " + etk::to_string(ethread::getId())); // get datas: while (m_running == true) { // get an action: m_action = m_pool.getAction(); if (m_action == nullptr) { - // TODO : This is really bad ==> fast to code and debug but not optimum at all ... use condition instead ... - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + std::unique_lock lock(m_mutex); + // If no action availlable and not requested to check, just sleep ... + if (m_needProcess == false) { + m_isWaiting = true; + ETHREAD_VERBOSE("RUN: Jump in sleep"); + if (m_condition.wait_for(lock, std::chrono::seconds(60)) == std::cv_status::timeout) { + ETHREAD_VERBOSE("RUN: time-out"); + continue; + } + ETHREAD_VERBOSE("RUN: WakeUp"); + m_isWaiting = false; + m_needProcess = false; + } continue; } m_action->call(); @@ -37,12 +48,16 @@ void ethread::PoolExecutor::threadCallback() { m_action.reset(); } m_running = false; - ETHREAD_INFO("[" << m_uniqueId << "] RUN: thread in Pool [STOP]"); + ETHREAD_DEBUG("RUN: thread in Pool [STOP]"); } void ethread::PoolExecutor::start() { - ETHREAD_INFO("START: thread in Pool [START]"); + ETHREAD_DEBUG("START: thread in Pool [START]"); m_running = true; + { + std::unique_lock lock(m_mutex); + m_condition.notify_all(); + } m_thread = ememory::makeShared([&](void *){ this->threadCallback();}, nullptr); if (m_thread == nullptr) { m_running = false; @@ -50,24 +65,47 @@ void ethread::PoolExecutor::start() { return; } //ethread::setPriority(*m_receiveThread, -6); - ETHREAD_INFO("START: thread in Pool [STOP]"); + ETHREAD_DEBUG("START: thread in Pool [STOP]"); } void ethread::PoolExecutor::stop() { - ETHREAD_INFO("[" << m_uniqueId << "] STOP: thread in Pool [START]"); + ETHREAD_DEBUG("STOP: thread in Pool [START]"); + { + std::unique_lock lock(m_mutex); + m_condition.notify_all(); + } m_running = false; - ETHREAD_INFO("[" << m_uniqueId << "] STOP: thread in Pool [STOP]"); + ETHREAD_DEBUG("STOP: thread in Pool [STOP]"); } void ethread::PoolExecutor::join() { - ETHREAD_INFO("[" << m_uniqueId << "] JOIN: thread in Pool [START]"); + ETHREAD_DEBUG("JOIN: thread in Pool [START]"); + { + std::unique_lock lock(m_mutex); + m_condition.notify_all(); + } if (m_thread != nullptr) { - ETHREAD_INFO("[" << m_uniqueId << "] JOIN: waiting ..."); + ETHREAD_DEBUG("JOIN: waiting ..."); m_thread->join(); m_thread.reset(); } - ETHREAD_INFO("[" << m_uniqueId << "] JOIN: thread in Pool [STOP]"); + ETHREAD_DEBUG("JOIN: thread in Pool [STOP]"); +} + +bool ethread::PoolExecutor::touch() { + std::unique_lock lock(m_mutex); + bool ret = false; + if ( m_needProcess == false + && m_isWaiting == true) { + ETHREAD_VERBOSE("Touch ..."); + m_needProcess = true; + ret = true; + m_condition.notify_all(); + } + return ret; } + + diff --git a/ethread/PoolExecutor.hpp b/ethread/PoolExecutor.hpp index c4d0779..17eca3e 100644 --- a/ethread/PoolExecutor.hpp +++ b/ethread/PoolExecutor.hpp @@ -11,20 +11,52 @@ #include #include #include +#include +#include namespace ethread { + /** + * @brief A pool Executor is a class that execute some PoolAction. it is contituated wit a simple thread that execute some code). + */ class PoolExecutor { + private: //section to permit to optimize CPU: + std::mutex m_mutex; //!< protection of the internal data. + std::condition_variable m_condition; //!< Message system to send event on an other thread. + bool m_needProcess; //!< Need to do action (no need to wait condition). + bool m_isWaiting; //!< The executor is waiting to some action to do. private: - ethread::Pool& m_pool; - ememory::SharedPtr m_thread; - bool m_running; - uint32_t m_uniqueId; - ememory::SharedPtr m_action; + ethread::Pool& m_pool; //!< Local reference on the Thread pool that store action to do. + ememory::SharedPtr m_thread; //!< Local thread to process action. + bool m_running; //!< Thread is running (not stop). + ememory::SharedPtr m_action; //!< Curent action that is processing. public: + /** + * @brief Create a thread executor for the specific pool. + * @param[in] _pool Reference on the thread pool. + */ PoolExecutor(ethread::Pool& _pool); + protected: + /** + * @brief Internal thread callback (for std::thread). + */ void threadCallback(); + public: + /** + * @brief Start the current thread. + */ void start(); + /** + * @brief Stop the current thread. + */ void stop(); + /** + * @brief Join the current thread. + */ void join(); + /** + * @brief Touche the execurot to process some other data. + * @return true if the executor is waiting to process something. false otherwise. + */ + bool touch(); }; } diff --git a/ethread/Promise.hpp b/ethread/Promise.hpp index 891d3b6..41273b5 100644 --- a/ethread/Promise.hpp +++ b/ethread/Promise.hpp @@ -10,16 +10,38 @@ #include namespace ethread { + /** + * @brief A promise of the specific ation to do in the thread pool + */ class Promise { private: - std::mutex m_mutex; - std::function m_callback; - bool m_isFinished; + std::mutex m_mutex; //!< Simple lock of the interface + std::function m_callback; //!< callback to call when processing is ended + bool m_isFinished; //!< The process of the action has been done public: + /** + * @brief Simple empty contructor + */ Promise(); + /** + * @brief Call this when the action has been done + */ void finish(); + /** + * @brief Check if the action is finished + * @return true, the action is done, false otherwise + */ bool isFinished(); + /** + * @brief Wait some time that the action finished + * @param[in] _delay Delay to wait the action is finished + * @return true, the action is finished, false, the time-out apear. + */ bool wait(echrono::Duration _delay=echrono::seconds(2)); + /** + * @brief Action to do when the action is finished + * @param[in] _action New action to do. + */ void andThen(std::function _action); }; }