[DEV] change some cont function and add send and receive timeing in websocket

This commit is contained in:
Edouard DUPIN 2016-06-20 23:22:45 +02:00
parent 19d49bb462
commit 07323386b7
4 changed files with 100 additions and 29 deletions

View File

@ -225,7 +225,7 @@ namespace enet {
public:
void start();
void stop(bool _inThread=false);
bool isAlive() {
bool isAlive() const {
return m_connection.getConnectionStatus() == enet::Tcp::status::link;
}
public:

View File

@ -47,7 +47,7 @@ namespace enet {
* @brief Get the current Status of the connection
* @return The status.
*/
enum status getConnectionStatus() {
enum status getConnectionStatus() const {
return m_status;
}
public:

View File

@ -26,6 +26,13 @@ namespace enet {
}
}
enet::WebSocket::WebSocket() :
m_interface(),
m_observer(nullptr),
m_observerUriCheck(nullptr) {
}
enet::WebSocket::WebSocket(enet::Tcp _connection, bool _isServer) :
m_interface(),
m_observer(nullptr),
@ -42,6 +49,19 @@ enet::WebSocket::WebSocket(enet::Tcp _connection, bool _isServer) :
m_interface->connectRaw(this, &enet::WebSocket::onReceiveData);
}
void enet::WebSocket::setInterface(enet::Tcp _connection, bool _isServer) {
if (_isServer == true) {
ememory::SharedPtr<enet::HttpServer> interface = std::make_shared<enet::HttpServer>(std::move(_connection));
interface->connectHeader(this, &enet::WebSocket::onReceiveRequest);
m_interface = interface;
} else {
ememory::SharedPtr<enet::HttpClient> interface = std::make_shared<enet::HttpClient>(std::move(_connection));
interface->connectHeader(this, &enet::WebSocket::onReceiveAnswer);
m_interface = interface;
}
m_interface->connectRaw(this, &enet::WebSocket::onReceiveData);
}
enet::WebSocket::~WebSocket() {
if (m_interface == nullptr) {
return;
@ -75,6 +95,7 @@ void enet::WebSocket::start(const std::string& _uri) {
m_interface->start();
if (m_interface->isServer() == false) {
enet::HttpRequest req(enet::HTTPReqType::GET);
req.setProtocol(enet::HTTPProtocol::http_1_1);
req.setUri(_uri);
req.setKey("Upgrade", "websocket");
req.setKey("Connection", "Upgrade");
@ -117,6 +138,7 @@ void enet::WebSocket::onReceiveData(enet::Tcp& _connection) {
ENET_VERBOSE("ReadRaw 2 [STOP]");
return;
}
m_lastReceive = std::chrono::steady_clock::now();
ENET_VERBOSE("Read opcode : " << uint32_t(opcode));
if ((opcode & 0x80) == 0) {
ENET_ERROR("Multiple frames ... NOT managed ...");
@ -260,6 +282,7 @@ void enet::WebSocket::onReceiveRequest(const enet::HttpRequest& _data) {
_data.display();
if (_data.getType() != enet::HTTPReqType::GET) {
enet::HttpAnswer answer(enet::HTTPAnswerCode::c400_badRequest, "support only GET");
answer.setProtocol(enet::HTTPProtocol::http_1_1);
answer.setKey("Connection", "close");
interface->setHeader(answer);
interface->stop(true);
@ -267,6 +290,7 @@ void enet::WebSocket::onReceiveRequest(const enet::HttpRequest& _data) {
}
if (_data.getKey("Connection") == "close") {
enet::HttpAnswer answer(enet::HTTPAnswerCode::c200_ok);
answer.setProtocol(enet::HTTPProtocol::http_1_1);
answer.setKey("Connection", "close");
interface->setHeader(answer);
interface->stop(true);
@ -274,6 +298,7 @@ void enet::WebSocket::onReceiveRequest(const enet::HttpRequest& _data) {
}
if (_data.getKey("Upgrade") != "websocket") {
enet::HttpAnswer answer(enet::HTTPAnswerCode::c400_badRequest, "websocket support only with Upgrade: websocket");
answer.setProtocol(enet::HTTPProtocol::http_1_1);
answer.setKey("Connection", "close");
interface->setHeader(answer);
interface->stop(true);
@ -281,6 +306,7 @@ void enet::WebSocket::onReceiveRequest(const enet::HttpRequest& _data) {
}
if (_data.getKey("Sec-WebSocket-Key") == "") {
enet::HttpAnswer answer(enet::HTTPAnswerCode::c400_badRequest, "websocket missing 'Sec-WebSocket-Key'");
answer.setProtocol(enet::HTTPProtocol::http_1_1);
answer.setKey("Connection", "close");
interface->setHeader(answer);
interface->stop(true);
@ -289,6 +315,7 @@ void enet::WebSocket::onReceiveRequest(const enet::HttpRequest& _data) {
if (m_observerUriCheck != nullptr) {
if (m_observerUriCheck(_data.getUri()) == false) {
enet::HttpAnswer answer(enet::HTTPAnswerCode::c404_notFound);
answer.setProtocol(enet::HTTPProtocol::http_1_1);
answer.setKey("Connection", "close");
interface->setHeader(answer);
interface->stop(true);
@ -296,6 +323,7 @@ void enet::WebSocket::onReceiveRequest(const enet::HttpRequest& _data) {
}
}
enet::HttpAnswer answer(enet::HTTPAnswerCode::c101_switchingProtocols);
answer.setProtocol(enet::HTTPProtocol::http_1_1);
answer.setKey("Upgrade", "websocket");
answer.setKey("Connection", "Upgrade");
std::string answerKey = generateCheckKey(_data.getKey("Sec-WebSocket-Key"));
@ -333,10 +361,10 @@ void enet::WebSocket::onReceiveAnswer(const enet::HttpAnswer& _data) {
}
int32_t enet::WebSocket::write(const void* _data, int32_t _len, bool _isString, bool _mask) {
bool enet::WebSocket::writeHeader(int32_t _len, bool _isString, bool _mask) {
if (m_interface == nullptr) {
ENET_ERROR("Nullptr interface ...");
return -1;
return false;
}
uint8_t mask = 0;
if (_mask == true) {
@ -348,6 +376,7 @@ int32_t enet::WebSocket::write(const void* _data, int32_t _len, bool _isString,
} else {
header |= enet::websocket::OPCODE_FRAME_TEXT;
}
m_lastSend = std::chrono::steady_clock::now();
m_interface->write(&header, sizeof(uint8_t));
ENET_VERBOSE("write opcode : " << int32_t(header));
if (_len < 126) {
@ -375,31 +404,48 @@ int32_t enet::WebSocket::write(const void* _data, int32_t _len, bool _isString,
std::mt19937 e2(rd());
// Distribtuions
std::uniform_real_distribution<> dist(0, 0xFF);
uint8_t dataMask[4];
dataMask[0] = uint8_t(dist(e2));
dataMask[1] = uint8_t(dist(e2));
dataMask[2] = uint8_t(dist(e2));
dataMask[3] = uint8_t(dist(e2));
m_interface->write(&dataMask, sizeof(uint32_t));
std::vector<uint8_t> data;
data.resize(_len);
const uint8_t* pdata = static_cast<const uint8_t*>(_data);
m_haveMask = true;
m_dataMask[0] = uint8_t(dist(e2));
m_dataMask[1] = uint8_t(dist(e2));
m_dataMask[2] = uint8_t(dist(e2));
m_dataMask[3] = uint8_t(dist(e2));
} else {
m_haveMask = false;
}
return true;
}
int32_t enet::WebSocket::writeData(uint8_t* _data, int32_t _len) {
if (m_interface == nullptr) {
ENET_ERROR("Nullptr interface ...");
return -1;
}
if (m_haveMask == true) {
for (size_t iii= 0; iii<_len; ++iii) {
data[iii] = pdata[iii] ^ dataMask[iii%4];
_data[iii] ^= m_dataMask[iii%4];
}
return m_interface->write(&data[0], data.size());
}
return m_interface->write(_data, _len);
}
int32_t enet::WebSocket::write(const void* _data, int32_t _len, bool _isString, bool _mask) {
if (writeHeader( _len, _isString, _mask) == false) {
return -1;
}
return writeData((uint8_t*)_data, _len);
}
void enet::WebSocket::controlPing() {
if (m_interface == nullptr) {
ENET_ERROR("Nullptr interface ...");
return;
}
uint16_t header = ( enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_PING) << 8;
m_interface->write(&header, sizeof(uint16_t));
uint8_t header = enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_PING;
m_lastSend = std::chrono::steady_clock::now();
m_interface->write(&header, sizeof(uint8_t));
header = 0;
m_interface->write(&header, sizeof(uint8_t));
}
void enet::WebSocket::controlPong() {
@ -407,18 +453,24 @@ void enet::WebSocket::controlPong() {
ENET_ERROR("Nullptr interface ...");
return;
}
uint16_t header = ( enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_PONG) << 8;
m_interface->write(&header, sizeof(uint16_t));
uint8_t header = enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_PONG;
m_lastSend = std::chrono::steady_clock::now();
m_interface->write(&header, sizeof(uint8_t));
header = 0;
m_interface->write(&header, sizeof(uint8_t));
}
void enet::WebSocket::contolClose() {
void enet::WebSocket::controlClose() {
if (m_interface == nullptr) {
ENET_ERROR("Nullptr interface ...");
return;
}
uint16_t header = ( enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_CLOSE) << 8;
m_interface->write(&header, sizeof(uint16_t));
uint8_t header = enet::websocket::FLAG_FIN
| enet::websocket::OPCODE_FRAME_CLOSE;
m_lastSend = std::chrono::steady_clock::now();
m_interface->write(&header, sizeof(uint8_t));
header = 0;
m_interface->write(&header, sizeof(uint8_t));
}

View File

@ -16,12 +16,26 @@ namespace enet {
ememory::SharedPtr<enet::Http> m_interface;
std::vector<uint8_t> m_buffer;
std::string m_checkKey;
std::chrono::steady_clock::time_point m_lastReceive;
std::chrono::steady_clock::time_point m_lastSend;
public:
const std::chrono::steady_clock::time_point& getLastTimeReceive() {
return m_lastReceive;
}
const std::chrono::steady_clock::time_point& getLastTimeSend() {
return m_lastSend;
}
public:
WebSocket();
WebSocket(enet::Tcp _connection, bool _isServer=false);
void setInterface(enet::Tcp _connection, bool _isServer=false);
virtual ~WebSocket();
void start(const std::string& _uri="");
void stop(bool _inThread=false);
bool isAlive() {
bool isAlive() const {
if (m_interface == nullptr) {
return false;
}
return m_interface->isAlive();
}
void onReceiveData(enet::Tcp& _data);
@ -62,13 +76,18 @@ namespace enet {
template<class CLASS_TYPE>
void connectUri(CLASS_TYPE* _class, bool (CLASS_TYPE::*_func)(const std::string&)) {
m_observerUriCheck = [=](const std::string& _value){
(*_class.*_func)(_value);
return (*_class.*_func)(_value);
};
}
void connectUri(ObserverUriCheck _func) {
m_observerUriCheck = _func;
}
private:
bool m_haveMask;
uint8_t m_dataMask[4];
public:
bool writeHeader(int32_t _len, bool _isString=false, bool _mask= false);
int32_t writeData(uint8_t* _data, int32_t _len);
/**
* @brief Write a chunk of data on the socket
* @param[in] _data pointer on the data might be write
@ -112,9 +131,9 @@ namespace enet {
}
return ret/sizeof(T);
}
protected:
public:
void controlPing();
void controlPong();
void contolClose();
void controlClose();
};
}