diff --git a/test/client/appl/main.cpp b/test/client/appl/main.cpp index cc44306..362672b 100644 --- a/test/client/appl/main.cpp +++ b/test/client/appl/main.cpp @@ -190,10 +190,8 @@ int main(int _argc, const char *_argv[]) { #endif #if 1 echrono::Steady start = echrono::Steady::now(); - //zeus::File tmp("./testzz.png"); - // TODO : Read file size before .. - int32_t size = 1024; ememory::SharedPtr tmp = zeus::File::create("./tmpResult.bmp"); + int32_t size = tmp->getSize(); auto retSendImage = remoteServicePicture.addFile(tmp).wait(); echrono::Steady stop = echrono::Steady::now(); APPL_WARNING(" IO*=" << (stop-start) << " " << retSendImage.get()); diff --git a/tools/service-picture/appl/main.cpp b/tools/service-picture/appl/main.cpp index bd1b6db..071edcc 100644 --- a/tools/service-picture/appl/main.cpp +++ b/tools/service-picture/appl/main.cpp @@ -214,7 +214,7 @@ namespace appl { int64_t retSize = futSize.get(); int64_t offset = 0; while (retSize > 0) { - int32_t nbElement = 4096; + int32_t nbElement = 1*1024*1024; if (retSize& _data, const std::string _type); - /** - * @brief Add a parameter object type in the buffer - * @param[in] _data Buffer to add type - * @param[in] _type string of the type to add - */ - void addTypeService(std::vector& _data, const std::string _type); - /** - * @brief Add a parameter RAW type in the buffer - * @param[in] _data Buffer to add type - */ - void addTypeRaw(std::vector& _data); } diff --git a/zeus/BufferParameter.cpp b/zeus/BufferParameter.cpp index dc43148..8ab790b 100644 --- a/zeus/BufferParameter.cpp +++ b/zeus/BufferParameter.cpp @@ -98,11 +98,7 @@ zeus::ParamType zeus::BufferParameter::getParameterType(int32_t _id) const { if (typeId == createType>().getId()) { m_parameter[_id].first = 2; return createType>(); } if (typeId == createType>().getId()) { m_parameter[_id].first = 2; return createType>(); } if (typeId == createType>().getId()) { m_parameter[_id].first = 2; return createType>(); } - //if (typeId == createType().getId()) { m_parameter[_id].first = 2; return createType(); } - if (typeId == paramTypeRaw) { - m_parameter[_id].first = sizeof(uint16_t); - return zeus::ParamType("raw", paramTypeRaw); - } + if (typeId == createType().getId()) { m_parameter[_id].first = 2; return createType(); } if (typeId == paramTypeObject) { const char* tmp = reinterpret_cast(&m_parameter[_id].second[2]); bool find = false; diff --git a/zeus/BufferParameter_addParameter.cpp b/zeus/BufferParameter_addParameter.cpp index 4dd0d6d..e95df9d 100644 --- a/zeus/BufferParameter_addParameter.cpp +++ b/zeus/BufferParameter_addParameter.cpp @@ -30,11 +30,6 @@ void zeus::addTypeObject(std::vector& _data, const std::string _type) { _data.push_back(0); } -void zeus::addTypeRaw(std::vector& _data) { - _data.push_back(uint8_t(zeus::paramTypeRaw>>8)); - _data.push_back(uint8_t(zeus::paramTypeRaw)); -} - void zeus::BufferParameter::addParameter() { std::vector data; diff --git a/zeus/FutureBase.cpp b/zeus/FutureBase.cpp index 91d27bd..c43dd5d 100644 --- a/zeus/FutureBase.cpp +++ b/zeus/FutureBase.cpp @@ -25,7 +25,6 @@ zeus::FutureBase::FutureBase(uint32_t _transactionId, uint32_t _source) { m_data->m_sendTime = std::chrono::steady_clock::now(); m_data->m_transactionId = _transactionId; m_data->m_source = _source; - m_data->m_isSynchronous = false; } ememory::SharedPtr zeus::FutureBase::getRaw() { @@ -43,7 +42,6 @@ zeus::FutureBase::FutureBase(uint32_t _transactionId, ememory::SharedPtrm_sendTime = std::chrono::steady_clock::now(); m_data->m_transactionId = _transactionId; m_data->m_source = _source; - m_data->m_isSynchronous = false; m_data->m_returnData = _returnData; if (isFinished() == true) { m_data->m_receiveTime = std::chrono::steady_clock::now(); @@ -123,54 +121,30 @@ zeus::FutureBase zeus::FutureBase::operator= (const zeus::FutureBase& _base) { return *this; } -bool zeus::FutureBase::appendData(ememory::SharedPtr _value) { +bool zeus::FutureBase::setBuffer(ememory::SharedPtr _value) { if (m_data == nullptr) { ZEUS_ERROR(" Not a valid future ..."); return true; } m_data->m_receiveTime = std::chrono::steady_clock::now(); - if (m_data->m_isSynchronous == true) { - m_data->m_returnData = _value; - if (hasError() == false) { - if (m_data->m_callbackThen != nullptr) { - return m_data->m_callbackThen(*this); - } - } else { - if (m_data->m_callbackElse != nullptr) { - return m_data->m_callbackElse(*this); - } - } - return true; - } - if (_value->getType() == zeus::Buffer::typeMessage::data) { - if (m_data->m_returnData != nullptr) { - m_data->m_returnData->appendBuffer(_value); - } - } else { - m_data->m_returnData = _value; - } + m_data->m_returnData = _value; if (m_data->m_returnData == nullptr) { return true; } - if (m_data->m_returnData->getPartFinish() == true) { - if (hasError() == false) { - if (m_data->m_callbackThen != nullptr) { - return m_data->m_callbackThen(*this); - } - } else { - if (m_data->m_callbackElse != nullptr) { - return m_data->m_callbackElse(*this); - } + if (m_data->m_returnData->getPartFinish() == false) { + ZEUS_ERROR("set buffer that is not finished ..."); + return false; + } + if (hasError() == false) { + if (m_data->m_callbackThen != nullptr) { + return m_data->m_callbackThen(*this); + } + } else { + if (m_data->m_callbackElse != nullptr) { + return m_data->m_callbackElse(*this); } - return true; } - return false; -} -void zeus::FutureBase::setSynchronous() { - if (m_data == nullptr) { - return; - } - m_data->m_isSynchronous = true; + return true; } uint32_t zeus::FutureBase::getTransactionId() const { diff --git a/zeus/FutureBase.hpp b/zeus/FutureBase.hpp index 7552e12..46a5ffa 100644 --- a/zeus/FutureBase.hpp +++ b/zeus/FutureBase.hpp @@ -71,12 +71,7 @@ namespace zeus { * @param[in] _returnValue Returned buffer * @return return true if an error occured */ - bool appendData(ememory::SharedPtr _returnValue); - /** - * @brief Set the future syncronous - * @note this mean that the system call the observer every time a packet arrive in the Future - */ - void setSynchronous(); + bool setBuffer(ememory::SharedPtr _returnValue); /** * @brief Get the transaction Id of the Future * @return Transaction Id requested or 0 diff --git a/zeus/FutureData.hpp b/zeus/FutureData.hpp index 6e74915..51be81f 100644 --- a/zeus/FutureData.hpp +++ b/zeus/FutureData.hpp @@ -23,7 +23,6 @@ namespace zeus { public: uint32_t m_transactionId; //!< waiting answer data uint32_t m_source; //!< Source of the message. - bool m_isSynchronous; //!< the future is synchronous. (call when receive data) ememory::SharedPtr m_returnData; //!< all buffer concatenate or last buffer if synchronous Observer m_callbackThen; //!< observer callback When data arrive and NO error appear Observer m_callbackElse; //!< observer callback When data arrive and AN error appear diff --git a/zeus/Object.cpp b/zeus/Object.cpp index 4c36918..b261f5d 100644 --- a/zeus/Object.cpp +++ b/zeus/Object.cpp @@ -30,25 +30,6 @@ void zeus::Object::receive(ememory::SharedPtr _value) { ZEUS_WARNING("BUFFER" << _value); uint32_t tmpID = _value->getTransactionId(); uint32_t source = _value->getSource(); - if (_value->getType() == zeus::Buffer::typeMessage::data) { - auto it = m_callMultiData.begin(); - while (it != m_callMultiData.end()) { - if ( it->getTransactionId() == tmpID - && it->getSource() == source) { - ZEUS_WARNING("Append data ... " << tmpID); - it->appendData(_value); - if (it->isFinished() == true) { - ZEUS_WARNING("CALL Function ..."); - callBinary(it->getRaw()); - it = m_callMultiData.erase(it); - } - return; - } - ++it; - } - ZEUS_ERROR("Un-associated data ..."); - return; - } ZEUS_WARNING("direct call"); zeus::FutureBase futData(tmpID, _value, source); if (futData.isFinished() == true) { diff --git a/zeus/ParamType.cpp b/zeus/ParamType.cpp index e05f71e..af80562 100644 --- a/zeus/ParamType.cpp +++ b/zeus/ParamType.cpp @@ -135,5 +135,4 @@ generate_basic_type(zeus::FileServer, "file", 0x000E, false, false); #endif const uint16_t zeus::paramTypeObject = 0xFFFF; -const uint16_t zeus::paramTypeRaw = 0xFFFE; diff --git a/zeus/ParamType.hpp b/zeus/ParamType.hpp index 2417b4c..ee8a435 100644 --- a/zeus/ParamType.hpp +++ b/zeus/ParamType.hpp @@ -89,7 +89,6 @@ namespace zeus { bool isVector() const; }; extern const uint16_t paramTypeObject; //!< van not automatic create a type with the string named object - extern const uint16_t paramTypeRaw; //!< Raw type (special case of data) /** * @brief Create human readable stream to debug * @param[in] _os the stream to inject data diff --git a/zeus/WebServer.cpp b/zeus/WebServer.cpp index 4b490df..24ba0a5 100644 --- a/zeus/WebServer.cpp +++ b/zeus/WebServer.cpp @@ -248,11 +248,59 @@ void zeus::WebServer::ping() { void zeus::WebServer::newBuffer(ememory::SharedPtr _buffer) { ZEUS_LOG_INPUT_OUTPUT("Receive :" << _buffer); + // if an adress id different ... just transmit it ... + if (m_localAddress != _buffer->getDestinationId()) { + // TODO : Change the callback ... + if (m_observerElement != nullptr) { + m_processingPool.async( + [=](){ + // not a pending call ==> simple event or call ... + m_observerElement(_buffer); //!< all input arrive at the same element + }, + 8); + } + return; + } + if ( _buffer->getPartFinish() == false + && _buffer->getType() != zeus::Buffer::typeMessage::data) { + m_listPartialBuffer.push_back(_buffer); + return; + } + if (_buffer->getType() == zeus::Buffer::typeMessage::data) { + // Add data in a previous buffer... + auto it = m_listPartialBuffer.begin(); + while (it != m_listPartialBuffer.end()) { + if (*it == nullptr) { + it = m_listPartialBuffer.erase(it); + continue; + } + if ((*it)->getDestination() != _buffer->getDestination()) { + ++it; + continue; + } + if ((*it)->getTransactionId() == _buffer->getTransactionId()) { + (*it)->appendBuffer(_buffer); + if (_buffer->getPartFinish() != true) { + return; + } + (*it)->setPartFinish(true); + _buffer = *it; + it = m_listPartialBuffer.erase(it); + break; + } + } + } + if (_buffer->getPartFinish() != true) { + ZEUS_ERROR("Get a buffer with no finished data ... (remove)" << _buffer); + return; + } + // Try to find in the current call that has been done to add data in an answer : zeus::FutureBase future; uint64_t tid = _buffer->getTransactionId(); // TODO : Check the UDI reaaly utility ... - if (_buffer->getType() == zeus::Buffer::typeMessage::answer) { + if ( _buffer->getType() == zeus::Buffer::typeMessage::answer + || _buffer->getType() == zeus::Buffer::typeMessage::data) { std::unique_lock lock(m_pendingCallMutex); auto it = m_pendingCall.begin(); while (it != m_pendingCall.end()) { @@ -303,7 +351,7 @@ void zeus::WebServer::newBuffer(ememory::SharedPtr _buffer) { zeus::FutureBase fut = future; ZEUS_INFO("PROCESS FUTURE : " << _buffer); // add data ... - bool ret = fut.appendData(_buffer); + bool ret = fut.setBuffer(_buffer); if (ret == true) { std::unique_lock lock(m_pendingCallMutex); auto it = m_pendingCall.begin(); diff --git a/zeus/WebServer.hpp b/zeus/WebServer.hpp index 9463043..2d9b6f8 100644 --- a/zeus/WebServer.hpp +++ b/zeus/WebServer.hpp @@ -97,7 +97,7 @@ namespace zeus { private: enet::WebSocket m_connection; ethread::Pool m_processingPool; - + std::vector> m_listPartialBuffer; uint16_t m_localAddress; uint16_t m_licalIdObjectIncrement; //!< attribute a unique ID for an object public: