ruby: adds Unpacker#feed_each

This commit is contained in:
FURUHASHI Sadayuki
2010-11-24 17:24:55 +09:00
parent 39ad071c4f
commit cc534fd21f
4 changed files with 121 additions and 7 deletions

View File

@@ -420,9 +420,9 @@ static inline void feed_buffer(msgpack_unpack_t* mp, const char* ptr, size_t len
{
struct unpack_buffer* buffer = &mp->user.buffer;
if(buffer->free < len) {
//if(buffer->free < len) {
reserve_buffer(mp, len);
}
//}
memcpy(buffer->ptr + buffer->size, ptr, len);
buffer->size += len;
buffer->free -= len;
@@ -528,6 +528,117 @@ static VALUE MessagePack_Unpacker_each(VALUE self)
return Qnil;
}
static VALUE feed_each_impl(VALUE args)
{
VALUE self = ((VALUE*)args)[0];
VALUE data = ((VALUE*)args)[1];
size_t* pconsumed = (size_t*)((VALUE*)args)[2];
UNPACKER(self, mp);
int ret;
const char* ptr = RSTRING_PTR(data);
size_t len = RSTRING_LEN(data);
if(mp->user.buffer.size > 0) {
while(1) {
ret = template_execute_wrap_each(mp,
mp->user.buffer.ptr, mp->user.buffer.size,
&mp->user.offset);
if(ret < 0) {
rb_raise(eUnpackError, "parse error.");
} else if(ret > 0) {
VALUE data = template_data(mp);
template_init(mp);
rb_yield(data);
} else {
break;
}
}
}
if(len <= 0) {
return Qnil;
}
if(mp->user.buffer.size <= mp->user.offset) {
// wrap & execute & feed
while(1) {
ret = template_execute_wrap_each(mp,
ptr, len, pconsumed);
if(ret < 0) {
rb_raise(eUnpackError, "parse error.");
} else if(ret > 0) {
VALUE data = template_data(mp);
template_init(mp);
rb_yield(data);
} else {
break;
}
}
} else {
// feed & execute
feed_buffer(mp, ptr, len);
*pconsumed = len;
while(1) {
ret = template_execute_wrap_each(mp,
mp->user.buffer.ptr, mp->user.buffer.size,
&mp->user.offset);
if(ret < 0) {
rb_raise(eUnpackError, "parse error.");
} else if(ret > 0) {
VALUE data = template_data(mp);
template_init(mp);
rb_yield(data);
} else {
break;
}
}
}
return Qnil;
}
static VALUE feed_each_ensure(VALUE args) {
VALUE self = ((VALUE*)args)[0];
VALUE data = ((VALUE*)args)[1];
size_t* pconsumed = (size_t*)((VALUE*)args)[2];
UNPACKER(self, mp);
const char* ptr = RSTRING_PTR(data);
size_t len = RSTRING_LEN(data);
if(len > *pconsumed) {
feed_buffer(mp, ptr+*pconsumed, len-*pconsumed);
}
return Qnil;
}
static VALUE MessagePack_Unpacker_feed_each(VALUE self, VALUE data)
{
size_t consumed = 0;
StringValue(data);
VALUE args[3];
args[0] = self;
args[1] = data;
args[2] = (VALUE)&consumed;
return rb_ensure(feed_each_impl, (VALUE)args,
feed_each_ensure, (VALUE)args);
}
static inline VALUE MessagePack_unpack_impl(VALUE self, VALUE data, unsigned long dlen)
{
@@ -726,6 +837,7 @@ void Init_msgpack_unpack(VALUE mMessagePack)
rb_define_method(cUnpacker, "each", MessagePack_Unpacker_each, 0);
rb_define_method(cUnpacker, "stream", MessagePack_Unpacker_stream_get, 0);
rb_define_method(cUnpacker, "stream=", MessagePack_Unpacker_stream_set, 1);
rb_define_method(cUnpacker, "feed_each", MessagePack_Unpacker_feed_each, 1);
/* Unbuffered API */
rb_define_method(cUnpacker, "execute", MessagePack_Unpacker_execute, 2);