cpp: new streaming deserialier API.

This commit is contained in:
frsyuki 2010-04-29 22:15:03 +09:00
parent 6352472c5f
commit 91a1f8d9e1
3 changed files with 247 additions and 42 deletions

View File

@ -37,6 +37,31 @@ struct unpack_error : public std::runtime_error {
};
class unpacked {
public:
unpacked() { }
unpacked(object obj, std::auto_ptr<msgpack::zone> z) :
m_obj(obj), m_zone(z) { }
object& get()
{ return m_obj; }
const object& get() const
{ return m_obj; }
std::auto_ptr<msgpack::zone>& zone()
{ return m_zone; }
const std::auto_ptr<msgpack::zone>& zone() const
{ return m_zone; }
private:
object m_obj;
std::auto_ptr<msgpack::zone> m_zone;
};
class unpacker : public msgpack_unpacker {
public:
unpacker(size_t init_buffer_size = MSGPACK_UNPACKER_DEFAULT_INITIAL_BUFFER_SIZE);
@ -53,39 +78,22 @@ public:
/*! 3. specify the number of bytes actually copied */
void buffer_consumed(size_t size);
/*! 4. repeat execute() until it retunrs false */
bool execute();
/*! 4. repeat next() until it retunrs false */
bool next(unpacked* result);
/*! 5.1. if execute() returns true, take out the parsed object */
object data();
/*! 5.2. the object is valid until the zone is deleted */
// Note that once release_zone() from unpacker, you must delete it
// otherwise the memrory will leak.
zone* release_zone();
/*! 5.2. this method is equivalence to `delete release_zone()` */
void reset_zone();
/*! 5.3. after release_zone(), re-initialize unpacker */
void reset();
/*! 6. check if the size of message doesn't exceed assumption. */
/*! 5. check if the size of message doesn't exceed assumption. */
size_t message_size() const;
// Basic usage of the unpacker is as following:
//
// msgpack::unpacker pac;
//
// while( /* readable */ ) {
// while( /* input is readable */ ) {
//
// // 1.
// pac.reserve_buffer(1024);
// pac.reserve_buffer(32*1024);
//
// // 2.
// ssize_t bytes =
// read(the_source, pac.buffer(), pac.buffer_capacity());
// size_t bytes = input.readsome(pac.buffer(), pac.buffer_capacity());
//
// // error handling ...
//
@ -93,25 +101,40 @@ public:
// pac.buffer_consumed(bytes);
//
// // 4.
// while(pac.execute()) {
// // 5.1
// object o = pac.data();
// msgpack::unpacked result;
// while(pac.next(&result)) {
// // do some with the object with the zone.
// msgpack::object obj = result.get();
// std::auto_ptr<msgpack:zone> z = result.zone();
// on_message(obj, z);
//
// // 5.2
// std::auto_ptr<msgpack::zone> olife( pac.release_zone() );
// //// boost::shared_ptr is also usable:
// // boost::shared_ptr<msgpack::zone> life(z.release());
// // on_message(result.get(), life);
// }
//
// // boost::shared_ptr is also usable:
// // boost::shared_ptr<msgpack::zone> olife( pac.release_zone() );
//
// // 5.3
// pac.reset();
//
// // do some with the object with the old zone.
// do_something(o, olife);
// // 5.
// if(pac.message_size() > 10*1024*1024) {
// throw std::runtime_error("message is too large");
// }
// }
//
/*! for backward compatibility */
bool execute();
/*! for backward compatibility */
object data();
/*! for backward compatibility */
zone* release_zone();
/*! for backward compatibility */
void reset_zone();
/*! for backward compatibility */
void reset();
public:
// These functions are usable when non-MessagePack message follows after
// MessagePack message.
@ -137,6 +160,11 @@ private:
};
static bool unpack(unpacked* result,
const char* data, size_t len, size_t* offset = NULL);
// obsolete
typedef enum {
UNPACK_SUCCESS = 2,
UNPACK_EXTRA_BYTES = 1,
@ -144,6 +172,7 @@ typedef enum {
UNPACK_PARSE_ERROR = -1,
} unpack_return;
// obsolete
static unpack_return unpack(const char* data, size_t len, size_t* off,
zone* z, object* result);
@ -187,6 +216,20 @@ inline void unpacker::buffer_consumed(size_t size)
return msgpack_unpacker_buffer_consumed(this, size);
}
inline bool unpacker::next(unpacked* result)
{
int ret = msgpack_unpacker_execute(this);
if(ret < 0) {
throw unpack_error("parse error");
}
result->zone().reset( release_zone() );
result->get() = data();
reset();
return ret > 0;
}
inline bool unpacker::execute()
{
@ -230,12 +273,12 @@ inline void unpacker::reset()
msgpack_unpacker_reset(this);
}
inline size_t unpacker::message_size() const
{
return msgpack_unpacker_message_size(this);
}
inline size_t unpacker::parsed_size() const
{
return msgpack_unpacker_parsed_size(this);
@ -262,6 +305,38 @@ inline void unpacker::remove_nonparsed_buffer()
}
inline bool unpack(unpacked* result,
const char* data, size_t len, size_t* offset)
{
msgpack::object obj;
std::auto_ptr<msgpack::zone> z(new zone());
unpack_return ret = (unpack_return)msgpack_unpack(
data, len, offset, z.get(),
reinterpret_cast<msgpack_object*>(&obj));
switch(ret) {
case UNPACK_SUCCESS:
result->get() = obj;
result->zone() = z;
return false;
case UNPACK_EXTRA_BYTES:
result->get() = obj;
result->zone() = z;
return true;
case UNPACK_CONTINUE:
throw unpack_error("insufficient bytes");
case UNPACK_PARSE_ERROR:
default:
throw unpack_error("parse error");
}
}
// obsolete
inline unpack_return unpack(const char* data, size_t len, size_t* off,
zone* z, object* result)
{

View File

@ -77,6 +77,32 @@ TEST(unpack, sequence)
msgpack::pack(sbuf, 2);
msgpack::pack(sbuf, 3);
bool cont;
size_t offset = 0;
msgpack::unpacked msg;
cont = msgpack::unpack(&msg, sbuf.data(), sbuf.size(), &offset);
EXPECT_TRUE(cont);
EXPECT_EQ(1, msg.get().as<int>());
cont = msgpack::unpack(&msg, sbuf.data(), sbuf.size(), &offset);
EXPECT_TRUE(cont);
EXPECT_EQ(2, msg.get().as<int>());
cont = msgpack::unpack(&msg, sbuf.data(), sbuf.size(), &offset);
EXPECT_FALSE(cont);
EXPECT_EQ(3, msg.get().as<int>());
}
TEST(unpack, sequence_compat)
{
msgpack::sbuffer sbuf;
msgpack::pack(sbuf, 1);
msgpack::pack(sbuf, 2);
msgpack::pack(sbuf, 3);
size_t offset = 0;
msgpack::zone z;

View File

@ -2,6 +2,7 @@
#include <gtest/gtest.h>
#include <sstream>
TEST(streaming, basic)
{
std::ostringstream stream;
@ -15,6 +16,108 @@ TEST(streaming, basic)
msgpack::unpacker pac;
int count = 0;
while(count < 3) {
pac.reserve_buffer(32*1024);
size_t len = input.readsome(pac.buffer(), pac.buffer_capacity());
pac.buffer_consumed(len);
msgpack::unpacked result;
while(pac.next(&result)) {
msgpack::object obj = result.get();
switch(count++) {
case 0:
EXPECT_EQ(1, obj.as<int>());
break;
case 1:
EXPECT_EQ(2, obj.as<int>());
break;
case 2:
EXPECT_EQ(3, obj.as<int>());
return;
}
}
}
}
class event_handler {
public:
event_handler(std::istream& input) : input(input) { }
~event_handler() { }
void on_read()
{
while(true) {
pac.reserve_buffer(32*1024);
size_t len = input.readsome(pac.buffer(), pac.buffer_capacity());
if(len == 0) {
return;
}
pac.buffer_consumed(len);
msgpack::unpacked result;
while(pac.next(&result)) {
on_message(result.get(), result.zone());
}
if(pac.message_size() > 10*1024*1024) {
throw std::runtime_error("message is too large");
}
}
}
void on_message(msgpack::object obj, std::auto_ptr<msgpack::zone> z)
{
EXPECT_EQ(expect, obj.as<int>());
}
int expect;
private:
std::istream& input;
msgpack::unpacker pac;
};
TEST(streaming, event)
{
std::stringstream stream;
msgpack::packer<std::ostream> pk(&stream);
event_handler handler(stream);
pk.pack(1);
handler.expect = 1;
handler.on_read();
pk.pack(2);
handler.expect = 2;
handler.on_read();
pk.pack(3);
handler.expect = 3;
handler.on_read();
}
// backward compatibility
TEST(streaming, basic_compat)
{
std::ostringstream stream;
msgpack::packer<std::ostream> pk(&stream);
pk.pack(1);
pk.pack(2);
pk.pack(3);
std::istringstream input(stream.str());
msgpack::unpacker pac;
int count = 0;
while(count < 3) {
pac.reserve_buffer(32*1024);
@ -44,10 +147,11 @@ TEST(streaming, basic)
}
class event_handler {
// backward compatibility
class event_handler_compat {
public:
event_handler(std::istream& input) : input(input) { }
~event_handler() { }
event_handler_compat(std::istream& input) : input(input) { }
~event_handler_compat() { }
void on_read()
{
@ -87,12 +191,12 @@ private:
msgpack::unpacker pac;
};
TEST(streaming, event)
TEST(streaming, event_compat)
{
std::stringstream stream;
msgpack::packer<std::ostream> pk(&stream);
event_handler handler(stream);
event_handler_compat handler(stream);
pk.pack(1);
handler.expect = 1;