Work on PubSub test

This commit is contained in:
fbraem 2015-11-07 21:47:18 +01:00
parent fcd7b68695
commit c864435f00

View File

@ -16,6 +16,7 @@
#include "RedisTest.h"
#include "Poco/Redis/AsyncReader.h"
#include "Poco/Redis/Command.h"
#include "CppUnit/TestCaller.h"
#include "CppUnit/TestSuite.h"
@ -38,7 +39,6 @@ RedisTest::RedisTest(const std::string& name):
{
Poco::Timespan t(10, 0); // Connect within 10 seconds
_redis.connect(_host, _port, t);
_redis.setReceiveTimeout(t); // Receive answers within 10 seconds
_connected = true;
std::cout << "Connected to [" << _host << ':' << _port << ']' << std::endl;
}
@ -96,10 +96,7 @@ void RedisTest::testAppend()
fail(e.message());
}
Array setCommand;
setCommand.add("SET")
.add("mykey")
.add("Hello");
Command setCommand = Command::set("mykey", "Hello");
try
{
std::string result = _redis.execute<std::string>(setCommand);
@ -722,9 +719,43 @@ class RedisSubscriber
{
public:
void onMessage(const void* pSender, RedisType::Ptr& message)
void onMessage(const void* pSender, RedisEventArgs& args)
{
std::cout << message->toString() << std::endl;
if ( ! args.message().isNull() )
{
Type<Array>* arrayType = dynamic_cast<Type<Array>*>(args.message().get());
if ( arrayType != NULL )
{
Array& array = arrayType->value();
if ( array.size() == 3 )
{
BulkString type = array.get<BulkString>(0);
if ( type.value().compare("unsubscribe") == 0 )
{
Poco::Int64 n = array.get<Poco::Int64>(2);
// When 0, no subscribers anymore, so stop reading ...
if ( n == 0 ) args.stop();
}
}
else
{
// Wrong array received. Stop the reader
args.stop();
}
}
else
{
// Invalid type of message received. Stop the reader ...
args.stop();
}
}
}
void onError(const void* pSender, RedisEventArgs& args)
{
std::cout << args.exception()->className() << std::endl;
// No need to call stop, AsyncReader stops automatically when an
// exception is received.
}
};
@ -748,9 +779,11 @@ void RedisTest::testPubSub()
AsyncReader reader(_redis);
reader.redisResponse += Poco::delegate(&subscriber, &RedisSubscriber::onMessage);
reader.redisException += Poco::delegate(&subscriber, &RedisSubscriber::onError);
reader.start();
Poco::Thread::sleep(30000);
std::cout << "Sleeping ..." << std::endl;
Poco::Thread::sleep(10000);
Array unsubscribe;
unsubscribe.add("UNSUBSCRIBE");