Merge pull request #358 from gummif/gfa/send-recv-multipart

Problem: Handling multipart messages is complex
This commit is contained in:
Simon Giesecke 2019-11-08 09:41:25 +01:00 committed by GitHub
commit a34d2a3da9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 308 additions and 2 deletions

View File

@ -41,9 +41,8 @@ Supported platforms
Examples
========
This example requires at least C++11.
These examples require at least C++11.
```c++
#include <string>
#include <zmq.hpp>
int main()
@ -54,6 +53,36 @@ int main()
sock.send(zmq::str_buffer("Hello, world"), zmq::send_flags::dontwait);
}
```
This a more complex example where we send and receive multi-part messages.
```c++
#include <iostream>
#include <zmq_addon.hpp>
int main()
{
zmq::context_t ctx;
zmq::socket_t sock1(ctx, zmq::socket_type::pair);
zmq::socket_t sock2(ctx, zmq::socket_type::pair);
sock1.bind("inproc://test");
sock2.connect("inproc://test");
std::array<zmq::const_buffer, 2> send_msgs = {
zmq::str_buffer("foo"),
zmq::str_buffer("bar!")
};
if (!zmq::send_multipart(sock1, send_msgs))
return 1;
std::vector<zmq::message_t> recv_msgs;
const auto ret = zmq::recv_multipart(
sock2, std::back_inserter(recv_msgs));
if (!ret)
return 1;
std::cout << "Got " << *ret
<< " messages" << std::endl;
return 0;
}
```
Contribution policy
===================

View File

@ -25,6 +25,8 @@ add_executable(
poller.cpp
active_poller.cpp
multipart.cpp
recv_multipart.cpp
send_multipart.cpp
monitor.cpp
utilities.cpp
)

60
tests/recv_multipart.cpp Normal file
View File

@ -0,0 +1,60 @@
#ifdef ZMQ_CPP11
#include <catch.hpp>
#include <zmq_addon.hpp>
TEST_CASE("recv_multipart test", "[recv_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 1 message")
{
input.send(zmq::str_buffer("hello"));
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
REQUIRE(ret);
CHECK(*ret == 1);
REQUIRE(msgs.size() == 1);
CHECK(msgs[0].size() == 5);
}
SECTION("send 2 messages")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
input.send(zmq::str_buffer("world!"));
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs));
REQUIRE(ret);
CHECK(*ret == 2);
REQUIRE(msgs.size() == 2);
CHECK(msgs[0].size() == 5);
CHECK(msgs[1].size() == 6);
}
SECTION("send no messages, dontwait")
{
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs.size() == 0);
}
SECTION("send 1 partial message, dontwait")
{
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
std::vector<zmq::message_t> msgs;
auto ret = zmq::recv_multipart(output, std::back_inserter(msgs), zmq::recv_flags::dontwait);
CHECK_FALSE(ret);
REQUIRE(msgs.size() == 0);
}
SECTION("recv with invalid socket")
{
std::vector<zmq::message_t> msgs;
CHECK_THROWS_AS(zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)), const zmq::error_t &);
}
}
#endif

124
tests/send_multipart.cpp Normal file
View File

@ -0,0 +1,124 @@
#ifdef ZMQ_CPP11
#include <forward_list>
#include <catch.hpp>
#include <zmq_addon.hpp>
TEST_CASE("send_multipart test", "[send_multipart]")
{
zmq::context_t context(1);
zmq::socket_t output(context, ZMQ_PAIR);
zmq::socket_t input(context, ZMQ_PAIR);
output.bind("inproc://multipart.test");
input.connect("inproc://multipart.test");
SECTION("send 0 messages")
{
std::vector<zmq::message_t> imsgs;
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 0);
}
SECTION("send 1 message")
{
std::array<zmq::message_t, 1> imsgs = {zmq::message_t(3)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 1);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 1);
REQUIRE(omsgs.size() == 1);
CHECK(omsgs[0].size() == 3);
}
SECTION("send 2 messages")
{
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, const_buffer")
{
std::array<zmq::const_buffer, 2> imsgs = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, mutable_buffer")
{
char buf[4] = {};
std::array<zmq::mutable_buffer, 2> imsgs = {zmq::buffer(buf, 3), zmq::buffer(buf)};
auto iret = zmq::send_multipart(input, imsgs);
REQUIRE(iret);
CHECK(*iret == 2);
std::vector<zmq::message_t> omsgs;
auto oret = zmq::recv_multipart(output, std::back_inserter(omsgs));
REQUIRE(oret);
CHECK(*oret == 2);
REQUIRE(omsgs.size() == 2);
CHECK(omsgs[0].size() == 3);
CHECK(omsgs[1].size() == 4);
}
SECTION("send 2 messages, dontwait")
{
zmq::socket_t push(context, ZMQ_PUSH);
push.bind("inproc://multipart.test.push");
std::array<zmq::message_t, 2> imsgs = {zmq::message_t(3), zmq::message_t(4)};
auto iret = zmq::send_multipart(push, imsgs, zmq::send_flags::dontwait);
REQUIRE_FALSE(iret);
}
SECTION("send, misc. containers")
{
std::vector<zmq::message_t> msgs_vec;
msgs_vec.emplace_back(3);
msgs_vec.emplace_back(4);
auto iret = zmq::send_multipart(input, msgs_vec);
REQUIRE(iret);
CHECK(*iret == 2);
std::forward_list<zmq::message_t> msgs_list;
msgs_list.emplace_front(4);
msgs_list.emplace_front(3);
iret = zmq::send_multipart(input, msgs_list);
REQUIRE(iret);
CHECK(*iret == 2);
// init. list
const auto msgs_il = {zmq::str_buffer("foo"), zmq::str_buffer("bar!")};
iret = zmq::send_multipart(input, msgs_il);
REQUIRE(iret);
CHECK(*iret == 2);
// rvalue
iret = zmq::send_multipart(input,
std::initializer_list<zmq::const_buffer>{zmq::str_buffer("foo"), zmq::str_buffer("bar!")});
REQUIRE(iret);
CHECK(*iret == 2);
}
SECTION("send with invalid socket")
{
std::vector<zmq::message_t> msgs(1);
CHECK_THROWS_AS(zmq::send_multipart(zmq::socket_ref(), msgs), const zmq::error_t &);
}
}
#endif

View File

@ -971,6 +971,15 @@ inline const_buffer buffer(const const_buffer& cb, size_t n) noexcept
namespace detail
{
template<class T>
struct is_buffer
{
static constexpr bool value =
std::is_same<T, const_buffer>::value ||
std::is_same<T, mutable_buffer>::value;
};
template<class T> struct is_pod_like
{
// NOTE: The networking draft N4771 section 16.11 requires

View File

@ -37,6 +37,88 @@
namespace zmq
{
#ifdef ZMQ_CPP11
/* Receive a multipart message.
Writes the zmq::message_t objects to OutputIterator out.
The out iterator must handle an unspecified number of writes,
e.g. by using std::back_inserter.
Returns: the number of messages received or nullopt (on EAGAIN).
Throws: if recv throws. Any exceptions thrown
by the out iterator will be propagated and the message
may have been only partially received with pending
message parts. It is adviced to close this socket in that event.
*/
template<class OutputIt>
ZMQ_NODISCARD detail::recv_result_t recv_multipart(socket_ref s, OutputIt out,
recv_flags flags = recv_flags::none)
{
size_t msg_count = 0;
message_t msg;
while (true)
{
if (!s.recv(msg, flags))
{
// zmq ensures atomic delivery of messages
assert(msg_count == 0);
return {};
}
++msg_count;
const bool more = msg.more();
*out++ = std::move(msg);
if (!more)
break;
}
return msg_count;
}
/* Send a multipart message.
The range must be a ForwardRange of zmq::message_t,
zmq::const_buffer or zmq::mutable_buffer.
The flags may be zmq::send_flags::sndmore if there are
more message parts to be sent after the call to this function.
Returns: the number of messages sent (exactly msgs.size()) or nullopt (on EAGAIN).
Throws: if send throws. Any exceptions thrown
by the msgs range will be propagated and the message
may have been only partially sent. It is adviced to close this socket in that event.
*/
template<class Range,
typename = typename std::enable_if<
detail::is_range<Range>::value
&& (std::is_same<detail::range_value_t<Range>, message_t>::value
|| detail::is_buffer<detail::range_value_t<Range>>::value)
>::type>
detail::send_result_t send_multipart(socket_ref s, Range&& msgs,
send_flags flags = send_flags::none)
{
using std::begin;
using std::end;
auto it = begin(msgs);
const auto end_it = end(msgs);
size_t msg_count = 0;
while (it != end_it)
{
const auto next = std::next(it);
const auto msg_flags = flags | (next == end_it ? send_flags::none : send_flags::sndmore);
if (!s.send(*it, msg_flags))
{
// zmq ensures atomic delivery of messages
assert(it == begin(msgs));
return {};
}
++msg_count;
it = next;
}
return msg_count;
}
#endif
#ifdef ZMQ_HAS_RVALUE_REFS
/*