Problem: Handling multipart messages is complex

Solution: Add generic algorithms for sending and receiving multipart
messages.
This commit is contained in:
Gudmundur Adalsteinsson 2019-10-26 18:29:22 +00:00
parent fdb2f13971
commit 505edeb336
5 changed files with 268 additions and 0 deletions

View File

@ -25,6 +25,8 @@ add_executable(
poller.cpp
active_poller.cpp
multipart.cpp
recv_multipart.cpp
send_multipart.cpp
monitor.cpp
utilities.cpp
)

60
tests/recv_multipart.cpp Normal file
View File

@ -0,0 +1,60 @@
#include <catch.hpp>
#include <zmq_addon.hpp>
#ifdef ZMQ_CPP11
TEST_CASE("recv_multipart test", "[recv_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 1 message")
{
input.send(zmq::str_buffer("hello"));
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
REQUIRE(ret);
CHECK(*ret == 1);
REQUIRE(msgs.size() == 1);
CHECK(msgs[0].size() == 5);
}
SECTION("send 2 messages")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
REQUIRE(ret);
CHECK(*ret == 2);
REQUIRE(msgs.size() == 2);
CHECK(msgs[0].size() == 5);
CHECK(msgs[1].size() == 6);
}
SECTION("send no messages, dontwait")
{
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs.size() == 0);
}
SECTION("send 1 partial message, dontwait")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs.size() == 0);
}
SECTION("recv with invalid socket")
{
std::vector<zmq::message_t> msgs;
CHECK_THROWS_AS(zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)), const zmq::error_t &);
}
}
#endif

125
tests/send_multipart.cpp Normal file
View File

@ -0,0 +1,125 @@
#include <forward_list>
#include <catch.hpp>
#include <zmq_addon.hpp>
#ifdef ZMQ_CPP11
TEST_CASE("send_multipart test", "[send_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 0 messages")
{
std::vector<zmq::message_t> imsgs;
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 0);
}
SECTION("send 1 message")
{
std::array<zmq::message_t, 1> imsgs = {zmq::message_t(3)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 1);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 1);
REQUIRE(omsgs.size() == 1);
CHECK(omsgs[0].size() == 3);
}
SECTION("send 2 messages")
{
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, const_buffer")
{
std::array<zmq::const_buffer, 2> imsgs = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, mutable_buffer")
{
char buf[4] = {};
std::array<zmq::mutable_buffer, 2> imsgs = {zmq::buffer(buf, 3), zmq::buffer(buf)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, dontwait")
{
zmq::socket_t push(context, ZMQ_PUSH);
push.bind("inproc://multipart.test.push");
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
auto iret = zmq::send_multipart(push, imsgs, zmq::send_flags::dontwait);
REQUIRE_FALSE(iret);
}
// TODO send with EAGAIN
SECTION("send, misc. containers")
{
std::vector<zmq::message_t> msgs_vec;
msgs_vec.emplace_back(3);
msgs_vec.emplace_back(4);
auto iret = zmq::send_multipart(input, msgs_vec);
REQUIRE(iret);
CHECK(*iret == 2);
std::forward_list<zmq::message_t> msgs_list;
msgs_list.emplace_front(4);
msgs_list.emplace_front(3);
iret = zmq::send_multipart(input, msgs_list);
REQUIRE(iret);
CHECK(*iret == 2);
// init. list
const auto msgs_il = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
iret = zmq::send_multipart(input, msgs_il);
REQUIRE(iret);
CHECK(*iret == 2);
// rvalue
iret = zmq::send_multipart(input,
std::initializer_list<zmq::const_buffer>{zmq::str_buffer("foo"), zmq::str_buffer("bar!")});
REQUIRE(iret);
CHECK(*iret == 2);
}
SECTION("send with invalid socket")
{
std::vector<zmq::message_t> msgs(1);
CHECK_THROWS_AS(zmq::send_multipart(zmq::socket_ref(), msgs), const zmq::error_t &);
}
}
#endif

View File

@ -971,6 +971,15 @@ inline const_buffer buffer(const const_buffer& cb, size_t n) noexcept
namespace detail
{
template<class T>
struct is_buffer
{
static constexpr bool value =
std::is_same<T, const_buffer>::value ||
std::is_same<T, mutable_buffer>::value;
};
template<class T> struct is_pod_like
{
// NOTE: The networking draft N4771 section 16.11 requires

View File

@ -37,6 +37,78 @@
namespace zmq
{
#ifdef ZMQ_CPP11
/* Receive a multipart message.
Writes the zmq::message_t objects to OutputIterator out.
The out iterator must handle an unspecified amount of write,
e.g. using std::back_inserter.
Returns: the number of messages received or nullopt (on EAGAIN).
Throws: if recv throws.
*/
template<class OutputIt>
ZMQ_NODISCARD detail::recv_result_t recv_multipart(socket_ref s, OutputIt out,
recv_flags flags = recv_flags::none)
{
size_t msg_count = 0;
message_t msg;
while (true)
{
if (!s.recv(msg, flags))
{
// zmq ensures atomic delivery of messages
assert(msg_count == 0);
return {};
}
++msg_count;
const bool more = msg.more();
*out++ = std::move(msg);
if (!more)
break;
}
return msg_count;
}
/* Send a multipart message.
The range must be a ForwardRange of zmq::message_t,
zmq::const_buffer or zmq::mutable_buffer.
The flags may be zmq::send_flags::sndmore if there are
more message parts to be sent after the call to this function.
Returns: the number of messages sent or nullopt (on EAGAIN).
Throws: if send throws.
*/
template<class Range,
typename = typename std::enable_if<
detail::is_range<Range>::value
&& (std::is_same<detail::range_value_t<Range>, message_t>::value
|| detail::is_buffer<detail::range_value_t<Range>>::value)
>::type>
detail::send_result_t send_multipart(socket_ref s, Range&& msgs,
send_flags flags = send_flags::none)
{
auto it = msgs.begin();
auto last = msgs.end();
const size_t msg_count = static_cast<size_t>(std::distance(it, last));
for (; it != last; ++it)
{
const auto mf = flags | (std::next(it) == last ? send_flags::none : send_flags::sndmore);
if (!s.send(*it, mf))
{
// zmq ensures atomic delivery of messages
assert(it == msgs.begin());
return {};
}
}
return msg_count;
}
#endif
#ifdef ZMQ_HAS_RVALUE_REFS
/*