mirror of
https://github.com/zeromq/cppzmq.git
synced 2025-01-22 10:25:53 +01:00
245 lines
6.3 KiB
C++
245 lines
6.3 KiB
C++
#include "testutil.hpp"
|
|
|
|
#ifdef ZMQ_CPP11
|
|
#include <thread>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <functional>
|
|
|
|
class mock_monitor_t : public zmq::monitor_t
|
|
{
|
|
public:
|
|
|
|
void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE
|
|
{
|
|
++connected;
|
|
++total;
|
|
}
|
|
|
|
int total{0};
|
|
int connected{0};
|
|
};
|
|
|
|
#endif
|
|
|
|
TEST_CASE("monitor create destroy", "[monitor]")
|
|
{
|
|
zmq::monitor_t monitor;
|
|
}
|
|
|
|
#if defined(ZMQ_CPP11)
|
|
TEST_CASE("monitor move construct", "[monitor]")
|
|
{
|
|
zmq::context_t ctx;
|
|
zmq::socket_t sock(ctx, ZMQ_DEALER);
|
|
SECTION("move ctor empty") {
|
|
zmq::monitor_t monitor1;
|
|
zmq::monitor_t monitor2 = std::move(monitor1);
|
|
}
|
|
SECTION("move ctor init") {
|
|
zmq::monitor_t monitor1;
|
|
monitor1.init(sock, "inproc://monitor-client");
|
|
zmq::monitor_t monitor2 = std::move(monitor1);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("monitor move assign", "[monitor]")
|
|
{
|
|
zmq::context_t ctx;
|
|
zmq::socket_t sock(ctx, ZMQ_DEALER);
|
|
SECTION("move assign empty") {
|
|
zmq::monitor_t monitor1;
|
|
zmq::monitor_t monitor2;
|
|
monitor1 = std::move(monitor2);
|
|
}
|
|
SECTION("move assign init") {
|
|
zmq::monitor_t monitor1;
|
|
monitor1.init(sock, "inproc://monitor-client");
|
|
zmq::monitor_t monitor2;
|
|
monitor2 = std::move(monitor1);
|
|
}
|
|
SECTION("move assign init both") {
|
|
zmq::monitor_t monitor1;
|
|
monitor1.init(sock, "inproc://monitor-client");
|
|
zmq::monitor_t monitor2;
|
|
zmq::socket_t sock2(ctx, ZMQ_DEALER);
|
|
monitor2.init(sock2, "inproc://monitor-client2");
|
|
monitor2 = std::move(monitor1);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("monitor init event count", "[monitor]")
|
|
{
|
|
common_server_client_setup s{false};
|
|
mock_monitor_t monitor;
|
|
|
|
const int expected_event_count = 1;
|
|
monitor.init(s.client, "inproc://foo");
|
|
|
|
CHECK_FALSE(monitor.check_event(0));
|
|
s.init();
|
|
|
|
while (monitor.check_event(1000) && monitor.total < expected_event_count) {
|
|
}
|
|
CHECK(monitor.connected == 1);
|
|
CHECK(monitor.total == expected_event_count);
|
|
}
|
|
|
|
TEST_CASE("monitor init abort", "[monitor]")
|
|
{
|
|
class mock_monitor : public mock_monitor_t
|
|
{
|
|
public:
|
|
mock_monitor(std::function<void(void)> handle_connected) :
|
|
handle_connected{std::move(handle_connected)}
|
|
{
|
|
}
|
|
|
|
void on_event_connected(const zmq_event_t &e, const char *m) ZMQ_OVERRIDE
|
|
{
|
|
mock_monitor_t::on_event_connected(e, m);
|
|
handle_connected();
|
|
}
|
|
|
|
std::function<void(void)> handle_connected;
|
|
};
|
|
|
|
common_server_client_setup s(false);
|
|
|
|
std::mutex mutex;
|
|
std::condition_variable cond_var;
|
|
bool done{false};
|
|
|
|
mock_monitor monitor([&]()
|
|
{
|
|
std::lock_guard<std::mutex> lock(mutex);
|
|
done = true;
|
|
cond_var.notify_one();
|
|
});
|
|
monitor.init(s.client, "inproc://foo");
|
|
|
|
auto thread = std::thread([&monitor]
|
|
{
|
|
while (monitor.check_event(-1)) {
|
|
}
|
|
});
|
|
|
|
s.init();
|
|
{
|
|
std::unique_lock<std::mutex> lock(mutex);
|
|
CHECK(cond_var.wait_for(lock, std::chrono::seconds(1),
|
|
[&done] { return done; }));
|
|
}
|
|
CHECK(monitor.connected == 1);
|
|
monitor.abort();
|
|
thread.join();
|
|
}
|
|
|
|
|
|
TEST_CASE("monitor from move assigned socket", "[monitor]")
|
|
{
|
|
zmq::context_t ctx;
|
|
zmq::socket_t sock;
|
|
sock = std::move([&ctx] {
|
|
zmq::socket_t sock(ctx, ZMQ_DEALER);
|
|
return sock;
|
|
}());
|
|
zmq::monitor_t monitor1;
|
|
monitor1.init(sock, "inproc://monitor-client");
|
|
// On failure, this test might hang indefinitely instead of immediately
|
|
// failing
|
|
}
|
|
#endif
|
|
|
|
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \
|
|
&& !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER)
|
|
#include "zmq_addon.hpp"
|
|
|
|
TEST_CASE("poll monitor events using active poller", "[monitor]")
|
|
{
|
|
// define necessary class for test
|
|
class test_monitor : public zmq::monitor_t
|
|
{
|
|
public:
|
|
void init(zmq::socket_t &socket,
|
|
const char *const addr_,
|
|
int events = ZMQ_EVENT_ALL)
|
|
{
|
|
zmq::monitor_t::init(socket, addr_, events);
|
|
}
|
|
|
|
void addToPoller(zmq::active_poller_t &inActivePoller)
|
|
{
|
|
inActivePoller.add(
|
|
monitor_socket(), zmq::event_flags::pollin,
|
|
[&](zmq::event_flags ef) { process_event(static_cast<short>(ef)); });
|
|
}
|
|
|
|
void on_event_accepted(const zmq_event_t &event_, const char *addr_) override
|
|
{
|
|
clientAccepted++;
|
|
}
|
|
void on_event_disconnected(const zmq_event_t &event,
|
|
const char *const addr) override
|
|
{
|
|
clientDisconnected++;
|
|
}
|
|
|
|
int clientAccepted = 0;
|
|
int clientDisconnected = 0;
|
|
};
|
|
|
|
//Arrange
|
|
int messageCounter = 0;
|
|
const char monitorAddress[] = "inproc://monitor-server";
|
|
|
|
auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) {
|
|
poller.add(socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) {
|
|
zmq::message_t msg;
|
|
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
|
|
messageCounter++;
|
|
});
|
|
};
|
|
|
|
common_server_client_setup sockets(false);
|
|
|
|
test_monitor monitor;
|
|
monitor.init(sockets.server, monitorAddress,
|
|
ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
|
|
|
|
zmq::active_poller_t poller;
|
|
monitor.addToPoller(poller);
|
|
addToPoller(sockets.server, poller);
|
|
|
|
sockets.init();
|
|
sockets.client.send(zmq::message_t(0), zmq::send_flags::dontwait);
|
|
CHECK(monitor.clientAccepted == 0);
|
|
CHECK(monitor.clientDisconnected == 0);
|
|
|
|
//Act
|
|
for (int i = 0; i < 100; i++) {
|
|
poller.wait(std::chrono::milliseconds(50));
|
|
if (monitor.clientAccepted > 0) {
|
|
break;
|
|
}
|
|
}
|
|
CHECK(monitor.clientAccepted == 1);
|
|
CHECK(monitor.clientDisconnected == 0);
|
|
|
|
sockets.client.close();
|
|
|
|
for (int i = 0; i < 100; i++) {
|
|
poller.wait(std::chrono::milliseconds(50));
|
|
if (monitor.clientDisconnected > 0) {
|
|
break;
|
|
}
|
|
}
|
|
sockets.server.close();
|
|
|
|
// Assert
|
|
CHECK(messageCounter == 1);
|
|
CHECK(monitor.clientAccepted == 1);
|
|
CHECK(monitor.clientDisconnected == 1);
|
|
}
|
|
#endif
|