zero-copy message receive

Construct messages from a reference-counted buffer allocated once
per receive instead of copying the data.
This commit is contained in:
Jens Auer
2015-05-29 23:54:43 +02:00
parent 611e96c701
commit e9b403a7b1
6 changed files with 273 additions and 67 deletions

View File

@@ -41,8 +41,96 @@
#include "wire.hpp"
#include "err.hpp"
zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_):
buf(NULL),
bufsize( bufsize_ )
{
}
zmq::shared_message_memory_allocator::~shared_message_memory_allocator()
{
deallocate();
}
unsigned char* zmq::shared_message_memory_allocator::allocate()
{
if (buf)
{
// release reference count to couple lifetime to messages
call_dec_ref(NULL, buf);
// release pointer because we are going to create a new buffer
release();
}
// @todo aligmnet padding may be needed
if (!buf)
{
buf = (unsigned char *) malloc(bufsize + sizeof(zmq::atomic_counter_t));
alloc_assert (buf);
new(buf) atomic_counter_t(1);
}
return buf + sizeof( zmq::atomic_counter_t);
}
void zmq::shared_message_memory_allocator::deallocate()
{
free(buf);
buf = NULL;
}
unsigned char* zmq::shared_message_memory_allocator::release()
{
unsigned char* b = buf;
buf = NULL;
return b;
}
void zmq::shared_message_memory_allocator::reset(unsigned char* b)
{
deallocate();
buf = b;
}
void zmq::shared_message_memory_allocator::inc_ref()
{
((zmq::atomic_counter_t*)buf)->add(1);
}
void zmq::shared_message_memory_allocator::call_dec_ref(void*, void* hint) {
zmq_assert( hint );
zmq::atomic_counter_t *c = reinterpret_cast<zmq::atomic_counter_t *>(hint);
if (!c->sub(1)) {
c->~atomic_counter_t();
free(hint);
}
}
size_t zmq::shared_message_memory_allocator::size() const
{
if (buf)
{
return bufsize;
}
else
{
return 0;
}
}
unsigned char* zmq::shared_message_memory_allocator::data()
{
zmq_assert(buf);
return buf + sizeof(zmq::atomic_counter_t);
}
zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <v2_decoder_t> (bufsize_),
shared_message_memory_allocator( bufsize_),
decoder_base_t <v2_decoder_t, shared_message_memory_allocator> (this),
msg_flags (0),
maxmsgsize (maxmsgsize_)
{
@@ -59,7 +147,7 @@ zmq::v2_decoder_t::~v2_decoder_t ()
errno_assert (rc == 0);
}
int zmq::v2_decoder_t::flags_ready ()
int zmq::v2_decoder_t::flags_ready (unsigned char const*)
{
msg_flags = 0;
if (tmpbuf [0] & v2_protocol_t::more_flag)
@@ -77,40 +165,20 @@ int zmq::v2_decoder_t::flags_ready ()
return 0;
}
int zmq::v2_decoder_t::one_byte_size_ready ()
int zmq::v2_decoder_t::one_byte_size_ready (unsigned char const* read_from)
{
// Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0)
if (unlikely (tmpbuf [0] > static_cast <uint64_t> (maxmsgsize))) {
errno = EMSGSIZE;
return -1;
}
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = in_progress.init_size (tmpbuf [0]);
if (unlikely (rc)) {
errno_assert (errno == ENOMEM);
rc = in_progress.init ();
errno_assert (rc == 0);
errno = ENOMEM;
return -1;
}
in_progress.set_flags (msg_flags);
next_step (in_progress.data (), in_progress.size (),
&v2_decoder_t::message_ready);
return 0;
return size_ready(tmpbuf[0], read_from);
}
int zmq::v2_decoder_t::eight_byte_size_ready ()
{
int zmq::v2_decoder_t::eight_byte_size_ready (unsigned char const* read_from) {
// The payload size is encoded as 64-bit unsigned integer.
// The most significant byte comes first.
const uint64_t msg_size = get_uint64 (tmpbuf);
const uint64_t msg_size = get_uint64(tmpbuf);
return size_ready(msg_size, read_from);
}
int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_pos) {
// Message size must not exceed the maximum allowed size.
if (maxmsgsize >= 0)
if (unlikely (msg_size > static_cast <uint64_t> (maxmsgsize))) {
@@ -127,7 +195,31 @@ int zmq::v2_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised.
int rc = in_progress.init_size (static_cast <size_t> (msg_size));
int rc = -1;
// the current message can exceed the current buffer. We have to copy the buffer
// data into a new message and complete it in the next receive.
if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size())))
{
// a new message has started, but the size would exceed the pre-allocated arena
// this happens everytime when a message does not fit completely into the buffer
rc = in_progress.init_size (static_cast <size_t> (msg_size));
}
else
{
// construct message using n bytes from the buffer as storage
// increase buffer ref count
rc = in_progress.init( (unsigned char*)read_pos,
msg_size, shared_message_memory_allocator::call_dec_ref,
buffer() );
// For small messages, data has been copied and refcount does not have to be increased
if (in_progress.is_lmsg())
{
inc_ref();
}
}
if (unlikely (rc)) {
errno_assert (errno == ENOMEM);
rc = in_progress.init ();
@@ -137,13 +229,19 @@ int zmq::v2_decoder_t::eight_byte_size_ready ()
}
in_progress.set_flags (msg_flags);
// this sets read_pos to
// the message data address if the data needs to be copied
// for small message / messages exceeding the current buffer
// or
// to the current start address in the buffer because the message
// was constructed to use n bytes from the address passed as argument
next_step (in_progress.data (), in_progress.size (),
&v2_decoder_t::message_ready);
return 0;
}
int zmq::v2_decoder_t::message_ready ()
int zmq::v2_decoder_t::message_ready (unsigned char const*)
{
// Message is completely read. Signal this to the caller
// and prepare to decode next message.