diff --git a/cpp/Makefile b/cpp/Makefile index 5ef7ecd1..7cc66b2c 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -1,5 +1,6 @@ -CXXFLAGS = -I.. -I. -Wall -g -O4 +CXXFLAGS = -I.. -I. -Wall -g +#CXXFLAGS = -I.. -I. -Wall -g -O4 LDFLAGS = -L. NEED_PREPROCESS = zone.hpp diff --git a/cpp/object.cpp b/cpp/object.cpp index 02e04389..947e8dda 100644 --- a/cpp/object.cpp +++ b/cpp/object.cpp @@ -320,12 +320,12 @@ const object_class* object_##NAME::inspect(std::ostream& s) const \ { (s << '"').write((const char*)ptr, len) << '"'; return this; } // FIXME escape -RAW_OBJECT(raw, - raw object_raw::xraw() { return raw(ptr, len); } - const_raw object_raw::xraw() const { return const_raw(ptr, len); } ) +RAW_OBJECT(raw_ref, + raw object_raw_ref::xraw() { return raw(ptr, len); } + const_raw object_raw_ref::xraw() const { return const_raw(ptr, len); } ) -RAW_OBJECT(const_raw, - const_raw object_const_raw::xraw() const { return const_raw(ptr, len); } ) +RAW_OBJECT(const_raw_ref, + const_raw object_const_raw_ref::xraw() const { return const_raw(ptr, len); } ) #undef RAW_OBJECT(NAME, EXTRA) diff --git a/cpp/object.hpp b/cpp/object.hpp index 3f22dfc9..f029e328 100644 --- a/cpp/object.hpp +++ b/cpp/object.hpp @@ -31,8 +31,9 @@ public: }; struct const_raw { - const_raw() : ptr(NULL), len(0) {} - const_raw(const void* p, size_t l) : ptr(p), len(l) {} + explicit const_raw() : ptr(NULL), len(0) {} + explicit const_raw(const void* p, size_t l) : ptr(p), len(l) {} + const_raw(const raw& m) : ptr(m.ptr), len(m.len) {} public: const void* ptr; size_t len; @@ -257,8 +258,8 @@ private: \ uint32_t len; \ }; -RAW_CLASS(raw, void*, raw xraw(); const_raw xraw() const; ) -RAW_CLASS(const_raw, const void*, const_raw xraw() const; ) +RAW_CLASS(raw_ref, void*, raw xraw(); const_raw xraw() const; ) +RAW_CLASS(const_raw_ref, const void*, const_raw xraw() const; ) #undef RAW_CLASS(NAME, TYPE, EXTRA) diff --git a/cpp/test.cpp b/cpp/test.cpp index 12e91509..3756a05b 100644 --- a/cpp/test.cpp +++ b/cpp/test.cpp @@ -2,6 +2,8 @@ #include #include #include +#include +#include class checker { public: @@ -114,14 +116,85 @@ int main(void) }; c.check(d, sizeof(d), z.narray( - z.nraw("", 0), - z.nraw("a", 1), - z.nraw("bc", 2), - z.nraw("def", 3) + z.nraw_ref("", 0), + z.nraw_ref("a", 1), + z.nraw_ref("bc", 2), + z.nraw_ref("def", 3) ) ); } + static const uint16_t TASK_ARRAY = 100; + static char tarray[3]; + static char traw[64]; + + { + memset(traw, 'a', sizeof(traw)); + traw[0] = 0xda; + uint16_t n = htons(sizeof(traw)-3); + traw[1] = ((char*)&n)[0]; + traw[2] = ((char*)&n)[1]; + + msgpack::zone z; + std::cout << msgpack::unpack(traw, sizeof(traw), z) << std::endl;; + } + + { + tarray[0] = 0xdc; + uint16_t n = htons(TASK_ARRAY); + tarray[1] = ((char*)&n)[0]; + tarray[2] = ((char*)&n)[1]; + } + + { + // write message + ssize_t total_bytes = 0; + std::stringstream stream; + for(unsigned q=0; q < 10; ++q) { + stream.write(tarray, sizeof(tarray)); + total_bytes += sizeof(tarray); + for(uint16_t i=0; i < TASK_ARRAY; ++i) { + stream.write(traw, sizeof(traw)); + total_bytes += sizeof(traw); + } + } + + stream.seekg(0); + + // reserive message + unsigned num_msg = 0; + + static const size_t RESERVE_SIZE = 32;//*1024; + + msgpack::unpacker upk; + while(stream.good() && total_bytes > 0) { + + upk.reserve_buffer(RESERVE_SIZE); + size_t sz = stream.readsome( + (char*)upk.buffer(), + upk.buffer_capacity()); + + total_bytes -= sz; + std::cout << "read " << sz << " bytes to capacity " + << upk.buffer_capacity() << " bytes" + << std::endl; + + upk.buffer_consumed(sz); + while( upk.execute() ) { + std::cout << "message parsed" << std::endl; + boost::scoped_ptr pz(upk.release_zone()); + msgpack::object o = upk.data(); + upk.reset(); + std::cout << o << std::endl; + ++num_msg; + } + + } + + std::cout << "stream finished" << std::endl; + std::cout << num_msg << " messages reached" << std::endl; + } + return 0; } diff --git a/cpp/unpack.cpp b/cpp/unpack.cpp index 0f02d3c1..50550084 100644 --- a/cpp/unpack.cpp +++ b/cpp/unpack.cpp @@ -5,10 +5,10 @@ namespace msgpack { struct unpacker::context { - context(zone& z) + context(zone* z) { msgpack_unpacker_init(&m_ctx); - m_ctx.user = &z; + m_ctx.user = z; } ~context() { } @@ -30,6 +30,22 @@ struct unpacker::context { m_ctx.user = z; } + void reset(zone* z) + { + msgpack_unpacker_init(&m_ctx); + m_ctx.user = z; + } + + zone* user() + { + return m_ctx.user; + } + + void user(zone* z) + { + m_ctx.user = z; + } + private: msgpack_unpacker m_ctx; @@ -39,46 +55,105 @@ private: }; -unpacker::unpacker(zone& z) : - m_ctx(new context(z)), - m_zone(z), - m_finished(false) +unpacker::unpacker() : + m_zone(new zone()), + m_ctx(new context(m_zone)), + m_buffer(NULL), + m_used(0), + m_free(0), + m_off(0) { } -unpacker::~unpacker() { delete m_ctx; } - - -size_t unpacker::execute(const void* data, size_t len, size_t off) +unpacker::~unpacker() { - int ret = m_ctx->execute(data, len, &off); - if(ret < 0) { - throw unpack_error("parse error"); - } else if(ret > 0) { - m_finished = true; - return off; + free(m_buffer); + delete m_ctx; + delete m_zone; +} + + +void unpacker::expand_buffer(size_t len) +{ + if(m_off == 0) { + size_t next_size; + if(m_free != 0) { next_size = m_free * 2; } + else { next_size = MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE; } + while(next_size < len + m_used) { next_size *= 2; } + + // FIXME realloc? + + void* tmp = malloc(next_size); + if(!tmp) { throw std::bad_alloc(); } + memcpy(tmp, m_buffer, m_used); + + free(m_buffer); + m_buffer = tmp; + m_free = next_size - m_used; + } else { - m_finished = false; - return off; + size_t next_size = MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE; + while(next_size < len + m_used - m_off) { next_size *= 2; } + + void* tmp = malloc(next_size); + if(!tmp) { throw std::bad_alloc(); } + memcpy(tmp, ((char*)m_buffer)+m_off, m_used-m_off); + + try { + m_zone->push_finalizer(&zone::finalize_free, NULL, m_buffer); + } catch (...) { + free(tmp); + throw; + } + + m_buffer = tmp; + m_used = m_used - m_off; + m_free = next_size - m_used; + m_off = 0; } } +bool unpacker::execute() +{ + int ret = m_ctx->execute(m_buffer, m_used, &m_off); + if(ret < 0) { + throw unpack_error("parse error"); + } else if(ret == 0) { + return false; + } else { + return true; + } +} + +zone* unpacker::release_zone() +{ + zone* z = m_zone; + m_zone = NULL; + m_zone = new zone(); + m_ctx->user(m_zone); + return z; +} object unpacker::data() { return object(m_ctx->data()); } - void unpacker::reset() { + if(!m_zone->empty()) { + delete m_zone; + m_zone = NULL; + m_zone = new zone(); + } + expand_buffer(0); m_ctx->reset(); } object unpacker::unpack(const void* data, size_t len, zone& z) { - context ctx(z); + context ctx(&z); size_t off = 0; int ret = ctx.execute(data, len, &off); if(ret < 0) { diff --git a/cpp/unpack.hpp b/cpp/unpack.hpp index 61ba781a..df4a636c 100644 --- a/cpp/unpack.hpp +++ b/cpp/unpack.hpp @@ -5,6 +5,10 @@ #include "msgpack/zone.hpp" #include +#ifndef MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE +#define MSGPACK_UNPACKER_INITIAL_BUFFER_SIZE 8*1024 +#endif + namespace msgpack { @@ -16,26 +20,58 @@ struct unpack_error : public std::runtime_error { class unpacker { public: - unpacker(zone& z); + unpacker(); ~unpacker(); + public: - size_t execute(const void* data, size_t len, size_t off); - bool is_finished() { return m_finished; } + void reserve_buffer(size_t len); + void* buffer(); + size_t buffer_capacity() const; + void buffer_consumed(size_t len); + bool execute(); + zone* release_zone(); // never throw object data(); void reset(); + private: + zone* m_zone; + struct context; context* m_ctx; - zone& m_zone; - bool m_finished; + + void* m_buffer; + size_t m_used; + size_t m_free; + size_t m_off; + void expand_buffer(size_t len); + private: - unpacker(); unpacker(const unpacker&); + public: static object unpack(const void* data, size_t len, zone& z); }; +inline void unpacker::reserve_buffer(size_t len) +{ + if(m_free >= len) { return; } + expand_buffer(len); +} + +inline void* unpacker::buffer() + { return (void*)(((char*)m_buffer)+m_used); } + +inline size_t unpacker::buffer_capacity() const + { return m_free; } + +inline void unpacker::buffer_consumed(size_t len) +{ + m_used += len; + m_free -= len; +} + + inline object unpack(const void* data, size_t len, zone& z) { return unpacker::unpack(data, len, z); diff --git a/cpp/unpack_inline.cpp b/cpp/unpack_inline.cpp index 82f6e7a6..90023277 100644 --- a/cpp/unpack_inline.cpp +++ b/cpp/unpack_inline.cpp @@ -59,11 +59,8 @@ static inline object_class* msgpack_unpack_map_start(zone** z, unsigned int n) static inline void msgpack_unpack_map_item(zone** z, object_class* c, object_class* k, object_class* v) { reinterpret_cast(c)->store(k, v); } -static inline object_class* msgpack_unpack_string(zone** z, const void* b, size_t l) -{ return (*z)->nraw(b, l); } - -static inline object_class* msgpack_unpack_raw(zone** z, const void* b, size_t l) -{ return (*z)->nraw(b, l); } +static inline object_class* msgpack_unpack_raw(zone** z, const void* b, const void* p, size_t l) +{ return (*z)->nraw_ref(p, l); } } // extern "C" diff --git a/cpp/zone.cpp b/cpp/zone.cpp index 5031467e..de2de22b 100644 --- a/cpp/zone.cpp +++ b/cpp/zone.cpp @@ -6,27 +6,36 @@ namespace msgpack { void* zone::alloc() { if(m_used >= m_pool.size()*MSGPACK_ZONE_CHUNK_SIZE) { - m_pool.push_back(chunk_t()); + m_pool.push_back(new chunk_t()); } - void* data = m_pool[m_used/MSGPACK_ZONE_CHUNK_SIZE].cells[m_used%MSGPACK_ZONE_CHUNK_SIZE].data; + void* data = m_pool[m_used/MSGPACK_ZONE_CHUNK_SIZE]->cells[m_used%MSGPACK_ZONE_CHUNK_SIZE].data; ++m_used; return data; } void zone::clear() { - for(size_t b=0; b < m_used/MSGPACK_ZONE_CHUNK_SIZE; ++b) { - cell_t* c(m_pool[b].cells); - for(size_t e=0; e < MSGPACK_ZONE_CHUNK_SIZE; ++e) { + if(!m_pool.empty()) { + for(size_t b=0; b < m_used/MSGPACK_ZONE_CHUNK_SIZE; ++b) { + cell_t* c(m_pool[b]->cells); + for(size_t e=0; e < MSGPACK_ZONE_CHUNK_SIZE; ++e) { + reinterpret_cast(c[e].data)->~object_class(); + } + } + cell_t* c(m_pool.back()->cells); + for(size_t e=0; e < m_used%MSGPACK_ZONE_CHUNK_SIZE; ++e) { reinterpret_cast(c[e].data)->~object_class(); } - } - cell_t* c(m_pool.back().cells); - for(size_t e=0; e < m_used%MSGPACK_ZONE_CHUNK_SIZE; ++e) { - reinterpret_cast(c[e].data)->~object_class(); + + for(pool_t::iterator it(m_pool.begin()), it_end(m_pool.end()); + it != it_end; + ++it) { + delete *it; + } + m_pool.clear(); } m_used = 0; - m_pool.resize(1); + for(user_finalizer_t::reverse_iterator it(m_user_finalizer.rbegin()), it_end(m_user_finalizer.rend()); it != it_end; ++it) { diff --git a/cpp/zone.hpp.erb b/cpp/zone.hpp.erb index 1a941afc..40ce694e 100644 --- a/cpp/zone.hpp.erb +++ b/cpp/zone.hpp.erb @@ -1,8 +1,11 @@ #ifndef MSGPACK_ZONE_HPP__ #define MSGPACK_ZONE_HPP__ +#include #include "msgpack/object.hpp" -#include +#include +#include +#include #ifndef MSGPACK_ZONE_CHUNK_SIZE #define MSGPACK_ZONE_CHUNK_SIZE 64 @@ -13,8 +16,8 @@ namespace msgpack { class zone { public: -zone() : m_used(0), m_pool(1) { } -~zone() { clear(); } + zone() : m_used(0) { } + ~zone() { clear(); } public: template @@ -35,11 +38,27 @@ public: object_float* nfloat( float v) { return new (alloc()) object_float(v); } object_double* ndouble( double v) { return new (alloc()) object_double(v); } - object_raw* nraw(void* ptr, uint32_t len) - { return new (alloc()) object_raw(ptr, len); } + object_raw_ref* nraw_ref(void* ptr, uint32_t len) + { return new (alloc()) object_raw_ref(ptr, len); } - object_const_raw* nraw(const void* ptr, uint32_t len) - { return new (alloc()) object_const_raw(ptr, len); } + object_const_raw_ref* nraw_ref(const void* ptr, uint32_t len) + { return new (alloc()) object_const_raw_ref(ptr, len); } + + object_raw_ref* nraw_copy(const void* ptr, uint32_t len) + { + void* copy = malloc(len); + if(!copy) { throw std::bad_alloc(); } + object_raw_ref* o; + try { + o = new (alloc()) object_raw_ref(copy, len); + push_finalizer(&zone::finalize_free, NULL, copy); + } catch (...) { + free(copy); + throw; + } + memcpy(copy, ptr, len); + return o; + } object_array* narray() { return new (alloc()) object_array(); } @@ -67,6 +86,7 @@ public: public: void clear(); + bool empty() const; private: void* alloc(); @@ -75,9 +95,9 @@ private: size_t m_used; static const size_t MAX_OBJECT_SIZE = - sizeof(object_raw) > sizeof(object_array) - ? ( sizeof(object_raw) > sizeof(object_map) - ? sizeof(object_raw) + sizeof(object_raw_ref) > sizeof(object_array) + ? ( sizeof(object_raw_ref) > sizeof(object_map) + ? sizeof(object_raw_ref) : sizeof(object_map) ) : ( sizeof(object_array) > sizeof(object_map) @@ -94,7 +114,7 @@ private: cell_t cells[MSGPACK_ZONE_CHUNK_SIZE]; }; - typedef std::vector pool_t; + typedef std::vector pool_t; pool_t m_pool; @@ -112,11 +132,32 @@ private: typedef std::vector user_finalizer_t; user_finalizer_t m_user_finalizer; +private: + void resize_pool(size_t n); + +public: + static void finalize_free(void* obj, void* user) + { free(user); } + private: zone(const zone&); }; +template +inline void zone::push_finalizer(void (*func)(void* obj, void* user), T* obj, void* user) +{ + m_user_finalizer.push_back( finalizer( + func, reinterpret_cast(obj), + user) ); +} + +inline bool zone::empty() const +{ + return m_used == 0 && m_user_finalizer.empty(); +} + + } // namespace msgpack #endif /* msgpack/zone.hpp */