diff --git a/ruby/unpack.c b/ruby/unpack.c index 2a6614f7..f61fe6db 100644 --- a/ruby/unpack.c +++ b/ruby/unpack.c @@ -24,7 +24,7 @@ static ID s_sysread; static ID s_readpartial; struct unpack_buffer { - size_t size; + size_t used; size_t free; char* ptr; }; @@ -37,6 +37,7 @@ typedef struct { VALUE stream; VALUE streambuf; ID stream_append_method; + size_t buffer_free_size; } unpack_user; @@ -241,6 +242,13 @@ static VALUE eUnpackError; #define MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE (8*1024) #endif +/* +#ifndef MSGPACK_BUFFER_FREE_SIZE +#define MSGPACK_BUFFER_FREE_SIZE (1024*1024) +#endif +*/ +#define MSGPACK_BUFFER_FREE_SIZE 0 + static void MessagePack_Unpacker_free(void* data) { if(data) { @@ -273,7 +281,7 @@ static VALUE MessagePack_Unpacker_alloc(VALUE klass) mp->user.finished = 0; mp->user.offset = 0; - mp->user.buffer.size = 0; + mp->user.buffer.used = 0; mp->user.buffer.free = 0; mp->user.buffer.ptr = NULL; @@ -326,6 +334,7 @@ static VALUE MessagePack_Unpacker_initialize(int argc, VALUE *argv, VALUE self) mp->user.stream = stream; mp->user.streambuf = rb_str_buf_new(MSGPACK_UNPACKER_BUFFER_RESERVE_SIZE); mp->user.stream_append_method = append_method_of(stream); + mp->user.buffer_free_size = MSGPACK_BUFFER_FREE_SIZE; return self; } @@ -366,22 +375,27 @@ static void reserve_buffer(msgpack_unpack_t* mp, size_t require) { struct unpack_buffer* buffer = &mp->user.buffer; - if(buffer->size == 0) { - size_t nsize = MSGPACK_UNPACKER_BUFFER_INIT_SIZE; + if(buffer->used == 0) { + if(require <= buffer->free) { + /* enough free space */ + return; + } + /* no used buffer: realloc only */ + size_t nsize = buffer->free == 0 ? + MSGPACK_UNPACKER_BUFFER_INIT_SIZE : buffer->free*2; while(nsize < require) { nsize *= 2; } - char* tmp = ALLOC_N(char, nsize); - buffer->ptr = tmp; + char* tmp = REALLOC_N(buffer->ptr, char, nsize); buffer->free = nsize; - buffer->size = 0; + buffer->ptr = tmp; return; } - if(buffer->size <= mp->user.offset) { + if(buffer->used <= mp->user.offset) { /* clear buffer and rewind offset */ - buffer->free += buffer->size; - buffer->size = 0; + buffer->free += buffer->used; + buffer->used = 0; mp->user.offset = 0; } @@ -390,41 +404,91 @@ static void reserve_buffer(msgpack_unpack_t* mp, size_t require) return; } - size_t nsize = (buffer->size + buffer->free) * 2; + size_t nsize = (buffer->used + buffer->free) * 2; - if(mp->user.offset <= buffer->size / 2) { + if(mp->user.offset <= buffer->used / 2) { /* parsed less than half: realloc only */ - while(nsize < buffer->size + require) { + while(nsize < buffer->used + require) { nsize *= 2; } char* tmp = REALLOC_N(buffer->ptr, char, nsize); - buffer->free = nsize - buffer->size; + buffer->free = nsize - buffer->used; buffer->ptr = tmp; } else { /* parsed more than half: realloc and move */ - size_t not_parsed = buffer->size - mp->user.offset; + size_t not_parsed = buffer->used - mp->user.offset; while(nsize < not_parsed + require) { nsize *= 2; } char* tmp = REALLOC_N(buffer->ptr, char, nsize); memcpy(tmp, tmp + mp->user.offset, not_parsed); - buffer->free = nsize - buffer->size; - buffer->size = not_parsed; + buffer->free = nsize - not_parsed; + buffer->used = not_parsed; buffer->ptr = tmp; mp->user.offset = 0; } } -static inline void feed_buffer(msgpack_unpack_t* mp, const char* ptr, size_t len) +static inline void try_free_buffer(msgpack_unpack_t* mp, size_t require) +{ + if(mp->user.buffer_free_size == 0) { + return; + } + + struct unpack_buffer* buffer = &mp->user.buffer; + size_t csize = buffer->used + buffer->free; + + if(csize <= mp->user.buffer_free_size) { + return; + } + + if(mp->user.offset <= buffer->used / 2) { + /* parsed less than half: do nothing */ + + } else if(mp->user.offset < buffer->used) { + /* parsed more than half but not all: realloc and move */ + size_t nsize = MSGPACK_UNPACKER_BUFFER_INIT_SIZE; + size_t not_parsed = buffer->used - mp->user.offset; + while(nsize < not_parsed + require) { + nsize *= 2; + } + + if(nsize >= csize) { + return; + } + + char* tmp; + if(mp->user.offset == 0) { + tmp = ALLOC_N(char, nsize); + memcpy(tmp, buffer->ptr + mp->user.offset, not_parsed); + free(buffer->ptr); + } else { + tmp = REALLOC_N(buffer->ptr, char, nsize); + } + buffer->free = nsize - not_parsed; + buffer->used = not_parsed; + buffer->ptr = tmp; + mp->user.offset = 0; + + } else { + /* all parsed: free all */ + free(buffer->ptr); + buffer->free = 0; + buffer->used = 0; + buffer->ptr = NULL; + mp->user.offset = 0; + } +} + +static void feed_buffer(msgpack_unpack_t* mp, const char* ptr, size_t len) { struct unpack_buffer* buffer = &mp->user.buffer; - //if(buffer->free < len) { - reserve_buffer(mp, len); - //} - memcpy(buffer->ptr + buffer->size, ptr, len); - buffer->size += len; + reserve_buffer(mp, len); + + memcpy(buffer->ptr + buffer->used, ptr, len); + buffer->used += len; buffer->free -= len; } @@ -498,7 +562,7 @@ static VALUE MessagePack_Unpacker_each(VALUE self) #endif while(1) { - if(mp->user.buffer.size <= mp->user.offset) { + if(mp->user.buffer.used <= mp->user.offset) { do_fill: { VALUE len = MessagePack_Unpacker_fill(self); @@ -509,7 +573,7 @@ static VALUE MessagePack_Unpacker_each(VALUE self) } ret = template_execute_wrap_each(mp, - mp->user.buffer.ptr, mp->user.buffer.size, + mp->user.buffer.ptr, mp->user.buffer.used, &mp->user.offset); if(ret < 0) { @@ -525,6 +589,8 @@ static VALUE MessagePack_Unpacker_each(VALUE self) } } + try_free_buffer(mp, 0); + return Qnil; } @@ -539,10 +605,10 @@ static VALUE feed_each_impl(VALUE args) const char* ptr = RSTRING_PTR(data); size_t len = RSTRING_LEN(data); - if(mp->user.buffer.size > 0) { + if(mp->user.buffer.used > 0) { while(1) { ret = template_execute_wrap_each(mp, - mp->user.buffer.ptr, mp->user.buffer.size, + mp->user.buffer.ptr, mp->user.buffer.used, &mp->user.offset); if(ret < 0) { @@ -563,7 +629,7 @@ static VALUE feed_each_impl(VALUE args) return Qnil; } - if(mp->user.buffer.size <= mp->user.offset) { + if(mp->user.buffer.used <= mp->user.offset) { // wrap & execute & feed while(1) { ret = template_execute_wrap_each(mp, @@ -589,7 +655,7 @@ static VALUE feed_each_impl(VALUE args) while(1) { ret = template_execute_wrap_each(mp, - mp->user.buffer.ptr, mp->user.buffer.size, + mp->user.buffer.ptr, mp->user.buffer.used, &mp->user.offset); if(ret < 0) { @@ -614,17 +680,26 @@ static VALUE feed_each_ensure(VALUE args) { VALUE data = ((VALUE*)args)[1]; size_t* pconsumed = (size_t*)((VALUE*)args)[2]; - UNPACKER(self, mp); - const char* ptr = RSTRING_PTR(data); - size_t len = RSTRING_LEN(data); + const char* dptr = RSTRING_PTR(data) + *pconsumed; + size_t dlen = RSTRING_LEN(data) - *pconsumed; - if(len > *pconsumed) { - feed_buffer(mp, ptr+*pconsumed, len-*pconsumed); + if(dlen > 0) { + UNPACKER(self, mp); + try_free_buffer(mp, dlen); + feed_buffer(mp, dptr, dlen); } return Qnil; } +/** + * Document-method: MessagePack::Unpacker#feed_each + * + * call-seq: + * unpacker.feed_each(data) {|object| } + * + * Same as feed(data) + each {|object| }, but tries to avoid copying of the buffer. + */ static VALUE MessagePack_Unpacker_feed_each(VALUE self, VALUE data) { size_t consumed = 0; @@ -731,7 +806,7 @@ static VALUE MessagePack_Unpacker_execute_impl(VALUE self, VALUE data, * * This method doesn't use the internal buffer. * - * Call *reset()* method before calling this method again. + * Call *reset* method before calling this method again. * * UnpackError is throw when parse error is occured. */ @@ -753,7 +828,7 @@ static VALUE MessagePack_Unpacker_execute_limit(VALUE self, VALUE data, * * This method doesn't use the internal buffer. * - * Call *reset()* method before calling this method again. + * Call *reset* method before calling this method again. * * This returns offset that was parsed to. * Use *finished?* method to check an object is deserialized and call *data* @@ -816,6 +891,7 @@ static VALUE MessagePack_Unpacker_reset(VALUE self) UNPACKER(self, mp); template_init(mp); mp->user.finished = 0; + try_free_buffer(mp, 0); return self; }