"zero-copy" raw_decoder

A memcpy is eliminated when receiving data on a ZMQ_STREAM socket. Instead
of receiving into a static buffer and then copying the data into the
buffer malloced in msg_t::init_size, the raw_decoder allocates the memory
for together with the reference-counter and creates a msg_t object
on top of that memory. This saves the memcpy operation.

For small messages, data is still copied and the receive buffer is reused.
This commit is contained in:
Jens Auer
2015-06-14 19:00:52 +02:00
parent d83220e92e
commit 3679793601
9 changed files with 320 additions and 223 deletions

View File

@@ -42,107 +42,7 @@
#include "wire.hpp"
#include "err.hpp"
zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_):
buf(NULL),
bufsize( 0 ),
max_size( bufsize_ ),
msg_refcnt( NULL )
{
}
zmq::shared_message_memory_allocator::~shared_message_memory_allocator()
{
if (buf) {
deallocate();
}
}
unsigned char* zmq::shared_message_memory_allocator::allocate()
{
if (buf)
{
// release reference count to couple lifetime to messages
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
// if refcnt drops to 0, there are no message using the buffer
// because either all messages have been closed or only vsm-messages
// were created
if (c->sub(1)) {
// buffer is still in use as message data. "Release" it and create a new one
// release pointer because we are going to create a new buffer
release();
}
}
// if buf != NULL it is not used by any message so we can re-use it for the next run
if (!buf) {
// allocate memory for reference counters together with reception buffer
size_t const maxCounters = std::ceil( (double)max_size / (double)msg_t::max_vsm_size);
size_t const allocationsize = max_size + sizeof(zmq::atomic_counter_t) + maxCounters * sizeof(zmq::atomic_counter_t);
buf = (unsigned char *) malloc(allocationsize);
alloc_assert (buf);
new(buf) atomic_counter_t(1);
}
else
{
// release reference count to couple lifetime to messages
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
c->set(1);
}
bufsize = max_size;
msg_refcnt = reinterpret_cast<zmq::atomic_counter_t*>( buf + sizeof(atomic_counter_t) + max_size );
return buf + sizeof( zmq::atomic_counter_t);
}
void zmq::shared_message_memory_allocator::deallocate()
{
free(buf);
buf = NULL;
bufsize = 0;
msg_refcnt = NULL;
}
unsigned char* zmq::shared_message_memory_allocator::release()
{
unsigned char* b = buf;
buf = NULL;
bufsize = 0;
msg_refcnt = NULL;
return b;
}
void zmq::shared_message_memory_allocator::inc_ref()
{
((zmq::atomic_counter_t*)buf)->add(1);
}
void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) {
zmq_assert( hint );
unsigned char* buf = static_cast<unsigned char*>(hint);
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(buf);
if (!c->sub(1)) {
c->~atomic_counter_t();
free(buf);
buf = NULL;
}
}
size_t zmq::shared_message_memory_allocator::size() const
{
return bufsize;
}
unsigned char* zmq::shared_message_memory_allocator::data()
{
return buf + sizeof(zmq::atomic_counter_t);
}
zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
shared_message_memory_allocator( bufsize_),