[DEBUG] correct some double sent in a socket

This commit is contained in:
Edouard DUPIN 2017-02-07 21:23:41 +01:00
parent 0f4bbb87f7
commit 63c9757a50
4 changed files with 60 additions and 8 deletions

View File

@ -23,6 +23,10 @@
#include <netdb.h> #include <netdb.h>
#endif #endif
#ifdef ENET_STORE_INPUT
static uint32_t baseID = 0;
#endif
bool enet::Tcp::setTCPNoDelay(bool _enabled) { bool enet::Tcp::setTCPNoDelay(bool _enabled) {
if (m_socketId >= 0) { if (m_socketId >= 0) {
int flag = _enabled==true?1:0; int flag = _enabled==true?1:0;
@ -54,13 +58,20 @@ enet::Tcp::Tcp() :
m_socketId(_idSocket), m_socketId(_idSocket),
m_name(_name), m_name(_name),
m_status(status::link) { m_status(status::link) {
#ifdef ENET_STORE_INPUT
m_nodeStoreInput = etk::FSNode("CACHE:StoreTCPdata_" + etk::to_string(baseID++) + ".tcp");
m_nodeStoreInput.fileOpenWrite();
#endif
} }
enet::Tcp::Tcp(Tcp&& _obj) : enet::Tcp::Tcp(Tcp&& _obj) :
m_socketId(_obj.m_socketId), m_socketId(_obj.m_socketId),
m_name(_obj.m_name), m_name(_obj.m_name),
m_status(_obj.m_status) { m_status(_obj.m_status) {
#ifdef ENET_STORE_INPUT
m_nodeStoreInput = etk::FSNode("CACHE:StoreTCPdata_" + etk::to_string(baseID++) + ".tcp");
m_nodeStoreInput.fileOpenWrite();
#endif
#ifdef __TARGET_OS__Windows #ifdef __TARGET_OS__Windows
_obj.m_socketId = INVALID_SOCKET; _obj.m_socketId = INVALID_SOCKET;
#else #else
@ -76,6 +87,10 @@ enet::Tcp::~Tcp() {
enet::Tcp& enet::Tcp::operator = (enet::Tcp&& _obj) { enet::Tcp& enet::Tcp::operator = (enet::Tcp&& _obj) {
unlink(); unlink();
#ifdef ENET_STORE_INPUT
m_nodeStoreInput = etk::FSNode("CACHE:StoreTCPdata_" + etk::to_string(baseID++) + ".tcp");
m_nodeStoreInput.fileOpenWrite();
#endif
m_socketId = _obj.m_socketId; m_socketId = _obj.m_socketId;
#ifdef __TARGET_OS__Windows #ifdef __TARGET_OS__Windows
_obj.m_socketId = INVALID_SOCKET; _obj.m_socketId = INVALID_SOCKET;
@ -172,7 +187,9 @@ int32_t enet::Tcp::read(void* _data, int32_t _maxLen) {
ENET_DEBUG(" Set status at remote close ..."); ENET_DEBUG(" Set status at remote close ...");
m_status = status::linkRemoteClose; m_status = status::linkRemoteClose;
} }
//#endif #ifdef ENET_STORE_INPUT
m_nodeStoreInput.fileWrite(_data, 1, size);
#endif
return size; return size;
} }

View File

@ -12,6 +12,12 @@
#include <ws2tcpip.h> #include <ws2tcpip.h>
#endif #endif
//#define ENET_STORE_INPUT
#ifdef ENET_STORE_INPUT
#include <etk/os/FSNode.hpp>
#endif
namespace enet { namespace enet {
class Tcp { class Tcp {
private: private:
@ -20,6 +26,9 @@ namespace enet {
#else #else
int32_t m_socketId; //!< socket linux interface generic int32_t m_socketId; //!< socket linux interface generic
#endif #endif
#ifdef ENET_STORE_INPUT
etk::FSNode m_nodeStoreInput;
#endif
std::mutex m_mutex; std::mutex m_mutex;
public: public:
Tcp(); Tcp();

View File

@ -175,7 +175,10 @@ void enet::WebSocket::onReceiveData(enet::Tcp& _connection) {
} }
int8_t size1 = 0; int8_t size1 = 0;
len = _connection.read(&size1, sizeof(uint8_t)); len = _connection.read(&size1, sizeof(uint8_t));
if (len <= 0) { int32_t maxIteration = 50;
// We must get the payload size in all case ... ==> otherwise it create problems
while ( len <= 0
&& maxIteration > 0) {
if (len < 0) { if (len < 0) {
if (_connection.getConnectionStatus() == enet::Tcp::status::link) { if (_connection.getConnectionStatus() == enet::Tcp::status::link) {
ENET_ERROR("Protocol error occured ..."); ENET_ERROR("Protocol error occured ...");
@ -186,6 +189,12 @@ void enet::WebSocket::onReceiveData(enet::Tcp& _connection) {
} }
ENET_ERROR("Time out ... ==> not managed ..."); ENET_ERROR("Time out ... ==> not managed ...");
ENET_VERBOSE("ReadRaw 2 [STOP]"); ENET_VERBOSE("ReadRaw 2 [STOP]");
len = _connection.read(&size1, sizeof(uint8_t));
maxIteration--;
}
if (maxIteration <= 0) {
ENET_ERROR("Can not read the Socket >> auto kill");
m_interface->stop(true);
return; return;
} }
uint64_t totalSize = size1 & 0x7F; uint64_t totalSize = size1 & 0x7F;
@ -265,24 +274,24 @@ void enet::WebSocket::onReceiveData(enet::Tcp& _connection) {
// check opcode: // check opcode:
if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_CLOSE) { if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_CLOSE) {
// Close the conection by remote: // Close the conection by remote:
ENET_INFO("Close connection by remote :"); ENET_WARNING("Close connection by remote :");
m_interface->stop(true); m_interface->stop(true);
return; return;
} }
if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_PING) { if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_PING) {
// Close the conection by remote: // Close the conection by remote:
ENET_DEBUG("Receive a ping (send a pong)"); ENET_WARNING("Receive a ping (send a pong)");
controlPong(); controlPong();
return; return;
} }
if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_PONG) { if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_PONG) {
// Close the conection by remote: // Close the conection by remote:
ENET_DEBUG("Receive a pong"); ENET_WARNING("Receive a pong");
return; return;
} }
if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_TEXT) { if ((opcode & 0x0F) == enet::websocket::OPCODE_FRAME_TEXT) {
// Close the conection by remote: // Close the conection by remote:
ENET_DEBUG("Receive a Text(UTF-8) data " << m_buffer.size() << " Bytes"); ENET_WARNING("Receive a Text(UTF-8) data " << m_buffer.size() << " Bytes");
if (m_observer != nullptr) { if (m_observer != nullptr) {
m_observer(m_buffer, true); m_observer(m_buffer, true);
} }
@ -496,6 +505,7 @@ int32_t enet::WebSocket::send() {
} }
int32_t enet::WebSocket::write(const void* _data, int32_t _len, bool _isString, bool _mask) { int32_t enet::WebSocket::write(const void* _data, int32_t _len, bool _isString, bool _mask) {
std::unique_lock<std::mutex> lock(m_mutex);
if (configHeader(_isString, _mask) == false) { if (configHeader(_isString, _mask) == false) {
return -1; return -1;
} }
@ -508,6 +518,7 @@ void enet::WebSocket::controlPing() {
ENET_ERROR("Nullptr interface ..."); ENET_ERROR("Nullptr interface ...");
return; return;
} }
std::unique_lock<std::mutex> lock(m_mutex);
uint8_t header = enet::websocket::FLAG_FIN uint8_t header = enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_PING; | enet::websocket::OPCODE_FRAME_PING;
m_lastSend = echrono::Steady::now(); m_lastSend = echrono::Steady::now();
@ -521,6 +532,7 @@ void enet::WebSocket::controlPong() {
ENET_ERROR("Nullptr interface ..."); ENET_ERROR("Nullptr interface ...");
return; return;
} }
std::unique_lock<std::mutex> lock(m_mutex);
uint8_t header = enet::websocket::FLAG_FIN uint8_t header = enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_PONG; | enet::websocket::OPCODE_FRAME_PONG;
m_lastSend = echrono::Steady::now(); m_lastSend = echrono::Steady::now();
@ -534,6 +546,7 @@ void enet::WebSocket::controlClose() {
ENET_ERROR("Nullptr interface ..."); ENET_ERROR("Nullptr interface ...");
return; return;
} }
std::unique_lock<std::mutex> lock(m_mutex);
uint8_t header = enet::websocket::FLAG_FIN uint8_t header = enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_CLOSE; | enet::websocket::OPCODE_FRAME_CLOSE;
m_lastSend = echrono::Steady::now(); m_lastSend = echrono::Steady::now();

View File

@ -21,6 +21,7 @@ namespace enet {
std::string m_checkKey; std::string m_checkKey;
echrono::Steady m_lastReceive; echrono::Steady m_lastReceive;
echrono::Steady m_lastSend; echrono::Steady m_lastSend;
std::mutex m_mutex;
public: public:
const echrono::Steady& getLastTimeReceive() { const echrono::Steady& getLastTimeReceive() {
return m_lastReceive; return m_lastReceive;
@ -96,8 +97,20 @@ namespace enet {
bool m_haveMask; bool m_haveMask;
uint8_t m_dataMask[4]; uint8_t m_dataMask[4];
public: public:
std::unique_lock<std::mutex> getScopeLock() {
return std::move(std::unique_lock<std::mutex>(m_mutex));
}
/**
* Compose the local header inside a temporary buffer ==> must lock external to prevent multiple simultaneous access
*/
bool configHeader(bool _isString=false, bool _mask= false); bool configHeader(bool _isString=false, bool _mask= false);
/**
* CWrite data in a temporary buffer ==> must lock external to prevent multiple simultaneous access
*/
int32_t writeData(uint8_t* _data, int32_t _len); int32_t writeData(uint8_t* _data, int32_t _len);
/**
* Use temporary buffer to send it in the socket ==> must lock external to prevent multiple simultaneous access
*/
int32_t send(); int32_t send();
/** /**
* @brief Write a chunk of data on the socket * @brief Write a chunk of data on the socket
@ -105,8 +118,8 @@ namespace enet {
* @param[in] _len Size that must be written socket * @param[in] _len Size that must be written socket
* @return >0 byte size on the socket write * @return >0 byte size on the socket write
* @return -1 an error occured. * @return -1 an error occured.
* @note: This function is locked ...
*/ */
//TODO : ...
int32_t write(const void* _data, int32_t _len, bool _isString=false, bool _mask= false); int32_t write(const void* _data, int32_t _len, bool _isString=false, bool _mask= false);
/** /**
* @brief Write a chunk of data on the socket * @brief Write a chunk of data on the socket