Problem: Missing recv multipart to fixed buffers

Solution: Add recv_multipart_n function
This commit is contained in:
Gudmundur Adalsteinsson 2019-11-09 12:38:58 +00:00
parent 5ee8261743
commit 93e3090eb3
2 changed files with 143 additions and 17 deletions

View File

@ -56,4 +56,89 @@ TEST_CASE("recv_multipart test", "[recv_multipart]")
CHECK_THROWS_AS(zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)), const zmq::error_t &); CHECK_THROWS_AS(zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)), const zmq::error_t &);
} }
} }
TEST_CASE("recv_multipart_n test", "[recv_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 1 message")
{
input.send(zmq::str_buffer("hello"));
std::array<zmq::message_t, 1> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
REQUIRE(ret);
CHECK(*ret == 1);
CHECK(msgs[0].size() == 5);
}
SECTION("send 1 message 2")
{
input.send(zmq::str_buffer("hello"));
std::array<zmq::message_t, 2> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
REQUIRE(ret);
CHECK(*ret == 1);
CHECK(msgs[0].size() == 5);
CHECK(msgs[1].size() == 0);
}
SECTION("send 2 messages, recv 1")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::array<zmq::message_t, 1> msgs;
CHECK_THROWS_AS(
zmq::recv_multipart_n(output, msgs.data(), msgs.size()),
const std::runtime_error&);
}
SECTION("recv 0")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::array<zmq::message_t, 1> msgs;
CHECK_THROWS_AS(
zmq::recv_multipart_n(output, msgs.data(), 0),
const std::runtime_error&);
}
SECTION("send 2 messages")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::array<zmq::message_t, 2> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
REQUIRE(ret);
CHECK(*ret == 2);
CHECK(msgs[0].size() == 5);
CHECK(msgs[1].size() == 6);
}
SECTION("send no messages, dontwait")
{
std::array<zmq::message_t, 1> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size(), zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs[0].size() == 0);
}
SECTION("send 1 partial message, dontwait")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
std::array<zmq::message_t, 1> msgs;
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size(), zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs[0].size() == 0);
}
SECTION("recv with invalid socket")
{
std::array<zmq::message_t, 1> msgs;
CHECK_THROWS_AS(zmq::recv_multipart_n(zmq::socket_ref(), msgs.data(), msgs.size()), const zmq::error_t &);
}
}
#endif #endif

View File

@ -40,6 +40,41 @@ namespace zmq
#ifdef ZMQ_CPP11 #ifdef ZMQ_CPP11
namespace detail
{
template<bool CheckN, class OutputIt>
recv_result_t recv_multipart_n(socket_ref s, OutputIt out, size_t n,
recv_flags flags)
{
size_t msg_count = 0;
message_t msg;
while (true)
{
#ifdef ZMQ_CPP17
if constexpr (CheckN)
#else
if (CheckN)
#endif
{
if (msg_count >= n)
throw std::runtime_error("Too many message parts in recv_multipart_n");
}
if (!s.recv(msg, flags))
{
// zmq ensures atomic delivery of messages
assert(msg_count == 0);
return {};
}
++msg_count;
const bool more = msg.more();
*out++ = std::move(msg);
if (!more)
break;
}
return msg_count;
}
} // namespace detail
/* Receive a multipart message. /* Receive a multipart message.
Writes the zmq::message_t objects to OutputIterator out. Writes the zmq::message_t objects to OutputIterator out.
@ -57,23 +92,29 @@ ZMQ_NODISCARD
recv_result_t recv_multipart(socket_ref s, OutputIt out, recv_result_t recv_multipart(socket_ref s, OutputIt out,
recv_flags flags = recv_flags::none) recv_flags flags = recv_flags::none)
{ {
size_t msg_count = 0; return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
message_t msg; }
while (true)
{ /* Receive a multipart message.
if (!s.recv(msg, flags))
{ Writes at most n zmq::message_t objects to OutputIterator out.
// zmq ensures atomic delivery of messages If the number of message parts of the incoming message exceeds n
assert(msg_count == 0); then an exception will be thrown.
return {};
} Returns: the number of messages received or nullopt (on EAGAIN).
++msg_count; Throws: if recv throws. Throws std::runtime_error if the number
const bool more = msg.more(); of message parts exceeds n (exactly n messages will have been written
*out++ = std::move(msg); to out). Any exceptions thrown
if (!more) by the out iterator will be propagated and the message
break; may have been only partially received with pending
} message parts. It is adviced to close this socket in that event.
return msg_count; */
template<class OutputIt>
ZMQ_NODISCARD
recv_result_t recv_multipart_n(socket_ref s, OutputIt out, size_t n,
recv_flags flags = recv_flags::none)
{
return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
} }
/* Send a multipart message. /* Send a multipart message.