diff --git a/cpp/test.cpp b/cpp/test.cpp index 3756a05b..e236d0fe 100644 --- a/cpp/test.cpp +++ b/cpp/test.cpp @@ -169,7 +169,10 @@ int main(void) msgpack::unpacker upk; while(stream.good() && total_bytes > 0) { + // 1. reserve buffer upk.reserve_buffer(RESERVE_SIZE); + + // 2. read data to buffer() up to buffer_capacity() bytes size_t sz = stream.readsome( (char*)upk.buffer(), upk.buffer_capacity()); @@ -179,14 +182,24 @@ int main(void) << upk.buffer_capacity() << " bytes" << std::endl; + // 3. specify the number of bytes actually copied upk.buffer_consumed(sz); + + // 4. repeat execute() until it returns false while( upk.execute() ) { std::cout << "message parsed" << std::endl; - boost::scoped_ptr pz(upk.release_zone()); + + // 5.1. take out the parsed object msgpack::object o = upk.data(); - upk.reset(); + + // 5.2. the parsed object is valid until the zone is deleted + boost::scoped_ptr pz(upk.release_zone()); + std::cout << o << std::endl; ++num_msg; + + // 5.3 re-initialize unpacker + upk.reset(); } } diff --git a/cpp/unpack.cpp b/cpp/unpack.cpp index 50550084..d4845938 100644 --- a/cpp/unpack.cpp +++ b/cpp/unpack.cpp @@ -82,7 +82,6 @@ void unpacker::expand_buffer(size_t len) while(next_size < len + m_used) { next_size *= 2; } // FIXME realloc? - void* tmp = malloc(next_size); if(!tmp) { throw std::bad_alloc(); } memcpy(tmp, m_buffer, m_used); @@ -121,15 +120,16 @@ bool unpacker::execute() } else if(ret == 0) { return false; } else { + expand_buffer(0); return true; } } zone* unpacker::release_zone() { + zone* nz = new zone(); zone* z = m_zone; - m_zone = NULL; - m_zone = new zone(); + m_zone = nz; m_ctx->user(m_zone); return z; } @@ -141,12 +141,12 @@ object unpacker::data() void unpacker::reset() { + if(m_off != 0) { expand_buffer(0); } if(!m_zone->empty()) { delete m_zone; m_zone = NULL; m_zone = new zone(); } - expand_buffer(0); m_ctx->reset(); } diff --git a/cpp/unpack.hpp b/cpp/unpack.hpp index df4a636c..ae536c97 100644 --- a/cpp/unpack.hpp +++ b/cpp/unpack.hpp @@ -24,15 +24,46 @@ public: ~unpacker(); public: + /*! 1. reserve buffer. at least `len' bytes of capacity will be ready */ void reserve_buffer(size_t len); + + /*! 2. read data to the buffer() up to buffer_capacity() bytes */ void* buffer(); size_t buffer_capacity() const; + + /*! 3. specify the number of bytes actually copied */ void buffer_consumed(size_t len); + + /*! 4. repeat execute() until it retunrs false */ bool execute(); - zone* release_zone(); // never throw + + /*! 5.1. if execute() returns true, take out the parsed object */ object data(); + + /*! 5.2. the parsed object is valid until the zone is deleted */ + // Note that once release_zone() from unpacker, you must delete it + // otherwise the memrory will leak. + zone* release_zone(); + + /*! 5.3. after release_zone(), re-initialize unpacker */ void reset(); +public: + // These functions are usable when non-MessagePack message follows after + // MessagePack message. + // Note that there are no parsed buffer when execute() returned true. + + /*! get address of buffer that is not parsed */ + void* nonparsed_buffer(); + size_t nonparsed_size() const; + + /*! get the number of bytes that is already parsed */ + size_t parsed_size() const; + + /*! remove unparsed buffer from unpacker */ + // Note that reset() leaves non-parsed buffer. + void remove_nonparsed_buffer(); + private: zone* m_zone; @@ -72,6 +103,19 @@ inline void unpacker::buffer_consumed(size_t len) } +inline void* unpacker::nonparsed_buffer() + { return (void*)(((char*)m_buffer)+m_off); } + +inline size_t unpacker::nonparsed_size() const + { return m_used - m_off; } + +inline size_t unpacker::parsed_size() const + { return m_off; } + +inline void unpacker::remove_nonparsed_buffer() + { m_used = m_off; } + + inline object unpack(const void* data, size_t len, zone& z) { return unpacker::unpack(data, len, z);