fix reactor tests; add a test

This commit is contained in:
Alex Fabijanic 2018-04-27 19:42:20 -05:00
parent ea38cdb740
commit f0fed91a0e
2 changed files with 164 additions and 6 deletions

View File

@ -21,6 +21,7 @@
#include "Poco/Net/SocketAddress.h"
#include "Poco/Observer.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include <sstream>
@ -38,6 +39,7 @@ using Poco::Net::TimeoutNotification;
using Poco::Net::ShutdownNotification;
using Poco::Observer;
using Poco::IllegalStateException;
using Poco::Thread;
namespace
@ -104,9 +106,8 @@ namespace
checkTimeoutObserverCount(0);
_reactor.addEventHandler(_socket, _ot);
checkTimeoutObserverCount(1);
}
~ClientServiceHandler()
{
}
@ -120,6 +121,7 @@ namespace
{
_str.write(buffer, n);
_data += _str.str();
_str.str("");
if ((_once && _data.size() >= 1024) ||
(!_once && _data.size() >= 4096))
{
@ -294,6 +296,71 @@ namespace
bool _failed;
bool _shutdown;
};
class DataServiceHandler
{
public:
typedef std::vector<std::string> Data;
DataServiceHandler(StreamSocket& socket, SocketReactor& reactor):
_socket(socket),
_reactor(reactor),
_pos(0)
{
_data.resize(1);
_reactor.addEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
_reactor.addEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
}
~DataServiceHandler()
{
_reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ReadableNotification>(*this, &DataServiceHandler::onReadable));
_reactor.removeEventHandler(_socket, Observer<DataServiceHandler, ShutdownNotification>(*this, &DataServiceHandler::onShutdown));
}
void onReadable(ReadableNotification* pNf)
{
pNf->release();
char buffer[8] = {0};
int n = _socket.receiveBytes(&buffer[0], sizeof(buffer) - 1);
if (n > 0)
{
_data[_pos].append(buffer, n);
std::size_t pos;
pos = _data[_pos].find('\n');
if(pos != std::string::npos)
{
if (pos == _data[_pos].size() - 1)
{
_data[_pos].erase(pos, 1);
_data.push_back(std::string());
}
else
{
_data.push_back(_data[_pos].substr(pos + 1));
_data[_pos].erase(pos);
}
++_pos;
}
}
else return;
}
void onShutdown(ShutdownNotification* pNf)
{
pNf->release();
delete this;
}
static Data _data;
private:
StreamSocket _socket;
SocketReactor& _reactor;
int _pos;
};
DataServiceHandler::Data DataServiceHandler::_data;
}
@ -319,7 +386,7 @@ void SocketReactorTest::testSocketReactor()
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assertTrue (data.size() >= 1024);
assertTrue (data.size() == 1024);
assertTrue (!ClientServiceHandler::readableError());
assertTrue (!ClientServiceHandler::writableError());
assertTrue (!ClientServiceHandler::timeoutError());
@ -339,7 +406,7 @@ void SocketReactorTest::testSetSocketReactor()
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assertTrue (data.size() >= 1024);
assertTrue (data.size() == 1024);
assertTrue (!ClientServiceHandler::readableError());
assertTrue (!ClientServiceHandler::writableError());
assertTrue (!ClientServiceHandler::timeoutError());
@ -361,7 +428,7 @@ void SocketReactorTest::testParallelSocketReactor()
ClientServiceHandler::resetData();
reactor.run();
std::string data(ClientServiceHandler::data());
assertTrue (data.size() >= 4096);
assertTrue (data.size() == 4096);
assertTrue (!ClientServiceHandler::readableError());
assertTrue (!ClientServiceHandler::writableError());
assertTrue (!ClientServiceHandler::timeoutError());
@ -385,7 +452,7 @@ void SocketReactorTest::testSocketConnectorFail()
void SocketReactorTest::testSocketConnectorTimeout()
{
ClientServiceHandler::setCloseOnTimeout(true);
SocketAddress ssa;
ServerSocket ss(ssa);
SocketReactor reactor;
@ -396,6 +463,95 @@ void SocketReactorTest::testSocketConnectorTimeout()
}
void SocketReactorTest::testDataCollection()
{
SocketAddress ssa;
ServerSocket ss(ssa);
SocketReactor reactor;
SocketAcceptor<DataServiceHandler> acceptor(ss, reactor);
Thread thread;
thread.start(reactor);
SocketAddress sa("127.0.0.1", ss.address().port());
StreamSocket sock(sa);
std::string data0("{"
" \"src\":\"127.0.0.1\","
" \"id\":\"test0\","
" \"ts\":\"1524864651000001\","
" \"data\":123"
"}\n");
sock.sendBytes(data0.data(), data0.size());
std::string data1("{"
" \"src\":\"127.0.0.1\","
" \"id\":\"test1\","
" \"ts\":\"1524864651123456\","
" \"data\":"
" ["
" {"
" \"tag1\":"
" ["
" {\"val1\":123},"
" {\"val2\":\"abc\"}"
" ]"
" }"
" ]"
"}\n");
sock.sendBytes(data1.data(), data1.size());
std::string data2 = "{"
" \"src\":\"127.0.0.1\","
" \"id\":\"test2\","
" \"ts\":\"1524864652654321\","
" \"data\":"
" ["
" {"
" \"tag1\":"
" ["
" {"
" \"val1\":123,"
" \"val2\":\"abc\","
" \"val3\":42.123"
" },"
" {"
" \"val1\":987,"
" \"val2\":\"xyz\","
" \"val3\":24.321"
" }"
" ],"
" \"tag2\":"
" ["
" {"
" \"val1\":42.123,"
" \"val2\":123,"
" \"val3\":\"abc\""
" },"
" {"
" \"val1\":24.321,"
" \"val2\":987,"
" \"val3\":\"xyz\""
" }"
" ]"
" }"
" ]"
"}\n";
sock.sendBytes(data2.data(), data2.size());
Thread::sleep(500);
reactor.stop();
thread.join();
assertTrue (DataServiceHandler::_data.size() == 4);
data0.erase(data0.size() - 1);
assertTrue (DataServiceHandler::_data[0] == data0);
data1.erase(data1.size() - 1);
assertTrue (DataServiceHandler::_data[1] == data1);
data2.erase(data2.size() - 1);
assertTrue (DataServiceHandler::_data[2] == data2);
assertTrue (DataServiceHandler::_data[3].empty());
}
void SocketReactorTest::setUp()
{
ClientServiceHandler::setCloseOnTimeout(false);
@ -416,6 +572,7 @@ CppUnit::Test* SocketReactorTest::suite()
CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail);
CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout);
CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection);
return pSuite;
}

View File

@ -29,6 +29,7 @@ public:
void testParallelSocketReactor();
void testSocketConnectorFail();
void testSocketConnectorTimeout();
void testDataCollection();
void setUp();
void tearDown();