[DEBUG] coorect the dead lock of each thead in the thread pool

This commit is contained in:
Edouard DUPIN 2017-06-23 01:14:59 +02:00
parent 16fee04656
commit 48b6f823f8
3 changed files with 55 additions and 11 deletions

View File

@ -212,6 +212,7 @@ void appl::ElementProperty::loadData() {
});
_media.getMetadata("description")
.andElse([=](const std::string& _error, const std::string& _help) mutable {
APPL_ERROR("Get remot error : " << _error << " " << _help << " [BEGIN]");
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
@ -219,6 +220,7 @@ void appl::ElementProperty::loadData() {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
APPL_ERROR("Get remot error : " << _error << " " << _help << " [END]");
return true;
})
.andThen([=](std::string _value) mutable {

View File

@ -128,13 +128,22 @@ bool zeus::Promise::setMessage(ememory::SharedPtr<zeus::Message> _value) {
m_receiveTime = echrono::Steady::now();
}
if (_value->getType() == zeus::message::type::event) {
ObserverEvent callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
callback = m_callbackEvent;
}
std::unique_lock<std::mutex> lock(m_mutex);
// notification of a progresion ...
if (m_callbackEvent != nullptr) {
if (callback != nullptr) {
if (_value == nullptr) {
return true;
}
m_callbackEvent(ememory::staticPointerCast<zeus::message::Event>(_value));
callback(ememory::staticPointerCast<zeus::message::Event>(_value));
{
std::unique_lock<std::mutex> lock(m_mutex);
m_callbackEvent = std::move(callback);
}
return false; // no error
}
return false;
@ -151,15 +160,36 @@ bool zeus::Promise::setMessage(ememory::SharedPtr<zeus::Message> _value) {
}
}
if (hasError() == false) {
Observer callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackThen != nullptr) {
return m_callbackThen(zeus::FutureBase(sharedFromThis()));
callback = std::move(m_callbackThen);
}
if (callback != nullptr) {
bool ret = callback(zeus::FutureBase(sharedFromThis()));
{
std::unique_lock<std::mutex> lock(m_mutex);
m_callbackThen = std::move(callback);
}
return ret;
}
} else {
ZEUS_ERROR("plop ...");
Observer callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackElse != nullptr) {
return m_callbackElse(zeus::FutureBase(sharedFromThis()));
callback = m_callbackElse;
}
ZEUS_ERROR("plop .2.");
if (callback != nullptr) {
bool ret = callback(zeus::FutureBase(sharedFromThis()));
{
std::unique_lock<std::mutex> lock(m_mutex);
m_callbackElse = std::move(callback);
}
return ret;
}
ZEUS_ERROR("plop .3.");
}
return true;
}

View File

@ -258,8 +258,8 @@ class SendAsyncBinary {
}
};
//#define ZEUS_LOG_INPUT_OUTPUT ZEUS_WARNING
#define ZEUS_LOG_INPUT_OUTPUT ZEUS_VERBOSE
#define ZEUS_LOG_INPUT_OUTPUT ZEUS_WARNING
//#define ZEUS_LOG_INPUT_OUTPUT ZEUS_VERBOSE
int32_t zeus::WebServer::writeBinary(ememory::SharedPtr<zeus::Message> _obj) {
@ -311,6 +311,7 @@ void zeus::WebServer::onReceiveData(std::vector<uint8_t>& _frame, bool _isBinary
disconnect(true);
return;
}
ZEUS_INFO("receive DATA ... ");
ememory::SharedPtr<zeus::Message> dataRaw = zeus::Message::create(sharedFromThis(), _frame);
if (dataRaw == nullptr) {
ZEUS_ERROR("Message Allocation ERROR ... ");
@ -329,6 +330,7 @@ void zeus::WebServer::onReceiveData(std::vector<uint8_t>& _frame, bool _isBinary
}
void zeus::WebServer::ping() {
ZEUS_WARNING("send PING");
m_connection.controlPing();
}
@ -395,17 +397,19 @@ void zeus::WebServer::newMessage(ememory::SharedPtr<zeus::Message> _buffer) {
while (it != m_pendingCall.end()) {
if (it->second.isValid() == false) {
it = m_pendingCall.erase(it);
ZEUS_WARNING("Remove element that have wrong data...");
continue;
}
if (it->second.getTransactionId() != tid) {
++it;
continue;
}
ZEUS_LOG_INPUT_OUTPUT("Find FUTURE ..." << _buffer);
future = it->second;
break;
}
}
// Not find a pen,ding call ==> execute it ...
// Not find a pending call ==> execute it ...
if (future.isValid() == false) {
uint32_t dest = _buffer->getDestination();
// Call local object
@ -456,6 +460,7 @@ void zeus::WebServer::newMessage(ememory::SharedPtr<zeus::Message> _buffer) {
}
return;
}
ZEUS_LOG_INPUT_OUTPUT("Add in puul to PROCESS FUTURE : " << _buffer);
m_processingPool.async(
[=](){
zeus::FutureBase fut = future;
@ -463,7 +468,9 @@ void zeus::WebServer::newMessage(ememory::SharedPtr<zeus::Message> _buffer) {
// add data ...
bool ret = fut.setMessage(_buffer);
if (ret == true) {
ZEUS_LOG_INPUT_OUTPUT(" ==> start LOCK");
std::unique_lock<std::mutex> lock(m_pendingCallMutex);
ZEUS_LOG_INPUT_OUTPUT(" ==> LOCK done");
auto it = m_pendingCall.begin();
while (it != m_pendingCall.end()) {
if (it->second.isValid() == false) {
@ -477,6 +484,9 @@ void zeus::WebServer::newMessage(ememory::SharedPtr<zeus::Message> _buffer) {
it = m_pendingCall.erase(it);
break;
}
ZEUS_LOG_INPUT_OUTPUT(" ==> end LOCK");
} else {
ZEUS_LOG_INPUT_OUTPUT(" ==> end");
}
},
tid); // force at the transaction Id to have a correct order in the processing of the data ...
@ -594,6 +604,7 @@ bool zeus::WebServer::removeObjectOwnership(uint16_t _objectAddress, uint32_t _s
void zeus::WebServer::addAsync(zeus::WebServer::ActionAsync _elem) {
std::unique_lock<std::mutex> lock(m_threadAsyncMutex);
m_threadAsyncList2.push_back(_elem);
ZEUS_DEBUG("ADD element to send ... " << m_threadAsyncList2.size());
}
void zeus::WebServer::threadAsyncCallback() {
@ -615,6 +626,7 @@ void zeus::WebServer::threadAsyncCallback() {
}
auto it = m_threadAsyncList.begin();
while (it != m_threadAsyncList.end()) {
ZEUS_INFO(" send DATA ... ");
bool ret = (*it)(this);
if (ret == true) {
// Remove it ...