mirror of
https://github.com/msgpack/msgpack-c.git
synced 2025-10-15 15:16:51 +02:00
Implement streaming deserializer.
This commit is contained in:
@@ -6,13 +6,16 @@ cdef extern from "Python.h":
|
||||
ctypedef char* const_char_ptr "const char*"
|
||||
ctypedef struct PyObject
|
||||
cdef object PyString_FromStringAndSize(const_char_ptr b, Py_ssize_t len)
|
||||
char* PyString_AsString(object o)
|
||||
|
||||
cdef extern from "stdlib.h":
|
||||
void* malloc(int)
|
||||
void* malloc(size_t)
|
||||
void* realloc(void*, size_t)
|
||||
void free(void*)
|
||||
|
||||
cdef extern from "string.h":
|
||||
int memcpy(char*dst, char*src, unsigned int size)
|
||||
void* memcpy(char* dst, char* src, size_t size)
|
||||
void* memmove(char* dst, char* src, size_t size)
|
||||
|
||||
cdef extern from "pack.h":
|
||||
ctypedef int (*msgpack_packer_write)(void* data, const_char_ptr buf, unsigned int len)
|
||||
@@ -34,8 +37,6 @@ cdef extern from "pack.h":
|
||||
void msgpack_pack_raw_body(msgpack_packer* pk, char* body, size_t l)
|
||||
|
||||
|
||||
cdef int BUFF_SIZE=2*1024
|
||||
|
||||
cdef class Packer:
|
||||
"""Packer that pack data into strm.
|
||||
|
||||
@@ -48,10 +49,7 @@ cdef class Packer:
|
||||
cdef msgpack_packer pk
|
||||
cdef object strm
|
||||
|
||||
def __init__(self, strm, int size=0):
|
||||
if size <= 0:
|
||||
size = BUFF_SIZE
|
||||
|
||||
def __init__(self, strm, int size=4*1024):
|
||||
self.strm = strm
|
||||
self.buff = <char*> malloc(size)
|
||||
self.allocated = size
|
||||
@@ -147,6 +145,8 @@ cdef class Packer:
|
||||
if flush:
|
||||
self.flush()
|
||||
|
||||
close = flush
|
||||
|
||||
cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l):
|
||||
if packer.length + l > packer.allocated:
|
||||
if packer.length > 0:
|
||||
@@ -163,20 +163,28 @@ cdef int _packer_write(Packer packer, const_char_ptr b, unsigned int l):
|
||||
return 0
|
||||
|
||||
def pack(object o, object stream):
|
||||
u"""pack o and write to stream)."""
|
||||
packer = Packer(stream)
|
||||
packer.pack(o)
|
||||
packer.flush()
|
||||
|
||||
def packs(object o):
|
||||
def packb(object o):
|
||||
u"""pack o and return packed bytes."""
|
||||
buf = StringIO()
|
||||
packer = Packer(buf)
|
||||
packer.pack(o)
|
||||
packer.flush()
|
||||
return buf.getvalue()
|
||||
|
||||
packs = packb
|
||||
|
||||
cdef extern from "unpack.h":
|
||||
ctypedef struct template_context:
|
||||
pass
|
||||
PyObject* obj
|
||||
size_t count
|
||||
unsigned int ct
|
||||
PyObject* key
|
||||
|
||||
int template_execute(template_context* ctx, const_char_ptr data,
|
||||
size_t len, size_t* off)
|
||||
void template_init(template_context* ctx)
|
||||
@@ -188,15 +196,139 @@ def unpacks(object packed_bytes):
|
||||
cdef const_char_ptr p = packed_bytes
|
||||
cdef template_context ctx
|
||||
cdef size_t off = 0
|
||||
cdef int ret
|
||||
template_init(&ctx)
|
||||
template_execute(&ctx, p, len(packed_bytes), &off)
|
||||
return template_data(&ctx)
|
||||
ret = template_execute(&ctx, p, len(packed_bytes), &off)
|
||||
if ret == 1:
|
||||
return template_data(&ctx)
|
||||
else:
|
||||
return None
|
||||
|
||||
def unpack(object stream):
|
||||
"""unpack from stream."""
|
||||
packed = stream.read()
|
||||
return unpacks(packed)
|
||||
|
||||
cdef class Unpacker:
|
||||
"""Do nothing. This function is for symmetric to Packer"""
|
||||
unpack = staticmethod(unpacks)
|
||||
cdef class UnpackIterator(object):
|
||||
cdef object unpacker
|
||||
|
||||
def __init__(self, unpacker):
|
||||
self.unpacker = unpacker
|
||||
|
||||
def __next__(self):
|
||||
return self.unpacker.unpack()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
cdef class Unpacker(object):
|
||||
"""Unpacker(file_like=None, read_size=4096)
|
||||
|
||||
Streaming unpacker.
|
||||
file_like must have read(n) method.
|
||||
read_size is used like file_like.read(read_size)
|
||||
|
||||
If file_like is None, you can feed() bytes. feed() is useful
|
||||
for unpack from non-blocking stream.
|
||||
|
||||
exsample 1:
|
||||
unpacker = Unpacker(afile)
|
||||
for o in unpacker:
|
||||
do_something(o)
|
||||
|
||||
example 2:
|
||||
unpacker = Unpacker()
|
||||
while 1:
|
||||
buf = astream.read()
|
||||
unpacker.feed(buf)
|
||||
for o in unpacker:
|
||||
do_something(o)
|
||||
"""
|
||||
|
||||
cdef template_context ctx
|
||||
cdef char* buf
|
||||
cdef size_t buf_size, buf_head, buf_tail
|
||||
cdef object file_like
|
||||
cdef int read_size
|
||||
cdef object waiting_bytes
|
||||
|
||||
def __init__(self, file_like=None, int read_size=4096):
|
||||
self.file_like = file_like
|
||||
self.read_size = read_size
|
||||
self.waiting_bytes = []
|
||||
self.buf = <char*>malloc(read_size)
|
||||
self.buf_size = read_size
|
||||
self.buf_head = 0
|
||||
self.buf_tail = 0
|
||||
template_init(&self.ctx)
|
||||
|
||||
def feed(self, next_bytes):
|
||||
if not isinstance(next_bytes, str):
|
||||
raise ValueError, "Argument must be bytes object"
|
||||
self.waiting_bytes.append(next_bytes)
|
||||
|
||||
cdef append_buffer(self):
|
||||
cdef char* buf = self.buf
|
||||
cdef Py_ssize_t tail = self.buf_tail
|
||||
cdef Py_ssize_t l
|
||||
|
||||
for b in self.waiting_bytes:
|
||||
l = len(b)
|
||||
memcpy(buf + tail, PyString_AsString(b), l)
|
||||
tail += l
|
||||
self.buf_tail = tail
|
||||
del self.waiting_bytes[:]
|
||||
|
||||
# prepare self.buf
|
||||
cdef fill_buffer(self):
|
||||
cdef Py_ssize_t add_size
|
||||
|
||||
if self.file_like is not None:
|
||||
self.waiting_bytes.append(self.file_like.read(self.read_size))
|
||||
|
||||
if not self.waiting_bytes:
|
||||
return
|
||||
|
||||
add_size = 0
|
||||
for b in self.waiting_bytes:
|
||||
add_size += len(b)
|
||||
|
||||
cdef char* buf = self.buf
|
||||
cdef size_t head = self.buf_head
|
||||
cdef size_t tail = self.buf_tail
|
||||
cdef size_t size = self.buf_size
|
||||
|
||||
if self.buf_tail + add_size <= self.buf_size:
|
||||
# do nothing.
|
||||
pass
|
||||
if self.buf_tail - self.buf_head + add_size < self.buf_size:
|
||||
# move to front.
|
||||
memmove(buf, buf + head, tail - head)
|
||||
tail -= head
|
||||
head = 0
|
||||
else:
|
||||
# expand buffer
|
||||
size = tail + add_size
|
||||
buf = <char*>realloc(<void*>buf, size)
|
||||
|
||||
self.buf = buf
|
||||
self.buf_head = head
|
||||
self.buf_tail = tail
|
||||
self.buf_size = size
|
||||
|
||||
self.append_buffer()
|
||||
|
||||
cpdef unpack(self):
|
||||
"""unpack one object"""
|
||||
cdef int ret
|
||||
self.fill_buffer()
|
||||
ret = template_execute(&self.ctx, self.buf, self.buf_tail, &self.buf_head)
|
||||
if ret == 1:
|
||||
return template_data(&self.ctx)
|
||||
elif ret == 0:
|
||||
raise StopIteration, "No more unpack data."
|
||||
else:
|
||||
raise ValueError, "Unpack failed."
|
||||
|
||||
def __iter__(self):
|
||||
return UnpackIterator(self)
|
||||
|
Reference in New Issue
Block a user