[DEBUG] add lock on read and write on the socket and check return of http header before continue sending datas...

This commit is contained in:
Edouard DUPIN 2016-07-01 23:39:24 +02:00
parent 905721d2b9
commit db50d40c8c
5 changed files with 61 additions and 10 deletions

View File

@ -387,6 +387,10 @@ void enet::Http::getHeader() {
break; break;
} }
} }
if (m_connection.getConnectionStatus() != enet::Tcp::status::link) {
ENET_ERROR("Read HTTP Header [STOP] : '" << header << "' ==> status move in unlink ...");
return;
}
ENET_VERBOSE("Read HTTP Header [STOP] : '" << header << "'"); ENET_VERBOSE("Read HTTP Header [STOP] : '" << header << "'");
m_headerIsSend = true; m_headerIsSend = true;
// parse header : // parse header :

View File

@ -117,7 +117,11 @@ int32_t enet::Tcp::read(void* _data, int32_t _maxLen) {
// Receive all incoming data on this socket before we loop back and call poll again. // Receive all incoming data on this socket before we loop back and call poll again.
// Receive data on this connection until the recv fails with EWOULDBLOCK. // Receive data on this connection until the recv fails with EWOULDBLOCK.
// If any other failure occurs, we will close the connection. // If any other failure occurs, we will close the connection.
{
std::unique_lock<std::mutex> lock(m_mutex);
//ENET_DEBUG("Read on socketid = " << m_fds[0].fd );
rc = recv(m_fds[0].fd, _data, _maxLen, 0); rc = recv(m_fds[0].fd, _data, _maxLen, 0);
}
if (rc < 0) { if (rc < 0) {
if (errno != EWOULDBLOCK) { if (errno != EWOULDBLOCK) {
ENET_ERROR(" recv() failed"); ENET_ERROR(" recv() failed");
@ -147,7 +151,12 @@ int32_t enet::Tcp::write(const void* _data, int32_t _len) {
ENET_ERROR("Can not write on unlink connection"); ENET_ERROR("Can not write on unlink connection");
return -1; return -1;
} }
int32_t size = ::write(m_socketId, _data, _len); //ENET_DEBUG("write on socketid = " << m_socketId << " data@=" << int64_t(_data) << " size=" << _len );
int32_t size;
{
std::unique_lock<std::mutex> lock(m_mutex);
size = ::write(m_socketId, _data, _len);
}
if ( size != _len if ( size != _len
&& errno != 0) { && errno != 0) {
ENET_ERROR("PB when writing data on the FD : request=" << _len << " have=" << size << ", erno=" << errno << "," << strerror(errno)); ENET_ERROR("PB when writing data on the FD : request=" << _len << " have=" << size << ", erno=" << errno << "," << strerror(errno));

View File

@ -7,12 +7,14 @@
#include <etk/types.h> #include <etk/types.h>
#include <poll.h> #include <poll.h>
#include <mutex>
namespace enet { namespace enet {
class Tcp { class Tcp {
private: private:
int32_t m_socketId; //!< socket linux interface generic int32_t m_socketId; //!< socket linux interface generic
struct pollfd m_fds[1]; struct pollfd m_fds[1];
std::mutex m_mutex;
public: public:
Tcp(); Tcp();
Tcp(int32_t _idSocket, const std::string& _name); Tcp(int32_t _idSocket, const std::string& _name);

View File

@ -12,6 +12,7 @@
#include <random> #include <random>
#include <algue/base64.h> #include <algue/base64.h>
#include <algue/sha1.h> #include <algue/sha1.h>
#include <unistd.h>
namespace enet { namespace enet {
@ -27,25 +28,35 @@ namespace enet {
} }
enet::WebSocket::WebSocket() : enet::WebSocket::WebSocket() :
m_interface(), m_connectionValidate(false),
m_interface(nullptr),
m_observer(nullptr), m_observer(nullptr),
m_observerUriCheck(nullptr) { m_observerUriCheck(nullptr) {
} }
enet::WebSocket::WebSocket(enet::Tcp _connection, bool _isServer) : enet::WebSocket::WebSocket(enet::Tcp _connection, bool _isServer) :
m_interface(), m_connectionValidate(false),
m_interface(nullptr),
m_observer(nullptr), m_observer(nullptr),
m_observerUriCheck(nullptr) { m_observerUriCheck(nullptr) {
_connection.setTCPNoDelay(true); _connection.setTCPNoDelay(true);
if (_isServer == true) { if (_isServer == true) {
ememory::SharedPtr<enet::HttpServer> interface = std::make_shared<enet::HttpServer>(std::move(_connection)); ememory::SharedPtr<enet::HttpServer> interface = std::make_shared<enet::HttpServer>(std::move(_connection));
interface->connectHeader(this, &enet::WebSocket::onReceiveRequest);
m_interface = interface; m_interface = interface;
if (interface != nullptr) {
interface->connectHeader(this, &enet::WebSocket::onReceiveRequest);
}
} else { } else {
ememory::SharedPtr<enet::HttpClient> interface = std::make_shared<enet::HttpClient>(std::move(_connection)); ememory::SharedPtr<enet::HttpClient> interface = std::make_shared<enet::HttpClient>(std::move(_connection));
interface->connectHeader(this, &enet::WebSocket::onReceiveAnswer);
m_interface = interface; m_interface = interface;
if (interface != nullptr) {
interface->connectHeader(this, &enet::WebSocket::onReceiveAnswer);
}
}
if (m_interface == nullptr) {
ENET_ERROR("can not create interface for the websocket");
return;
} }
m_interface->connectRaw(this, &enet::WebSocket::onReceiveData); m_interface->connectRaw(this, &enet::WebSocket::onReceiveData);
} }
@ -54,12 +65,20 @@ void enet::WebSocket::setInterface(enet::Tcp _connection, bool _isServer) {
_connection.setTCPNoDelay(true); _connection.setTCPNoDelay(true);
if (_isServer == true) { if (_isServer == true) {
ememory::SharedPtr<enet::HttpServer> interface = std::make_shared<enet::HttpServer>(std::move(_connection)); ememory::SharedPtr<enet::HttpServer> interface = std::make_shared<enet::HttpServer>(std::move(_connection));
interface->connectHeader(this, &enet::WebSocket::onReceiveRequest);
m_interface = interface; m_interface = interface;
if (interface != nullptr) {
interface->connectHeader(this, &enet::WebSocket::onReceiveRequest);
}
} else { } else {
ememory::SharedPtr<enet::HttpClient> interface = std::make_shared<enet::HttpClient>(std::move(_connection)); ememory::SharedPtr<enet::HttpClient> interface = std::make_shared<enet::HttpClient>(std::move(_connection));
interface->connectHeader(this, &enet::WebSocket::onReceiveAnswer);
m_interface = interface; m_interface = interface;
if (interface != nullptr) {
interface->connectHeader(this, &enet::WebSocket::onReceiveAnswer);
}
}
if (m_interface == nullptr) {
ENET_ERROR("can not create interface for the websocket");
return;
} }
m_interface->connectRaw(this, &enet::WebSocket::onReceiveData); m_interface->connectRaw(this, &enet::WebSocket::onReceiveData);
} }
@ -123,11 +142,25 @@ void enet::WebSocket::start(const std::string& _uri, const std::vector<std::stri
ememory::SharedPtr<enet::HttpClient> interface = std::dynamic_pointer_cast<enet::HttpClient>(m_interface); ememory::SharedPtr<enet::HttpClient> interface = std::dynamic_pointer_cast<enet::HttpClient>(m_interface);
if (interface != nullptr) { if (interface != nullptr) {
interface->setHeader(req); interface->setHeader(req);
int32_t timeout = 500000; // 5 second
while (timeout>=0) {
if ( m_connectionValidate == true
|| m_interface->isAlive() == false) {
break;
}
usleep(10000);
timeout--;
}
if ( m_connectionValidate == false
|| m_interface->isAlive() == false) {
ENET_ERROR("Connection refused by SERVER ...");
}
} }
} }
} }
void enet::WebSocket::stop(bool _inThread) { void enet::WebSocket::stop(bool _inThread) {
ENET_DEBUG("Stop interface ...");
if (m_interface == nullptr) { if (m_interface == nullptr) {
ENET_ERROR("Nullptr interface ..."); ENET_ERROR("Nullptr interface ...");
return; return;
@ -401,6 +434,8 @@ void enet::WebSocket::onReceiveAnswer(const enet::HttpAnswer& _data) {
} }
setProtocol(_data.getKey("Sec-WebSocket-Protocol")); setProtocol(_data.getKey("Sec-WebSocket-Protocol"));
// TODO : Create a methode to check the current protocol ... // TODO : Create a methode to check the current protocol ...
// now we can release the client call connection ...
m_connectionValidate = true;
} }
bool enet::WebSocket::writeHeader(int32_t _len, bool _isString, bool _mask) { bool enet::WebSocket::writeHeader(int32_t _len, bool _isString, bool _mask) {

View File

@ -13,6 +13,7 @@
namespace enet { namespace enet {
class WebSocket { class WebSocket {
protected: protected:
bool m_connectionValidate;
ememory::SharedPtr<enet::Http> m_interface; ememory::SharedPtr<enet::Http> m_interface;
std::vector<uint8_t> m_buffer; std::vector<uint8_t> m_buffer;
std::string m_checkKey; std::string m_checkKey;