Fix streaming unpacking for splitted packed data

This commit is contained in:
Fuji, Goro
2010-10-28 15:48:28 +09:00
parent e1711ffcf2
commit 8a629ad6fb
3 changed files with 64 additions and 21 deletions

View File

@@ -20,8 +20,6 @@ my $input = [
my $packed = Data::MessagePack->pack($input);
foreach my $size(1 .. 16) {
local $TODO = "Splitted byte streaming is not yet supported (bufer size: $size)";
my $up = Data::MessagePack::Unpacker->new();
open my $stream, '<:bytes :scalar', \$packed;
@@ -29,7 +27,7 @@ foreach my $size(1 .. 16) {
my $buff;
my $done = 0;
while( read($stream, $buff, $size) ) {
#note "buff: ", join " ", map { unpack 'H2', $_ } split //, $buff;
note "buff: ", join " ", map { unpack 'H2', $_ } split //, $buff;
$done = $up->execute($buff);
}

View File

@@ -0,0 +1,22 @@
use strict;
use Test::More;
use Data::MessagePack;
foreach my $data("abc") {
my $packed = Data::MessagePack->pack($data);
my $unpacker = Data::MessagePack::Unpacker->new;
note "buff: ", join " ", map { unpack 'H2', $_ } split //, $packed;
my $offset = 0;
foreach my $byte(split //, $packed) {
note "offset: $offset";
$offset += $unpacker->execute($byte);
}
ok $unpacker->is_finished, 'finished';
is_deeply $unpacker->data, $data, 'data';
}
done_testing;

View File

@@ -9,11 +9,13 @@ typedef struct {
} my_cxt_t;
START_MY_CXT
// context data for execute_template()
typedef struct {
bool finished;
bool incremented;
bool utf8;
SV* buffer;
} unpack_user;
#define UNPACK_USER_INIT { false, false, NULL }
#include "msgpack/unpack_define.h"
@@ -301,7 +303,7 @@ XS(xs_unpack) {
msgpack_unpack_t mp;
template_init(&mp);
unpack_user const u = {false, false, false};
unpack_user const u = UNPACK_USER_INIT;
mp.user = u;
size_t from = 0;
@@ -326,14 +328,6 @@ XS(xs_unpack) {
/* ------------------------------ stream -- */
/* http://twitter.com/frsyuki/status/13249304748 */
STATIC_INLINE void _reset(SV* const self) {
dTHX;
unpack_user const u = {false, false, false};
UNPACKER(self, mp);
template_init(mp);
mp->user = u;
}
XS(xs_unpacker_new) {
dXSARGS;
@@ -345,9 +339,14 @@ XS(xs_unpacker_new) {
msgpack_unpack_t *mp;
Newxz(mp, 1, msgpack_unpack_t);
template_init(mp);
unpack_user const u = UNPACK_USER_INIT;
mp->user = u;
mp->user.buffer = newSV(80);
sv_setpvs(mp->user.buffer, "");
sv_setref_pv(self, "Data::MessagePack::Unpacker", mp);
_reset(self);
ST(0) = self;
XSRETURN(1);
@@ -378,21 +377,44 @@ _execute_impl(SV* const self, SV* const data, UV const offset, UV const limit) {
dTHX;
if(offset >= limit) {
Perl_croak(aTHX_ "offset (%"UVuf") is bigger than data buffer size (%"UVuf")",
Perl_croak(aTHX_
"offset (%"UVuf") is bigger than data buffer size (%"UVuf")",
offset, limit);
}
UNPACKER(self, mp);
size_t from = offset;
const char* const dptr = SvPV_nolen_const(data);
const char* dptr = SvPV_nolen_const(data);
STRLEN dlen = limit;
int const ret = template_execute(mp, dptr, limit, &from);
if(SvCUR(mp->user.buffer) != 0) {
sv_catpvn(mp->user.buffer, dptr, dlen);
dptr = SvPV_const(mp->user.buffer, dlen);
from = 0;
}
int const ret = template_execute(mp, dptr, dlen, &from);
// ret < 0 : error
// ret == 0 : insufficient
// ret > 0 : success
if(ret < 0) {
Perl_croak(aTHX_ "Data::MessagePack::Unpacker: parse error while executing");
Perl_croak(aTHX_
"Data::MessagePack::Unpacker: parse error while executing");
}
mp->user.finished = (ret > 0) ? true : false;
if(!mp->user.finished) {
template_init(mp); // reset the state
sv_setpvn(mp->user.buffer, dptr, dlen);
from = 0;
}
else {
sv_setpvs(mp->user.buffer, "");
}
//warn(">> (%d) dlen=%d, from=%d, rest=%d",
// (int)ret, (int)dlen, (int)from, dlen - from);
return from;
}
@@ -464,12 +486,12 @@ XS(xs_unpacker_reset) {
}
UNPACKER(ST(0), mp);
bool const utf8 = mp->user.utf8; // save
SV* const data = template_data(mp);
SvREFCNT_dec(data);
_reset(ST(0));
mp->user.utf8 = utf8;
template_init(mp);
sv_setpvs(mp->user.buffer, "");
XSRETURN(0);
}
@@ -484,6 +506,7 @@ XS(xs_unpacker_destroy) {
SV* const data = template_data(mp);
SvREFCNT_dec(data);
SvREFCNT_dec(mp->user.buffer);
Safefree(mp);
XSRETURN(0);