[DEV] firl transfer is OK

This commit is contained in:
Edouard DUPIN 2016-12-07 11:36:07 +01:00
parent d13a239878
commit 07dcff3647
13 changed files with 69 additions and 96 deletions

View File

@ -190,10 +190,8 @@ int main(int _argc, const char *_argv[]) {
#endif #endif
#if 1 #if 1
echrono::Steady start = echrono::Steady::now(); echrono::Steady start = echrono::Steady::now();
//zeus::File tmp("./testzz.png");
// TODO : Read file size before ..
int32_t size = 1024;
ememory::SharedPtr<zeus::File> tmp = zeus::File::create("./tmpResult.bmp"); ememory::SharedPtr<zeus::File> tmp = zeus::File::create("./tmpResult.bmp");
int32_t size = tmp->getSize();
auto retSendImage = remoteServicePicture.addFile(tmp).wait(); auto retSendImage = remoteServicePicture.addFile(tmp).wait();
echrono::Steady stop = echrono::Steady::now(); echrono::Steady stop = echrono::Steady::now();
APPL_WARNING(" IO*=" << (stop-start) << " " << retSendImage.get()); APPL_WARNING(" IO*=" << (stop-start) << " " << retSendImage.get());

View File

@ -214,7 +214,7 @@ namespace appl {
int64_t retSize = futSize.get(); int64_t retSize = futSize.get();
int64_t offset = 0; int64_t offset = 0;
while (retSize > 0) { while (retSize > 0) {
int32_t nbElement = 4096; int32_t nbElement = 1*1024*1024;
if (retSize<nbElement) { if (retSize<nbElement) {
nbElement = retSize; nbElement = retSize;
} }

View File

@ -307,16 +307,5 @@ namespace zeus {
* @param[in] _type string of the type to add * @param[in] _type string of the type to add
*/ */
void addTypeObject(std::vector<uint8_t>& _data, const std::string _type); void addTypeObject(std::vector<uint8_t>& _data, const std::string _type);
/**
* @brief Add a parameter object type in the buffer
* @param[in] _data Buffer to add type
* @param[in] _type string of the type to add
*/
void addTypeService(std::vector<uint8_t>& _data, const std::string _type);
/**
* @brief Add a parameter RAW type in the buffer
* @param[in] _data Buffer to add type
*/
void addTypeRaw(std::vector<uint8_t>& _data);
} }

View File

@ -98,11 +98,7 @@ zeus::ParamType zeus::BufferParameter::getParameterType(int32_t _id) const {
if (typeId == createType<std::vector<uint16_t>>().getId()) { m_parameter[_id].first = 2; return createType<std::vector<uint16_t>>(); } if (typeId == createType<std::vector<uint16_t>>().getId()) { m_parameter[_id].first = 2; return createType<std::vector<uint16_t>>(); }
if (typeId == createType<std::vector<uint8_t>>().getId()) { m_parameter[_id].first = 2; return createType<std::vector<uint8_t>>(); } if (typeId == createType<std::vector<uint8_t>>().getId()) { m_parameter[_id].first = 2; return createType<std::vector<uint8_t>>(); }
if (typeId == createType<std::vector<std::string>>().getId()) { m_parameter[_id].first = 2; return createType<std::vector<std::string>>(); } if (typeId == createType<std::vector<std::string>>().getId()) { m_parameter[_id].first = 2; return createType<std::vector<std::string>>(); }
//if (typeId == createType<zeus::File>().getId()) { m_parameter[_id].first = 2; return createType<zeus::File>(); } if (typeId == createType<zeus::Raw>().getId()) { m_parameter[_id].first = 2; return createType<zeus::Raw>(); }
if (typeId == paramTypeRaw) {
m_parameter[_id].first = sizeof(uint16_t);
return zeus::ParamType("raw", paramTypeRaw);
}
if (typeId == paramTypeObject) { if (typeId == paramTypeObject) {
const char* tmp = reinterpret_cast<const char*>(&m_parameter[_id].second[2]); const char* tmp = reinterpret_cast<const char*>(&m_parameter[_id].second[2]);
bool find = false; bool find = false;

View File

@ -30,11 +30,6 @@ void zeus::addTypeObject(std::vector<uint8_t>& _data, const std::string _type) {
_data.push_back(0); _data.push_back(0);
} }
void zeus::addTypeRaw(std::vector<uint8_t>& _data) {
_data.push_back(uint8_t(zeus::paramTypeRaw>>8));
_data.push_back(uint8_t(zeus::paramTypeRaw));
}
void zeus::BufferParameter::addParameter() { void zeus::BufferParameter::addParameter() {
std::vector<uint8_t> data; std::vector<uint8_t> data;

View File

@ -25,7 +25,6 @@ zeus::FutureBase::FutureBase(uint32_t _transactionId, uint32_t _source) {
m_data->m_sendTime = std::chrono::steady_clock::now(); m_data->m_sendTime = std::chrono::steady_clock::now();
m_data->m_transactionId = _transactionId; m_data->m_transactionId = _transactionId;
m_data->m_source = _source; m_data->m_source = _source;
m_data->m_isSynchronous = false;
} }
ememory::SharedPtr<zeus::Buffer> zeus::FutureBase::getRaw() { ememory::SharedPtr<zeus::Buffer> zeus::FutureBase::getRaw() {
@ -43,7 +42,6 @@ zeus::FutureBase::FutureBase(uint32_t _transactionId, ememory::SharedPtr<zeus::B
m_data->m_sendTime = std::chrono::steady_clock::now(); m_data->m_sendTime = std::chrono::steady_clock::now();
m_data->m_transactionId = _transactionId; m_data->m_transactionId = _transactionId;
m_data->m_source = _source; m_data->m_source = _source;
m_data->m_isSynchronous = false;
m_data->m_returnData = _returnData; m_data->m_returnData = _returnData;
if (isFinished() == true) { if (isFinished() == true) {
m_data->m_receiveTime = std::chrono::steady_clock::now(); m_data->m_receiveTime = std::chrono::steady_clock::now();
@ -123,54 +121,30 @@ zeus::FutureBase zeus::FutureBase::operator= (const zeus::FutureBase& _base) {
return *this; return *this;
} }
bool zeus::FutureBase::appendData(ememory::SharedPtr<zeus::Buffer> _value) { bool zeus::FutureBase::setBuffer(ememory::SharedPtr<zeus::Buffer> _value) {
if (m_data == nullptr) { if (m_data == nullptr) {
ZEUS_ERROR(" Not a valid future ..."); ZEUS_ERROR(" Not a valid future ...");
return true; return true;
} }
m_data->m_receiveTime = std::chrono::steady_clock::now(); m_data->m_receiveTime = std::chrono::steady_clock::now();
if (m_data->m_isSynchronous == true) { m_data->m_returnData = _value;
m_data->m_returnData = _value;
if (hasError() == false) {
if (m_data->m_callbackThen != nullptr) {
return m_data->m_callbackThen(*this);
}
} else {
if (m_data->m_callbackElse != nullptr) {
return m_data->m_callbackElse(*this);
}
}
return true;
}
if (_value->getType() == zeus::Buffer::typeMessage::data) {
if (m_data->m_returnData != nullptr) {
m_data->m_returnData->appendBuffer(_value);
}
} else {
m_data->m_returnData = _value;
}
if (m_data->m_returnData == nullptr) { if (m_data->m_returnData == nullptr) {
return true; return true;
} }
if (m_data->m_returnData->getPartFinish() == true) { if (m_data->m_returnData->getPartFinish() == false) {
if (hasError() == false) { ZEUS_ERROR("set buffer that is not finished ...");
if (m_data->m_callbackThen != nullptr) { return false;
return m_data->m_callbackThen(*this); }
} if (hasError() == false) {
} else { if (m_data->m_callbackThen != nullptr) {
if (m_data->m_callbackElse != nullptr) { return m_data->m_callbackThen(*this);
return m_data->m_callbackElse(*this); }
} } else {
if (m_data->m_callbackElse != nullptr) {
return m_data->m_callbackElse(*this);
} }
return true;
} }
return false; return true;
}
void zeus::FutureBase::setSynchronous() {
if (m_data == nullptr) {
return;
}
m_data->m_isSynchronous = true;
} }
uint32_t zeus::FutureBase::getTransactionId() const { uint32_t zeus::FutureBase::getTransactionId() const {

View File

@ -71,12 +71,7 @@ namespace zeus {
* @param[in] _returnValue Returned buffer * @param[in] _returnValue Returned buffer
* @return return true if an error occured * @return return true if an error occured
*/ */
bool appendData(ememory::SharedPtr<zeus::Buffer> _returnValue); bool setBuffer(ememory::SharedPtr<zeus::Buffer> _returnValue);
/**
* @brief Set the future syncronous
* @note this mean that the system call the observer every time a packet arrive in the Future
*/
void setSynchronous();
/** /**
* @brief Get the transaction Id of the Future * @brief Get the transaction Id of the Future
* @return Transaction Id requested or 0 * @return Transaction Id requested or 0

View File

@ -23,7 +23,6 @@ namespace zeus {
public: public:
uint32_t m_transactionId; //!< waiting answer data uint32_t m_transactionId; //!< waiting answer data
uint32_t m_source; //!< Source of the message. uint32_t m_source; //!< Source of the message.
bool m_isSynchronous; //!< the future is synchronous. (call when receive data)
ememory::SharedPtr<zeus::Buffer> m_returnData; //!< all buffer concatenate or last buffer if synchronous ememory::SharedPtr<zeus::Buffer> m_returnData; //!< all buffer concatenate or last buffer if synchronous
Observer m_callbackThen; //!< observer callback When data arrive and NO error appear Observer m_callbackThen; //!< observer callback When data arrive and NO error appear
Observer m_callbackElse; //!< observer callback When data arrive and AN error appear Observer m_callbackElse; //!< observer callback When data arrive and AN error appear

View File

@ -30,25 +30,6 @@ void zeus::Object::receive(ememory::SharedPtr<zeus::Buffer> _value) {
ZEUS_WARNING("BUFFER" << _value); ZEUS_WARNING("BUFFER" << _value);
uint32_t tmpID = _value->getTransactionId(); uint32_t tmpID = _value->getTransactionId();
uint32_t source = _value->getSource(); uint32_t source = _value->getSource();
if (_value->getType() == zeus::Buffer::typeMessage::data) {
auto it = m_callMultiData.begin();
while (it != m_callMultiData.end()) {
if ( it->getTransactionId() == tmpID
&& it->getSource() == source) {
ZEUS_WARNING("Append data ... " << tmpID);
it->appendData(_value);
if (it->isFinished() == true) {
ZEUS_WARNING("CALL Function ...");
callBinary(it->getRaw());
it = m_callMultiData.erase(it);
}
return;
}
++it;
}
ZEUS_ERROR("Un-associated data ...");
return;
}
ZEUS_WARNING("direct call"); ZEUS_WARNING("direct call");
zeus::FutureBase futData(tmpID, _value, source); zeus::FutureBase futData(tmpID, _value, source);
if (futData.isFinished() == true) { if (futData.isFinished() == true) {

View File

@ -135,5 +135,4 @@ generate_basic_type(zeus::FileServer, "file", 0x000E, false, false);
#endif #endif
const uint16_t zeus::paramTypeObject = 0xFFFF; const uint16_t zeus::paramTypeObject = 0xFFFF;
const uint16_t zeus::paramTypeRaw = 0xFFFE;

View File

@ -89,7 +89,6 @@ namespace zeus {
bool isVector() const; bool isVector() const;
}; };
extern const uint16_t paramTypeObject; //!< van not automatic create a type with the string named object extern const uint16_t paramTypeObject; //!< van not automatic create a type with the string named object
extern const uint16_t paramTypeRaw; //!< Raw type (special case of data)
/** /**
* @brief Create human readable stream to debug * @brief Create human readable stream to debug
* @param[in] _os the stream to inject data * @param[in] _os the stream to inject data

View File

@ -248,11 +248,59 @@ void zeus::WebServer::ping() {
void zeus::WebServer::newBuffer(ememory::SharedPtr<zeus::Buffer> _buffer) { void zeus::WebServer::newBuffer(ememory::SharedPtr<zeus::Buffer> _buffer) {
ZEUS_LOG_INPUT_OUTPUT("Receive :" << _buffer); ZEUS_LOG_INPUT_OUTPUT("Receive :" << _buffer);
// if an adress id different ... just transmit it ...
if (m_localAddress != _buffer->getDestinationId()) {
// TODO : Change the callback ...
if (m_observerElement != nullptr) {
m_processingPool.async(
[=](){
// not a pending call ==> simple event or call ...
m_observerElement(_buffer); //!< all input arrive at the same element
},
8);
}
return;
}
if ( _buffer->getPartFinish() == false
&& _buffer->getType() != zeus::Buffer::typeMessage::data) {
m_listPartialBuffer.push_back(_buffer);
return;
}
if (_buffer->getType() == zeus::Buffer::typeMessage::data) {
// Add data in a previous buffer...
auto it = m_listPartialBuffer.begin();
while (it != m_listPartialBuffer.end()) {
if (*it == nullptr) {
it = m_listPartialBuffer.erase(it);
continue;
}
if ((*it)->getDestination() != _buffer->getDestination()) {
++it;
continue;
}
if ((*it)->getTransactionId() == _buffer->getTransactionId()) {
(*it)->appendBuffer(_buffer);
if (_buffer->getPartFinish() != true) {
return;
}
(*it)->setPartFinish(true);
_buffer = *it;
it = m_listPartialBuffer.erase(it);
break;
}
}
}
if (_buffer->getPartFinish() != true) {
ZEUS_ERROR("Get a buffer with no finished data ... (remove)" << _buffer);
return;
}
// Try to find in the current call that has been done to add data in an answer : // Try to find in the current call that has been done to add data in an answer :
zeus::FutureBase future; zeus::FutureBase future;
uint64_t tid = _buffer->getTransactionId(); uint64_t tid = _buffer->getTransactionId();
// TODO : Check the UDI reaaly utility ... // TODO : Check the UDI reaaly utility ...
if (_buffer->getType() == zeus::Buffer::typeMessage::answer) { if ( _buffer->getType() == zeus::Buffer::typeMessage::answer
|| _buffer->getType() == zeus::Buffer::typeMessage::data) {
std::unique_lock<std::mutex> lock(m_pendingCallMutex); std::unique_lock<std::mutex> lock(m_pendingCallMutex);
auto it = m_pendingCall.begin(); auto it = m_pendingCall.begin();
while (it != m_pendingCall.end()) { while (it != m_pendingCall.end()) {
@ -303,7 +351,7 @@ void zeus::WebServer::newBuffer(ememory::SharedPtr<zeus::Buffer> _buffer) {
zeus::FutureBase fut = future; zeus::FutureBase fut = future;
ZEUS_INFO("PROCESS FUTURE : " << _buffer); ZEUS_INFO("PROCESS FUTURE : " << _buffer);
// add data ... // add data ...
bool ret = fut.appendData(_buffer); bool ret = fut.setBuffer(_buffer);
if (ret == true) { if (ret == true) {
std::unique_lock<std::mutex> lock(m_pendingCallMutex); std::unique_lock<std::mutex> lock(m_pendingCallMutex);
auto it = m_pendingCall.begin(); auto it = m_pendingCall.begin();

View File

@ -97,7 +97,7 @@ namespace zeus {
private: private:
enet::WebSocket m_connection; enet::WebSocket m_connection;
ethread::Pool m_processingPool; ethread::Pool m_processingPool;
std::vector<ememory::SharedPtr<zeus::Buffer>> m_listPartialBuffer;
uint16_t m_localAddress; uint16_t m_localAddress;
uint16_t m_licalIdObjectIncrement; //!< attribute a unique ID for an object uint16_t m_licalIdObjectIncrement; //!< attribute a unique ID for an object
public: public: