Use RedisStream instead of RedisSocket

This commit is contained in:
fbraem 2015-11-03 22:36:46 +01:00
parent 8e586b1012
commit 004bcfe48a
9 changed files with 286 additions and 247 deletions

View File

@ -225,23 +225,21 @@ struct ElementTraits<Array>
};
template<> inline
void Type<Array>::read(RedisSocket& socket)
void Type<Array>::read(RedisInputStream& input)
{
std::string line;
socket.readLine(line);
Int64 length = NumberParser::parse64(line);
Int64 length = NumberParser::parse64(input.getline());
if ( length != -1 )
{
for(int i = 0; i < length; ++i)
{
char marker = socket.get();
char marker = input.get();
RedisType::Ptr element = Type::createRedisType(marker);
if ( element.isNull() )
throw RedisException("Wrong answer received from Redis server");
element->read(socket);
element->read(input);
_value.add(element);
}
}

View File

@ -25,7 +25,7 @@
#include "Poco/Redis/Redis.h"
#include "Poco/Redis/Array.h"
#include "Poco/Redis/Error.h"
#include "Poco/Redis/RedisSocket.h"
#include "Poco/Redis/RedisStream.h"
namespace Poco {
namespace Redis {
@ -52,8 +52,6 @@ class Redis_API Client
/// }
{
public:
typedef Poco::SharedPtr<Client> Ptr;
Client();
/// Default constructor. Use this when you want to
/// connect later on.
@ -98,7 +96,7 @@ public:
/// Disconnects from the Redis server.
template<typename T>
T execute(const Array& command)
T execute(const Array& command, bool flush = true)
/// Sends the Redis Command to the server. It gets the reply
/// and tries to convert it to the given template type.
/// A specialization exists for type void, which doesn't read
@ -108,13 +106,22 @@ public:
/// converted. Supported types are Int64, std::string, BulkString,
/// Array and void. When the reply is an Error, it will throw
/// a RedisException.
///
/// The flush argument only makes sense when using for type void.
/// When flush is false, the commands are build in a buffer and stays
/// there until flush is called or when a command is executed with
/// flush set to true.
{
T result;
writeCommand(command);
writeCommand(command, true);
readReply(result);
return result;
}
void flush();
/// Flush the output buffer to Redis. Use this when commands
/// are stored in the buffer to send them all at once to Redis.
RedisType::Ptr sendCommand(const Array& command);
/// Sends a Redis command to the server and returns the reply.
/// Use this when the type of the reply isn't known.
@ -151,19 +158,21 @@ private:
Client& operator = (const Client&);
Net::SocketAddress _address;
RedisSocket _socket;
Net::StreamSocket _socket;
void connect();
/// Connects to the Redis server
void connect(const Timespan& timeout);
/// Connects to the Redis server and sets a timeout.
void writeCommand(const Array& command);
void writeCommand(const Array& command, bool flush);
/// Sends a request to the Redis server. Use readReply to get the
/// answer. Can also be used for pipelining commands. Make sure you
/// call readReply as many times as you called writeCommand, even when
/// an error occurred on a command.
RedisInputStream* _input;
RedisOutputStream* _output;
};
@ -173,9 +182,15 @@ inline Net::SocketAddress Client::address() const
}
template<> inline
void Client::execute<void>(const Array& command)
void Client::execute<void>(const Array& command, bool flush)
{
writeCommand(command);
writeCommand(command, flush);
}
inline void Client::flush()
{
poco_assert(!_output);
_output->flush();
}
inline void Client::setReceiveTimeout(const Timespan& timeout)

View File

@ -65,14 +65,9 @@ struct ElementTraits<Error>
template<> inline
void Type<Error>::read(RedisSocket& socket)
void Type<Error>::read(RedisInputStream& input)
{
std::string s;
socket.readLine(s);
std::cout << s << std::endl;
_value = s;
_value = input.getline();
}
}} // Namespace Poco::Redis

View File

@ -1,77 +0,0 @@
//
// RedistSocket.h
//
// $Id$
//
// Library: Redis
// Package: Redis
// Module: RedistSocket
//
// Definition of the RedistSocket class.
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Redis_RedisSocket_INCLUDED
#define Redis_RedisSocket_INCLUDED
#include "Poco/Timespan.h"
#include "Poco/Net/SocketAddress.h"
#include "Poco/Net/StreamSocket.h"
#include "Poco/Redis/Redis.h"
namespace Poco {
namespace Redis {
class Redis_API RedisSocket
{
public:
RedisSocket();
virtual ~RedisSocket();
void close();
void connect(const Net::SocketAddress& addrs);
void connect(const Net::SocketAddress& addrs, const Timespan& timeout);
int get();
int peek();
void read(UInt64 length, std::string& data);
int write(const char* buffer, std::streamsize length);
void refill();
void readLine(std::string& line);
void setReceiveTimeout(const Timespan& timeout);
private:
RedisSocket(RedisSocket& copy);
RedisSocket& operator = (const RedisSocket&);
Net::StreamSocket _socket;
char* _buffer;
char* _current;
char* _end;
};
inline void RedisSocket::setReceiveTimeout(const Timespan& timeout)
{
_socket.setReceiveTimeout(timeout);
}
} }
#endif // Redis_RedisSocket_INCLUDED

View File

@ -0,0 +1,106 @@
//
// RedisStream.h
//
// $Id$
//
// Library: Redis
// Package: Redis
// Module: RedisStream
//
// Definition of the RedisStream class.
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Redis_RedisStream_INCLUDED
#define Redis_RedisStream_INCLUDED
#include "Poco/BufferedStreamBuf.h"
#include "Poco/Net/StreamSocket.h"
#include <istream>
#include <ostream>
namespace Poco {
namespace Redis {
class RedisStreamBuf : public BufferedStreamBuf
{
public:
RedisStreamBuf(Net::StreamSocket& redis);
~RedisStreamBuf();
std::string readLine();
protected:
int readFromDevice(char* buffer, std::streamsize length);
int writeToDevice(const char* buffer, std::streamsize length);
private:
enum
{
STREAM_BUFFER_SIZE = 1024
};
Net::StreamSocket& _redis;
};
class RedisIOS: public virtual std::ios
{
public:
RedisIOS(Net::StreamSocket& redis);
/// Creates the RedisIOS with the given socket.
~RedisIOS();
/// Destroys the RedisIOS.
///
/// Flushes the buffer, but does not close the socket.
RedisStreamBuf* rdbuf();
/// Returns a pointer to the internal RedisStreamBuf.
void close();
/// Flushes the stream.
protected:
RedisStreamBuf _buf;
};
class RedisOutputStream: public RedisIOS, public std::ostream
/// An output stream for writing to a Redis server.
{
public:
RedisOutputStream(Net::StreamSocket& redis);
/// Creates the RedisOutputStream with the given socket.
~RedisOutputStream();
/// Destroys the RedisOutputStream.
///
/// Flushes the buffer.
};
class RedisInputStream: public RedisIOS, public std::istream
/// An input stream for reading from a Redis server.
{
public:
RedisInputStream(Net::StreamSocket& redis);
/// Creates the RedisInputStream with the given socket.
~RedisInputStream();
/// Destroys the RedisInputStream.
std::string getline();
/// Redis uses /r/n as delimiter. This getline version removes
/// the /r from the result.
};
}} // Poco::Redis
#endif // Redis_RedisStream_INCLUDED

View File

@ -25,7 +25,7 @@
#include "Poco/Nullable.h"
#include "Poco/Redis/Redis.h"
#include "Poco/Redis/RedisSocket.h"
#include "Poco/Redis/RedisStream.h"
namespace Poco {
namespace Redis {
@ -52,7 +52,7 @@ public:
virtual int type() const = 0;
virtual void read(RedisSocket& socket) = 0;
virtual void read(RedisInputStream& input) = 0;
virtual std::string toString() const = 0;
@ -192,7 +192,7 @@ public:
return ElementTraits<T>::TypeId;
}
virtual void read(RedisSocket& socket);
virtual void read(RedisInputStream& socket);
virtual std::string toString() const
{
@ -215,37 +215,35 @@ private:
};
template<> inline
void Type<Int64>::read(RedisSocket& socket)
void Type<Int64>::read(RedisInputStream& input)
{
std::string number;
socket.readLine(number);
std::string number = input.getline();
_value = NumberParser::parse64(number);
}
template<> inline
void Type<std::string>::read(RedisSocket& socket)
void Type<std::string>::read(RedisInputStream& input)
{
_value.clear();
socket.readLine(_value);
_value = input.getline();
}
template<> inline
void Type<BulkString>::read(RedisSocket& socket)
void Type<BulkString>::read(RedisInputStream& input)
{
_value.clear();
std::string line;
socket.readLine(line);
std::string line = input.getline();
int length = NumberParser::parse64(line);
if ( length >= 0 )
{
std::string s;
socket.read(length, s);
s.resize(length, ' ');
input.read(&*s.begin(), length);
_value.assign(s);
socket.readLine(line);
line = input.getline();
}
}

View File

@ -22,24 +22,24 @@ namespace Poco {
namespace Redis {
Client::Client() : _address(), _socket()
Client::Client() : _address(), _socket(), _input(0), _output(0)
{
}
Client::Client(const std::string& hostAndPort) : _address(hostAndPort), _socket()
Client::Client(const std::string& hostAndPort) : _address(hostAndPort), _socket(), _input(0), _output(0)
{
connect();
}
Client::Client(const std::string& host, int port) : _address(host, port), _socket()
Client::Client(const std::string& host, int port) : _address(host, port), _socket(), _input(0), _output(0)
{
connect();
}
Client::Client(const Net::SocketAddress& addrs) : _address(addrs), _socket()
Client::Client(const Net::SocketAddress& addrs) : _address(addrs), _socket(), _input(0), _output(0)
{
connect();
}
@ -47,12 +47,19 @@ Client::Client(const Net::SocketAddress& addrs) : _address(addrs), _socket()
Client::~Client()
{
delete _input;
delete _output;
}
void Client::connect()
{
poco_assert(! _input);
poco_assert(! _output);
_socket.connect(_address);
_input = new RedisInputStream(_socket);
_output = new RedisOutputStream(_socket);
}
void Client::connect(const std::string& hostAndPort)
@ -77,7 +84,12 @@ void Client::connect(const Net::SocketAddress& addrs)
void Client::connect(const Timespan& timeout)
{
poco_assert(! _input);
poco_assert(! _output);
_socket.connect(_address, timeout);
_input = new RedisInputStream(_socket);
_output = new RedisOutputStream(_socket);
}
void Client::connect(const std::string& hostAndPort, const Timespan& timeout)
@ -102,31 +114,38 @@ void Client::connect(const Net::SocketAddress& addrs, const Timespan& timeout)
void Client::disconnect()
{
delete _input;
_input = 0;
delete _output;
_output = 0;
_socket.close();
}
void Client::writeCommand(const Array& command)
void Client::writeCommand(const Array& command, bool flush)
{
std::string commandStr = command.toString();
_socket.write(commandStr.c_str(), commandStr.length());
_output->write(commandStr.c_str(), commandStr.length());
if ( flush ) _output->flush();
}
RedisType::Ptr Client::readReply()
{
RedisType::Ptr result = RedisType::createRedisType(_socket.get());
RedisType::Ptr result = RedisType::createRedisType(_input->get());
if ( result.isNull() )
{
throw RedisException("Invalid Redis type returned");
}
result->read(_socket);
result->read(*_input);
return result;
}
RedisType::Ptr Client::sendCommand(const Array& command)
{
writeCommand(command);
writeCommand(command, true);
return readReply();
}
@ -136,8 +155,9 @@ Array Client::sendCommands(const std::vector<Array>& commands)
for(std::vector<Array>::const_iterator it = commands.begin(); it != commands.end(); ++it)
{
writeCommand(*it);
writeCommand(*it, false);
}
_output->flush();
for(int i = 0; i < commands.size(); ++i)
{

View File

@ -1,124 +0,0 @@
//
// RedistSocket.h
//
// $Id$
//
// Library: Redis
// Package: Redis
// Module: RedistSocket
//
// Implementation of the RedistSocket class.
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include "Poco/Redis/RedisSocket.h"
namespace Poco {
namespace Redis {
RedisSocket::RedisSocket() : _socket(), _buffer(0), _current(0), _end(0)
{
}
RedisSocket::~RedisSocket()
{
if ( _buffer ) delete[] _buffer;
try
{
_socket.close();
}
catch (...)
{
}
}
void RedisSocket::close()
{
_socket.close();
}
void RedisSocket::connect(const Net::SocketAddress& addrs)
{
_socket.connect(addrs);
}
void RedisSocket::connect(const Net::SocketAddress& addrs, const Timespan& timeout)
{
_socket.connect(addrs, timeout);
}
int RedisSocket::get()
{
if ( _current == _end ) refill();
if ( _current < _end ) return *_current++;
return std::char_traits<char>::eof();
}
int RedisSocket::peek()
{
if ( _current == _end ) refill();
if ( _current < _end ) return *_current;
return std::char_traits<char>::eof();
}
void RedisSocket::read(UInt64 length, std::string& data)
{
UInt64 count = 0;
data.clear();
while(count < length)
{
int c = get();
if ( c == -1 ) throw IOException("Invalid EOF");
data += c;
++count;
}
}
int RedisSocket::write(const char* buffer, std::streamsize length)
{
return _socket.sendBytes(buffer, (int) length);
}
void RedisSocket::refill()
{
if ( ! _buffer ) _buffer = new char[1024];
_current = _end = _buffer;
int n = _socket.receiveBytes(_buffer, 1024);
_end += n;
}
void RedisSocket::readLine(std::string& line)
{
line.clear();
int c = get();
while(1)
{
if ( c == -1 ) throw IOException("Invalid EOF");
if ( c == '\r' && peek() == '\n' )
{
get();
break;
}
line += c;
c = get();
}
}
} }

108
Redis/src/RedisStream.cpp Normal file
View File

@ -0,0 +1,108 @@
//
// RedisStream.cpp
//
// $Id$
//
// Library: Redis
// Package: Redis
// Module: RedisStream
//
// Implementation of the RedisStream class.
//
// Copyright (c) 2012, Applied Informatics Software Engineering GmbH.
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#include <iostream>
#include "Poco/Redis/RedisStream.h"
namespace Poco {
namespace Redis {
RedisStreamBuf::RedisStreamBuf(Net::StreamSocket& redis):
BufferedStreamBuf(STREAM_BUFFER_SIZE, std::ios::in | std::ios::out),
_redis(redis)
{
}
RedisStreamBuf::~RedisStreamBuf()
{
}
int RedisStreamBuf::readFromDevice(char* buffer, std::streamsize len)
{
return _redis.receiveBytes(buffer, len);
}
int RedisStreamBuf::writeToDevice(const char* buffer, std::streamsize length)
{
return _redis.sendBytes(buffer, length);
}
RedisIOS::RedisIOS(Net::StreamSocket& redis):
_buf(redis)
{
poco_ios_init(&_buf);
}
RedisIOS::~RedisIOS()
{
try
{
_buf.sync();
}
catch (...)
{
}
}
RedisStreamBuf* RedisIOS::rdbuf()
{
return &_buf;
}
void RedisIOS::close()
{
_buf.sync();
}
RedisOutputStream::RedisOutputStream(Net::StreamSocket& redis):
RedisIOS(redis),
std::ostream(&_buf)
{
}
RedisOutputStream::~RedisOutputStream()
{
}
RedisInputStream::RedisInputStream(Net::StreamSocket& redis):
RedisIOS(redis),
std::istream(&_buf)
{
}
RedisInputStream::~RedisInputStream()
{
}
std::string RedisInputStream::getline()
{
std::string line;
std::getline(*this, line);
if ( line.size() > 0 ) line.erase(line.end() - 1);
return line;
}
}}