/** @file * @author Edouard DUPIN * @copyright 2016, Edouard DUPIN, all right reserved * @license APACHE v2.0 (see license file) */ #include #include #include #include jus::TcpString::TcpString(enet::Tcp _connection) : m_connection(std::move(_connection)), m_thread(nullptr), m_observerElement(nullptr), m_observerRawElement(nullptr), m_threadAsync(nullptr) { m_threadRunning = false; m_threadAsyncRunning = false; } jus::TcpString::TcpString() : m_connection(), m_thread(nullptr), m_observerElement(nullptr), m_observerRawElement(nullptr), m_threadAsync(nullptr) { m_threadRunning = false; m_threadAsyncRunning = false; } void jus::TcpString::setInterface(enet::Tcp _connection) { m_connection = std::move(_connection); } jus::TcpString::~TcpString() { disconnect(); } void jus::TcpString::setInterfaceName(const std::string& _name) { ethread::setName(*m_thread, "Tcp-" + _name); } // TODO : Do it better : void jus::TcpString::threadCallback() { ethread::setName("TcpString-input"); // get datas: while ( m_threadRunning == true && m_connection.getConnectionStatus() == enet::Tcp::status::link) { // READ section data: if (m_observerElement != nullptr) { JUS_PRINT("Call String ..."); std::string data = std::move(read()); JUS_PRINT("Receive data: '" << data << "'"); if (data.size() != 0) { m_lastReceive = std::chrono::steady_clock::now(); JUS_PRINT(" Call function ... 1"); if (m_observerElement != nullptr) { m_observerElement(std::move(data)); } else if (m_observerRawElement != nullptr) { jus::Buffer dataRaw; dataRaw.composeWith(data); m_observerRawElement(dataRaw); } else { JUS_ERROR("Lose DATA ..."); } JUS_PRINT(" Call function ... 1 (done)"); } } else if (m_observerRawElement != nullptr) { JUS_PRINT("Call Raw ..."); jus::Buffer data = readRaw(); JUS_PRINT(" Call function ... 2"); m_observerRawElement(data); JUS_PRINT(" Call function ... 2 (done)"); } } m_threadRunning = false; JUS_DEBUG("End of thread"); } bool jus::TcpString::isActive() const { return m_threadRunning; } void jus::TcpString::connect(bool _async){ JUS_DEBUG("connect [START]"); m_threadRunning = true; m_thread = new std::thread([&](void *){ this->threadCallback();}, nullptr); if (m_thread == nullptr) { m_threadRunning = false; JUS_ERROR("creating callback thread!"); return; } m_threadAsyncRunning = true; m_threadAsync = new std::thread([&](void *){ this->threadAsyncCallback();}, nullptr); if (m_threadAsync == nullptr) { m_threadAsyncRunning = false; JUS_ERROR("creating async sender thread!"); return; } while ( _async == false && m_threadRunning == true && m_connection.getConnectionStatus() != enet::Tcp::status::link) { usleep(50000); } //ethread::setPriority(*m_receiveThread, -6); if (_async == true) { JUS_DEBUG("connect [STOP] async mode"); } else { JUS_DEBUG("connect [STOP]"); } } void jus::TcpString::disconnect(bool _inThreadStop){ JUS_DEBUG("disconnect [START]"); m_threadRunning = false; m_threadAsyncRunning = false; if (m_connection.getConnectionStatus() == enet::Tcp::status::link) { uint32_t size = 0xFFFFFFFF; m_connection.write(&size, 4); } if (m_connection.getConnectionStatus() != enet::Tcp::status::unlink) { m_connection.unlink(); } if (m_threadAsync != nullptr) { m_threadAsync->join(); delete m_threadAsync; m_threadAsync = nullptr; } if (_inThreadStop == false) { if (m_thread != nullptr) { m_thread->join(); delete m_thread; m_thread = nullptr; } } JUS_DEBUG("disconnect [STOP]"); } int32_t jus::TcpString::write(const std::string& _data) { if (m_threadRunning == false) { return -2; } if (_data.size() == 0) { return 0; } uint32_t size = _data.size(); m_lastSend = std::chrono::steady_clock::now(); m_connection.write(&size, 4); return m_connection.write(_data.c_str(), _data.size()); } int32_t jus::TcpString::writeBinary(jus::Buffer& _data) { _data.prepare(); JUS_DEBUG("Send BINARY '" << _data.generateHumanString() << "'"); if (m_threadRunning == false) { return -2; } /* if (_data.size() == 0) { return 0; } */ //uint32_t size = _data.size(); const uint8_t* data = nullptr; uint32_t dataSize = 0; m_lastSend = std::chrono::steady_clock::now(); data = _data.getHeader(); dataSize = _data.getHeaderSize(); m_connection.write(data, dataSize); data = _data.getParam(); dataSize = _data.getParamSize(); m_connection.write(data, dataSize); data = _data.getData(); dataSize = _data.getDataSize(); m_connection.write(data, dataSize); return 1; } std::string jus::TcpString::read() { JUS_VERBOSE("Read [START]"); if (m_threadRunning == false) { JUS_DEBUG("Read [END] Disconected"); return ""; } // TODO : Do it better with a correct way to check data size ... JUS_VERBOSE("Read [START]"); std::string out; uint32_t size = 0; int32_t len = m_connection.read(&size, 4); if (len != 4) { JUS_ERROR("Protocol error occured ..."); } else { if (size == -1) { JUS_WARNING("Remote close connection"); m_threadRunning = false; //m_connection.unlink(); } else { int64_t offset = 0; out.resize(size); while (offset != size) { len = m_connection.read(&out[offset], size-offset); offset += len; if (len == 0) { JUS_WARNING("Read No data"); //break; } /* else if (size != offset) { JUS_ERROR("Protocol error occured .2. ==> concat (offset=" << offset << " size=" << size); } */ } } } JUS_VERBOSE("Read [STOP]"); return out; } jus::Buffer jus::TcpString::readRaw() { jus::Buffer out; JUS_VERBOSE("ReadRaw [START]"); if (m_threadRunning == false) { JUS_DEBUG("Read [END] Disconected"); return out; } JUS_VERBOSE("ReadRaw [START]"); uint32_t size = 0; int32_t len = m_connection.read(&size, 4); if (len != 4) { JUS_ERROR("Protocol error occured ..."); } else { if (size == -1) { JUS_WARNING("Remote close connection"); m_threadRunning = false; //m_connection.unlink(); } else { int64_t offset = 0; m_buffer.resize(size); while (offset != size) { len = m_connection.read(&m_buffer[offset], size-offset); offset += len; if (len == 0) { JUS_WARNING("Read No data"); //break; } /* else if (size != offset) { JUS_ERROR("Protocol error occured .2. ==> concat (offset=" << offset << " size=" << size); } */ } out.composeWith(m_buffer); } } JUS_VERBOSE("ReadRaw [STOP]"); return out; } void jus::TcpString::threadAsyncCallback() { ethread::setName("Async-sender"); // get datas: while ( m_threadAsyncRunning == true && m_connection.getConnectionStatus() == enet::Tcp::status::link) { if (m_threadAsyncList.size() == 0) { usleep(10000); continue; } std::unique_lock lock(m_threadAsyncMutex); auto it = m_threadAsyncList.begin(); while (it != m_threadAsyncList.end()) { bool ret = (*it)(this); if (ret == true) { // Remove it ... it = m_threadAsyncList.erase(it); } else { ++it; } } } m_threadRunning = false; JUS_DEBUG("End of thread"); }