From 331c5f941b9107836b22fe791545c8e192f35ed3 Mon Sep 17 00:00:00 2001 From: Edouard DUPIN Date: Wed, 15 Jun 2016 21:31:30 +0200 Subject: [PATCH] [DEV] access on the service with binary mode (if possible) --- jus/Client.cpp | 117 ++++----------------- jus/Client.h | 64 ++++++------ jus/GateWayClient.cpp | 226 ++++++++--------------------------------- jus/GateWayClient.h | 54 ---------- jus/GateWayService.cpp | 4 +- jus/GateWayService.h | 2 + jus/Service.cpp | 85 ++++++++++------ jus/ServiceRemote.cpp | 66 ++++++------ jus/ServiceRemote.h | 42 +++----- jus/TcpString.cpp | 92 ++++++++++++++--- jus/TcpString.h | 39 +++++-- 11 files changed, 300 insertions(+), 491 deletions(-) diff --git a/jus/Client.cpp b/jus/Client.cpp index 8f6d312..a1b8c41 100644 --- a/jus/Client.cpp +++ b/jus/Client.cpp @@ -12,9 +12,8 @@ jus::Client::Client() : propertyIp(this, "ip", "127.0.0.1", "Ip to connect server", &jus::Client::onPropertyChangeIp), - propertyPort(this, "port", 1983, "Port to connect server", &jus::Client::onPropertyChangePort), - m_id(1) { - m_interfaceClient.connect(this, &jus::Client::onClientData); + propertyPort(this, "port", 1983, "Port to connect server", &jus::Client::onPropertyChangePort) { + } jus::Client::~Client() { @@ -22,91 +21,11 @@ jus::Client::~Client() { } void jus::Client::onClientData(jus::Buffer& _value) { - JUS_DEBUG("Get answer RAW : "/* << _value*/); - jus::FutureBase future; - uint64_t tid = _value.getTransactionId(); - if (tid == 0) { - JUS_ERROR("Get a Protocol error ... No ID ..."); - /* - if (obj["error"].toString().get() == "PROTOCOL-ERROR") { - JUS_ERROR("Get a Protocol error ..."); - std::unique_lock lock(m_mutex); - for (auto &it : m_pendingCall) { - if (it.isValid() == false) { - continue; - } - it.setAnswer(obj); - } - m_pendingCall.clear(); - } else { - JUS_ERROR("call with no ID ==> error ..."); - } - */ - return; - } - { - std::unique_lock lock(m_mutex); - auto it = m_pendingCall.begin(); - while (it != m_pendingCall.end()) { - if (it->isValid() == false) { - it = m_pendingCall.erase(it); - continue; - } - if (it->getTransactionId() != tid) { - ++it; - continue; - } - future = *it; - break; - } - } - if (future.isValid() == false) { - JUS_TODO("manage this event better ..."); - //m_newData.push_back(std::move(_value)); - return; - } - bool ret = future.setAnswer(_value); - if (ret == true) { - std::unique_lock lock(m_mutex); - auto it = m_pendingCall.begin(); - while (it != m_pendingCall.end()) { - if (it->isValid() == false) { - it = m_pendingCall.erase(it); - continue; - } - if (it->getTransactionId() != tid) { - ++it; - continue; - } - it = m_pendingCall.erase(it); - break; - } - } + JUS_ERROR("Get Data On the Communication interface that is not understand ... : " << _value.toJson().generateHumanString()); } jus::ServiceRemote jus::Client::getService(const std::string& _name) { - return jus::ServiceRemote(this, _name); -} - -int32_t jus::Client::link(const std::string& _serviceName) { - // TODO : Check the number of connection of this service ... - jus::Future ret = call("link", _serviceName); - ret.wait(); - if (ret.hasError() == true) { - JUS_WARNING("Can not link with the service named: '" << _serviceName << "' ==> link error"); - return false; - } - return ret.get(); -} - -bool jus::Client::unlink(const uint32_t& _serviceId) { - jus::Future ret = call("unlink", _serviceId); - ret.wait(); - if (ret.hasError() == true) { - JUS_WARNING("Can not unlink with the service id: '" << _serviceId << "' ==> link error"); - return false; - } - return ret.get(); + return jus::ServiceRemote(m_interfaceClient, _name); } void jus::Client::onPropertyChangeIp() { @@ -119,17 +38,23 @@ void jus::Client::onPropertyChangePort(){ bool jus::Client::connect(const std::string& _remoteUserToConnect){ - disconnect(); JUS_DEBUG("connect [START]"); + disconnect(); enet::Tcp connection = std::move(enet::connectTcpClient(*propertyIp, *propertyPort)); - m_interfaceClient.setInterface(std::move(connection)); - m_interfaceClient.connect(); + m_interfaceClient = std::make_shared(); + if (m_interfaceClient == nullptr) { + JUS_ERROR("Allocate connection error"); + return false; + } + m_interfaceClient->connect(this, &jus::Client::onClientData); + m_interfaceClient->setInterface(std::move(connection)); + m_interfaceClient->connect(); // Force mode binary: JUS_WARNING("Request change in mode Binary"); jus::Future retBin = call("setMode", "BIN").wait(); if (retBin.get() == true) { JUS_WARNING(" ==> accepted binary"); - m_interfaceClient.setMode(jus::connectionMode::modeBinary); + m_interfaceClient->setMode(jus::connectionMode::modeBinary); JUS_INFO("Connection jump in BINARY ..."); } else { // stay in JSON @@ -152,13 +77,11 @@ bool jus::Client::connect(const std::string& _remoteUserToConnect){ void jus::Client::disconnect() { JUS_DEBUG("disconnect [START]"); - m_interfaceClient.disconnect(); + if (m_interfaceClient != nullptr) { + m_interfaceClient->disconnect(); + m_interfaceClient.reset(); + } else { + JUS_VERBOSE("Nothing to disconnect ..."); + } JUS_DEBUG("disconnect [STOP]"); } - -uint64_t jus::Client::getId() { - return m_id++; -} - - - diff --git a/jus/Client.h b/jus/Client.h index b0f8419..0e364d7 100644 --- a/jus/Client.h +++ b/jus/Client.h @@ -9,11 +9,11 @@ #include #include #include -#include #include #include #include #include +#include namespace jus { class Client : public eproperty::Interface { @@ -21,23 +21,34 @@ namespace jus { public: eproperty::Value propertyIp; eproperty::Value propertyPort; - std::mutex m_mutex; - std::vector m_pendingCall; - public: - enum jus::connectionMode getMode() { return m_interfaceClient.getMode(); } private: - jus::TcpString m_interfaceClient; - uint32_t m_id; - std::vector m_newData; + ememory::SharedPtr m_interfaceClient; public: + /** + * @brief Create a client on a specific user in a client mode with the tocken associated + * @param[in] _address Address of the user: "ABCD.efgh#atria-soft.com:1993" + * @param[in] + * @param[in] + */ + //Client(const std::string& _address, const std::string& _clientName, const std::string& _clientTocken); + /** + * @brief Create a client on a specific user in a user mode (connect to your personnal account) + * @param[in] _address Address of the user: "ABCD.efgh#atria-soft.com:1993" + * @param[in] _userPassword Password of the user + */ + //Client(const std::string& _address, const std::string& _userPassword); + /** + * @brief Create a client on a specific user in an ANONIMOUS way + * @param[in] _address Address of the user: "ABCD.efgh#atria-soft.com:1993" + */ + //Client(const std::string& _address); + Client(); virtual ~Client(); bool connect(const std::string& _remoteUserToConnect); void disconnect(); public: jus::ServiceRemote getService(const std::string& _serviceName); - int32_t link(const std::string& _serviceName); - bool unlink(const uint32_t& _serviceId); // Connect that is not us //bool identify("clientTest1#atria-soft.com", "QSDQSDGQSF54HSXWVCSQDJ654URTDJ654NBXCDFDGAEZ51968"); @@ -46,36 +57,23 @@ namespace jus { private: void onClientData(jus::Buffer& _value); public: - uint64_t getId(); template jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { - return m_interfaceClient.call(_functionName, _args...); - /* - uint64_t id = getId(); - std::vector asyncAction; - if (getMode() == jus::connectionMode::modeJson) { - ejson::Object callElem = jus::createCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); - return callJson(id, callElem, asyncAction); - } else { - jus::Buffer callElem = jus::createBinaryCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); - return callBinary(id, callElem, asyncAction); + if (m_interfaceClient == nullptr) { + jus::Buffer ret; + ret.addError("NULLPTR", "call " + _functionName + " with no interface open"); + return jus::FutureBase(0, true, ret); } - */ + return m_interfaceClient->call(_functionName, _args...); } template jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { - return m_interfaceClient.callAction(_functionName, _args..., _callback); - /* - uint64_t id = getId(); - std::vector asyncAction; - if (getMode() == jus::connectionMode::modeJson) { - ejson::Object callElem = jus::createCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); - return callJson(id, callElem, asyncAction, _callback); - } else { - jus::Buffer callElem = jus::createBinaryCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); - return callBinary(id, callElem, asyncAction, _callback); + if (m_interfaceClient == nullptr) { + jus::Buffer ret; + ret.addError("NULLPTR", "call " + _functionName + " with no interface open"); + return jus::FutureBase(0, true, ret, _callback); } - */ + return m_interfaceClient->callAction(_functionName, _args..., _callback); } private: void onPropertyChangeIp(); diff --git a/jus/GateWayClient.cpp b/jus/GateWayClient.cpp index f1a09de..2106750 100644 --- a/jus/GateWayClient.cpp +++ b/jus/GateWayClient.cpp @@ -18,8 +18,7 @@ static const std::string protocolError = "PROTOCOL-ERROR"; jus::GateWayClient::GateWayClient(enet::Tcp _connection, jus::GateWay* _gatewayInterface) : m_state(jus::GateWayClient::state::unconnect), m_gatewayInterface(_gatewayInterface), - m_interfaceClient(std::move(_connection)), - m_transactionLocalId(1) { + m_interfaceClient(std::move(_connection)) { JUS_INFO("----------------"); JUS_INFO("-- NEW Client --"); JUS_INFO("----------------"); @@ -47,18 +46,24 @@ void jus::GateWayClient::stop() { if (it == nullptr) { continue; } + it->m_interfaceClient.callClient(m_uid, "_delete"); + /* ejson::Object linkService; linkService.add("call", ejson::String("_delete")); linkService.add("id", ejson::Number(m_transactionLocalId++)); linkService.add("param", ejson::Array()); it->SendData(m_uid, linkService); + */ } if (m_userService != nullptr) { + m_userService->m_interfaceClient.callClient(m_uid2, "_delete"); + /* ejson::Object linkService; linkService.add("call", ejson::String("_delete")); linkService.add("id", ejson::Number(m_transactionLocalId++)); linkService.add("param", ejson::Array()); m_userService->SendData(m_uid2, linkService); + */ m_userService = nullptr; } m_listConnectedService.clear(); @@ -131,7 +136,7 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { if (m_userService == nullptr) { answerProtocolError(transactionId, "Gateway internal error 'No user interface'"); } else { - jus::Future futLocalService = call(m_uid2, m_userService, "_new", m_userConnectionName, "**Gateway**", std::vector()); + jus::Future futLocalService = m_userService->m_interfaceClient.callClient(m_uid2, "_new", m_userConnectionName, "**Gateway**", std::vector()); futLocalService.wait(); // TODO: Set timeout ... m_state = jus::GateWayClient::state::userIdentify; m_interfaceClient.answerValue(transactionId, true); @@ -161,7 +166,8 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { answerProtocolError(transactionId, "gateWay internal error 3"); return; } - jus::Future fut = call(m_uid2, m_userService, "checkTocken", clientName, clientTocken); + + jus::Future fut = m_userService->m_interfaceClient.callClient(m_uid2, "checkTocken", clientName, clientTocken); fut.wait(); // TODO: Set timeout ... if (fut.hasError() == true) { JUS_ERROR("Get error from the service ..."); @@ -177,7 +183,7 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { } if (callFunction == "auth") { std::string password = _value.getParameter(0); - jus::Future fut = call(m_uid2, m_userService, "checkAuth", password); + jus::Future fut = m_userService->m_interfaceClient.callClient(m_uid2, "checkAuth", password); fut.wait(); // TODO: Set timeout ... if (fut.hasError() == true) { JUS_ERROR("Get error from the service ..."); @@ -197,7 +203,7 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { // -------------------------------- // -- Get groups: // -------------------------------- - jus::Future> futGroup = call(m_uid2, m_userService, "getGroups", m_clientName); + jus::Future> futGroup = m_userService->m_interfaceClient.callClient(m_uid2, "getGroups", m_clientName); futGroup.wait(); // TODO: Set timeout ... if (futGroup.hasError() == true) { JUS_ERROR("Get error from the service ..."); @@ -210,7 +216,7 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { // -- Get services: // -------------------------------- std::vector currentServices = m_gatewayInterface->getAllServiceName(); - jus::Future> futServices = call(m_uid2, m_userService, "filterServices", m_clientName, currentServices); + jus::Future> futServices = m_userService->m_interfaceClient.callClient(m_uid2, "filterServices", m_clientName, currentServices); futServices.wait(); // TODO: Set timeout ... if (futServices.hasError() == true) { JUS_ERROR("Get error from the service ..."); @@ -267,7 +273,7 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { } ememory::SharedPtr srv = m_gatewayInterface->get(serviceName); if (srv != nullptr) { - jus::Future futLink = call(m_uid, srv, "_new", m_userConnectionName, m_clientName, m_clientgroups); + jus::Future futLink = srv->m_interfaceClient.callClient(m_uid, "_new", m_userConnectionName, m_clientName, m_clientgroups); futLink.wait(); // TODO: Set timeout ... if (futLink.hasError() == true) { JUS_ERROR("Get error from the service ... LINK"); @@ -292,7 +298,7 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { m_interfaceClient.answerError(transactionId, "NOT-CONNECTED-SERVICE"); return; } - jus::Future futUnLink = call(m_uid, m_listConnectedService[localServiceID], "_delete"); + jus::Future futUnLink = m_listConnectedService[localServiceID]->m_interfaceClient.callClient(m_uid, "_delete"); futUnLink.wait(); // TODO: Set timeout ... if (futUnLink.hasError() == true) { JUS_ERROR("Get error from the service ... UNLINK"); @@ -318,185 +324,43 @@ void jus::GateWayClient::onClientData(jus::Buffer& _value) { JUS_ERROR("TODO : Manage this case ..."); return; } - bool finish = _value.getPartFinish(); uint16_t partId = _value.getPartId(); if (partId != 0) { - // subMessage ... ==> try to forward message: - std::unique_lock lock(m_mutex); - for (auto &itCall : m_pendingCall) { - JUS_INFO(" compare : " << itCall.first << " =?= " << transactionId); - if (itCall.first == transactionId) { - // Find element ==> transit it ... - _value.setTransactionId(itCall.second.getTransactionId()); - m_listConnectedService[serviceId]->SendData(m_uid, _value); - return; - } - } - JUS_ERROR("Can not transfer part of a message ..."); + m_listConnectedService[serviceId]->m_interfaceClient.callForwardMultiple( + m_uid, + _value, + (uint64_t(m_uid) << 32) + uint64_t(transactionId)); return; } - callActionForward(m_listConnectedService[serviceId], - _value, - [=](jus::FutureBase _ret) { - - // TODO : Check if it is a JSON or binary ... - jus::Buffer tmpp = _ret.getRaw(); - JUS_DEBUG(" ==> transmit : " << tmpp.getTransactionId() << " -> " << transactionId); - JUS_DEBUG(" msg=" << tmpp.generateHumanString()); - tmpp.setTransactionId(transactionId); - JUS_DEBUG("transmit=" << tmpp.generateHumanString()); - m_interfaceClient.writeBinary(tmpp); - // multiple send element ... - return tmpp.getPartFinish(); - /* - // TODO : Check if it is a JSON or binary ... - ejson::Object tmpp = _ret.getRaw(); - JUS_VERBOSE(" ==> transmit : " << tmpp["id"].toNumber().getU64() << " -> " << transactionId); - JUS_VERBOSE(" msg=" << tmpp.generateMachineString()); - tmpp["id"].toNumber().set(uint64_t(transactionId)); - m_interfaceClient.writeJson(tmpp); - if (tmpp.valueExist("part") == true) { - // multiple send element ... - if (tmpp.valueExist("finish") == true) { - return tmpp["finish"].toBoolean().get(); - } - return false; - } - return true; - */ - }); + m_listConnectedService[serviceId]->m_interfaceClient.callForward( + m_uid, + _value, + (uint64_t(m_uid) << 32) + uint64_t(transactionId), + [=](jus::FutureBase _ret) { + jus::Buffer tmpp = _ret.getRaw(); + JUS_DEBUG(" ==> transmit : " << tmpp.getTransactionId() << " -> " << transactionId); + JUS_DEBUG(" msg=" << tmpp.generateHumanString()); + tmpp.setTransactionId(transactionId); + tmpp.setServiceId(serviceId+1); + JUS_DEBUG("transmit=" << tmpp.generateHumanString()); + if (m_interfaceClient.getMode() == jus::connectionMode::modeJson) { + ejson::Object obj = tmpp.toJson(); + m_interfaceClient.writeJson(obj); + } else if (m_interfaceClient.getMode() == jus::connectionMode::modeBinary) { + m_interfaceClient.writeBinary(tmpp); + } else if (m_interfaceClient.getMode() == jus::connectionMode::modeXml) { + JUS_ERROR("TODO ... "); + } else { + JUS_ERROR("wrong type of communication"); + } + // multiple send element ... + return tmpp.getPartFinish(); + }); } } } } -jus::FutureBase jus::GateWayClient::callBinary(uint64_t _callerId, - ememory::SharedPtr _srv, - uint64_t _clientTransactionId, - uint64_t _transactionId, - jus::Buffer& _obj, - jus::FutureData::ObserverFinish _callback) { - JUS_VERBOSE("Send BINARY [START] "); - if (_srv == nullptr) { - // TODO : Change this ... - jus::Buffer obj; - obj.setTransactionId(_transactionId); - obj.setClientId(_callerId); - obj.setType(jus::Buffer::typeMessage::answer); - obj.addError("NOT-CONNECTED", "Client interface not connected (no TCP)"); - return jus::FutureBase(_transactionId, true, obj, _callback); - } - jus::FutureBase tmpFuture(_transactionId, _callback); - { - std::unique_lock lock(m_mutex); - m_pendingCall.push_back(std::make_pair(_clientTransactionId, tmpFuture)); - } - _obj.setTransactionId(_transactionId); - _srv->SendData(_callerId, _obj); - JUS_VERBOSE("Send BINARY [STOP]"); - return tmpFuture; -} - -jus::FutureBase jus::GateWayClient::callActionForward(ememory::SharedPtr _srv, - jus::Buffer& _Buffer, - jus::FutureData::ObserverFinish _callback) { - uint64_t id = getId(); - uint64_t clientTransactionId = _Buffer.getTransactionId(); - jus::FutureBase ret = callBinary(m_uid, _srv, clientTransactionId, id, _Buffer, _callback); - ret.setSynchronous(); - return ret; -} - - -uint64_t jus::GateWayClient::getId() { - return m_transactionLocalId++; -} - -jus::FutureBase jus::GateWayClient::callJson(uint64_t _callerId, - ememory::SharedPtr _srv, - uint64_t _clientTransactionId, - uint64_t _transactionId, - const ejson::Object& _obj, - jus::FutureData::ObserverFinish _callback) { - JUS_VERBOSE("Send JSON [START] "); - if (_srv == nullptr) { - ejson::Object obj; - obj.add("error", ejson::String("NOT-CONNECTED")); - obj.add("error-help", ejson::String("Client interface not connected (no TCP)")); - return jus::FutureBase(_transactionId, true, obj, _callback); - } - jus::FutureBase tmpFuture(_transactionId, _callback); - { - std::unique_lock lock(m_mutex); - m_pendingCall.push_back(std::make_pair(_clientTransactionId, tmpFuture)); - } - _srv->SendData(_callerId, _obj); - JUS_VERBOSE("Send JSON [STOP]"); - return tmpFuture; -} - - - - - void jus::GateWayClient::returnMessage(jus::Buffer& _data) { - jus::FutureBase future; - uint32_t tid = _data.getTransactionId(); - if (tid == 0) { - /* TODO ... - if (_data["error"].toString().get() == "PROTOCOL-ERROR") { - JUS_ERROR("Get a Protocol error ..."); - std::unique_lock lock(m_mutex); - for (auto &it : m_pendingCall) { - if (it.second.isValid() == false) { - continue; - } - it.second.setAnswer(_data); - } - m_pendingCall.clear(); - } else { - JUS_ERROR("call with no ID ==> error ..."); - } - */ - return; - } - { - std::unique_lock lock(m_mutex); - auto it = m_pendingCall.begin(); - while (it != m_pendingCall.end()) { - if (it->second.isValid() == false) { - it = m_pendingCall.erase(it); - continue; - } - if (it->second.getTransactionId() != tid) { - ++it; - continue; - } - // TODO : Do it better ... - future = it->second; - break; - } - } - if (future.isValid() == false) { - JUS_WARNING("Action to do ..."); - return; - } - bool ret = future.setAnswer(_data); - if (ret == true) { - std::unique_lock lock(m_mutex); - auto it = m_pendingCall.begin(); - while (it != m_pendingCall.end()) { - if (it->second.isValid() == false) { - it = m_pendingCall.erase(it); - continue; - } - if (it->second.getTransactionId() != tid) { - ++it; - continue; - } - it = m_pendingCall.erase(it); - break; - } - } -} - + JUS_ERROR("Get call from the Service to the user ..."); +} \ No newline at end of file diff --git a/jus/GateWayClient.h b/jus/GateWayClient.h index f850230..e8a539c 100644 --- a/jus/GateWayClient.h +++ b/jus/GateWayClient.h @@ -57,60 +57,6 @@ namespace jus { } bool isAlive(); - - private: - std::mutex m_mutex; - std::vector> m_pendingCall; - int32_t m_transactionLocalId; - jus::FutureBase callJson(uint64_t _callerId, - ememory::SharedPtr _srv, - uint64_t _clientTransactionId, - uint64_t _transactionId, - const ejson::Object& _obj, - jus::FutureData::ObserverFinish _callback=nullptr); - jus::FutureBase callBinary(uint64_t _callerId, - ememory::SharedPtr _srv, - uint64_t _clientTransactionId, - uint64_t _transactionId, - jus::Buffer& _obj, - jus::FutureData::ObserverFinish _callback=nullptr); - uint64_t getId(); - public: - template - jus::FutureBase call(uint64_t _callerId, ememory::SharedPtr _srv, const std::string& _functionName, _ARGS&&... _args) { - uint64_t id = getId(); - std::vector asyncAction; - ejson::Object callElem = jus::createCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); - if (asyncAction.size() != 0) { - JUS_ERROR("Missing send async messages"); - } - return callJson(_callerId, _srv, 0, id, callElem); - } - template - jus::FutureBase callAction(uint64_t _callerId, ememory::SharedPtr _srv, const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { - uint64_t id = getId(); - std::vector asyncAction; - ejson::Object callElem = jus::createCall(asyncAction, id, _functionName, std::forward<_ARGS>(_args)...); - if (asyncAction.size() != 0) { - JUS_ERROR("Missing send async messages"); - } - return callJson(_callerId, _srv, 0, id, callElem, _callback); - } - jus::FutureBase callActionForward(uint64_t _callerId, - uint64_t _clientTransactionId, - ememory::SharedPtr _srv, - const std::string& _functionName, - ejson::Array _params, - jus::FutureData::ObserverFinish _callback, - int64_t _part, - bool _finish); - - jus::FutureBase callActionForward(ememory::SharedPtr _srv, - jus::Buffer& _Buffer, - jus::FutureData::ObserverFinish _callback); - - - void answerProtocolError(uint32_t _transactionId, const std::string& _errorHelp); diff --git a/jus/GateWayService.cpp b/jus/GateWayService.cpp index 0ad56c1..b9d522d 100644 --- a/jus/GateWayService.cpp +++ b/jus/GateWayService.cpp @@ -64,7 +64,7 @@ void jus::GateWayService::SendData(uint64_t _userSessionId, jus::Buffer& _data) } void jus::GateWayService::onServiceData(jus::Buffer& _value) { - //JUS_DEBUG("On service data: " << _value); + JUS_DEBUG("On service data: " << _value.toJson().generateHumanString()); uint32_t transactionId = _value.getTransactionId(); //data.add("from-service", ejson::String(m_name)); if (_value.getType() == jus::Buffer::typeMessage::event) { @@ -114,7 +114,7 @@ void jus::GateWayService::onServiceData(jus::Buffer& _value) { } m_name = _value.getParameter(0); m_interfaceClient.setInterfaceName("srv-" + m_name); - m_interfaceClient.answerValue(transactionId, false); + m_interfaceClient.answerValue(transactionId, true); return; } answerProtocolError(transactionId, "unknow function"); diff --git a/jus/GateWayService.h b/jus/GateWayService.h index 7da488a..b4902ab 100644 --- a/jus/GateWayService.h +++ b/jus/GateWayService.h @@ -13,7 +13,9 @@ namespace jus { class GateWay; + class GateWayClient; class GateWayService { + friend class jus::GateWayClient; private: jus::GateWay* m_gatewayInterface; jus::TcpString m_interfaceClient; diff --git a/jus/Service.cpp b/jus/Service.cpp index 1047dd3..dc0a739 100644 --- a/jus/Service.cpp +++ b/jus/Service.cpp @@ -15,8 +15,7 @@ jus::Service::Service() : propertyIp(this, "ip", "127.0.0.1", "Ip to connect server", &jus::Service::onPropertyChangeIp), propertyPort(this, "port", 1982, "Port to connect server", &jus::Service::onPropertyChangePort) { - m_interfaceClient = std::make_shared(); - m_interfaceClient->connect(this, &jus::Service::onClientData); + advertise("getExtention", &jus::Service::getExtention); setLastFuncDesc("Get List of availlable extention of this service"); @@ -54,6 +53,7 @@ void jus::Service::onClientData(jus::Buffer& _value) { } jus::FutureCall futCall(clientId, tmpID, _value); if (futCall.isFinished() == true) { + JUS_INFO("Call Binary .."); callBinary(tmpID, futCall.getRaw()); } else { m_callMultiData.push_back(futCall); @@ -77,25 +77,48 @@ void jus::Service::connect(const std::string& _serviceName, uint32_t _numberRetr JUS_DEBUG("connect [STOP] ==> can not connect"); return; } + m_interfaceClient = std::make_shared(); + if (m_interfaceClient == nullptr) { + JUS_ERROR("Can not allocate interface ..."); + return; + } + m_interfaceClient->connect(this, &jus::Service::onClientData); m_interfaceClient->setInterface(std::move(connection)); m_interfaceClient->connect(); - ejson::Object tmpp; - tmpp.add("id", ejson::Number(1)); - tmpp.add("call", ejson::String("connect-service")); - ejson::Array params; - params.add(ejson::String(_serviceName)); - tmpp.add("param", params); - m_interfaceClient->writeJson(tmpp); + jus::Future ret = m_interfaceClient->call("setMode", "BIN"); + ret.wait(); + if (ret.get() == false) { + JUS_ERROR("Can not communicate with the gateway in Binary mode ... STAY in JSON"); + } else { + JUS_INFO("Change mode in Binary"); + m_interfaceClient->setMode(jus::connectionMode::modeBinary); + } + ret = m_interfaceClient->call("connect-service", _serviceName); + ret.wait(); + if (ret.get() == false) { + JUS_ERROR("Can not configure the interface for the service with the current name ..."); + m_interfaceClient->disconnect(); + return; + } + JUS_DEBUG("connect [STOP]"); } void jus::Service::disconnect(){ JUS_DEBUG("disconnect [START]"); - m_interfaceClient->disconnect(); + if (m_interfaceClient != nullptr) { + m_interfaceClient->disconnect(); + m_interfaceClient.reset(); + } else { + JUS_VERBOSE("Nothing to disconnect ..."); + } JUS_DEBUG("disconnect [STOP]"); } bool jus::Service::GateWayAlive() { + if (m_interfaceClient == nullptr) { + return false; + } return m_interfaceClient->isActive(); } @@ -128,30 +151,26 @@ void jus::Service::callBinary(uint32_t _transactionId, jus::Buffer& _obj) { return; } //if (_obj.getType() == jus::Buffer::typeMessage::event) { - if (m_interfaceClient->getMode() == jus::connectionMode::modeBinary) { - - } else if (m_interfaceClient->getMode() == jus::connectionMode::modeJson) { - uint32_t clientId = _obj.getClientId(); - std::string callFunction = _obj.getCall(); - if (callFunction[0] == '_') { - if (callFunction == "_new") { - std::string userName = _obj.getParameter(0); - std::string clientName = _obj.getParameter(1); - std::vector clientGroup = _obj.getParameter>(2); - clientConnect(clientId, userName, clientName, clientGroup); - } else if (callFunction == "_delete") { - clientDisconnect(clientId); - } - m_interfaceClient->answerValue(_transactionId, true, clientId); - return; - } else if (isFunctionAuthorized(clientId, callFunction) == true) { - callBinary2(_transactionId, clientId, callFunction, _obj); - return; - } else { - m_interfaceClient->answerError(_transactionId, "NOT-AUTHORIZED-FUNCTION", "", clientId); - return; + uint32_t clientId = _obj.getClientId(); + std::string callFunction = _obj.getCall(); + if (callFunction[0] == '_') { + if (callFunction == "_new") { + std::string userName = _obj.getParameter(0); + std::string clientName = _obj.getParameter(1); + std::vector clientGroup = _obj.getParameter>(2); + clientConnect(clientId, userName, clientName, clientGroup); + } else if (callFunction == "_delete") { + clientDisconnect(clientId); } + m_interfaceClient->answerValue(_transactionId, true, clientId); + return; + } else if (isFunctionAuthorized(clientId, callFunction) == true) { + callBinary2(_transactionId, clientId, callFunction, _obj); + return; } else { - JUS_ERROR("Not manage transfer mode "); + m_interfaceClient->answerError(_transactionId, "NOT-AUTHORIZED-FUNCTION", "", clientId); + return; } } + + diff --git a/jus/ServiceRemote.cpp b/jus/ServiceRemote.cpp index 585e8ac..dc7b35c 100644 --- a/jus/ServiceRemote.cpp +++ b/jus/ServiceRemote.cpp @@ -7,24 +7,43 @@ #include #include -jus::ServiceRemote::ServiceRemote(jus::Client* _clientInterface, const std::string& _name): - m_clientInterface(_clientInterface), +jus::ServiceRemote::ServiceRemote(ememory::SharedPtr _clientLink, const std::string& _name): + m_interfaceClient(_clientLink), m_name(_name), - m_serviceId(0) { - int32_t val = m_clientInterface->link(_name); - if (val >= 0) { - m_isLinked = true; - m_serviceId = val; - } else { - m_isLinked = false; - m_serviceId = 0; + m_serviceId(0), + m_isLinked(false) { + if (m_interfaceClient == nullptr) { + return; } + // little hack : Call the service manager with the service ID=0 ... + jus::Future ret = call("link", _name); + ret.wait(); + if (ret.hasError() == true) { + JUS_WARNING("Can not link with the service named: '" << _name << "' ==> link error"); + return; + } + m_isLinked = true; + m_serviceId = ret.get(); } jus::ServiceRemote::~ServiceRemote() { if (m_isLinked == true) { - m_clientInterface->unlink(m_serviceId); - m_isLinked = false; + uint32_t tmpLocalService = m_serviceId; + // little hack : Call the service manager with the service ID=0 ... + m_serviceId = 0; + jus::Future ret = call("unlink", tmpLocalService); + ret.wait(); + if (ret.hasError() == true) { + JUS_WARNING("Can not unlink with the service id: '" << tmpLocalService << "' ==> link error"); + m_serviceId = tmpLocalService; + return; + } + if (ret.get() == true) { + m_isLinked = false; + } else { + JUS_ERROR("Can not unlink with this service ...."); + m_serviceId = tmpLocalService; + } } } @@ -32,26 +51,3 @@ bool jus::ServiceRemote::exist() { return m_isLinked; } -uint64_t jus::ServiceRemote::getId() { - return m_clientInterface->getId(); -} - -enum jus::connectionMode jus::ServiceRemote::getMode() { - return m_clientInterface->getMode(); -} - -jus::FutureBase jus::ServiceRemote::callJson(uint64_t _transactionId, - const ejson::Object& _obj, - const std::vector& _async, - jus::FutureData::ObserverFinish _callback) { - return m_clientInterface->callJson(_transactionId, _obj, _async, _callback, m_serviceId); -} - -jus::FutureBase jus::ServiceRemote::callBinary(uint64_t _transactionId, - jus::Buffer& _obj, - const std::vector& _async, - jus::FutureData::ObserverFinish _callback) { - return m_clientInterface->callBinary(_transactionId, _obj, _async, _callback, m_serviceId); -} - - diff --git a/jus/ServiceRemote.h b/jus/ServiceRemote.h index ff26ea2..9af3dd5 100644 --- a/jus/ServiceRemote.h +++ b/jus/ServiceRemote.h @@ -13,54 +13,38 @@ #include #include #include +#include namespace jus { class Client; class ServiceRemote { private: - jus::Client* m_clientInterface; + ememory::SharedPtr m_interfaceClient; std::string m_name; uint32_t m_serviceId; bool m_isLinked; public: - ServiceRemote(jus::Client* _clientInterface, const std::string& _name); + ServiceRemote(ememory::SharedPtr _clientLink, const std::string& _name); ~ServiceRemote(); bool exist(); - private: - jus::FutureBase callJson(uint64_t _transactionId, const ejson::Object& _obj, const std::vector& _async, jus::FutureData::ObserverFinish _callback=nullptr); - jus::FutureBase callBinary(uint64_t _transactionId, jus::Buffer& _obj, const std::vector& _async, jus::FutureData::ObserverFinish _callback=nullptr); - uint64_t getId(); - enum jus::connectionMode getMode(); public: template jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { - return m_clientInterface->m_interfaceClient.callService(m_serviceId, _functionName, _args...); - /* - uint64_t id = getId(); - std::vector asyncActionToDo; - if (getMode() == jus::connectionMode::modeJson) { - ejson::Object callElem = jus::createCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); - return callJson(id, callElem, asyncActionToDo); - } else { - jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); - return callBinary(id, callElem, asyncActionToDo); + if (m_interfaceClient == nullptr) { + jus::Buffer ret; + ret.addError("NULLPTR", "call " + _functionName + " with no interface open"); + return jus::FutureBase(0, true, ret); } - */ + return m_interfaceClient->callService(m_serviceId, _functionName, _args...); } template jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { - return m_clientInterface->m_interfaceClient.callServiceAction(m_serviceId, _functionName, _args..., _callback); - /* - uint64_t id = getId(); - std::vector asyncActionToDo; - if (getMode() == jus::connectionMode::modeJson) { - ejson::Object callElem = jus::createCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); - return callJson(id, callElem, asyncActionToDo, _callback); - } else { - jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); - return callBinary(id, callElem, asyncActionToDo, _callback); + if (m_interfaceClient == nullptr) { + jus::Buffer ret; + ret.addError("NULLPTR", "call " + _functionName + " with no interface open"); + return jus::FutureBase(0, true, ret, _callback); } - */ + return m_interfaceClient->callServiceAction(m_serviceId, _functionName, _args..., _callback); } }; } diff --git a/jus/TcpString.cpp b/jus/TcpString.cpp index bffb285..abaece4 100644 --- a/jus/TcpString.cpp +++ b/jus/TcpString.cpp @@ -16,7 +16,7 @@ jus::TcpString::TcpString(enet::Tcp _connection) : m_threadRunning = false; m_threadAsyncRunning = false; m_interfaceMode = jus::connectionMode::modeJson; - m_transmissionId = 0; + m_transmissionId = 1; } jus::TcpString::TcpString() : @@ -27,7 +27,7 @@ jus::TcpString::TcpString() : m_threadRunning = false; m_threadAsyncRunning = false; m_interfaceMode = jus::connectionMode::modeJson; - m_transmissionId = 0; + m_transmissionId = 1; } void jus::TcpString::setInterface(enet::Tcp _connection) { @@ -259,19 +259,20 @@ void jus::TcpString::newBuffer(jus::Buffer& _buffer) { std::unique_lock lock(m_pendingCallMutex); auto it = m_pendingCall.begin(); while (it != m_pendingCall.end()) { - if (it->isValid() == false) { + if (it->second.isValid() == false) { it = m_pendingCall.erase(it); continue; } - if (it->getTransactionId() != tid) { + if (it->second.getTransactionId() != tid) { ++it; continue; } - future = *it; + future = it->second; break; } } if (future.isValid() == false) { + // not a pending call ==> simple event or call ... if (m_observerElement != nullptr) { m_observerElement(_buffer); } @@ -282,11 +283,11 @@ void jus::TcpString::newBuffer(jus::Buffer& _buffer) { std::unique_lock lock(m_pendingCallMutex); auto it = m_pendingCall.begin(); while (it != m_pendingCall.end()) { - if (it->isValid() == false) { + if (it->second.isValid() == false) { it = m_pendingCall.erase(it); continue; } - if (it->getTransactionId() != tid) { + if (it->second.getTransactionId() != tid) { ++it; continue; } @@ -371,16 +372,16 @@ jus::FutureBase jus::TcpString::callJson(uint64_t _transactionId, jus::FutureData::ObserverFinish _callback, const uint32_t& _serviceId) { JUS_VERBOSE("Send JSON [START] "); - if (m_interfaceClient.isActive() == false) { + if (isActive() == false) { jus::Buffer obj; - obj.setType(jus::Buffer::typeMessage::answer) { + obj.setType(jus::Buffer::typeMessage::answer); obj.addError("NOT-CONNECTED", "Client interface not connected (no TCP)"); return jus::FutureBase(_transactionId, true, obj, _callback); } jus::FutureBase tmpFuture(_transactionId, _callback); { std::unique_lock lock(m_pendingCallMutex); - m_pendingCall.push_back(tmpFuture); + m_pendingCall.push_back(std::make_pair(uint64_t(0), tmpFuture)); } if (_async.size() != 0) { _obj.add("part", ejson::Number(0)); @@ -440,16 +441,16 @@ jus::FutureBase jus::TcpString::callBinary(uint64_t _transactionId, jus::FutureData::ObserverFinish _callback, const uint32_t& _serviceId) { JUS_VERBOSE("Send Binary [START] "); - if (m_interfaceClient.isActive() == false) { + if (isActive() == false) { jus::Buffer obj; - obj.setType(jus::Buffer::typeMessage::answer) { + obj.setType(jus::Buffer::typeMessage::answer); obj.addError("NOT-CONNECTED", "Client interface not connected (no TCP)"); return jus::FutureBase(_transactionId, true, obj, _callback); } jus::FutureBase tmpFuture(_transactionId, _callback); { std::unique_lock lock(m_pendingCallMutex); - m_pendingCall.push_back(tmpFuture); + m_pendingCall.push_back(std::make_pair(uint64_t(0), tmpFuture)); } if (_async.size() != 0) { _obj.setPartFinish(false); @@ -465,10 +466,69 @@ jus::FutureBase jus::TcpString::callBinary(uint64_t _transactionId, return tmpFuture; } +jus::FutureBase jus::TcpString::callForward(uint32_t _clientId, + jus::Buffer& _buffer, + uint64_t _singleReferenceId, + jus::FutureData::ObserverFinish _callback) { + JUS_VERBOSE("Call Forward [START]"); + //jus::FutureBase ret = callBinary(id, _Buffer, async, _callback); + //ret.setSynchronous(); + + if (isActive() == false) { + jus::Buffer obj; + obj.setType(jus::Buffer::typeMessage::answer); + obj.addError("NOT-CONNECTED", "Client interface not connected (no TCP)"); + return jus::FutureBase(0, true, obj, _callback); + } + uint64_t id = getId(); + _buffer.setTransactionId(id); + _buffer.setClientId(_clientId); + jus::FutureBase tmpFuture(id, _callback); + tmpFuture.setSynchronous(); + { + std::unique_lock lock(m_pendingCallMutex); + m_pendingCall.push_back(std::make_pair(_singleReferenceId, tmpFuture)); + } + if (m_interfaceMode == jus::connectionMode::modeJson) { + ejson::Object obj = _buffer.toJson(); + writeJson(obj); + } else if (m_interfaceMode == jus::connectionMode::modeBinary) { + writeBinary(_buffer); + } else if (m_interfaceMode == jus::connectionMode::modeXml) { + JUS_ERROR("TODO ... "); + } else { + JUS_ERROR("wrong type of communication"); + } + JUS_VERBOSE("Send Forward [STOP]"); + return tmpFuture; +} - - - +void jus::TcpString::callForwardMultiple(uint32_t _clientId, + jus::Buffer& _buffer, + uint64_t _singleReferenceId){ + // subMessage ... ==> try to forward message: + std::unique_lock lock(m_pendingCallMutex); + for (auto &itCall : m_pendingCall) { + JUS_INFO(" compare : " << itCall.first << " =?= " << _singleReferenceId); + if (itCall.first == _singleReferenceId) { + // Find element ==> transit it ... + _buffer.setTransactionId(itCall.second.getTransactionId()); + _buffer.setClientId(_clientId); + if (m_interfaceMode == jus::connectionMode::modeJson) { + ejson::Object obj = _buffer.toJson(); + writeJson(obj); + } else if (m_interfaceMode == jus::connectionMode::modeBinary) { + writeBinary(_buffer); + } else if (m_interfaceMode == jus::connectionMode::modeXml) { + JUS_ERROR("TODO ... "); + } else { + JUS_ERROR("wrong type of communication"); + } + return; + } + } + JUS_ERROR("Can not transfer part of a message ..."); +} void jus::TcpString::answerError(uint64_t _clientTransactionId, const std::string& _errorValue, const std::string& _errorHelp, uint32_t _clientId) { if (m_interfaceMode == jus::connectionMode::modeJson) { diff --git a/jus/TcpString.h b/jus/TcpString.h index c045851..492b0e6 100644 --- a/jus/TcpString.h +++ b/jus/TcpString.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace jus { class TcpString : public eproperty::Interface { @@ -24,7 +25,7 @@ namespace jus { return m_transmissionId++; } std::mutex m_pendingCallMutex; - std::vector m_pendingCall; + std::vector> m_pendingCall; protected: enum jus::connectionMode m_interfaceMode; public: @@ -105,7 +106,6 @@ namespace jus { public: // section call direct template jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { - return m_interfaceClient.call(_functionName, _args...); uint16_t id = getId(); std::vector asyncAction; if (m_interfaceMode == jus::connectionMode::modeJson) { @@ -118,7 +118,6 @@ namespace jus { } template jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { - return m_interfaceClient.callAction(_functionName, _args..., _callback); uint16_t id = getId(); std::vector asyncAction; if (m_interfaceMode == jus::connectionMode::modeJson) { @@ -132,31 +131,49 @@ namespace jus { public: // section call with service ID / Client ID template - jus::FutureBase call(const std::string& _functionName, _ARGS&&... _args) { - return m_clientInterface->m_interfaceClient.callService(m_serviceId, _functionName, _args...); + jus::FutureBase callService(uint32_t _serviceId, const std::string& _functionName, _ARGS&&... _args) { uint16_t id = getId(); std::vector asyncActionToDo; if (m_interfaceMode == jus::connectionMode::modeJson) { - ejson::Object callElem = jus::createCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + ejson::Object callElem = jus::createCallService(asyncActionToDo, id, _serviceId, _functionName, std::forward<_ARGS>(_args)...); return callJson(id, callElem, asyncActionToDo); } else { - jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, _serviceId, _functionName, std::forward<_ARGS>(_args)...); return callBinary(id, callElem, asyncActionToDo); } } template - jus::FutureBase callAction(const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { - return m_clientInterface->m_interfaceClient.callServiceAction(m_serviceId, _functionName, _args..., _callback); + jus::FutureBase callServiceAction(uint32_t _serviceId, const std::string& _functionName, _ARGS&&... _args, jus::FutureData::ObserverFinish _callback) { uint16_t id = getId(); std::vector asyncActionToDo; if (m_interfaceMode == jus::connectionMode::modeJson) { - ejson::Object callElem = jus::createCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + ejson::Object callElem = jus::createCallService(asyncActionToDo, id, _serviceId, _functionName, std::forward<_ARGS>(_args)...); return callJson(id, callElem, asyncActionToDo, _callback); } else { - jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, m_serviceId, _functionName, std::forward<_ARGS>(_args)...); + jus::Buffer callElem = jus::createBinaryCallService(asyncActionToDo, id, _serviceId, _functionName, std::forward<_ARGS>(_args)...); return callBinary(id, callElem, asyncActionToDo, _callback); } } + template + jus::FutureBase callClient(uint32_t _clientId, + const std::string& _functionName, + _ARGS&&... _args) { + return callService(_clientId, _functionName, _args...); + } + template + jus::FutureBase callClientAction(uint32_t _clientId, + const std::string& _functionName, + _ARGS&&... _args, + jus::FutureData::ObserverFinish _callback) { + return callServiceAction(_clientId, _functionName, _args..., _callback); + } + jus::FutureBase callForward(uint32_t _clientId, + jus::Buffer& _Buffer, + uint64_t _singleReferenceId, + jus::FutureData::ObserverFinish _callback); + void callForwardMultiple(uint32_t _clientId, + jus::Buffer& _Buffer, + uint64_t _singleReferenceId); public: // answers ... void answerProtocolError(uint32_t _transactionId, const std::string& _errorHelp);