From f0fed91a0ebb91d8266f86233c7124f244fc1d9e Mon Sep 17 00:00:00 2001 From: Alex Fabijanic Date: Fri, 27 Apr 2018 19:42:20 -0500 Subject: [PATCH] fix reactor tests; add a test --- Net/testsuite/src/SocketReactorTest.cpp | 169 +++++++++++++++++++++++- Net/testsuite/src/SocketReactorTest.h | 1 + 2 files changed, 164 insertions(+), 6 deletions(-) diff --git a/Net/testsuite/src/SocketReactorTest.cpp b/Net/testsuite/src/SocketReactorTest.cpp index 3bba177ee..b5d030fb7 100644 --- a/Net/testsuite/src/SocketReactorTest.cpp +++ b/Net/testsuite/src/SocketReactorTest.cpp @@ -21,6 +21,7 @@ #include "Poco/Net/SocketAddress.h" #include "Poco/Observer.h" #include "Poco/Exception.h" +#include "Poco/Thread.h" #include @@ -38,6 +39,7 @@ using Poco::Net::TimeoutNotification; using Poco::Net::ShutdownNotification; using Poco::Observer; using Poco::IllegalStateException; +using Poco::Thread; namespace @@ -104,9 +106,8 @@ namespace checkTimeoutObserverCount(0); _reactor.addEventHandler(_socket, _ot); checkTimeoutObserverCount(1); - } - + ~ClientServiceHandler() { } @@ -120,6 +121,7 @@ namespace { _str.write(buffer, n); _data += _str.str(); + _str.str(""); if ((_once && _data.size() >= 1024) || (!_once && _data.size() >= 4096)) { @@ -294,6 +296,71 @@ namespace bool _failed; bool _shutdown; }; + + class DataServiceHandler + { + public: + typedef std::vector Data; + + DataServiceHandler(StreamSocket& socket, SocketReactor& reactor): + _socket(socket), + _reactor(reactor), + _pos(0) + { + _data.resize(1); + _reactor.addEventHandler(_socket, Observer(*this, &DataServiceHandler::onReadable)); + _reactor.addEventHandler(_socket, Observer(*this, &DataServiceHandler::onShutdown)); + } + + ~DataServiceHandler() + { + _reactor.removeEventHandler(_socket, Observer(*this, &DataServiceHandler::onReadable)); + _reactor.removeEventHandler(_socket, Observer(*this, &DataServiceHandler::onShutdown)); + } + + void onReadable(ReadableNotification* pNf) + { + pNf->release(); + char buffer[8] = {0}; + int n = _socket.receiveBytes(&buffer[0], sizeof(buffer) - 1); + if (n > 0) + { + _data[_pos].append(buffer, n); + std::size_t pos; + pos = _data[_pos].find('\n'); + if(pos != std::string::npos) + { + if (pos == _data[_pos].size() - 1) + { + _data[_pos].erase(pos, 1); + _data.push_back(std::string()); + } + else + { + _data.push_back(_data[_pos].substr(pos + 1)); + _data[_pos].erase(pos); + } + ++_pos; + } + } + else return; + } + + void onShutdown(ShutdownNotification* pNf) + { + pNf->release(); + delete this; + } + + static Data _data; + + private: + StreamSocket _socket; + SocketReactor& _reactor; + int _pos; + }; + + DataServiceHandler::Data DataServiceHandler::_data; } @@ -319,7 +386,7 @@ void SocketReactorTest::testSocketReactor() ClientServiceHandler::resetData(); reactor.run(); std::string data(ClientServiceHandler::data()); - assertTrue (data.size() >= 1024); + assertTrue (data.size() == 1024); assertTrue (!ClientServiceHandler::readableError()); assertTrue (!ClientServiceHandler::writableError()); assertTrue (!ClientServiceHandler::timeoutError()); @@ -339,7 +406,7 @@ void SocketReactorTest::testSetSocketReactor() ClientServiceHandler::resetData(); reactor.run(); std::string data(ClientServiceHandler::data()); - assertTrue (data.size() >= 1024); + assertTrue (data.size() == 1024); assertTrue (!ClientServiceHandler::readableError()); assertTrue (!ClientServiceHandler::writableError()); assertTrue (!ClientServiceHandler::timeoutError()); @@ -361,7 +428,7 @@ void SocketReactorTest::testParallelSocketReactor() ClientServiceHandler::resetData(); reactor.run(); std::string data(ClientServiceHandler::data()); - assertTrue (data.size() >= 4096); + assertTrue (data.size() == 4096); assertTrue (!ClientServiceHandler::readableError()); assertTrue (!ClientServiceHandler::writableError()); assertTrue (!ClientServiceHandler::timeoutError()); @@ -385,7 +452,7 @@ void SocketReactorTest::testSocketConnectorFail() void SocketReactorTest::testSocketConnectorTimeout() { ClientServiceHandler::setCloseOnTimeout(true); - + SocketAddress ssa; ServerSocket ss(ssa); SocketReactor reactor; @@ -396,6 +463,95 @@ void SocketReactorTest::testSocketConnectorTimeout() } +void SocketReactorTest::testDataCollection() +{ + SocketAddress ssa; + ServerSocket ss(ssa); + SocketReactor reactor; + SocketAcceptor acceptor(ss, reactor); + Thread thread; + thread.start(reactor); + + SocketAddress sa("127.0.0.1", ss.address().port()); + StreamSocket sock(sa); + + std::string data0("{" + " \"src\":\"127.0.0.1\"," + " \"id\":\"test0\"," + " \"ts\":\"1524864651000001\"," + " \"data\":123" + "}\n"); + sock.sendBytes(data0.data(), data0.size()); + + std::string data1("{" + " \"src\":\"127.0.0.1\"," + " \"id\":\"test1\"," + " \"ts\":\"1524864651123456\"," + " \"data\":" + " [" + " {" + " \"tag1\":" + " [" + " {\"val1\":123}," + " {\"val2\":\"abc\"}" + " ]" + " }" + " ]" + "}\n"); + sock.sendBytes(data1.data(), data1.size()); + + std::string data2 = "{" + " \"src\":\"127.0.0.1\"," + " \"id\":\"test2\"," + " \"ts\":\"1524864652654321\"," + " \"data\":" + " [" + " {" + " \"tag1\":" + " [" + " {" + " \"val1\":123," + " \"val2\":\"abc\"," + " \"val3\":42.123" + " }," + " {" + " \"val1\":987," + " \"val2\":\"xyz\"," + " \"val3\":24.321" + " }" + " ]," + " \"tag2\":" + " [" + " {" + " \"val1\":42.123," + " \"val2\":123," + " \"val3\":\"abc\"" + " }," + " {" + " \"val1\":24.321," + " \"val2\":987," + " \"val3\":\"xyz\"" + " }" + " ]" + " }" + " ]" + "}\n"; + sock.sendBytes(data2.data(), data2.size()); + Thread::sleep(500); + reactor.stop(); + thread.join(); + + assertTrue (DataServiceHandler::_data.size() == 4); + data0.erase(data0.size() - 1); + assertTrue (DataServiceHandler::_data[0] == data0); + data1.erase(data1.size() - 1); + assertTrue (DataServiceHandler::_data[1] == data1); + data2.erase(data2.size() - 1); + assertTrue (DataServiceHandler::_data[2] == data2); + assertTrue (DataServiceHandler::_data[3].empty()); +} + + void SocketReactorTest::setUp() { ClientServiceHandler::setCloseOnTimeout(false); @@ -416,6 +572,7 @@ CppUnit::Test* SocketReactorTest::suite() CppUnit_addTest(pSuite, SocketReactorTest, testParallelSocketReactor); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorFail); CppUnit_addTest(pSuite, SocketReactorTest, testSocketConnectorTimeout); + CppUnit_addTest(pSuite, SocketReactorTest, testDataCollection); return pSuite; } diff --git a/Net/testsuite/src/SocketReactorTest.h b/Net/testsuite/src/SocketReactorTest.h index ab202d7dd..9fb5cf9c7 100644 --- a/Net/testsuite/src/SocketReactorTest.h +++ b/Net/testsuite/src/SocketReactorTest.h @@ -29,6 +29,7 @@ public: void testParallelSocketReactor(); void testSocketConnectorFail(); void testSocketConnectorTimeout(); + void testDataCollection(); void setUp(); void tearDown();