[DEBUG] try to stabilyse the media stream and the call stability

This commit is contained in:
Edouard DUPIN 2017-06-09 00:55:40 +02:00
parent 62c1cf74c2
commit 56072e960e
20 changed files with 249 additions and 241 deletions

View File

@ -60,7 +60,6 @@ def configure(target, my_module):
'zeus/message/Data.cpp',
'zeus/message/Event.cpp',
'zeus/message/Flow.cpp',
'zeus/message/Progress.cpp',
'zeus/message/Parameter_addParameter.cpp',
'zeus/message/Parameter_getParameter.cpp',
'zeus/message/ParamType.cpp',
@ -74,7 +73,6 @@ def configure(target, my_module):
'zeus/message/Data.hpp',
'zeus/message/Event.hpp',
'zeus/message/Flow.hpp',
'zeus/message/Progress.hpp',
'zeus/message/ParamType.hpp',
'zeus/message/type.hpp',
])

View File

@ -305,7 +305,7 @@ int main(int _argc, const char *_argv[]) {
} else {
elog::flush();
std::this_thread::sleep_for(std::chrono::seconds(1));
APPL_INFO("service in waiting ... " << iii << "/inf");
APPL_INFO("gateway in waiting ... " << iii << "/inf");
}
iii++;
}

View File

@ -48,6 +48,10 @@ void appl::ClientProperty::fromJson(ejson::Object _obj) {
m_port = _obj["port"].toNumber().getU64();
}
void appl::ClientProperty::disconnect() {
m_connection.disconnect();
}
void appl::ClientProperty::connect() {
if (m_connection.isAlive() == true) {
m_connection.pingIsAlive();

View File

@ -80,6 +80,7 @@ void appl::widget::Connection::onCallbackButtonValidate() {
APPL_INFO("Connect with : '" << m_login << "' ... '" << m_password << "'");
m_baseProperty->setLogin(m_login);
m_baseProperty->setPassword(m_password);
m_baseProperty->disconnect();
m_baseProperty->connect();
if (m_baseProperty->getConnection().isAlive() == false) {
APPL_ERROR(" ==> NOT Authentify to '" << m_baseProperty->getLogin() << "'");

View File

@ -101,13 +101,13 @@ void appl::ElementProperty::loadData() {
m_widget->markToRedraw();
return true;
}
int32_t nbCallOfMetaData = 7;
_media.getMetadata("title")
.andElse([=](const std::string& _error, const std::string& _help) mutable {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -123,7 +123,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -134,7 +134,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -150,7 +150,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -161,7 +161,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -177,7 +177,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -188,7 +188,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -204,7 +204,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -215,7 +215,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -231,7 +231,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -242,7 +242,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -258,7 +258,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -269,7 +269,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -285,7 +285,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -296,7 +296,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -319,7 +319,7 @@ void appl::ElementProperty::loadData() {
{
std::unique_lock<std::mutex> lock(tmpProperty->m_mutex);
tmpProperty->m_nbElementLoaded++;
if (tmpProperty->m_nbElementLoaded >= 8) {
if (tmpProperty->m_nbElementLoaded >= nbCallOfMetaData) {
tmpProperty->m_metadataUpdated = appl::statusLoadingData::done;
}
}
@ -646,7 +646,7 @@ void appl::ElementDisplayed::generateDisplay(vec2 _startPos, vec2 _size) {
std::string textToDisplay;
if (m_property != nullptr) {
if (m_property->LoadDataEnded() == false) {
textToDisplay += "<br/><i>Loading in progress</i>";
textToDisplay += "<br/><i>Loading in progress</i> ... " + etk::to_string(m_property->m_nbElementLoaded) + "/8";
} else {
std::unique_lock<std::mutex> lock(m_property->m_mutex);
//m_text.setClipping(drawClippingPos, drawClippingSize);

View File

@ -35,7 +35,7 @@ namespace appl {
m_nbElementLoaded(0) {
}
protected:
public:
enum appl::statusLoadingData m_metadataUpdated; //!< Check value to know when metadata is getted (like thumb ...)
uint32_t m_nbElementLoaded; //!< this cont the number of lement loaded to set tle media full loaded
public:

View File

@ -343,17 +343,17 @@ namespace appl {
if (_sqlLikeRequest == "") {
throw std::invalid_argument("empty request");
}
APPL_INFO("check : " << _sqlLikeRequest);
APPL_DEBUG("check : " << _sqlLikeRequest);
std::vector<std::vector<std::string>> listAndParsed = interpreteSQLRequest(_sqlLikeRequest);
std::unique_lock<std::mutex> lock(g_mutex);
for (auto &it : m_listFile) {
if (it == nullptr) {
continue;
}
APPL_INFO(" [" << it->getUniqueId() << " list=" << mapToString(it->getMetadataDirect()));
APPL_DEBUG(" [" << it->getUniqueId() << " list=" << mapToString(it->getMetadataDirect()));
bool isCorrectElement = isValid(listAndParsed, it->getMetadataDirect());
if (isCorrectElement == true) {
APPL_INFO(" select");
APPL_DEBUG(" select");
out.push_back(it->getUniqueId());
}
}

View File

@ -192,19 +192,22 @@ zeus::Future<std::vector<std::string>> zeus::Client::getServiceList() {
}
// TODO : This is an active waiting ==> this is bad ... ==> use future, it will be better
bool zeus::Client::waitForService(const std::string& _serviceName) {
int32_t delayMax = 10;
while (delayMax > 0) {
bool zeus::Client::waitForService(const std::string& _serviceName, echrono::Duration _delta) {
echrono::Steady start = echrono::Steady::now();
while (echrono::Steady::now() - start < _delta) {
auto listValues = getServiceList();
listValues.wait();
listValues.waitFor(echrono::seconds(1));
if (listValues.hasError() == true) {
ZEUS_ERROR("Wait for service (get list service) timeout ... ==> " << listValues.getErrorType() << " help=" << listValues.getErrorHelp());
return false;
}
for (auto &it: listValues.get()) {
if (it == _serviceName) {
return true;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
delayMax--;
}
ZEUS_ERROR("Wait for service timeout ...");
return false;
}

View File

@ -165,7 +165,7 @@ namespace zeus {
zeus::Future<int32_t> getServiceCount();
zeus::Future<std::vector<std::string>> getServiceList();
// TODO : This is an active waiting ==> this is bad ... ==> use future, it will be better
bool waitForService(const std::string& _serviceName);
bool waitForService(const std::string& _serviceName, echrono::Duration _delta = echrono::seconds(1));
};
}

View File

@ -43,26 +43,50 @@ namespace zeus {
ZEUS_RETURN get() {
return zeus::futureGetValue<ZEUS_RETURN>(m_promise);
}
/**
* @brief Wait the end of the Future processing.
* @note A time out of 5 second is automaticly set...
*/
const Future<ZEUS_RETURN, ZEUS_EVENT>& wait() const {
zeus::FutureBase::wait();
return *this;
}
/**
* @brief Wait the end of the Future processing.
* @note A time out of 5 second is automaticly set...
*/
Future<ZEUS_RETURN, ZEUS_EVENT>& wait() {
zeus::FutureBase::wait();
return *this;
}
/**
* @brief Wait the end of the Future processing for a specific time
* @param[in] _delta Duration to wait the end of the processing (default 30 seconds)
*/
const Future<ZEUS_RETURN, ZEUS_EVENT>& waitFor(echrono::Duration _delta = echrono::seconds(30)) const {
zeus::FutureBase::waitFor(_delta);
return *this;
}
/**
* @brief Wait the end of the Future processing for a specific time
* @param[in] _delta Duration to wait the end of the processing (default 30 seconds)
*/
Future<ZEUS_RETURN, ZEUS_EVENT>& waitFor(echrono::Duration _delta = echrono::seconds(30)) {
zeus::FutureBase::waitFor(_delta);
return *this;
}
/**
* @brief Wait the end of the Future processing while a specific time is not reach.
* @param[in] _endTime The time where the wait will stop if the process is not ended.
*/
const Future<ZEUS_RETURN, ZEUS_EVENT>& waitUntil(echrono::Steady _endTime) const {
zeus::FutureBase::waitUntil(_endTime);
return *this;
}
/**
* @brief Wait the end of the Future processing while a specific time is not reach.
* @param[in] _endTime The time where the wait will stop if the process is not ended.
*/
Future<ZEUS_RETURN, ZEUS_EVENT>& waitUntil(echrono::Steady _endTime) {
zeus::FutureBase::waitUntil(_endTime);
return *this;
@ -75,16 +99,15 @@ namespace zeus {
Future<ZEUS_RETURN, ZEUS_EVENT>& andAll(ObserverFut _callback) {
zeus::FutureBase::andAll(
[=](zeus::FutureBase _fut) {
return _callback(zeus::Future<ZEUS_RETURN>(_fut));
return _callback(zeus::Future<ZEUS_RETURN, ZEUS_EVENT>(_fut));
});
return *this;
}
#if 0
/**
* @brief Attach callback on a specific return action (SUCESS)
* @param[in] _callback Handle on the function to call in case of sucess on the call
* @param[in] _callback Handle on the function to call in case of sucess on the call (return value)
*/
/*
note : this is anbigous ???
Future<ZEUS_RETURN, ZEUS_EVENT>& andThen(std::function<bool(const ZEUS_RETURN&)> _callback) {
zeus::FutureBase::andThen(
[=](zeus::FutureBase _fut) {
@ -93,35 +116,35 @@ namespace zeus {
});
return *this;
}
*/
#endif
/**
* @brief Attach callback on a specific return action (SUCESS)
* @param[in] _callback Handle on the function to call in case of sucess on the call (return value)
*/
Future<ZEUS_RETURN, ZEUS_EVENT>& andThen(std::function<bool(ZEUS_RETURN)> _callback) {
zeus::FutureBase::andThen(
[=](zeus::FutureBase _fut) {
return _callback(std::move(zeus::Future<ZEUS_RETURN>(_fut).get()));
return _callback(std::move(zeus::Future<ZEUS_RETURN, ZEUS_EVENT>(_fut).get()));
});
return *this;
}
/*
#if 0
/**
* @brief Attach callback on a specific return action (SUCESS)
* @param[in] _callback Handle on the function to call in case of sucess on the call (Future parameter)
*/
Future<ZEUS_RETURN, ZEUS_EVENT>& andThen(ObserverFut _callback) {
zeus::FutureBase::andThen(
[=](zeus::FutureBase _fut) {
return _callback(zeus::Future<ZEUS_RETURN>(_fut));
});
return *this;
}*/
/**
* @brief Attach callback on a specific return action (ERROR)
* @param[in] _callback Handle on the function to call in case of error on the call
*/
/*
Future<ZEUS_RETURN, ZEUS_EVENT>& andElse(ObserverFut _callback) {
zeus::FutureBase::andElse(
[=](zeus::FutureBase _fut) {
return _callback(zeus::Future<ZEUS_RETURN>(_fut));
return _callback(zeus::Future<ZEUS_RETURN, ZEUS_EVENT>(_fut));
});
return *this;
}
*/
#endif
/**
* @brief Attach callback on a specific return action (ERROR)
* @param[in] _callback Handle on the function to call in case of error on the call (with error parameter (ERROR type, Help string)
*/
Future<ZEUS_RETURN, ZEUS_EVENT>& andElse(std::function<bool(const std::string&, const std::string&)> _callback) {
zeus::FutureBase::andElse(
[=](zeus::FutureBase _fut) {
@ -129,17 +152,25 @@ namespace zeus {
});
return *this;
}
#if 0
/**
* @brief Attach callback on activity of the action if user set some return information
* @param[in] _callback Handle on the function to call in progress information
* @brief Attach callback on a specific return action (ERROR)
* @param[in] _callback Handle on the function to call in case of error on the call (Future parameter)
*/
// TODO: this is deprecated ...
Future<ZEUS_RETURN, ZEUS_EVENT>& onProgress(Promise::ObserverEvent _callback) {
zeus::FutureBase::onEvent(_callback);
Future<ZEUS_RETURN, ZEUS_EVENT>& andElse(ObserverFut _callback) {
zeus::FutureBase::andElse(
[=](zeus::FutureBase _fut) {
return _callback(zeus::Future<ZEUS_RETURN>(_fut));
});
return *this;
}
#endif
#if 0
/**
* @brief Attach callback on activity of the action / signal
* @param[in] _callback Handle on the function to call in progress information
*/
//template<typename = std::enable_if<std::is_void<ZEUS_EVENT>::value, false>>
/*
Future<ZEUS_RETURN, ZEUS_EVENT>& onSignal(std::function<void(const ZEUS_EVENT&)> _callback) {
zeus::FutureBase::onEvent(
[=](ememory::SharedPtr<zeus::message::Event> _msg) {
@ -150,7 +181,11 @@ namespace zeus {
});
return *this;
}
*/
#endif
/**
* @brief Attach callback on activity of the action / signal
* @param[in] _callback Handle on the function to call in progress information
*/
//template<typename = std::enable_if<std::is_void<ZEUS_EVENT>::value, false>>
Future<ZEUS_RETURN, ZEUS_EVENT>& onSignal(std::function<void(ZEUS_EVENT)> _callback) {
zeus::FutureBase::onEvent(
@ -186,26 +221,51 @@ namespace zeus {
m_promise = _base.m_promise;
return *this;
}
// TODO : Fusion Wait and Wait For ...
/**
* @brief Wait the end of the Future processing.
* @note A time out of 5 second is automaticly set...
*/
const Future<void, ZEUS_EVENT>& wait() const {
zeus::FutureBase::wait();
return *this;
}
/**
* @brief Wait the end of the Future processing.
* @note A time out of 5 second is automaticly set...
*/
Future<void, ZEUS_EVENT>& wait() {
zeus::FutureBase::wait();
return *this;
}
/**
* @brief Wait the end of the Future processing for a specific time
* @param[in] _delta Duration to wait the end of the processing (default 30 seconds)
*/
const Future<void, ZEUS_EVENT>& waitFor(echrono::Duration _delta = echrono::seconds(30)) const {
zeus::FutureBase::waitFor(_delta);
return *this;
}
/**
* @brief Wait the end of the Future processing for a specific time
* @param[in] _delta Duration to wait the end of the processing (default 30 seconds)
*/
Future<void, ZEUS_EVENT>& waitFor(echrono::Duration _delta = echrono::seconds(30)) {
zeus::FutureBase::waitFor(_delta);
return *this;
}
/**
* @brief Wait the end of the Future processing while a specific time is not reach.
* @param[in] _endTime The time where the wait will stop if the process is not ended.
*/
const Future<void, ZEUS_EVENT>& waitUntil(echrono::Steady _endTime) const {
zeus::FutureBase::waitUntil(_endTime);
return *this;
}
/**
* @brief Wait the end of the Future processing while a specific time is not reach.
* @param[in] _endTime The time where the wait will stop if the process is not ended.
*/
Future<void, ZEUS_EVENT>& waitUntil(echrono::Steady _endTime) {
zeus::FutureBase::waitUntil(_endTime);
return *this;
@ -218,23 +278,27 @@ namespace zeus {
Future<void, ZEUS_EVENT>& andAll(ObserverFut _callback) {
zeus::FutureBase::andAll(
[=](zeus::FutureBase _fut) {
return _callback(zeus::Future<void>(_fut));
return _callback(zeus::Future<void, ZEUS_EVENT>(_fut));
});
return *this;
}
#if 0
/**
* @brief Attach callback on a specific return action (SUCESS)
* @param[in] _callback Handle on the function to call in case of sucess on the call
* @param[in] _callback Handle on the function to call in case of sucess on the call (Future parameter)
*/
/*
Future<void, ZEUS_EVENT>& andThen(ObserverFut _callback) {
zeus::FutureBase::andThen(
[=](zeus::FutureBase _fut) {
return _callback(zeus::Future<void>(_fut));
return _callback(zeus::Future<void, ZEUS_EVENT>(_fut));
});
return *this;
}
*/
#endif
/**
* @brief Attach callback on a specific return action (SUCESS)
* @param[in] _callback Handle on the function to call in case of sucess on the call (void parameter)
*/
Future<void, ZEUS_EVENT>& andThen(std::function<bool()> _callback) {
zeus::FutureBase::andThen(
[=](zeus::FutureBase _fut) {
@ -242,19 +306,23 @@ namespace zeus {
});
return *this;
}
#if 0
/**
* @brief Attach callback on a specific return action (ERROR)
* @param[in] _callback Handle on the function to call in case of error on the call
* @param[in] _callback Handle on the function to call in case of error on the call (Future parameter)
*/
/*
Future<void, ZEUS_EVENT>& andElse(ObserverFut _callback) {
zeus::FutureBase::andElse(
[=](zeus::FutureBase _fut) {
return _callback(zeus::Future<void>(_fut));
return _callback(zeus::Future<void, ZEUS_EVENT>(_fut));
});
return *this;
}
*/
#endif
/**
* @brief Attach callback on a specific return action (ERROR)
* @param[in] _callback Handle on the function to call in case of error on the call (with error parameter (ERROR type, Help string)
*/
Future<void, ZEUS_EVENT>& andElse(std::function<bool(const std::string&, const std::string&)> _callback) {
zeus::FutureBase::andElse(
[=](zeus::FutureBase _fut) {
@ -262,16 +330,28 @@ namespace zeus {
});
return *this;
}
#if 0
/**
* @brief Attach callback on activity of the action if user set some return information
* @brief Attach callback on activity of the action / signal
* @param[in] _callback Handle on the function to call in progress information
*/
/*
Future<void, ZEUS_EVENT>& onEvent(Promise::ObserverEvent _callback) {
zeus::FutureBase::onEvent(_callback);
//template<typename = std::enable_if<std::is_void<ZEUS_EVENT>::value, false>>
Future<void, ZEUS_EVENT>& onSignal(std::function<void(const ZEUS_EVENT&)> _callback) {
zeus::FutureBase::onEvent(
[=](ememory::SharedPtr<zeus::message::Event> _msg) {
if (_msg == nullptr) {
return;
}
_callback(_msg->getEvent<ZEUS_EVENT>());
});
return *this;
}
*/
#endif
/**
* @brief Attach callback on activity of the action / signal
* @param[in] _callback Handle on the function to call in progress information
*/
//template<typename = std::enable_if<std::is_void<ZEUS_EVENT>::value, false>>
Future<void, ZEUS_EVENT>& onSignal(std::function<void(ZEUS_EVENT)> _callback) {
zeus::FutureBase::onEvent(
[=](ememory::SharedPtr<zeus::message::Event> _msg) {

View File

@ -15,8 +15,12 @@ namespace zeus {
// TODO : Add the posiboilities to have a andFinished()
class FutureGroup {
private:
std::vector<zeus::FutureBase> m_listFuture;
std::vector<zeus::FutureBase> m_listFuture; //!< List of all Future that we need to wait the end.
public:
/**
* @brief Add an other Future to wait the end.
* @param[in] _fut Future to add.
*/
void add(const zeus::FutureBase& _fut);
/**
* @brief Wait the Future receive data

View File

@ -29,7 +29,7 @@ zeus::ObjectRemoteBase::~ObjectRemoteBase() {
ZEUS_VERBOSE("[" << m_id << "/" << m_objectId << "] DESTROY => to remote [" << (m_remoteAddress>>16) << "/" << (m_remoteAddress&0xFFFF) << "]");
if (m_isLinked == true) {
zeus::Future<bool> ret = m_interfaceWeb->call(getFullId(), m_remoteAddress&0xFFFF0000, "unlink", m_remoteAddress);
ret.wait();
ret.waitFor(echrono::seconds(1));
if (ret.hasError() == true) {
ZEUS_WARNING("return call error: " << ret.getErrorType() << " help:" << ret.getErrorHelp());
ZEUS_WARNING("Can not unlink with the object id: " << (m_remoteAddress>>16) << "/" << (m_remoteAddress&0xFFFF) << " ==> link error");

View File

@ -230,7 +230,8 @@ void zeus::Promise::waitFor(echrono::Duration _delta) const {
std::this_thread::sleep_for(echrono::milliseconds(10));
}
if (isFinished() == false) {
ZEUS_WARNING("Wait timeout ...");
ZEUS_WARNING("Wait timeout ... " << _delta);
//elog::displayBacktrace();
}
}
@ -241,7 +242,8 @@ void zeus::Promise::waitUntil(echrono::Steady _endTime) const {
std::this_thread::sleep_for(echrono::milliseconds(10));
}
if (isFinished() == false) {
ZEUS_WARNING("Wait timeout ...");
ZEUS_WARNING("Wait timeout ..." << _endTime);
//elog::displayBacktrace();
}
}

View File

@ -8,7 +8,6 @@
#include <zeus/message/Answer.hpp>
#include <zeus/message/Event.hpp>
#include <zeus/message/Call.hpp>
#include <zeus/message/Progress.hpp>
#include <enet/WebSocket.hpp>
#include <thread>
#include <ememory/memory.hpp>
@ -30,9 +29,13 @@ namespace zeus {
class WebServer;
class ObjectRemoteBase;
/**
* @brief
* @param[in]
* @return
* @brief Create a basic call message with all the basic information needed
* @param[in] _iface WebServer interface handle
* @param[in] _transactionId UniqueId of the transaction (must be != 0)
* @param[in] _source Unique local zeus network adress of a service or an Object (the source of the message address)
* @param[in] _destination Unique local zeus network adress of a service or an Object (the destination of the message address)
* @param[in] _functionName The name of the function to call
* @return handle on the new message to call on the remote object
*/
ememory::SharedPtr<zeus::message::Call> createBaseCall(const ememory::SharedPtr<zeus::WebServer>& _iface,
uint64_t _transactionId,
@ -40,17 +43,19 @@ namespace zeus {
const uint32_t& _destination,
const std::string& _functionName);
/**
* @brief
* @param[in]
* @return
* @brief This is the last call of createParam recursive function (no more parameter to add)
* @param[in] _parmaId Id of the parameter to add.
* @param[in] _obj message where to add the parameter.
*/
void createParam(int32_t _paramId,
ememory::SharedPtr<zeus::message::Call> _obj);
/**
* @brief
* @param[in]
* @return
* @brief Template to add a parameter of a function in recursive form
* @param[in] _parmaId Id of the parameter to add.
* @param[in] _obj message where to add the parameter.
* @param[in] _param Parameter value to add.
* @param[in] _args... other argument to add (in recursive call)
*/
template<class ZEUS_TYPE, class... _ARGS>
void createParam(int32_t _paramId,
@ -62,11 +67,12 @@ namespace zeus {
createParam(_paramId, _obj, std::forward<_ARGS>(_args)...);
}
/**
* @brief
* @param[in]
* @return
* @brief Template specialization in 'const char*' to add a parameter of a function in recursive form
* @param[in] _parmaId Id of the parameter to add.
* @param[in] _obj message where to add the parameter.
* @param[in] _param Parameter value to add (char* that is converted in std::string).
* @param[in] _args... other argument to add (in recursive call)
*/
// convert const char in std::string ...
template<class... _ARGS>
void createParam(int32_t _paramId,
ememory::SharedPtr<zeus::message::Call> _obj,
@ -75,9 +81,14 @@ namespace zeus {
createParam(_paramId, _obj, std::string(_param), std::forward<_ARGS>(_args)...);
}
/**
* @brief
* @param[in]
* @return
* @brieftemplate to create a ZEUS CALL message with all the parameter in arguments
* @param[in] _iface WebServer interface handle
* @param[in] _transactionId UniqueId of the transaction (must be != 0)
* @param[in] _source Unique local zeus network adress of a service or an Object (the source of the message address)
* @param[in] _destination Unique local zeus network adress of a service or an Object (the destination of the message address)
* @param[in] _functionName The name of the function to call
* @param[in] _args... argument of the call to do
* @return handle on the new message to call on the remote object
*/
template<class... _ARGS>
ememory::SharedPtr<zeus::message::Call> createCall(const ememory::SharedPtr<zeus::WebServer>& _iface,
@ -94,19 +105,19 @@ namespace zeus {
return callElem;
}
/**
* @brief
* @brief Web interface of a service engine
*/
class WebServer : public ememory::EnableSharedFromThis<zeus::WebServer> {
protected:
std::mutex m_mutex;
std::mutex m_mutex; //!< main interface lock
public:
std::vector<ememory::SharedPtr<zeus::WebObj>> m_actifObject; //!< List of all active object created and that remove is in progress ...
private:
enet::WebSocket m_connection;
ethread::Pool m_processingPool;
std::vector<ememory::SharedPtr<zeus::Message>> m_listPartialMessage;
uint16_t m_localAddress;
uint16_t m_localIdObjectIncrement; //!< attribute an unique ID for an object
enet::WebSocket m_connection; //!< Zeus protocol is based on a webSocket to be compatible with Java-script
ethread::Pool m_processingPool; //!< Thread pool processing of the input data
std::vector<ememory::SharedPtr<zeus::Message>> m_listPartialMessage; //!< list of all message that data has not finished to arrive.
uint16_t m_localAddress; //!< Local client address.
uint16_t m_localIdObjectIncrement; //!< attribute an unique ID for an object.
public:
uint16_t getAddress() const {
return m_localAddress;
@ -118,8 +129,8 @@ namespace zeus {
return m_localIdObjectIncrement++;
}
private:
std::vector<ememory::SharedPtr<zeus::WebObj>> m_listObject;
std::vector<ememory::WeakPtr<zeus::ObjectRemoteBase>> m_listRemoteObject;
std::vector<ememory::SharedPtr<zeus::WebObj>> m_listObject; //!< List of all local object that is reference in the system.
std::vector<ememory::WeakPtr<zeus::ObjectRemoteBase>> m_listRemoteObject; //!< List of all object that we have a reference in the local interface.
public:
void addWebObj(ememory::SharedPtr<zeus::WebObj> _obj);
void addWebObjRemote(ememory::SharedPtr<zeus::ObjectRemoteBase> _obj);

View File

@ -14,7 +14,6 @@
#include <zeus/message/Call.hpp>
#include <zeus/message/Data.hpp>
#include <zeus/message/Flow.hpp>
#include <zeus/message/Progress.hpp>
#include <zeus/message/Event.hpp>
#include <zeus/WebServer.hpp>
@ -100,9 +99,6 @@ void zeus::Message::generateDisplay(std::ostream& _os) const {
case zeus::message::type::data:
_os << " -DATA-";
break;
case zeus::message::type::progress:
_os << " -PROGRESS-";
break;
}
}
@ -270,22 +266,6 @@ ememory::SharedPtr<zeus::Message> zeus::Message::create(ememory::SharedPtr<zeus:
return value;
}
break;
case zeus::message::type::progress: {
ememory::SharedPtr<zeus::message::Progress> value = zeus::message::Progress::create(_iface);
if (value == nullptr) {
return nullptr;
}
value->setTransactionId(header.transactionId);
value->setSourceId(header.sourceId);
value->setSourceObjectId(header.sourceObjectId);
value->setDestinationId(header.destinationId);
value->setDestinationObjectId(header.destinationObjectId);
value->setPartFinish((header.flags & ZEUS_BUFFER_FLAG_FINISH) != 0);
value->composeWith(&_buffer[sizeof(zeus::message::headerBin)],
_buffer.size() - sizeof(zeus::message::headerBin));
return value;
}
break;
}
return nullptr;
}

View File

@ -104,11 +104,46 @@ namespace zeus {
#define ZEUS_BUFFER_FLAG_TYPE_MESSAGE (0x07)
/**
* @brief Protocol buffer to transmit datas
* @note: A simple shematics to understand what is the order of messaging
* <pre>
* client routeur Gateway service/Object
* | | | |
* | . . |
* | |
* o------------CALL XXX (id=666)(part 0) ----->o |
* | | | |
* | o-------DATA (id=666)(part 1) --------->| |
* | | | |
* | o-------DATA (id=666)(part 2) --------->| |
* | | ... | |
* | o-------DATA (id=666)(part n) [end] --->o---->*-*
* | | |
* | | | P
* | | | R
* |<--------- EVENT (id=666) ------------------------| | O
* | | | C
* | | | E
* |<--------- EVENT (id=666) ------------------------| | S
* | | | S
* | | | I
* | | | N
* | | | G
* | | |
* | o<------ANSWER XXX (id=666)(part 0) ----------*-*
* | | | |
* | |<------DATA (id=666)(part 1) ----------o |
* | | | |
* | |<------DATA (id=666)(part 2) ----------o |
* | | | |
* |<---o<------DATA (id=666)(part m) [end] ----o |
* | |
* | |
* </pre>
*/
class Message {
friend std::ostream& operator<<(std::ostream&, zeus::Message*);
protected:
ememory::SharedPtr<zeus::WebServer> m_iface;
ememory::SharedPtr<zeus::WebServer> m_iface; //!< link to the interface
protected:
/**
* @brief basic constructor (hidden to force the use of ememory::SharedPtr) @ref zeus::Message::create

View File

@ -1,56 +0,0 @@
/** @file
* @author Edouard DUPIN
* @copyright 2016, Edouard DUPIN, all right reserved
* @license MPL v2.0 (see license file)
*/
#include <etk/types.hpp>
#include <zeus/message/Message.hpp>
#include <zeus/debug.hpp>
#include <etk/stdTools.hpp>
#include <zeus/message/Progress.hpp>
void zeus::message::Progress::generateDisplay(std::ostream& _os) const {
zeus::Message::generateDisplay(_os);
_os << m_data;
}
const std::string& zeus::message::Progress::getData() const {
return m_data;
}
void zeus::message::Progress::setData(const std::string& _data) {
m_data = _data;
}
bool zeus::message::Progress::writeOn(enet::WebSocket& _interface) {
std::unique_lock<std::mutex> lock = _interface.getScopeLock();
zeus::Message::writeOn(_interface);
_interface.writeData((uint8_t*)m_data.c_str(), m_data.size() + 1);
int32_t count = _interface.send();
return count > 0;
}
void zeus::message::Progress::composeWith(const uint8_t* _buffer, uint32_t _lenght) {
// First element iw the call name, after, this is the parameters...
// parse the string: (call name)
uint32_t pos = 0;
m_data.clear();
while( pos < _lenght
&& _buffer[pos] != '\0') {
m_data += _buffer[pos];
pos++;
}
}
void zeus::message::Progress::appendMessageData(ememory::SharedPtr<zeus::message::Data> _obj) {
ZEUS_ERROR("can not append data at a progress message ...");
}
// ------------------------------------------------------------------------------------
// -- Factory
// ------------------------------------------------------------------------------------
ememory::SharedPtr<zeus::message::Progress> zeus::message::Progress::create(ememory::SharedPtr<zeus::WebServer> _iface) {
return ememory::SharedPtr<zeus::message::Progress>(new zeus::message::Progress(_iface));
}

View File

@ -1,53 +0,0 @@
/** @file
* @author Edouard DUPIN
* @copyright 2016, Edouard DUPIN, all right reserved
* @license MPL v2.0 (see license file)
*/
#pragma once
#include <etk/types.hpp>
#include <enet/WebSocket.hpp>
#include <zeus/message/Message.hpp>
namespace zeus {
class WebServer;
namespace message {
class Progress :
public zeus::Message {
friend class zeus::Message;
protected:
std::string m_data;
protected:
/**
* @brief basic constructor (hidden to force the use of ememory::SharedPtr) @ref zeus::message::Answer::create
*/
Progress(ememory::SharedPtr<zeus::WebServer> _iface):
zeus::Message(_iface) {
m_header.flags = uint8_t(zeus::message::type::progress);
};
void composeWith(const uint8_t* _buffer, uint32_t _lenght) override;
void appendMessageData(ememory::SharedPtr<zeus::message::Data> _obj) override;
bool writeOn(enet::WebSocket& _interface) override;
void generateDisplay(std::ostream& _os) const override;
public:
/**
* @brief Create a shared pointer on the Answer
* @return Allocated Message.
*/
static ememory::SharedPtr<zeus::message::Progress> create(ememory::SharedPtr<zeus::WebServer> _iface);
public:
enum zeus::message::type getType() const override {
return zeus::message::type::progress;
}
/**
* @brief progress message answer
* @param[in] _data Data of the progress
*/
void setData(const std::string& _data);
/**
* @brief get the error value (if exist)
* @return string of the error
*/
const std::string& getData() const;
};
}
}

View File

@ -20,8 +20,6 @@ namespace etk {
return "event";
case zeus::message::type::data:
return "data";
case zeus::message::type::progress:
return "progress";
}
return "???";
}
@ -43,8 +41,6 @@ enum zeus::message::type zeus::message::getTypeFromInt(uint16_t _value) {
return zeus::message::type::data;
case 4:
return zeus::message::type::event;
case 5:
return zeus::message::type::progress;
}
return zeus::message::type::unknow;
}

View File

@ -14,12 +14,11 @@ namespace zeus {
* @brief Type of the massage send or receive
*/
enum class type {
unknow = 0x0000, //!< Init value
unknow = 0x0000, //!< Init / Unknow value
call = 0x0001, //!< Remote call on a service ID
answer = 0x0002, //!< Answer from a previous call
data = 0x0003, //!< data message happend when partId > 0 it compleate the data of a parameter or an answer or an event
event = 0x0004, //!< Message in one way (no return is waiting and the message has no garenty...)
progress = 0x0005, //!< Message that prevent a future of a progression of an action
};
/**
* @brief generate a display of the typemessage
@ -28,7 +27,11 @@ namespace zeus {
* @return a reference of the stream
*/
std::ostream& operator <<(std::ostream& _os, enum zeus::message::type _value);
/**
* @brief convert the type in a correct enumeration value
* @param[in] _value Value of type to convert
* @return the correspondent enumeration type of the message
*/
enum zeus::message::type getTypeFromInt(uint16_t _value);
}
}