mirror of
https://github.com/zeromq/cppzmq.git
synced 2025-01-19 00:46:05 +01:00
Problem: lack conversion message_t
and multipart_t
(#391)
Solution: add encode/decode methods to multipart_t giving a codec compatible with the CZMQ equivalent.
This commit is contained in:
parent
2f1ab4c2a7
commit
10431084bb
@ -27,6 +27,7 @@ add_executable(
|
||||
multipart.cpp
|
||||
recv_multipart.cpp
|
||||
send_multipart.cpp
|
||||
codec_multipart.cpp
|
||||
monitor.cpp
|
||||
utilities.cpp
|
||||
)
|
||||
|
210
tests/codec_multipart.cpp
Normal file
210
tests/codec_multipart.cpp
Normal file
@ -0,0 +1,210 @@
|
||||
#include <catch.hpp>
|
||||
#include <zmq_addon.hpp>
|
||||
|
||||
#ifdef ZMQ_CPP11
|
||||
|
||||
TEST_CASE("multipart codec empty", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
|
||||
multipart_t mmsg;
|
||||
message_t msg = mmsg.encode();
|
||||
CHECK(msg.size() == 0);
|
||||
|
||||
multipart_t mmsg2;
|
||||
mmsg2.decode_append(msg);
|
||||
CHECK(mmsg2.size() == 0);
|
||||
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec small", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
|
||||
multipart_t mmsg;
|
||||
mmsg.addstr("Hello World");
|
||||
message_t msg = mmsg.encode();
|
||||
CHECK(msg.size() == 1 + 11); // small size packing
|
||||
|
||||
mmsg.addstr("Second frame");
|
||||
msg = mmsg.encode();
|
||||
CHECK(msg.size() == 1 + 11 + 1 + 12);
|
||||
|
||||
multipart_t mmsg2;
|
||||
mmsg2.decode_append(msg);
|
||||
CHECK(mmsg2.size() == 2);
|
||||
std::string part0 = mmsg2[0].to_string();
|
||||
CHECK(part0 == "Hello World");
|
||||
CHECK(mmsg2[1].to_string() == "Second frame");
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec big", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
|
||||
message_t big(495); // large size packing
|
||||
big.data<char>()[0] = 'X';
|
||||
|
||||
multipart_t mmsg;
|
||||
mmsg.pushmem(big.data(), big.size());
|
||||
message_t msg = mmsg.encode();
|
||||
CHECK(msg.size() == 5 + 495);
|
||||
CHECK(msg.data<unsigned char>()[0] == std::numeric_limits<uint8_t>::max());
|
||||
CHECK(msg.data<unsigned char>()[5] == 'X');
|
||||
|
||||
CHECK(mmsg.size() == 1);
|
||||
mmsg.decode_append(msg);
|
||||
CHECK(mmsg.size() == 2);
|
||||
CHECK(mmsg[0].data<char>()[0] == 'X');
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec decode bad data overflow", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
|
||||
char bad_data[3] = {5, 'h', 'i'};
|
||||
message_t wrong_size(bad_data, 3);
|
||||
CHECK(wrong_size.size() == 3);
|
||||
CHECK(wrong_size.data<char>()[0] == 5);
|
||||
|
||||
CHECK_THROWS_AS(
|
||||
multipart_t::decode(wrong_size),
|
||||
std::out_of_range);
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec decode bad data extra data", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
|
||||
char bad_data[3] = {1, 'h', 'i'};
|
||||
message_t wrong_size(bad_data, 3);
|
||||
CHECK(wrong_size.size() == 3);
|
||||
CHECK(wrong_size.data<char>()[0] == 1);
|
||||
|
||||
CHECK_THROWS_AS(
|
||||
multipart_t::decode(wrong_size),
|
||||
std::out_of_range);
|
||||
}
|
||||
|
||||
|
||||
// After exercising it, this test is disabled over concern of running
|
||||
// on hosts which lack enough free memory to allow the absurdly large
|
||||
// message part to be allocated.
|
||||
#if 0
|
||||
TEST_CASE("multipart codec encode too big", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
|
||||
const size_t too_big_size = 1L + std::numeric_limits<uint32_t>::max();
|
||||
CHECK(too_big_size > std::numeric_limits<uint32_t>::max());
|
||||
char* too_big_data = new char[too_big_size];
|
||||
multipart_t mmsg(too_big_data, too_big_size);
|
||||
delete [] too_big_data;
|
||||
|
||||
CHECK(mmsg.size() == 1);
|
||||
CHECK(mmsg[0].size() > std::numeric_limits<uint32_t>::max());
|
||||
|
||||
CHECK_THROWS_AS(
|
||||
mmsg.encode(),
|
||||
std::range_error);
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST_CASE("multipart codec free function with vector of message_t", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
std::vector<message_t> parts;
|
||||
parts.emplace_back("Hello", 5);
|
||||
parts.emplace_back("World",5);
|
||||
auto msg = encode(parts);
|
||||
CHECK(msg.size() == 1 + 5 + 1 + 5 );
|
||||
CHECK(msg.data<unsigned char>()[0] == 5);
|
||||
CHECK(msg.data<unsigned char>()[1] == 'H');
|
||||
CHECK(msg.data<unsigned char>()[6] == 5);
|
||||
CHECK(msg.data<unsigned char>()[7] == 'W');
|
||||
|
||||
std::vector<message_t> parts2;
|
||||
decode(msg, std::back_inserter(parts2));
|
||||
CHECK(parts.size() == 2);
|
||||
CHECK(parts[0].size() == 5);
|
||||
CHECK(parts[1].size() == 5);
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec free function with vector of const_buffer", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
std::vector<const_buffer> parts;
|
||||
parts.emplace_back("Hello", 5);
|
||||
parts.emplace_back("World",5);
|
||||
auto msg = encode(parts);
|
||||
CHECK(msg.size() == 1 + 5 + 1 + 5 );
|
||||
CHECK(msg.data<unsigned char>()[0] == 5);
|
||||
CHECK(msg.data<unsigned char>()[1] == 'H');
|
||||
CHECK(msg.data<unsigned char>()[6] == 5);
|
||||
CHECK(msg.data<unsigned char>()[7] == 'W');
|
||||
|
||||
std::vector<message_t> parts2;
|
||||
decode(msg, std::back_inserter(parts2));
|
||||
CHECK(parts.size() == 2);
|
||||
CHECK(parts[0].size() == 5);
|
||||
CHECK(parts[1].size() == 5);
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec free function with vector of mutable_buffer", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
std::vector<mutable_buffer> parts;
|
||||
char hello[6] = "Hello";
|
||||
parts.emplace_back(hello, 5);
|
||||
char world[6] = "World";
|
||||
parts.emplace_back(world,5);
|
||||
auto msg = encode(parts);
|
||||
CHECK(msg.size() == 1 + 5 + 1 + 5 );
|
||||
CHECK(msg.data<unsigned char>()[0] == 5);
|
||||
CHECK(msg.data<unsigned char>()[1] == 'H');
|
||||
CHECK(msg.data<unsigned char>()[6] == 5);
|
||||
CHECK(msg.data<unsigned char>()[7] == 'W');
|
||||
|
||||
std::vector<message_t> parts2;
|
||||
decode(msg, std::back_inserter(parts2));
|
||||
CHECK(parts.size() == 2);
|
||||
CHECK(parts[0].size() == 5);
|
||||
CHECK(parts[1].size() == 5);
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec free function with multipart_t", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
multipart_t mmsg;
|
||||
mmsg.addstr("Hello");
|
||||
mmsg.addstr("World");
|
||||
auto msg = encode(mmsg);
|
||||
CHECK(msg.size() == 1 + 5 + 1 + 5);
|
||||
CHECK(msg.data<unsigned char>()[0] == 5);
|
||||
CHECK(msg.data<unsigned char>()[1] == 'H');
|
||||
CHECK(msg.data<unsigned char>()[6] == 5);
|
||||
CHECK(msg.data<unsigned char>()[7] == 'W');
|
||||
|
||||
multipart_t mmsg2;
|
||||
decode(msg, std::back_inserter(mmsg2));
|
||||
CHECK(mmsg2.size() == 2);
|
||||
CHECK(mmsg2[0].size() == 5);
|
||||
CHECK(mmsg2[1].size() == 5);
|
||||
}
|
||||
|
||||
TEST_CASE("multipart codec static method decode to multipart_t", "[codec_multipart]")
|
||||
{
|
||||
using namespace zmq;
|
||||
multipart_t mmsg;
|
||||
mmsg.addstr("Hello");
|
||||
mmsg.addstr("World");
|
||||
auto msg = encode(mmsg);
|
||||
|
||||
auto mmsg2 = multipart_t::decode(msg);
|
||||
CHECK(mmsg2.size() == 2);
|
||||
CHECK(mmsg2[0].size() == 5);
|
||||
CHECK(mmsg2[1].size() == 5);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
6
zmq.hpp
6
zmq.hpp
@ -26,6 +26,12 @@
|
||||
#ifndef __ZMQ_HPP_INCLUDED__
|
||||
#define __ZMQ_HPP_INCLUDED__
|
||||
|
||||
#ifdef _WIN32
|
||||
#ifndef NOMINMAX
|
||||
#define NOMINMAX
|
||||
#endif
|
||||
#endif
|
||||
|
||||
// macros defined if has a specific standard or greater
|
||||
#if (defined(__cplusplus) && __cplusplus >= 201103L) \
|
||||
|| (defined(_MSC_VER) && _MSC_VER >= 1900)
|
||||
|
147
zmq_addon.hpp
147
zmq_addon.hpp
@ -31,6 +31,7 @@
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#ifdef ZMQ_CPP11
|
||||
#include <limits>
|
||||
#include <functional>
|
||||
#include <unordered_map>
|
||||
#endif
|
||||
@ -125,12 +126,13 @@ ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
|
||||
*/
|
||||
template<class Range
|
||||
#ifndef ZMQ_CPP11_PARTIAL
|
||||
, typename = typename std::enable_if<
|
||||
,
|
||||
typename = typename std::enable_if<
|
||||
detail::is_range<Range>::value
|
||||
&& (std::is_same<detail::range_value_t<Range>, message_t>::value
|
||||
|| detail::is_buffer<detail::range_value_t<Range>>::value)>::type
|
||||
#endif
|
||||
>
|
||||
>
|
||||
send_result_t
|
||||
send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
|
||||
{
|
||||
@ -154,7 +156,122 @@ send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
|
||||
return msg_count;
|
||||
}
|
||||
|
||||
/* Encode a multipart message.
|
||||
|
||||
The range must be a ForwardRange of zmq::message_t. A
|
||||
zmq::multipart_t or STL container may be passed for encoding.
|
||||
|
||||
Returns: a zmq::message_t holding the encoded multipart data.
|
||||
|
||||
Throws: std::range_error is thrown if the size of any single part
|
||||
can not fit in an unsigned 32 bit integer.
|
||||
|
||||
The encoding is compatible with that used by the CZMQ function
|
||||
zmsg_encode(). Each part consists of a size followed by the data.
|
||||
These are placed contiguously into the output message. A part of
|
||||
size less than 255 bytes will have a single byte size value.
|
||||
Larger parts will have a five byte size value with the first byte
|
||||
set to 0xFF and the remaining four bytes holding the size of the
|
||||
part's data.
|
||||
*/
|
||||
template<class Range
|
||||
#ifndef ZMQ_CPP11_PARTIAL
|
||||
,
|
||||
typename = typename std::enable_if<
|
||||
detail::is_range<Range>::value
|
||||
&& (std::is_same<detail::range_value_t<Range>, message_t>::value
|
||||
|| detail::is_buffer<detail::range_value_t<Range>>::value)>::type
|
||||
#endif
|
||||
>
|
||||
message_t encode(const Range &parts)
|
||||
{
|
||||
size_t mmsg_size = 0;
|
||||
|
||||
// First pass check sizes
|
||||
for (const auto &part : parts) {
|
||||
size_t part_size = part.size();
|
||||
if (part_size > std::numeric_limits<std::uint32_t>::max()) {
|
||||
// Size value must fit into uint32_t.
|
||||
throw std::range_error("Invalid size, message part too large");
|
||||
}
|
||||
size_t count_size = 5;
|
||||
if (part_size < std::numeric_limits<std::uint8_t>::max()) {
|
||||
count_size = 1;
|
||||
}
|
||||
mmsg_size += part_size + count_size;
|
||||
}
|
||||
|
||||
message_t encoded(mmsg_size);
|
||||
unsigned char *buf = encoded.data<unsigned char>();
|
||||
for (const auto &part : parts) {
|
||||
uint32_t part_size = part.size();
|
||||
const unsigned char *part_data =
|
||||
static_cast<const unsigned char *>(part.data());
|
||||
|
||||
// small part
|
||||
if (part_size < std::numeric_limits<std::uint8_t>::max()) {
|
||||
*buf++ = (unsigned char) part_size;
|
||||
memcpy(buf, part_data, part_size);
|
||||
buf += part_size;
|
||||
continue;
|
||||
}
|
||||
|
||||
// big part
|
||||
*buf++ = std::numeric_limits<uint8_t>::max();
|
||||
*buf++ = (part_size >> 24) & std::numeric_limits<std::uint8_t>::max();
|
||||
*buf++ = (part_size >> 16) & std::numeric_limits<std::uint8_t>::max();
|
||||
*buf++ = (part_size >> 8) & std::numeric_limits<std::uint8_t>::max();
|
||||
*buf++ = part_size & std::numeric_limits<std::uint8_t>::max();
|
||||
memcpy(buf, part_data, part_size);
|
||||
buf += part_size;
|
||||
}
|
||||
return encoded;
|
||||
}
|
||||
|
||||
/* Decode an encoded message to multiple parts.
|
||||
|
||||
The given output iterator must be a ForwardIterator to a container
|
||||
holding zmq::message_t such as a zmq::multipart_t or various STL
|
||||
containers.
|
||||
|
||||
Returns the ForwardIterator advanced once past the last decoded
|
||||
part.
|
||||
|
||||
Throws: a std::out_of_range is thrown if the encoded part sizes
|
||||
lead to exceeding the message data bounds.
|
||||
|
||||
The decoding assumes the message is encoded in the manner
|
||||
performed by zmq::encode().
|
||||
*/
|
||||
template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
|
||||
{
|
||||
const unsigned char *source = encoded.data<unsigned char>();
|
||||
const unsigned char *const limit = source + encoded.size();
|
||||
|
||||
while (source < limit) {
|
||||
size_t part_size = *source++;
|
||||
if (part_size == std::numeric_limits<std::uint8_t>::max()) {
|
||||
if (source > limit - 4) {
|
||||
throw std::out_of_range(
|
||||
"Malformed encoding, overflow in reading size");
|
||||
}
|
||||
part_size = ((uint32_t) source[0] << 24) + ((uint32_t) source[1] << 16)
|
||||
+ ((uint32_t) source[2] << 8) + (uint32_t) source[3];
|
||||
source += 4;
|
||||
}
|
||||
|
||||
if (source > limit - part_size) {
|
||||
throw std::out_of_range("Malformed encoding, overflow in reading part");
|
||||
}
|
||||
*out = message_t(source, part_size);
|
||||
++out;
|
||||
source += part_size;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
#ifdef ZMQ_HAS_RVALUE_REFS
|
||||
|
||||
@ -171,6 +288,8 @@ class multipart_t
|
||||
std::deque<message_t> m_parts;
|
||||
|
||||
public:
|
||||
typedef std::deque<message_t>::value_type value_type;
|
||||
|
||||
typedef std::deque<message_t>::iterator iterator;
|
||||
typedef std::deque<message_t>::const_iterator const_iterator;
|
||||
|
||||
@ -343,6 +462,9 @@ class multipart_t
|
||||
// Push message part to back
|
||||
void add(message_t &&message) { m_parts.push_back(std::move(message)); }
|
||||
|
||||
// Alias to allow std::back_inserter()
|
||||
void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
|
||||
|
||||
// Pop string from front
|
||||
std::string popstr()
|
||||
{
|
||||
@ -469,6 +591,27 @@ class multipart_t
|
||||
return true;
|
||||
}
|
||||
|
||||
#ifdef ZMQ_CPP11
|
||||
|
||||
// Return single part message_t encoded from this multipart_t.
|
||||
message_t encode() const { return zmq::encode(*this); }
|
||||
|
||||
// Decode encoded message into multiple parts and append to self.
|
||||
void decode_append(const message_t &encoded)
|
||||
{
|
||||
zmq::decode(encoded, std::back_inserter(*this));
|
||||
}
|
||||
|
||||
// Return a new multipart_t containing the decoded message_t.
|
||||
static multipart_t decode(const message_t &encoded)
|
||||
{
|
||||
multipart_t tmp;
|
||||
zmq::decode(encoded, std::back_inserter(tmp));
|
||||
return tmp;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
private:
|
||||
// Disable implicit copying (moving is more efficient)
|
||||
multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
|
||||
|
Loading…
x
Reference in New Issue
Block a user