[DEV] update thead pool to use wait instead of simple pooling in sleep (remove use of 7% CPU)

This commit is contained in:
Edouard DUPIN 2017-01-02 22:52:41 +01:00
parent 314683ff3a
commit d43bd7c9ef
7 changed files with 207 additions and 34 deletions

View File

@ -11,13 +11,33 @@
#include <ememory/memory.hpp>
namespace ethread {
/**
* @brief Simple future to process
*/
class Future {
private:
ememory::SharedPtr<ethread::Promise> m_promise;
ememory::SharedPtr<ethread::Promise> m_promise; //!< the assiciated promise that we are waiting and of action
public:
/**
* @brief Simple Future contructor
* @param[in] _promise Associated promise to wait
*/
Future(ememory::SharedPtr<ethread::Promise> _promise=nullptr);
/**
* @brief Check if the action is finished
* @return true, the action is done, false otherwise
*/
bool isFinished();
/**
* @brief Wait some time that the action finished
* @param[in] _delay Delay to wait the action is finished
* @return true, the action is finished, false, the time-out apear.
*/
bool wait(echrono::Duration _delay=echrono::seconds(2));
/**
* @brief Action to do when the action is finished
* @param[in] _action New action to do.
*/
void andThen(std::function<void()> _action);
};
}

View File

@ -39,6 +39,15 @@ ethread::Future ethread::Pool::async(std::function<void()> _call, uint64_t _exec
ememory::SharedPtr<ethread::Promise> promise = ememory::makeShared<ethread::Promise>();
ememory::SharedPtr<ethread::PoolAction> action = ememory::makeShared<ethread::PoolAction>(_executionInGroupId, promise, _call);
m_listActions.push_back(action);
for(auto &it : m_listThread) {
if (it == nullptr) {
continue;
}
if (it->touch() == true) {
// Find one to force action now ...
break;
}
}
return ethread::Future(promise);
}
@ -106,15 +115,16 @@ void ethread::Pool::stop() {
void ethread::Pool::join() {
std::unique_lock<std::mutex> lock(m_mutex);
ETHREAD_INFO("start join all the threads in pool " << m_listThread.size());
ETHREAD_DEBUG("start join all the threads in pool " << m_listThread.size());
for (size_t iii=0; iii<m_listThread.size(); ++iii) {
ETHREAD_INFO(" join " << iii);
ETHREAD_DEBUG(" join " << iii);
if (m_listThread[iii] == nullptr) {
continue;
}
m_listThread[iii]->join();
}
ETHREAD_INFO(" ==> all joined");
ETHREAD_DEBUG(" ==> all joined");
m_listThread.clear();
ETHREAD_INFO(" ==> all reset");
ETHREAD_DEBUG(" ==> all reset");
}

View File

@ -13,6 +13,9 @@
namespace ethread {
class PoolExecutor;
/**
* @brief A pool is an association of a list of active thread that excecute some actions
*/
class Pool {
private:
std::mutex m_mutex; //!< global add and release some thread
@ -21,14 +24,46 @@ namespace ethread {
std::vector<uint64_t> m_listIdPool; //!< Thread pool
uint32_t m_lastTrandId; //!< to group the action in a single thread
public:
/**
* @brief Contructor of the threadPool
* @param[in] _numberOfThread Basic number of thread
*/
Pool(uint16_t _numberOfThread);
~Pool();
/**
* @brief DEstructor (virtualized)
*/
virtual ~Pool();
/**
* @brief Create a new group ID to manage a single ID
* @return A simple incremented group ID (number 0 to 10 are reserved)
*/
uint32_t createGroupId();
/**
* @brief Ad a async action to process later
* @param[in] _func Function to execure to process action
* @param[in] _executionInGroupId Group in which the process must be run
* @return A future on the action done
*/
// Execte in a group != of 0 request ordering the action in a single thread (same as a trand ...)
ethread::Future async(std::function<void()>, uint64_t _executionInGroupId=0); //!< execute an action in the thread pool...
ethread::Future async(std::function<void()> _func, uint64_t _executionInGroupId=0); //!< execute an action in the thread pool...
// internal:
/**
* @brief Gan an Action to process
* @return A single action to process in a thread
*/
ememory::SharedPtr<ethread::PoolAction> getAction();
/**
* @brief Release an action id in the pool of action in progress
* @param[in] _id Id to release
*/
void releaseId(uint64_t _id);
/**
* @brief Stop all the thread and current actions
*/
void stop();
/**
* @brief Join all thread to destroy the pool
*/
void join();
};
}

View File

@ -12,14 +12,30 @@
#include <ememory/memory.hpp>
namespace ethread {
/**
* @brief A pool Action is a link between an action to do (the call) and a group of excecution (poolId) and the result store in a promise.
*/
class PoolAction {
private:
uint64_t m_currentPoolId;
ememory::SharedPtr<ethread::Promise> m_promise;
std::function<void()> m_call;
uint64_t m_currentPoolId; //!< execution group Id requested
ememory::SharedPtr<ethread::Promise> m_promise; //!< Return promise of the action
std::function<void()> m_call; //!< Action to do ...
public:
/**
* @brief Contuctor of a simple action
* @param[in] _currentPoolId Id Of the action might be process inside
* @param[in] _promise Promise to call when action is done
* @param[in] _call Action to do (callable object)
*/
PoolAction(uint64_t _currentPoolId, ememory::SharedPtr<ethread::Promise> _promise, std::function<void()> _call);
/**
* @brief Get the Pool id of the Action
* @return The pool id of this action (0 for no request)
*/
uint64_t getPoolId() const;
/**
* @brief Call the action to do (real action will be done)
*/
void call();
};
}

View File

@ -13,23 +13,34 @@
#include "debug.hpp"
ethread::PoolExecutor::PoolExecutor(ethread::Pool& _pool):
m_needProcess(false),
m_isWaiting(false),
m_pool(_pool),
m_running(false),
m_uniqueId(0) {
static uint32_t uid = 10;
m_uniqueId = uid++;
m_running(false) {
}
void ethread::PoolExecutor::threadCallback() {
ETHREAD_INFO("[" << m_uniqueId << "] RUN: thread in Pool [START]");
ethread::setName("pool " + etk::to_string(m_uniqueId));
ETHREAD_DEBUG("RUN: thread in Pool [START]");
ethread::setName("pool " + etk::to_string(ethread::getId()));
// get datas:
while (m_running == true) {
// get an action:
m_action = m_pool.getAction();
if (m_action == nullptr) {
// TODO : This is really bad ==> fast to code and debug but not optimum at all ... use condition instead ...
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::unique_lock<std::mutex> lock(m_mutex);
// If no action availlable and not requested to check, just sleep ...
if (m_needProcess == false) {
m_isWaiting = true;
ETHREAD_VERBOSE("RUN: Jump in sleep");
if (m_condition.wait_for(lock, std::chrono::seconds(60)) == std::cv_status::timeout) {
ETHREAD_VERBOSE("RUN: time-out");
continue;
}
ETHREAD_VERBOSE("RUN: WakeUp");
m_isWaiting = false;
m_needProcess = false;
}
continue;
}
m_action->call();
@ -37,12 +48,16 @@ void ethread::PoolExecutor::threadCallback() {
m_action.reset();
}
m_running = false;
ETHREAD_INFO("[" << m_uniqueId << "] RUN: thread in Pool [STOP]");
ETHREAD_DEBUG("RUN: thread in Pool [STOP]");
}
void ethread::PoolExecutor::start() {
ETHREAD_INFO("START: thread in Pool [START]");
ETHREAD_DEBUG("START: thread in Pool [START]");
m_running = true;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.notify_all();
}
m_thread = ememory::makeShared<std::thread>([&](void *){ this->threadCallback();}, nullptr);
if (m_thread == nullptr) {
m_running = false;
@ -50,24 +65,47 @@ void ethread::PoolExecutor::start() {
return;
}
//ethread::setPriority(*m_receiveThread, -6);
ETHREAD_INFO("START: thread in Pool [STOP]");
ETHREAD_DEBUG("START: thread in Pool [STOP]");
}
void ethread::PoolExecutor::stop() {
ETHREAD_INFO("[" << m_uniqueId << "] STOP: thread in Pool [START]");
ETHREAD_DEBUG("STOP: thread in Pool [START]");
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.notify_all();
}
m_running = false;
ETHREAD_INFO("[" << m_uniqueId << "] STOP: thread in Pool [STOP]");
ETHREAD_DEBUG("STOP: thread in Pool [STOP]");
}
void ethread::PoolExecutor::join() {
ETHREAD_INFO("[" << m_uniqueId << "] JOIN: thread in Pool [START]");
ETHREAD_DEBUG("JOIN: thread in Pool [START]");
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.notify_all();
}
if (m_thread != nullptr) {
ETHREAD_INFO("[" << m_uniqueId << "] JOIN: waiting ...");
ETHREAD_DEBUG("JOIN: waiting ...");
m_thread->join();
m_thread.reset();
}
ETHREAD_INFO("[" << m_uniqueId << "] JOIN: thread in Pool [STOP]");
ETHREAD_DEBUG("JOIN: thread in Pool [STOP]");
}
bool ethread::PoolExecutor::touch() {
std::unique_lock<std::mutex> lock(m_mutex);
bool ret = false;
if ( m_needProcess == false
&& m_isWaiting == true) {
ETHREAD_VERBOSE("Touch ...");
m_needProcess = true;
ret = true;
m_condition.notify_all();
}
return ret;
}

View File

@ -11,20 +11,52 @@
#include <ethread/Future.hpp>
#include <ethread/PoolAction.hpp>
#include <ethread/Pool.hpp>
#include <mutex>
#include <condition_variable>
namespace ethread {
/**
* @brief A pool Executor is a class that execute some PoolAction. it is contituated wit a simple thread that execute some code).
*/
class PoolExecutor {
private: //section to permit to optimize CPU:
std::mutex m_mutex; //!< protection of the internal data.
std::condition_variable m_condition; //!< Message system to send event on an other thread.
bool m_needProcess; //!< Need to do action (no need to wait condition).
bool m_isWaiting; //!< The executor is waiting to some action to do.
private:
ethread::Pool& m_pool;
ememory::SharedPtr<std::thread> m_thread;
bool m_running;
uint32_t m_uniqueId;
ememory::SharedPtr<ethread::PoolAction> m_action;
ethread::Pool& m_pool; //!< Local reference on the Thread pool that store action to do.
ememory::SharedPtr<std::thread> m_thread; //!< Local thread to process action.
bool m_running; //!< Thread is running (not stop).
ememory::SharedPtr<ethread::PoolAction> m_action; //!< Curent action that is processing.
public:
/**
* @brief Create a thread executor for the specific pool.
* @param[in] _pool Reference on the thread pool.
*/
PoolExecutor(ethread::Pool& _pool);
protected:
/**
* @brief Internal thread callback (for std::thread).
*/
void threadCallback();
public:
/**
* @brief Start the current thread.
*/
void start();
/**
* @brief Stop the current thread.
*/
void stop();
/**
* @brief Join the current thread.
*/
void join();
/**
* @brief Touche the execurot to process some other data.
* @return true if the executor is waiting to process something. false otherwise.
*/
bool touch();
};
}

View File

@ -10,16 +10,38 @@
#include <echrono/Duration.hpp>
namespace ethread {
/**
* @brief A promise of the specific ation to do in the thread pool
*/
class Promise {
private:
std::mutex m_mutex;
std::function<void()> m_callback;
bool m_isFinished;
std::mutex m_mutex; //!< Simple lock of the interface
std::function<void()> m_callback; //!< callback to call when processing is ended
bool m_isFinished; //!< The process of the action has been done
public:
/**
* @brief Simple empty contructor
*/
Promise();
/**
* @brief Call this when the action has been done
*/
void finish();
/**
* @brief Check if the action is finished
* @return true, the action is done, false otherwise
*/
bool isFinished();
/**
* @brief Wait some time that the action finished
* @param[in] _delay Delay to wait the action is finished
* @return true, the action is finished, false, the time-out apear.
*/
bool wait(echrono::Duration _delay=echrono::seconds(2));
/**
* @brief Action to do when the action is finished
* @param[in] _action New action to do.
*/
void andThen(std::function<void()> _action);
};
}