diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 41879aa..0ae37d7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -27,6 +27,7 @@ add_executable( multipart.cpp recv_multipart.cpp send_multipart.cpp + codec_multipart.cpp monitor.cpp utilities.cpp ) diff --git a/tests/codec_multipart.cpp b/tests/codec_multipart.cpp new file mode 100644 index 0000000..7be90d4 --- /dev/null +++ b/tests/codec_multipart.cpp @@ -0,0 +1,210 @@ +#include +#include + +#ifdef ZMQ_CPP11 + +TEST_CASE("multipart codec empty", "[codec_multipart]") +{ + using namespace zmq; + + multipart_t mmsg; + message_t msg = mmsg.encode(); + CHECK(msg.size() == 0); + + multipart_t mmsg2; + mmsg2.decode_append(msg); + CHECK(mmsg2.size() == 0); + +} + +TEST_CASE("multipart codec small", "[codec_multipart]") +{ + using namespace zmq; + + multipart_t mmsg; + mmsg.addstr("Hello World"); + message_t msg = mmsg.encode(); + CHECK(msg.size() == 1 + 11); // small size packing + + mmsg.addstr("Second frame"); + msg = mmsg.encode(); + CHECK(msg.size() == 1 + 11 + 1 + 12); + + multipart_t mmsg2; + mmsg2.decode_append(msg); + CHECK(mmsg2.size() == 2); + std::string part0 = mmsg2[0].to_string(); + CHECK(part0 == "Hello World"); + CHECK(mmsg2[1].to_string() == "Second frame"); +} + +TEST_CASE("multipart codec big", "[codec_multipart]") +{ + using namespace zmq; + + message_t big(495); // large size packing + big.data()[0] = 'X'; + + multipart_t mmsg; + mmsg.pushmem(big.data(), big.size()); + message_t msg = mmsg.encode(); + CHECK(msg.size() == 5 + 495); + CHECK(msg.data()[0] == std::numeric_limits::max()); + CHECK(msg.data()[5] == 'X'); + + CHECK(mmsg.size() == 1); + mmsg.decode_append(msg); + CHECK(mmsg.size() == 2); + CHECK(mmsg[0].data()[0] == 'X'); +} + +TEST_CASE("multipart codec decode bad data overflow", "[codec_multipart]") +{ + using namespace zmq; + + char bad_data[3] = {5, 'h', 'i'}; + message_t wrong_size(bad_data, 3); + CHECK(wrong_size.size() == 3); + CHECK(wrong_size.data()[0] == 5); + + CHECK_THROWS_AS( + multipart_t::decode(wrong_size), + std::out_of_range); +} + +TEST_CASE("multipart codec decode bad data extra data", "[codec_multipart]") +{ + using namespace zmq; + + char bad_data[3] = {1, 'h', 'i'}; + message_t wrong_size(bad_data, 3); + CHECK(wrong_size.size() == 3); + CHECK(wrong_size.data()[0] == 1); + + CHECK_THROWS_AS( + multipart_t::decode(wrong_size), + std::out_of_range); +} + + +// After exercising it, this test is disabled over concern of running +// on hosts which lack enough free memory to allow the absurdly large +// message part to be allocated. +#if 0 +TEST_CASE("multipart codec encode too big", "[codec_multipart]") +{ + using namespace zmq; + + const size_t too_big_size = 1L + std::numeric_limits::max(); + CHECK(too_big_size > std::numeric_limits::max()); + char* too_big_data = new char[too_big_size]; + multipart_t mmsg(too_big_data, too_big_size); + delete [] too_big_data; + + CHECK(mmsg.size() == 1); + CHECK(mmsg[0].size() > std::numeric_limits::max()); + + CHECK_THROWS_AS( + mmsg.encode(), + std::range_error); +} +#endif + +TEST_CASE("multipart codec free function with vector of message_t", "[codec_multipart]") +{ + using namespace zmq; + std::vector parts; + parts.emplace_back("Hello", 5); + parts.emplace_back("World",5); + auto msg = encode(parts); + CHECK(msg.size() == 1 + 5 + 1 + 5 ); + CHECK(msg.data()[0] == 5); + CHECK(msg.data()[1] == 'H'); + CHECK(msg.data()[6] == 5); + CHECK(msg.data()[7] == 'W'); + + std::vector parts2; + decode(msg, std::back_inserter(parts2)); + CHECK(parts.size() == 2); + CHECK(parts[0].size() == 5); + CHECK(parts[1].size() == 5); +} + +TEST_CASE("multipart codec free function with vector of const_buffer", "[codec_multipart]") +{ + using namespace zmq; + std::vector parts; + parts.emplace_back("Hello", 5); + parts.emplace_back("World",5); + auto msg = encode(parts); + CHECK(msg.size() == 1 + 5 + 1 + 5 ); + CHECK(msg.data()[0] == 5); + CHECK(msg.data()[1] == 'H'); + CHECK(msg.data()[6] == 5); + CHECK(msg.data()[7] == 'W'); + + std::vector parts2; + decode(msg, std::back_inserter(parts2)); + CHECK(parts.size() == 2); + CHECK(parts[0].size() == 5); + CHECK(parts[1].size() == 5); +} + +TEST_CASE("multipart codec free function with vector of mutable_buffer", "[codec_multipart]") +{ + using namespace zmq; + std::vector parts; + char hello[6] = "Hello"; + parts.emplace_back(hello, 5); + char world[6] = "World"; + parts.emplace_back(world,5); + auto msg = encode(parts); + CHECK(msg.size() == 1 + 5 + 1 + 5 ); + CHECK(msg.data()[0] == 5); + CHECK(msg.data()[1] == 'H'); + CHECK(msg.data()[6] == 5); + CHECK(msg.data()[7] == 'W'); + + std::vector parts2; + decode(msg, std::back_inserter(parts2)); + CHECK(parts.size() == 2); + CHECK(parts[0].size() == 5); + CHECK(parts[1].size() == 5); +} + +TEST_CASE("multipart codec free function with multipart_t", "[codec_multipart]") +{ + using namespace zmq; + multipart_t mmsg; + mmsg.addstr("Hello"); + mmsg.addstr("World"); + auto msg = encode(mmsg); + CHECK(msg.size() == 1 + 5 + 1 + 5); + CHECK(msg.data()[0] == 5); + CHECK(msg.data()[1] == 'H'); + CHECK(msg.data()[6] == 5); + CHECK(msg.data()[7] == 'W'); + + multipart_t mmsg2; + decode(msg, std::back_inserter(mmsg2)); + CHECK(mmsg2.size() == 2); + CHECK(mmsg2[0].size() == 5); + CHECK(mmsg2[1].size() == 5); +} + +TEST_CASE("multipart codec static method decode to multipart_t", "[codec_multipart]") +{ + using namespace zmq; + multipart_t mmsg; + mmsg.addstr("Hello"); + mmsg.addstr("World"); + auto msg = encode(mmsg); + + auto mmsg2 = multipart_t::decode(msg); + CHECK(mmsg2.size() == 2); + CHECK(mmsg2[0].size() == 5); + CHECK(mmsg2[1].size() == 5); +} + + +#endif diff --git a/zmq.hpp b/zmq.hpp index af3f440..98fe258 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -26,6 +26,12 @@ #ifndef __ZMQ_HPP_INCLUDED__ #define __ZMQ_HPP_INCLUDED__ +#ifdef _WIN32 +#ifndef NOMINMAX +#define NOMINMAX +#endif +#endif + // macros defined if has a specific standard or greater #if (defined(__cplusplus) && __cplusplus >= 201103L) \ || (defined(_MSC_VER) && _MSC_VER >= 1900) diff --git a/zmq_addon.hpp b/zmq_addon.hpp index 504115a..5c783f2 100644 --- a/zmq_addon.hpp +++ b/zmq_addon.hpp @@ -31,6 +31,7 @@ #include #include #ifdef ZMQ_CPP11 +#include #include #include #endif @@ -125,12 +126,13 @@ ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s, */ template::value && (std::is_same, message_t>::value || detail::is_buffer>::value)>::type #endif - > + > send_result_t send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none) { @@ -154,7 +156,122 @@ send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none) return msg_count; } +/* Encode a multipart message. + + The range must be a ForwardRange of zmq::message_t. A + zmq::multipart_t or STL container may be passed for encoding. + + Returns: a zmq::message_t holding the encoded multipart data. + + Throws: std::range_error is thrown if the size of any single part + can not fit in an unsigned 32 bit integer. + + The encoding is compatible with that used by the CZMQ function + zmsg_encode(). Each part consists of a size followed by the data. + These are placed contiguously into the output message. A part of + size less than 255 bytes will have a single byte size value. + Larger parts will have a five byte size value with the first byte + set to 0xFF and the remaining four bytes holding the size of the + part's data. +*/ +template::value + && (std::is_same, message_t>::value + || detail::is_buffer>::value)>::type #endif + > +message_t encode(const Range &parts) +{ + size_t mmsg_size = 0; + + // First pass check sizes + for (const auto &part : parts) { + size_t part_size = part.size(); + if (part_size > std::numeric_limits::max()) { + // Size value must fit into uint32_t. + throw std::range_error("Invalid size, message part too large"); + } + size_t count_size = 5; + if (part_size < std::numeric_limits::max()) { + count_size = 1; + } + mmsg_size += part_size + count_size; + } + + message_t encoded(mmsg_size); + unsigned char *buf = encoded.data(); + for (const auto &part : parts) { + uint32_t part_size = part.size(); + const unsigned char *part_data = + static_cast(part.data()); + + // small part + if (part_size < std::numeric_limits::max()) { + *buf++ = (unsigned char) part_size; + memcpy(buf, part_data, part_size); + buf += part_size; + continue; + } + + // big part + *buf++ = std::numeric_limits::max(); + *buf++ = (part_size >> 24) & std::numeric_limits::max(); + *buf++ = (part_size >> 16) & std::numeric_limits::max(); + *buf++ = (part_size >> 8) & std::numeric_limits::max(); + *buf++ = part_size & std::numeric_limits::max(); + memcpy(buf, part_data, part_size); + buf += part_size; + } + return encoded; +} + +/* Decode an encoded message to multiple parts. + + The given output iterator must be a ForwardIterator to a container + holding zmq::message_t such as a zmq::multipart_t or various STL + containers. + + Returns the ForwardIterator advanced once past the last decoded + part. + + Throws: a std::out_of_range is thrown if the encoded part sizes + lead to exceeding the message data bounds. + + The decoding assumes the message is encoded in the manner + performed by zmq::encode(). + */ +template OutputIt decode(const message_t &encoded, OutputIt out) +{ + const unsigned char *source = encoded.data(); + const unsigned char *const limit = source + encoded.size(); + + while (source < limit) { + size_t part_size = *source++; + if (part_size == std::numeric_limits::max()) { + if (source > limit - 4) { + throw std::out_of_range( + "Malformed encoding, overflow in reading size"); + } + part_size = ((uint32_t) source[0] << 24) + ((uint32_t) source[1] << 16) + + ((uint32_t) source[2] << 8) + (uint32_t) source[3]; + source += 4; + } + + if (source > limit - part_size) { + throw std::out_of_range("Malformed encoding, overflow in reading part"); + } + *out = message_t(source, part_size); + ++out; + source += part_size; + } + return out; +} + +#endif + #ifdef ZMQ_HAS_RVALUE_REFS @@ -171,6 +288,8 @@ class multipart_t std::deque m_parts; public: + typedef std::deque::value_type value_type; + typedef std::deque::iterator iterator; typedef std::deque::const_iterator const_iterator; @@ -343,6 +462,9 @@ class multipart_t // Push message part to back void add(message_t &&message) { m_parts.push_back(std::move(message)); } + // Alias to allow std::back_inserter() + void push_back(message_t &&message) { m_parts.push_back(std::move(message)); } + // Pop string from front std::string popstr() { @@ -469,6 +591,27 @@ class multipart_t return true; } +#ifdef ZMQ_CPP11 + + // Return single part message_t encoded from this multipart_t. + message_t encode() const { return zmq::encode(*this); } + + // Decode encoded message into multiple parts and append to self. + void decode_append(const message_t &encoded) + { + zmq::decode(encoded, std::back_inserter(*this)); + } + + // Return a new multipart_t containing the decoded message_t. + static multipart_t decode(const message_t &encoded) + { + multipart_t tmp; + zmq::decode(encoded, std::back_inserter(tmp)); + return tmp; + } + +#endif + private: // Disable implicit copying (moving is more efficient) multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;