diff --git a/perl/t/10_splitted_bytes.t b/perl/t/10_splitted_bytes.t index 15598f4e..d94472d2 100644 --- a/perl/t/10_splitted_bytes.t +++ b/perl/t/10_splitted_bytes.t @@ -20,8 +20,6 @@ my $input = [ my $packed = Data::MessagePack->pack($input); foreach my $size(1 .. 16) { - local $TODO = "Splitted byte streaming is not yet supported (bufer size: $size)"; - my $up = Data::MessagePack::Unpacker->new(); open my $stream, '<:bytes :scalar', \$packed; @@ -29,7 +27,7 @@ foreach my $size(1 .. 16) { my $buff; my $done = 0; while( read($stream, $buff, $size) ) { - #note "buff: ", join " ", map { unpack 'H2', $_ } split //, $buff; + note "buff: ", join " ", map { unpack 'H2', $_ } split //, $buff; $done = $up->execute($buff); } diff --git a/perl/t/16_unpacker_for_larges.t b/perl/t/16_unpacker_for_larges.t new file mode 100644 index 00000000..81e7c6a8 --- /dev/null +++ b/perl/t/16_unpacker_for_larges.t @@ -0,0 +1,22 @@ +use strict; +use Test::More; +use Data::MessagePack; + +foreach my $data("abc") { + my $packed = Data::MessagePack->pack($data); + + my $unpacker = Data::MessagePack::Unpacker->new; + note "buff: ", join " ", map { unpack 'H2', $_ } split //, $packed; + + my $offset = 0; + foreach my $byte(split //, $packed) { + note "offset: $offset"; + $offset += $unpacker->execute($byte); + } + + ok $unpacker->is_finished, 'finished'; + is_deeply $unpacker->data, $data, 'data'; +} + +done_testing; + diff --git a/perl/xs-src/unpack.c b/perl/xs-src/unpack.c index db7e3949..e3f62c63 100644 --- a/perl/xs-src/unpack.c +++ b/perl/xs-src/unpack.c @@ -9,11 +9,13 @@ typedef struct { } my_cxt_t; START_MY_CXT +// context data for execute_template() typedef struct { bool finished; - bool incremented; bool utf8; + SV* buffer; } unpack_user; +#define UNPACK_USER_INIT { false, false, NULL } #include "msgpack/unpack_define.h" @@ -301,7 +303,7 @@ XS(xs_unpack) { msgpack_unpack_t mp; template_init(&mp); - unpack_user const u = {false, false, false}; + unpack_user const u = UNPACK_USER_INIT; mp.user = u; size_t from = 0; @@ -326,14 +328,6 @@ XS(xs_unpack) { /* ------------------------------ stream -- */ /* http://twitter.com/frsyuki/status/13249304748 */ -STATIC_INLINE void _reset(SV* const self) { - dTHX; - unpack_user const u = {false, false, false}; - - UNPACKER(self, mp); - template_init(mp); - mp->user = u; -} XS(xs_unpacker_new) { dXSARGS; @@ -345,9 +339,14 @@ XS(xs_unpacker_new) { msgpack_unpack_t *mp; Newxz(mp, 1, msgpack_unpack_t); + template_init(mp); + unpack_user const u = UNPACK_USER_INIT; + mp->user = u; + + mp->user.buffer = newSV(80); + sv_setpvs(mp->user.buffer, ""); sv_setref_pv(self, "Data::MessagePack::Unpacker", mp); - _reset(self); ST(0) = self; XSRETURN(1); @@ -378,21 +377,44 @@ _execute_impl(SV* const self, SV* const data, UV const offset, UV const limit) { dTHX; if(offset >= limit) { - Perl_croak(aTHX_ "offset (%"UVuf") is bigger than data buffer size (%"UVuf")", + Perl_croak(aTHX_ + "offset (%"UVuf") is bigger than data buffer size (%"UVuf")", offset, limit); } UNPACKER(self, mp); size_t from = offset; - const char* const dptr = SvPV_nolen_const(data); + const char* dptr = SvPV_nolen_const(data); + STRLEN dlen = limit; - int const ret = template_execute(mp, dptr, limit, &from); + if(SvCUR(mp->user.buffer) != 0) { + sv_catpvn(mp->user.buffer, dptr, dlen); + dptr = SvPV_const(mp->user.buffer, dlen); + from = 0; + } + + int const ret = template_execute(mp, dptr, dlen, &from); + // ret < 0 : error + // ret == 0 : insufficient + // ret > 0 : success if(ret < 0) { - Perl_croak(aTHX_ "Data::MessagePack::Unpacker: parse error while executing"); + Perl_croak(aTHX_ + "Data::MessagePack::Unpacker: parse error while executing"); } + mp->user.finished = (ret > 0) ? true : false; + if(!mp->user.finished) { + template_init(mp); // reset the state + sv_setpvn(mp->user.buffer, dptr, dlen); + from = 0; + } + else { + sv_setpvs(mp->user.buffer, ""); + } + //warn(">> (%d) dlen=%d, from=%d, rest=%d", + // (int)ret, (int)dlen, (int)from, dlen - from); return from; } @@ -464,12 +486,12 @@ XS(xs_unpacker_reset) { } UNPACKER(ST(0), mp); - bool const utf8 = mp->user.utf8; // save SV* const data = template_data(mp); SvREFCNT_dec(data); - _reset(ST(0)); - mp->user.utf8 = utf8; + + template_init(mp); + sv_setpvs(mp->user.buffer, ""); XSRETURN(0); } @@ -484,6 +506,7 @@ XS(xs_unpacker_destroy) { SV* const data = template_data(mp); SvREFCNT_dec(data); + SvREFCNT_dec(mp->user.buffer); Safefree(mp); XSRETURN(0);