diff --git a/ruby/unpack.c b/ruby/unpack.c index e9d6494e..0f1af38f 100644 --- a/ruby/unpack.c +++ b/ruby/unpack.c @@ -18,10 +18,16 @@ #include "ruby.h" #include "msgpack/unpack_define.h" +static ID s_sysread; typedef struct { int finished; VALUE source; + size_t offset; + size_t parsed; + VALUE buffer; + VALUE stream; + VALUE streambuf; } unpack_user; @@ -144,6 +150,9 @@ static void MessagePack_Unpacker_free(void* data) static void MessagePack_Unpacker_mark(msgpack_unpack_t *mp) { unsigned int i; + rb_gc_mark(mp->user.buffer); + rb_gc_mark(mp->user.stream); + rb_gc_mark(mp->user.streambuf); for(i=0; i < mp->top; ++i) { rb_gc_mark(mp->stack[i].obj); rb_gc_mark(mp->stack[i].map_key); /* maybe map_key is not initialized */ @@ -164,14 +173,32 @@ static VALUE MessagePack_Unpacker_reset(VALUE self) UNPACKER(self, mp); template_init(mp); init_stack(mp); - unpack_user u = {0, Qnil}; - mp->user = u; + mp->user.finished = 0; return self; } -static VALUE MessagePack_Unpacker_initialize(VALUE self) +static VALUE MessagePack_Unpacker_initialize(int argc, VALUE *argv, VALUE self) { - return MessagePack_Unpacker_reset(self); + VALUE stream; + switch(argc) { + case 0: + stream = Qnil; + break; + case 1: + stream = argv[0]; + break; + default: + rb_raise(rb_eArgError, "wrong number of arguments (%d for 0)", argc); + } + + MessagePack_Unpacker_reset(self); + UNPACKER(self, mp); + mp->user.offset = 0; + mp->user.parsed = 0; + mp->user.buffer = rb_str_new("",0); + mp->user.stream = stream; + mp->user.streambuf = rb_str_new("",0); + return self; } @@ -249,6 +276,87 @@ static VALUE MessagePack_Unpacker_data(VALUE self) } +static VALUE MessagePack_Unpacker_feed(VALUE self, VALUE data) +{ + UNPACKER(self, mp); + StringValue(data); + rb_str_cat(mp->user.buffer, RSTRING_PTR(data), RSTRING_LEN(data)); + return Qnil; +} + +static VALUE MessagePack_Unpacker_stream_get(VALUE self) +{ + UNPACKER(self, mp); + return mp->user.stream; +} + +static VALUE MessagePack_Unpacker_stream_set(VALUE self, VALUE val) +{ + UNPACKER(self, mp); + return mp->user.stream = val; +} + +static VALUE MessagePack_Unpacker_fill(VALUE self) +{ + UNPACKER(self, mp); + + if(mp->user.stream == Qnil) { + return Qnil; + } + + size_t len; + if(RSTRING_LEN(mp->user.buffer) == 0) { + rb_funcall(mp->user.stream, s_sysread, 2, LONG2FIX(64*1024), mp->user.buffer); + len = RSTRING_LEN(mp->user.buffer); + } else { + rb_funcall(mp->user.stream, s_sysread, 2, LONG2FIX(64*1024), mp->user.streambuf); + len = RSTRING_LEN(mp->user.streambuf); + rb_str_cat(mp->user.buffer, RSTRING_PTR(mp->user.streambuf), RSTRING_LEN(mp->user.streambuf)); + } + + return LONG2FIX(len); +} + +static VALUE MessagePack_Unpacker_each(VALUE self) +{ + UNPACKER(self, mp); + int ret; + +#ifdef RETURN_ENUMERATOR + RETURN_ENUMERATOR(self, 0, 0); +#endif + + while(1) { + if(RSTRING_LEN(mp->user.buffer) <= mp->user.offset) { + do_fill: + { + VALUE len = MessagePack_Unpacker_fill(self); + if(len == Qnil || FIX2LONG(len) == 0) { + break; + } + } + } + + mp->user.source = mp->user.buffer; + ret = template_execute(mp, RSTRING_PTR(mp->user.buffer), RSTRING_LEN(mp->user.buffer), &mp->user.offset); + mp->user.source = Qnil; + + if(ret < 0) { + rb_raise(eUnpackError, "parse error."); + } else if(ret > 0) { + VALUE data = template_data(mp); + template_init(mp); + init_stack(mp); + rb_yield(data); + } else { + goto do_fill; + } + } + + return Qnil; +} + + static VALUE MessagePack_unpack_impl(VALUE args) { msgpack_unpack_t* mp = (msgpack_unpack_t*)((VALUE*)args)[0]; @@ -292,7 +400,7 @@ static VALUE MessagePack_unpack_limit(VALUE self, VALUE data, VALUE limit) msgpack_unpack_t mp; template_init(&mp); init_stack(&mp); - unpack_user u = {0, Qnil}; + unpack_user u = {0, Qnil, 0, 0, Qnil, Qnil, Qnil}; mp.user = u; rb_gc_disable(); @@ -313,17 +421,22 @@ static VALUE MessagePack_unpack(VALUE self, VALUE data) void Init_msgpack_unpack(VALUE mMessagePack) { + s_sysread = rb_intern("sysread"); eUnpackError = rb_define_class_under(mMessagePack, "UnpackError", rb_eStandardError); cUnpacker = rb_define_class_under(mMessagePack, "Unpacker", rb_cObject); rb_define_alloc_func(cUnpacker, MessagePack_Unpacker_alloc); - rb_define_method(cUnpacker, "initialize", MessagePack_Unpacker_initialize, 0); + rb_define_method(cUnpacker, "initialize", MessagePack_Unpacker_initialize, -1); rb_define_method(cUnpacker, "execute", MessagePack_Unpacker_execute, 2); rb_define_method(cUnpacker, "execute_limit", MessagePack_Unpacker_execute_limit, 3); rb_define_method(cUnpacker, "finished?", MessagePack_Unpacker_finished_p, 0); rb_define_method(cUnpacker, "data", MessagePack_Unpacker_data, 0); rb_define_method(cUnpacker, "reset", MessagePack_Unpacker_reset, 0); + rb_define_method(cUnpacker, "feed", MessagePack_Unpacker_feed, 1); + rb_define_method(cUnpacker, "fill", MessagePack_Unpacker_fill, 0); + rb_define_method(cUnpacker, "each", MessagePack_Unpacker_each, 0); + rb_define_method(cUnpacker, "stream", MessagePack_Unpacker_stream_get, 0); + rb_define_method(cUnpacker, "stream=", MessagePack_Unpacker_stream_set, 1); rb_define_module_function(mMessagePack, "unpack", MessagePack_unpack, 1); rb_define_module_function(mMessagePack, "unpack_limit", MessagePack_unpack_limit, 2); } -