fix(SocketReactor): fix dataCollection test

This commit is contained in:
Alex Fabijanic 2021-06-17 17:34:11 +02:00
parent 861392d15a
commit 8c31d76e2b

View File

@ -324,6 +324,7 @@ namespace
_data.resize(1);
_reactor.addEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
_reactor.addEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
_socket.setBlocking(false);
}
~DataServiceHandler()
@ -336,28 +337,32 @@ namespace
{
pNf->release();
char buffer[64];
int n = _socket.receiveBytes(&buffer[0], sizeof(buffer));
if (n > 0)
int n = 0;
do
{
_data[_pos].append(buffer, n);
std::size_t pos;
pos = _data[_pos].find('\n');
if(pos != std::string::npos)
n = _socket.receiveBytes(&buffer[0], sizeof(buffer));
if (n > 0)
{
if (pos == _data[_pos].size() - 1)
_data[_pos].append(buffer, n);
std::size_t pos;
pos = _data[_pos].find('\n');
if (pos != std::string::npos)
{
_data[_pos].erase(pos, 1);
_data.push_back(std::string());
if (pos == _data[_pos].size() - 1)
{
_data[_pos].erase(pos, 1);
_data.push_back(std::string());
}
else
{
_data.push_back(_data[_pos].substr(pos + 1));
_data[_pos].erase(pos);
}
++_pos;
}
else
{
_data.push_back(_data[_pos].substr(pos + 1));
_data[_pos].erase(pos);
}
++_pos;
}
}
else return;
else break;
} while (true);
}
void onShutdown(ShutdownNotification* pNf)
@ -499,8 +504,7 @@ void SocketReactorTest::testDataCollection()
" \"ts\":\"1524864651000001\","
" \"data\":123"
"}\n");
sock.sendBytes(data0.data(), static_cast<int>(data0.size()));
int n = sock.sendBytes(data0.data(), static_cast<int>(data0.size()));
std::string data1("{"
" \"src\":\"127.0.0.1\","
" \"id\":\"test1\","
@ -516,8 +520,7 @@ void SocketReactorTest::testDataCollection()
" }"
" ]"
"}\n");
sock.sendBytes(data1.data(), static_cast<int>(data1.size()));
n = sock.sendBytes(data1.data(), static_cast<int>(data1.size()));
std::string data2 = "{"
" \"src\":\"127.0.0.1\","
" \"id\":\"test2\","
@ -554,7 +557,7 @@ void SocketReactorTest::testDataCollection()
" }"
" ]"
"}\n";
sock.sendBytes(data2.data(), static_cast<int>(data2.size()));
n = sock.sendBytes(data2.data(), static_cast<int>(data2.size()));
Thread::sleep(500);
reactor.stop();
thread.join();