[DEV] start dev of the future progress return

This commit is contained in:
Edouard DUPIN 2017-05-03 11:50:48 +00:00
parent 502f431d5f
commit 2073938bc5
11 changed files with 109 additions and 13 deletions

View File

@ -35,6 +35,11 @@ bool zeus::checkOrderFunctionParameter() {
return value;
}
const std::string zeus::g_threadKeyTransactionId("zeus-transaction-id");
const std::string zeus::g_threadKeyTransactionSource("zeus-transaction-source");
const std::string zeus::g_threadKeyTransactiondestination("zeus-transaction-destination");
enum zeus::AbstractFunction::type zeus::AbstractFunction::getType() const {
return m_type;
}

View File

@ -13,6 +13,9 @@
namespace zeus {
static const std::string g_threadKeyTransactionId;
static const std::string g_threadKeyTransactionSource;
static const std::string g_threadKeyTransactiondestination;
/**
* @bried check if the compilater order the function element call in order or backOrder
*/

View File

@ -151,6 +151,9 @@ namespace zeus {
return;
}
}
ethread::metadataSet(zeus::g_threadKeyTransactionId, _obj->getTransactionId());
ethread::metadataSet(zeus::g_threadKeyTransactionSource, _obj->getSource());
ethread::metadataSet(zeus::g_threadKeyTransactionDestination, _obj->getDestination());
try {
// execute cmd:
zeus::executeClassCall(_interfaceClient, _obj, tmpClass, m_function);
@ -205,6 +208,9 @@ namespace zeus {
return true;
});
}
ethread::metadataRemove(zeus::g_threadKeyTransactionId);
ethread::metadataRemove(zeus::g_threadKeyTransactionSource);
ethread::metadataRemove(zeus::g_threadKeyTransactionDestination);
}
};
// specialization

View File

@ -8,6 +8,7 @@
#include <zeus/WebServer.hpp>
#include <zeus/debug.hpp>
#include <zeus/AbstractFunction.hpp>
namespace zeus {
/**
* @brief Execute a call on the global function with a return value
@ -139,6 +140,9 @@ namespace zeus {
return;
}
}
ethread::metadataSet(zeus::g_threadKeyTransactionId, _obj->getTransactionId());
ethread::metadataSet(zeus::g_threadKeyTransactionSource, _obj->getSource());
ethread::metadataSet(zeus::g_threadKeyTransactionDestination, _obj->getDestination());
try {
// execute cmd:
zeus::executeCall(_interfaceClient, _obj, m_function);
@ -193,6 +197,9 @@ namespace zeus {
return true;
});
}
ethread::metadataRemove(zeus::g_threadKeyTransactionId);
ethread::metadataRemove(zeus::g_threadKeyTransactionSource);
ethread::metadataRemove(zeus::g_threadKeyTransactionDestination);
}
};
// specialization

View File

@ -96,6 +96,14 @@ namespace zeus {
});
return *this;
}
/**
* @brief Attach callback on activity of the action if user set some return information
* @param[in] _callback Handle on the function to call in progress information
*/
Future<ZEUS_RETURN>& onProgress(ObserverProgress _callback) {
zeus::FutureBase::onProgress(_callback);
return *this;
}
};
/**
* @brief future template to cast type in a void methode (fallback)
@ -178,5 +186,13 @@ namespace zeus {
});
return *this;
}
/**
* @brief Attach callback on activity of the action if user set some return information
* @param[in] _callback Handle on the function to call in progress information
*/
Future<void>& onProgress(ObserverProgress _callback) {
zeus::FutureBase::onProgress(_callback);
return *this;
}
};
}

View File

@ -64,6 +64,12 @@ void zeus::FutureBase::andElse(zeus::Promise::Observer _callback) {
m_promise->andElse(_callback);
}
void zeus::FutureBase::onProgress(zeus::Promise::ObserverProgress _callback) {
if (m_promise == nullptr) {
return;
}
m_promise->onProgress(_callback);
}
echrono::Duration zeus::FutureBase::getTransmitionTime() const {
if (m_promise == nullptr) {

View File

@ -58,6 +58,11 @@ namespace zeus {
* @param[in] _callback Handle on the function to call in case of error on the call
*/
void andElse(zeus::Promise::Observer _callback);
/**
* @brief Attach callback on activity of the action if user set some return information
* @param[in] _callback Handle on the function to call in progress information
*/
void onProgress(zeus::Promise::ObserverProgress _callback);
/*
/ **
* @brief Attach callback on a specific return action (ABORT)

View File

@ -44,17 +44,21 @@ void zeus::Promise::remoteObjectDestroyed() {
}
void zeus::Promise::andAll(zeus::Promise::Observer _callback) {
// TODO : Lock ...
m_callbackThen = _callback;
m_callbackElse = _callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_callbackThen = _callback;
m_callbackElse = _callback;
}
if (isFinished() == false) {
return;
}
if (hasError() == false) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackThen != nullptr) {
m_callbackThen(zeus::FutureBase(sharedFromThis()));
}
} else {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackElse != nullptr) {
m_callbackElse(zeus::FutureBase(sharedFromThis()));
}
@ -62,14 +66,17 @@ void zeus::Promise::andAll(zeus::Promise::Observer _callback) {
}
void zeus::Promise::andThen(zeus::Promise::Observer _callback) {
// TODO : Lock ...
m_callbackThen = _callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_callbackThen = _callback;
}
if (isFinished() == false) {
return;
}
if (hasError() == true) {
return;
}
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackThen == nullptr) {
return;
}
@ -77,43 +84,67 @@ void zeus::Promise::andThen(zeus::Promise::Observer _callback) {
}
void zeus::Promise::andElse(zeus::Promise::Observer _callback) {
// TODO : Lock ...
m_callbackElse = _callback;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_callbackElse = _callback;
}
if (isFinished() == false) {
return;
}
if (hasError() == false) {
return;
}
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackElse == nullptr) {
return;
}
m_callbackElse(zeus::FutureBase(sharedFromThis()));
}
void zeus::Promise::onProgress(zeus::Promise::ObserverProgress _callback) {
std::unique_lock<std::mutex> lock(m_mutex);
m_callbackProgress = _callback;
}
echrono::Duration zeus::Promise::getTransmitionTime() const {
if (isFinished() == false) {
return echrono::nanoseconds(0);
}
std::unique_lock<std::mutex> lock(m_mutex);
return m_receiveTime - m_sendTime;
}
bool zeus::Promise::setMessage(ememory::SharedPtr<zeus::Message> _value) {
m_receiveTime = echrono::Steady::now();
m_message = _value;
if (m_message == nullptr) {
return true;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_receiveTime = echrono::Steady::now();
}
if (m_message->getPartFinish() == false) {
ZEUS_ERROR("set buffer that is not finished ...");
if (_value->getType() != zeus::message::type::progress) {
std::unique_lock<std::mutex> lock(m_mutex);
// notification of a progresion ...
if (m_callbackProgress != nullptr) {
return m_callbackProgress(_value.);
}
return false;
}
{
std::unique_lock<std::mutex> lock(m_mutex);
m_message = _value;
if (m_message == nullptr) {
return true;
}
if (m_message->getPartFinish() == false) {
ZEUS_ERROR("set buffer that is not finished ...");
return false;
}
}
if (hasError() == false) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackThen != nullptr) {
return m_callbackThen(zeus::FutureBase(sharedFromThis()));
}
} else {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_callbackElse != nullptr) {
return m_callbackElse(zeus::FutureBase(sharedFromThis()));
}
@ -130,6 +161,7 @@ uint32_t zeus::Promise::getSource() const {
}
bool zeus::Promise::hasError() const {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_message == nullptr) {
return true;
}
@ -140,6 +172,7 @@ bool zeus::Promise::hasError() const {
}
std::string zeus::Promise::getErrorType() const {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_message == nullptr) {
return "NULL_PTR";
}
@ -150,6 +183,7 @@ std::string zeus::Promise::getErrorType() const {
}
std::string zeus::Promise::getErrorHelp() const {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_message == nullptr) {
return "This is a nullptr future";
}
@ -161,6 +195,7 @@ std::string zeus::Promise::getErrorHelp() const {
bool zeus::Promise::isFinished() const {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_message == nullptr) {
// in this case, we are waiting for an answer that the first packet is not arrived
return false;

View File

@ -21,12 +21,15 @@ namespace zeus {
class Promise : public ememory::EnableSharedFromThis<zeus::Promise> {
public:
using Observer = std::function<bool(zeus::FutureBase)>; //!< Define an Observer: function pointer
using ObserverProgress = std::function<void(const std::string&)>; //!< Define the observer on activity of the action (note that is a string, but it can contain json or other ...)
private:
std::mutex m_mutex; //!< local prevention of multiple acess
uint32_t m_transactionId; //!< waiting answer data
uint32_t m_source; //!< Source of the message.
ememory::SharedPtr<zeus::Message> m_message; //!< all buffer concatenate or last buffer if synchronous
Observer m_callbackThen; //!< observer callback When data arrive and NO error appear
Observer m_callbackElse; //!< observer callback When data arrive and AN error appear
ObserverProgress m_callbackProgress; //!< observer callback When progress is sended from the remote object called
//Observer m_callbackAbort; //!< observer callback When Action is abort by user
echrono::Steady m_sendTime; //!< time when the future has been sended request
echrono::Steady m_receiveTime; //!< time when the future has receve answer
@ -60,6 +63,11 @@ namespace zeus {
* @param[in] _callback Handle on the function to call in case of error on the call
*/
void andElse(zeus::Promise::Observer _callback);
/**
* @brief Attach callback on activity of the action if user set some return information
* @param[in] _callback Handle on the function to call in progress information
*/
void onProgress(zeus::Promise::ObserverProgress _callback);
/*
/ **
* @brief Attach callback on a specific return action (ABORT)

View File

@ -20,6 +20,8 @@ namespace etk {
return "event";
case zeus::message::type::data:
return "data";
case zeus::message::type::progress:
return "progress";
}
return "???";
}
@ -41,6 +43,8 @@ enum zeus::message::type zeus::message::getTypeFromInt(uint16_t _value) {
return zeus::message::type::data;
case 4:
return zeus::message::type::event;
case 5:
return zeus::message::type::progress;
}
return zeus::message::type::unknow;
}

View File

@ -19,6 +19,7 @@ namespace zeus {
answer = 0x0002, //!< Answer from a previous call
data = 0x0003, //!< data message happend when partId > 0 it compleate the data of a parameter or an answer or an event
event = 0x0004, //!< Message in one way (no return is waiting and the message has no garenty...)
progress = 0x0005, //!< Message that prevent a future of a progression of an action
};
/**
* @brief generate a display of the typemessage