diff --git a/jus/Client.cpp b/jus/Client.cpp index 1653a56..8f6d312 100644 --- a/jus/Client.cpp +++ b/jus/Client.cpp @@ -160,148 +160,5 @@ uint64_t jus::Client::getId() { return m_id++; } -class SendAsyncJson { - private: - std::vector m_async; - uint64_t m_transactionId; - uint32_t m_serviceId; - uint32_t m_partId; - public: - SendAsyncJson(uint64_t _transactionId, const uint32_t& _serviceId, const std::vector& _async) : - m_async(_async), - m_transactionId(_transactionId), - m_serviceId(_serviceId), - m_partId(1) { - - } - bool operator() (jus::TcpString* _interface){ - auto it = m_async.begin(); - while (it != m_async.end()) { - bool ret = (*it)(_interface, m_serviceId, m_transactionId, m_partId); - if (ret == true) { - // Remove it ... - it = m_async.erase(it); - } else { - ++it; - } - m_partId++; - } - if (m_async.size() == 0) { - ejson::Object obj; - if (m_serviceId != 0) { - obj.add("service", ejson::Number(m_serviceId)); - } - obj.add("id", ejson::Number(m_transactionId)); - obj.add("part", ejson::Number(m_partId)); - obj.add("finish", ejson::Boolean(true)); - _interface->writeJson(obj); - return true; - } - return false; - } -}; - -jus::FutureBase jus::Client::callJson(uint64_t _transactionId, - ejson::Object _obj, - const std::vector& _async, - jus::FutureData::ObserverFinish _callback, - const uint32_t& _serviceId) { - JUS_VERBOSE("Send JSON [START] "); - if (m_interfaceClient.isActive() == false) { - ejson::Object obj; - obj.add("error", ejson::String("NOT-CONNECTED")); - obj.add("error-help", ejson::String("Client interface not connected (no TCP)")); - return jus::FutureBase(_transactionId, true, obj, _callback); - } - jus::FutureBase tmpFuture(_transactionId, _callback); - { - std::unique_lock lock(m_mutex); - m_pendingCall.push_back(tmpFuture); - } - if (_async.size() != 0) { - _obj.add("part", ejson::Number(0)); - } - m_interfaceClient.writeJson(_obj); - - if (_async.size() != 0) { - m_interfaceClient.addAsync(SendAsyncJson(_transactionId, _serviceId, _async)); - } - JUS_VERBOSE("Send JSON [STOP]"); - return tmpFuture; -} - - -class SendAsyncBinary { - private: - std::vector m_async; - uint64_t m_transactionId; - uint32_t m_serviceId; - uint32_t m_partId; - public: - SendAsyncBinary(uint64_t _transactionId, const uint32_t& _serviceId, const std::vector& _async) : - m_async(_async), - m_transactionId(_transactionId), - m_serviceId(_serviceId), - m_partId(1) { - - } - bool operator() (jus::TcpString* _interface){ - auto it = m_async.begin(); - while (it != m_async.end()) { - bool ret = (*it)(_interface, m_serviceId, m_transactionId, m_partId); - if (ret == true) { - // Remove it ... - it = m_async.erase(it); - } else { - ++it; - } - m_partId++; - } - if (m_async.size() == 0) { - jus::Buffer obj; - obj.setServiceId(m_serviceId); - obj.setTransactionId(m_transactionId); - obj.setPartId(m_partId); - obj.setPartFinish(true); - _interface->writeBinary(obj); - return true; - } - return false; - } -}; - -jus::FutureBase jus::Client::callBinary(uint64_t _transactionId, - jus::Buffer& _obj, - const std::vector& _async, - jus::FutureData::ObserverFinish _callback, - const uint32_t& _serviceId) { - JUS_VERBOSE("Send Binary [START] "); - if (m_interfaceClient.isActive() == false) { - jus::Buffer obj; - JUS_TODO("SEt error answer ..."); - //obj.add("error", ejson::String("NOT-CONNECTED")); - //obj.add("error-help", ejson::String("Client interface not connected (no TCP)")); - return jus::FutureBase(_transactionId, true, obj, _callback); - } - jus::FutureBase tmpFuture(_transactionId, _callback); - { - std::unique_lock lock(m_mutex); - m_pendingCall.push_back(tmpFuture); - } - if (_async.size() != 0) { - _obj.setPartFinish(false); - } else { - _obj.setPartFinish(true); - } - m_interfaceClient.writeBinary(_obj); - - if (_async.size() != 0) { - m_interfaceClient.addAsync(SendAsyncBinary(_transactionId, _serviceId, _async)); - } - JUS_VERBOSE("Send Binary [STOP]"); - return tmpFuture; -} - - diff --git a/jus/Client.h b/jus/Client.h index ee26f84..b0f8419 100644 --- a/jus/Client.h +++ b/jus/Client.h @@ -45,20 +45,12 @@ namespace jus { //client1.authentificate("coucou"); private: void onClientData(jus::Buffer& _value); - jus::FutureBase callJson(uint64_t _transactionId, - ejson::Object _obj, - const std::vector& _async, - jus::FutureData::ObserverFinish _callback=nullptr, - const uint32_t& _service=0); - jus::FutureBase callBinary(uint64_t _transactionId, - jus::Buffer& _obj, - const std::vector& _async, - jus::FutureData::ObserverFinish _callback=nullptr, - const uint32_t& _service=0); public: uint64_t getId(); template jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { + return m_interfaceClient.call(_functionName, _args...); + /* uint64_t id = getId(); std::vector asyncAction; if (getMode() == jus::connectionMode::modeJson) { @@ -68,9 +60,12 @@ namespace jus { jus::Buffer callElem = jus::createBinaryCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); return callBinary(id, callElem, asyncAction); } + */ } template jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { + return m_interfaceClient.callAction(_functionName, _args..., _callback); + /* uint64_t id = getId(); std::vector asyncAction; if (getMode() == jus::connectionMode::modeJson) { @@ -80,6 +75,7 @@ namespace jus { jus::Buffer callElem = jus::createBinaryCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); return callBinary(id, callElem, asyncAction, _callback); } + */ } private: void onPropertyChangeIp(); diff --git a/jus/ServiceRemote.h b/jus/ServiceRemote.h index 5f5fd80..ff26ea2 100644 --- a/jus/ServiceRemote.h +++ b/jus/ServiceRemote.h @@ -34,6 +34,8 @@ namespace jus { public: template jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { + return m_clientInterface->m_interfaceClient.callService(m_serviceId, _functionName, _args...); + /* uint64_t id = getId(); std::vector asyncActionToDo; if (getMode() == jus::connectionMode::modeJson) { @@ -43,9 +45,12 @@ namespace jus { jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); return callBinary(id, callElem, asyncActionToDo); } + */ } template jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { + return m_clientInterface->m_interfaceClient.callServiceAction(m_serviceId, _functionName, _args..., _callback); + /* uint64_t id = getId(); std::vector asyncActionToDo; if (getMode() == jus::connectionMode::modeJson) { @@ -55,6 +60,7 @@ namespace jus { jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); return callBinary(id, callElem, asyncActionToDo, _callback); } + */ } }; } diff --git a/jus/TcpString.cpp b/jus/TcpString.cpp index 68dd07a..bffb285 100644 --- a/jus/TcpString.cpp +++ b/jus/TcpString.cpp @@ -16,6 +16,7 @@ jus::TcpString::TcpString(enet::Tcp _connection) : m_threadRunning = false; m_threadAsyncRunning = false; m_interfaceMode = jus::connectionMode::modeJson; + m_transmissionId = 0; } jus::TcpString::TcpString() : @@ -26,6 +27,7 @@ jus::TcpString::TcpString() : m_threadRunning = false; m_threadAsyncRunning = false; m_interfaceMode = jus::connectionMode::modeJson; + m_transmissionId = 0; } void jus::TcpString::setInterface(enet::Tcp _connection) { @@ -183,12 +185,9 @@ void jus::TcpString::read() { JUS_WARNING("Read No data"); } } - if (m_observerElement != nullptr) { - jus::Buffer dataRaw; - dataRaw.composeWith(m_buffer); - JUS_VERBOSE("Receive Binary :" << dataRaw.toJson().generateHumanString()); - m_observerElement(dataRaw); - } + jus::Buffer dataRaw; + dataRaw.composeWith(m_buffer); + newBuffer(dataRaw); } } JUS_VERBOSE("ReadRaw [STOP]"); @@ -217,12 +216,9 @@ void jus::TcpString::read() { JUS_WARNING("Read No data"); } } - if (m_observerElement != nullptr) { - JUS_VERBOSE("Receive String :" << out); - jus::Buffer dataRaw; - dataRaw.composeWith(out); - m_observerElement(dataRaw); - } + jus::Buffer dataRaw; + dataRaw.composeWith(out); + newBuffer(dataRaw); } } JUS_VERBOSE("Read sized String [STOP]"); @@ -236,6 +232,70 @@ void jus::TcpString::read() { } } +void jus::TcpString::newBuffer(jus::Buffer& _buffer) { + JUS_VERBOSE("Receive Binary :" << _buffer.toJson().generateHumanString()); + jus::FutureBase future; + uint64_t tid = _buffer.getTransactionId(); + if (tid == 0) { + JUS_ERROR("Get a Protocol error ... No ID ..."); + /* + if (obj["error"].toString().get() == "PROTOCOL-ERROR") { + JUS_ERROR("Get a Protocol error ..."); + std::unique_lock lock(m_mutex); + for (auto &it : m_pendingCall) { + if (it.isValid() == false) { + continue; + } + it.setAnswer(obj); + } + m_pendingCall.clear(); + } else { + JUS_ERROR("call with no ID ==> error ..."); + } + */ + return; + } + { + std::unique_lock lock(m_pendingCallMutex); + auto it = m_pendingCall.begin(); + while (it != m_pendingCall.end()) { + if (it->isValid() == false) { + it = m_pendingCall.erase(it); + continue; + } + if (it->getTransactionId() != tid) { + ++it; + continue; + } + future = *it; + break; + } + } + if (future.isValid() == false) { + if (m_observerElement != nullptr) { + m_observerElement(_buffer); + } + return; + } + bool ret = future.setAnswer(_buffer); + if (ret == true) { + std::unique_lock lock(m_pendingCallMutex); + auto it = m_pendingCall.begin(); + while (it != m_pendingCall.end()) { + if (it->isValid() == false) { + it = m_pendingCall.erase(it); + continue; + } + if (it->getTransactionId() != tid) { + ++it; + continue; + } + it = m_pendingCall.erase(it); + break; + } + } +} + void jus::TcpString::threadAsyncCallback() { ethread::setName("Async-sender"); // get datas: @@ -262,6 +322,154 @@ void jus::TcpString::threadAsyncCallback() { } + + +class SendAsyncJson { + private: + std::vector m_async; + uint64_t m_transactionId; + uint32_t m_serviceId; + uint32_t m_partId; + public: + SendAsyncJson(uint64_t _transactionId, const uint32_t& _serviceId, const std::vector& _async) : + m_async(_async), + m_transactionId(_transactionId), + m_serviceId(_serviceId), + m_partId(1) { + + } + bool operator() (jus::TcpString* _interface){ + auto it = m_async.begin(); + while (it != m_async.end()) { + bool ret = (*it)(_interface, m_serviceId, m_transactionId, m_partId); + if (ret == true) { + // Remove it ... + it = m_async.erase(it); + } else { + ++it; + } + m_partId++; + } + if (m_async.size() == 0) { + ejson::Object obj; + if (m_serviceId != 0) { + obj.add("service", ejson::Number(m_serviceId)); + } + obj.add("id", ejson::Number(m_transactionId)); + obj.add("part", ejson::Number(m_partId)); + obj.add("finish", ejson::Boolean(true)); + _interface->writeJson(obj); + return true; + } + return false; + } +}; + +jus::FutureBase jus::TcpString::callJson(uint64_t _transactionId, + ejson::Object _obj, + const std::vector& _async, + jus::FutureData::ObserverFinish _callback, + const uint32_t& _serviceId) { + JUS_VERBOSE("Send JSON [START] "); + if (m_interfaceClient.isActive() == false) { + jus::Buffer obj; + obj.setType(jus::Buffer::typeMessage::answer) { + obj.addError("NOT-CONNECTED", "Client interface not connected (no TCP)"); + return jus::FutureBase(_transactionId, true, obj, _callback); + } + jus::FutureBase tmpFuture(_transactionId, _callback); + { + std::unique_lock lock(m_pendingCallMutex); + m_pendingCall.push_back(tmpFuture); + } + if (_async.size() != 0) { + _obj.add("part", ejson::Number(0)); + } + writeJson(_obj); + + if (_async.size() != 0) { + addAsync(SendAsyncJson(_transactionId, _serviceId, _async)); + } + JUS_VERBOSE("Send JSON [STOP]"); + return tmpFuture; +} + + +class SendAsyncBinary { + private: + std::vector m_async; + uint64_t m_transactionId; + uint32_t m_serviceId; + uint32_t m_partId; + public: + SendAsyncBinary(uint64_t _transactionId, const uint32_t& _serviceId, const std::vector& _async) : + m_async(_async), + m_transactionId(_transactionId), + m_serviceId(_serviceId), + m_partId(1) { + + } + bool operator() (jus::TcpString* _interface){ + auto it = m_async.begin(); + while (it != m_async.end()) { + bool ret = (*it)(_interface, m_serviceId, m_transactionId, m_partId); + if (ret == true) { + // Remove it ... + it = m_async.erase(it); + } else { + ++it; + } + m_partId++; + } + if (m_async.size() == 0) { + jus::Buffer obj; + obj.setServiceId(m_serviceId); + obj.setTransactionId(m_transactionId); + obj.setPartId(m_partId); + obj.setPartFinish(true); + _interface->writeBinary(obj); + return true; + } + return false; + } +}; + +jus::FutureBase jus::TcpString::callBinary(uint64_t _transactionId, + jus::Buffer& _obj, + const std::vector& _async, + jus::FutureData::ObserverFinish _callback, + const uint32_t& _serviceId) { + JUS_VERBOSE("Send Binary [START] "); + if (m_interfaceClient.isActive() == false) { + jus::Buffer obj; + obj.setType(jus::Buffer::typeMessage::answer) { + obj.addError("NOT-CONNECTED", "Client interface not connected (no TCP)"); + return jus::FutureBase(_transactionId, true, obj, _callback); + } + jus::FutureBase tmpFuture(_transactionId, _callback); + { + std::unique_lock lock(m_pendingCallMutex); + m_pendingCall.push_back(tmpFuture); + } + if (_async.size() != 0) { + _obj.setPartFinish(false); + } else { + _obj.setPartFinish(true); + } + writeBinary(_obj); + + if (_async.size() != 0) { + addAsync(SendAsyncBinary(_transactionId, _serviceId, _async)); + } + JUS_VERBOSE("Send Binary [STOP]"); + return tmpFuture; +} + + + + + + void jus::TcpString::answerError(uint64_t _clientTransactionId, const std::string& _errorValue, const std::string& _errorHelp, uint32_t _clientId) { if (m_interfaceMode == jus::connectionMode::modeJson) { ejson::Object answer; diff --git a/jus/TcpString.h b/jus/TcpString.h index 2e4228c..c045851 100644 --- a/jus/TcpString.h +++ b/jus/TcpString.h @@ -19,6 +19,12 @@ namespace jus { enet::Tcp m_connection; std::thread* m_thread; bool m_threadRunning; + uint16_t m_transmissionId; + uint16_t getId() { + return m_transmissionId++; + } + std::mutex m_pendingCallMutex; + std::vector m_pendingCall; protected: enum jus::connectionMode m_interfaceMode; public: @@ -61,7 +67,8 @@ namespace jus { std::string asyncRead(); private: void read(); - jus::Buffer readRaw(); + + void newBuffer(jus::Buffer& _buffer); private: void threadCallback(); public: @@ -84,8 +91,73 @@ namespace jus { std::unique_lock lock(m_threadAsyncMutex); m_threadAsyncList.push_back(_elem); } + private: + jus::FutureBase callJson(uint64_t _transactionId, + ejson::Object _obj, + const std::vector& _async, + jus::FutureData::ObserverFinish _callback=nullptr, + const uint32_t& _service=0); + jus::FutureBase callBinary(uint64_t _transactionId, + jus::Buffer& _obj, + const std::vector& _async, + jus::FutureData::ObserverFinish _callback=nullptr, + const uint32_t& _service=0); + public: // section call direct + template + jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { + return m_interfaceClient.call(_functionName, _args...); + uint16_t id = getId(); + std::vector asyncAction; + if (m_interfaceMode == jus::connectionMode::modeJson) { + ejson::Object callElem = jus::createCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); + return callJson(id, callElem, asyncAction); + } else { + jus::Buffer callElem = jus::createBinaryCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); + return callBinary(id, callElem, asyncAction); + } + } + template + jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { + return m_interfaceClient.callAction(_functionName, _args..., _callback); + uint16_t id = getId(); + std::vector asyncAction; + if (m_interfaceMode == jus::connectionMode::modeJson) { + ejson::Object callElem = jus::createCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); + return callJson(id, callElem, asyncAction, _callback); + } else { + jus::Buffer callElem = jus::createBinaryCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); + return callBinary(id, callElem, asyncAction, _callback); + } + } + public: // section call with service ID / Client ID - + template + jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { + return m_clientInterface->m_interfaceClient.callService(m_serviceId, _functionName, _args...); + uint16_t id = getId(); + std::vector asyncActionToDo; + if (m_interfaceMode == jus::connectionMode::modeJson) { + ejson::Object callElem = jus::createCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + return callJson(id, callElem, asyncActionToDo); + } else { + jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + return callBinary(id, callElem, asyncActionToDo); + } + } + template + jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { + return m_clientInterface->m_interfaceClient.callServiceAction(m_serviceId, _functionName, _args..., _callback); + uint16_t id = getId(); + std::vector asyncActionToDo; + if (m_interfaceMode == jus::connectionMode::modeJson) { + ejson::Object callElem = jus::createCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + return callJson(id, callElem, asyncActionToDo, _callback); + } else { + jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + return callBinary(id, callElem, asyncActionToDo, _callback); + } + } + public: // answers ... void answerProtocolError(uint32_t _transactionId, const std::string& _errorHelp); template