[DEV] add thead pool

This commit is contained in:
Edouard DUPIN 2016-12-05 22:42:12 +01:00
parent bbd5b70b6f
commit 2c17a43c99
18 changed files with 562 additions and 103 deletions

32
doxy_ethread-tools.py Normal file
View File

@ -0,0 +1,32 @@
#!/usr/bin/python
import os
import doxy.module as module
import doxy.debug as debug
import doxy.tools as tools
def create(target, module_name):
my_module = module.Module(__file__, module_name)
my_module.set_version("version.txt")
my_module.set_title("ethread: Ewol thread platform tools")
my_module.set_website("http://atria-soft.github.io/" + module_name)
my_module.set_website_sources("http://github.com/atria-soft/" + module_name)
my_module.add_path([
module_name,
"doc"
])
my_module.add_file_patterns([
'tools.hpp',
'*.md',
])
my_module.add_exclude_symbols([
'*operator<<*',
])
my_module.add_exclude_file([
'debug.hpp',
])
my_module.add_file_patterns([
'*.hpp',
'*.md',
])
return my_module

View File

@ -23,6 +23,7 @@ def create(target, module_name):
])
my_module.add_exclude_file([
'debug.hpp',
'tools.hpp',
])
my_module.add_file_patterns([
'*.hpp',

View File

@ -4,61 +4,9 @@
* @license APACHE v2.0 (see license file)
*/
#include "debug.hpp"
#include <ethread/Future.hpp>
ethread::Promise::Promise():
m_isFinished(false) {
}
bool ethread::Future::isFinished() {
return m_isFinished;
}
void ethread::Promise::finish() {
std::function<void()> callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_isFinished == true) {
ETHREAD_ERROR("Request 2 time finishing a Promise ...");
return;
}
m_isFinished = true;
if (m_callback != nullptr);
// call callbacks ...
callback = std::move(m_callback);
}
}
if (callback != nullptr) {
callback();
}
}
bool ethread::Promise::wait(echrono::Duration _delay) {
echrono::Steady time = echrono::Steady::now();
while (_delay >= 0) {
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_isFinished == true) {
return true;
}
}
echrono::Steady time2 = echrono::Steady::now();
_delay -= (time2-time);
time = time2;
if (_delay >= 0) {
// TODO : This is really bad ==> fast to code and debug but not optimum at all ... use condition instead ...
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
return false;
}
void ethread::Promise::andThen(std::function<void()> _action) {
m_callback = std::move(_action);
}
ethread::Future::Future(ememory::SharedPtr<ethread::Promise> _promise):
m_promise(_promise) {

View File

@ -7,21 +7,10 @@
#include <mutex>
#include <thread>
#include <echrono/Duration.hpp>
#include <ethread/Promise.hpp>
#include <ememory/memory.hpp>
namespace ethread {
class Promise {
private:
std::mutex m_mutex;
std::function<void()> m_callback;
bool m_isFinished;
public:
Promise();
void finish();
bool isFinished();
bool wait(echrono::Duration _delay=echrono::seconds(2));
void andThen(std::function<void()> _action);
};
class Future {
private:
ememory::SharedPtr<ethread::Promise> m_promise;

118
ethread/Pool.cpp Normal file
View File

@ -0,0 +1,118 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#include <ethread/Pool.hpp>
#include <ethread/PoolExecutor.hpp>
#include "debug.hpp"
ethread::Pool::Pool(uint16_t _numberOfThread):
m_lastTrandId(1) {
std::unique_lock<std::mutex> lock(m_mutex);
for (uint32_t iii=0; iii<_numberOfThread; ++iii) {
ememory::SharedPtr<ethread::PoolExecutor> tmp = ememory::makeShared<ethread::PoolExecutor>(*this);
if (tmp != nullptr) {
tmp->start();
m_listThread.push_back(tmp);
}
}
}
ethread::Pool::~Pool() {
stop();
join();
}
uint32_t ethread::Pool::createGroupId() {
std::unique_lock<std::mutex> lock(m_mutex);
return m_lastTrandId++;
}
ethread::Future ethread::Pool::async(std::function<void()> _call, uint64_t _executionInGroupId) {
std::unique_lock<std::mutex> lock(m_mutex);
if (_call == nullptr) {
ETHREAD_ERROR("Can not add an action with no function to call...");
return ethread::Future();
}
ememory::SharedPtr<ethread::Promise> promise = ememory::makeShared<ethread::Promise>();
ememory::SharedPtr<ethread::PoolAction> action = ememory::makeShared<ethread::PoolAction>(_executionInGroupId, promise, _call);
m_listActions.push_back(action);
return ethread::Future(promise);
}
void ethread::Pool::releaseId(uint64_t _id) {
if (_id == 0) {
return;
}
auto it = m_listIdPool.begin();
while (it != m_listIdPool.end()) {
if (*it == _id) {
it = m_listIdPool.erase(it);
continue;
}
++it;
}
}
// get an action to execute ...
ememory::SharedPtr<ethread::PoolAction> ethread::Pool::getAction() {
std::unique_lock<std::mutex> lock(m_mutex);
auto it = m_listActions.begin();
while (it != m_listActions.end()) {
if (*it == nullptr) {
it = m_listActions.erase(it);
continue;
}
// Check if this element is associated at a specific pool...
uint64_t uniquId = (*it)->getPoolId();
bool alreadyUsed = false;
if (uniquId != 0) {
for (auto &itId : m_listIdPool) {
if (itId == uniquId) {
alreadyUsed = true;
break;
}
}
}
if (alreadyUsed == false) {
ememory::SharedPtr<ethread::PoolAction> out = (*it);
if (uniquId != 0) {
m_listIdPool.push_back(uniquId);
}
it = m_listActions.erase(it);
return out;
}
++it;
}
return nullptr;
}
void ethread::Pool::stop() {
std::unique_lock<std::mutex> lock(m_mutex);
auto it = m_listThread.begin();
while (it != m_listThread.end()) {
if (*it == nullptr) {
it = m_listThread.erase(it);
continue;
}
(*it)->stop();
}
}
void ethread::Pool::join() {
std::unique_lock<std::mutex> lock(m_mutex);
ETHREAD_INFO("start join all the threads in pool " << m_listThread.size());
for (size_t iii=0; iii<m_listThread.size(); ++iii) {
ETHREAD_INFO(" join " << iii);
if (m_listThread[iii] == nullptr) {
continue;
}
m_listThread[iii]->join();
}
ETHREAD_INFO(" ==> all joined");
m_listThread.clear();
ETHREAD_INFO(" ==> all reset");
}

34
ethread/Pool.hpp Normal file
View File

@ -0,0 +1,34 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#pragma once
#include <mutex>
#include <vector>
#include <thread>
#include <ethread/Future.hpp>
#include <ethread/PoolAction.hpp>
namespace ethread {
class PoolExecutor;
class Pool {
private:
std::mutex m_mutex; //!< global add and release some thread
std::vector<ememory::SharedPtr<ethread::PoolExecutor>> m_listThread; //!< Thread pool
std::vector<ememory::SharedPtr<ethread::PoolAction>> m_listActions; //!< Thread pool
std::vector<uint64_t> m_listIdPool; //!< Thread pool
uint32_t m_lastTrandId; //!< to group the action in a single thread
public:
Pool(uint16_t _numberOfThread);
~Pool();
uint32_t createGroupId();
// Execte in a group != of 0 request ordering the action in a single thread (same as a trand ...)
ethread::Future async(std::function<void()>, uint64_t _executionInGroupId=0); //!< execute an action in the thread pool...
ememory::SharedPtr<ethread::PoolAction> getAction();
void releaseId(uint64_t _id);
void stop();
void join();
};
}

32
ethread/PoolAction.cpp Normal file
View File

@ -0,0 +1,32 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#include <ethread/PoolAction.hpp>
#include "debug.hpp"
ethread::PoolAction::PoolAction(uint64_t _currentPoolId, ememory::SharedPtr<ethread::Promise> _promise, std::function<void()> _call) :
m_currentPoolId(_currentPoolId),
m_promise(_promise),
m_call(std::move(_call)) {
}
uint64_t ethread::PoolAction::getPoolId() const {
return m_currentPoolId;
}
void ethread::PoolAction::call() {
if (m_call == nullptr) {
return;
}
if (m_call != nullptr) {
m_call();
}
if (m_promise != nullptr) {
m_promise->finish();
}
}

25
ethread/PoolAction.hpp Normal file
View File

@ -0,0 +1,25 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#pragma once
#include <mutex>
#include <vector>
#include <thread>
#include <ethread/Future.hpp>
#include <ememory/memory.hpp>
namespace ethread {
class PoolAction {
private:
uint64_t m_currentPoolId;
ememory::SharedPtr<ethread::Promise> m_promise;
std::function<void()> m_call;
public:
PoolAction(uint64_t _currentPoolId, ememory::SharedPtr<ethread::Promise> _promise, std::function<void()> _call);
uint64_t getPoolId() const;
void call();
};
}

73
ethread/PoolExecutor.cpp Normal file
View File

@ -0,0 +1,73 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#include <mutex>
#include <vector>
#include <thread>
#include <ethread/Future.hpp>
#include <ethread/PoolExecutor.hpp>
#include <ethread/tools.hpp>
#include "debug.hpp"
ethread::PoolExecutor::PoolExecutor(ethread::Pool& _pool):
m_pool(_pool),
m_running(false),
m_uniqueId(0) {
static uint32_t uid = 10;
m_uniqueId = uid++;
}
void ethread::PoolExecutor::threadCallback() {
ETHREAD_DEBUG("RUN: thread in Pool [START]");
ethread::setName("pool " + etk::to_string(m_uniqueId));
// get datas:
while (m_running == true) {
// get an action:
m_action = m_pool.getAction();
if (m_action == nullptr) {
// TODO : This is really bad ==> fast to code and debug but not optimum at all ... use condition instead ...
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
m_action->call();
m_pool.releaseId(m_action->getPoolId());
m_action.reset();
}
m_running = false;
ETHREAD_DEBUG("RUN: thread in Pool [STOP]");
}
void ethread::PoolExecutor::start() {
ETHREAD_DEBUG("START: thread in Pool [START]");
m_running = true;
m_thread = ememory::makeShared<std::thread>([&](void *){ this->threadCallback();}, nullptr);
if (m_thread == nullptr) {
m_running = false;
ETHREAD_ERROR("START: thread in Pool [STOP] can not intanciate THREAD!");
return;
}
//ethread::setPriority(*m_receiveThread, -6);
ETHREAD_DEBUG("START: thread in Pool [STOP]");
}
void ethread::PoolExecutor::stop() {
ETHREAD_DEBUG("STOP: thread in Pool [START]");
m_running = false;
ETHREAD_DEBUG("STOP: thread in Pool [STOP]");
}
void ethread::PoolExecutor::join() {
ETHREAD_DEBUG("JOIN: thread in Pool [START]");
if (m_thread != nullptr) {
ETHREAD_DEBUG("JOIN: waiting ...");
m_thread->join();
m_thread.reset();
}
ETHREAD_DEBUG("JOIN: thread in Pool [STOP]");
}

30
ethread/PoolExecutor.hpp Normal file
View File

@ -0,0 +1,30 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#pragma once
#include <mutex>
#include <vector>
#include <thread>
#include <ethread/Future.hpp>
#include <ethread/PoolAction.hpp>
#include <ethread/Pool.hpp>
namespace ethread {
class PoolExecutor {
private:
ethread::Pool& m_pool;
ememory::SharedPtr<std::thread> m_thread;
bool m_running;
uint32_t m_uniqueId;
ememory::SharedPtr<ethread::PoolAction> m_action;
public:
PoolExecutor(ethread::Pool& _pool);
void threadCallback();
void start();
void stop();
void join();
};
}

67
ethread/Promise.cpp Normal file
View File

@ -0,0 +1,67 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#include "debug.hpp"
#include <ethread/Future.hpp>
#include <ethread/Promise.hpp>
#include <echrono/Steady.hpp>
ethread::Promise::Promise():
m_isFinished(false) {
}
bool ethread::Promise::isFinished() {
return m_isFinished;
}
void ethread::Promise::finish() {
std::function<void()> callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_isFinished == true) {
ETHREAD_ERROR("Request 2 time finishing a Promise ...");
return;
}
m_isFinished = true;
if (m_callback != nullptr) {
// call callbacks ...
callback = std::move(m_callback);
}
}
if (callback != nullptr) {
callback();
}
}
bool ethread::Promise::wait(echrono::Duration _delay) {
echrono::Steady time = echrono::Steady::now();
while (_delay >= 0) {
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_isFinished == true) {
return true;
}
}
echrono::Steady time2 = echrono::Steady::now();
_delay -= (time2-time);
time = time2;
if (_delay >= 0) {
// TODO : This is really bad ==> fast to code and debug but not optimum at all ... use condition instead ...
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
return false;
}
void ethread::Promise::andThen(std::function<void()> _action) {
std::unique_lock<std::mutex> lock(m_mutex);
m_callback = std::move(_action);
if (m_isFinished == true) {
m_callback();
}
}

25
ethread/Promise.hpp Normal file
View File

@ -0,0 +1,25 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#pragma once
#include <mutex>
#include <thread>
#include <echrono/Duration.hpp>
namespace ethread {
class Promise {
private:
std::mutex m_mutex;
std::function<void()> m_callback;
bool m_isFinished;
public:
Promise();
void finish();
bool isFinished();
bool wait(echrono::Duration _delay=echrono::seconds(2));
void andThen(std::function<void()> _action);
};
}

View File

View File

@ -1,26 +0,0 @@
/**
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#pragma once
#include <mutex>
#include <vector>
#include <thread>
#include <echrono/Future.hpp>
namespace ethread {
class ThreadPool {
private:
std::mutex m_lock; //!< global add and release some thread
std::vector<std::thread> m_listThread; //!< Thread pool
public:
ThreadPool(uint32_t _numberOfThread);
~ThreadPool();
uint32_t createGroupId();
// Execte in a group != of 0 request ordering the action in a single thread (same as a trand ...)
ethread::Future async(std::function<void()>, uint32_t _executionInGroupId=0); //!< execute an Action in the thread pool...
};
}

12
ethread/debug.cpp Normal file
View File

@ -0,0 +1,12 @@
/** @file
* @author Edouard DUPIN
* @copyright 2014, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#include "debug.hpp"
int32_t ethread::getLogId() {
static int32_t g_val = elog::registerInstance("ethread");
return g_val;
}

40
ethread/debug.hpp Normal file
View File

@ -0,0 +1,40 @@
/** @file
* @author Edouard DUPIN
* @copyright 2014, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#pragma once
#include <elog/log.hpp>
namespace ethread {
int32_t getLogId();
};
#define ETHREAD_BASE(info,data) ELOG_BASE(ethread::getLogId(),info,data)
#define ETHREAD_PRINT(data) ETHREAD_BASE(-1, data)
#define ETHREAD_CRITICAL(data) ETHREAD_BASE(1, data)
#define ETHREAD_ERROR(data) ETHREAD_BASE(2, data)
#define ETHREAD_WARNING(data) ETHREAD_BASE(3, data)
#ifdef DEBUG
#define ETHREAD_INFO(data) ETHREAD_BASE(4, data)
#define ETHREAD_DEBUG(data) ETHREAD_BASE(5, data)
#define ETHREAD_VERBOSE(data) ETHREAD_BASE(6, data)
#define ETHREAD_TODO(data) ETHREAD_BASE(4, "TODO : " << data)
#else
#define ETHREAD_INFO(data) do { } while(false)
#define ETHREAD_DEBUG(data) do { } while(false)
#define ETHREAD_VERBOSE(data) do { } while(false)
#define ETHREAD_TODO(data) do { } while(false)
#endif
#define ETHREAD_ASSERT(cond,data) \
do { \
if (!(cond)) { \
ETHREAD_CRITICAL(data); \
assert(!#cond); \
} \
} while (0)

53
lutin_ethread-tools.py Normal file
View File

@ -0,0 +1,53 @@
#!/usr/bin/python
import lutin.debug as debug
import lutin.tools as tools
def get_type():
return "LIBRARY"
def get_desc():
return "Ewol thread tools"
def get_licence():
return "APACHE-2"
def get_compagny_type():
return "com"
def get_compagny_name():
return "atria-soft"
def get_maintainer():
return "authors.txt"
def get_version():
return "version.txt"
def configure(target, my_module):
my_module.add_extra_flags()
# add the file to compile:
my_module.add_src_file([
'ethread/tools.cpp',
])
my_module.add_header_file([
'ethread/tools.hpp',
])
# build in C++ mode
my_module.compile_version("c++", 2011)
# add dependency of the generic C++ library:
my_module.add_depend([
'cxx',
])
#pthread is not availlable on Windows
if "Linux" in target.get_type() \
or "Android" in target.get_type():
my_module.add_depend([
'pthread'
])
return True

View File

@ -7,7 +7,7 @@ def get_type():
return "LIBRARY"
def get_desc():
return "Ewol thread tools"
return "Ewol thread"
def get_licence():
return "APACHE-2"
@ -28,26 +28,32 @@ def configure(target, my_module):
my_module.add_extra_flags()
# add the file to compile:
my_module.add_src_file([
'ethread/tools.cpp',
'ethread/debug.cpp',
'ethread/Future.cpp',
'ethread/Promise.cpp',
'ethread/Pool.cpp',
'ethread/PoolAction.cpp',
'ethread/PoolExecutor.cpp',
])
my_module.add_header_file([
'ethread/tools.hpp',
'ethread/Future.hpp',
'ethread/Promise.hpp',
'ethread/Pool.hpp',
'ethread/PoolAction.hpp',
'ethread/PoolExecutor.hpp',
])
# build in C++ mode
my_module.compile_version("c++", 2011)
# add dependency of the generic C++ library:
my_module.add_depend([
'cxx'
'cxx',
'elog',
'ethread-tools',
'echrono',
'ememory'
])
#pthread is not availlable on Windows
if "Linux" in target.get_type() \
or "Android" in target.get_type():
my_module.add_depend([
'pthread'
])
return True