mirror of
https://github.com/zeromq/cppzmq.git
synced 2024-12-13 10:52:57 +01:00
Add an example (#423)
I had a really tough time figuring out how to do simple pub-sub with cppzmq; I finally figured it out so I'd like to add it.
This commit is contained in:
parent
4ceb2c8722
commit
12c3003aa2
99
examples/pubsub_multithread_inproc.cpp
Normal file
99
examples/pubsub_multithread_inproc.cpp
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
#include <future>
|
||||||
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
#include "zmq.hpp"
|
||||||
|
#include "zmq_addon.hpp"
|
||||||
|
|
||||||
|
void PublisherThread(zmq::context_t *ctx) {
|
||||||
|
// Prepare publisher
|
||||||
|
zmq::socket_t publisher(*ctx, zmq::socket_type::pub);
|
||||||
|
publisher.bind("inproc://#1");
|
||||||
|
|
||||||
|
// Give the subscribers a chance to connect, so they don't lose any messages
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(20));
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// Write three messages, each with an envelope and content
|
||||||
|
publisher.send(zmq::str_buffer("A"), zmq::send_flags::sndmore);
|
||||||
|
publisher.send(zmq::str_buffer("Message in A envelope"));
|
||||||
|
publisher.send(zmq::str_buffer("B"), zmq::send_flags::sndmore);
|
||||||
|
publisher.send(zmq::str_buffer("Message in B envelope"));
|
||||||
|
publisher.send(zmq::str_buffer("C"), zmq::send_flags::sndmore);
|
||||||
|
publisher.send(zmq::str_buffer("Message in C envelope"));
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SubscriberThread1(zmq::context_t *ctx) {
|
||||||
|
// Prepare subscriber
|
||||||
|
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
|
||||||
|
subscriber.connect("inproc://#1");
|
||||||
|
|
||||||
|
// Thread2 opens "A" and "B" envelopes
|
||||||
|
subscriber.set(zmq::sockopt::subscribe, "A");
|
||||||
|
subscriber.set(zmq::sockopt::subscribe, "B");
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
// Receive all parts of the message
|
||||||
|
std::vector<zmq::message_t> recv_msgs;
|
||||||
|
zmq::recv_result_t result =
|
||||||
|
zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
|
||||||
|
assert(result && "recv failed");
|
||||||
|
|
||||||
|
std::cout << "Thread2: [" << recv_msgs[0].to_string_view() << "] "
|
||||||
|
<< recv_msgs[1].to_string_view() << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void SubscriberThread2(zmq::context_t *ctx) {
|
||||||
|
// Prepare our context and subscriber
|
||||||
|
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
|
||||||
|
subscriber.connect("inproc://#1");
|
||||||
|
|
||||||
|
// Thread3 opens ALL envelopes
|
||||||
|
subscriber.set(zmq::sockopt::subscribe, "");
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
// Receive all parts of the message
|
||||||
|
std::vector<zmq::message_t> recv_msgs;
|
||||||
|
zmq::recv_result_t result =
|
||||||
|
zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
|
||||||
|
assert(result && "recv failed");
|
||||||
|
|
||||||
|
std::cout << "Thread3: [" << recv_msgs[0].to_string_view() << "] "
|
||||||
|
<< recv_msgs[1].to_string_view() << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
/*
|
||||||
|
* No I/O threads are involved in passing messages using the inproc transport.
|
||||||
|
* Therefore, if you are using a ØMQ context for in-process messaging only you
|
||||||
|
* can initialise the context with zero I/O threads.
|
||||||
|
*
|
||||||
|
* Source: http://api.zeromq.org/4-3:zmq-inproc
|
||||||
|
*/
|
||||||
|
zmq::context_t ctx(0);
|
||||||
|
|
||||||
|
auto thread1 = std::async(std::launch::async, PublisherThread, &ctx);
|
||||||
|
|
||||||
|
// Give the publisher a chance to bind, since inproc requires it
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||||
|
|
||||||
|
auto thread2 = std::async(std::launch::async, SubscriberThread1, &ctx);
|
||||||
|
auto thread3 = std::async(std::launch::async, SubscriberThread2, &ctx);
|
||||||
|
thread1.wait();
|
||||||
|
thread2.wait();
|
||||||
|
thread3.wait();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Output:
|
||||||
|
* An infinite loop of a mix of:
|
||||||
|
* Thread2: [A] Message in A envelope
|
||||||
|
* Thread2: [B] Message in B envelope
|
||||||
|
* Thread3: [A] Message in A envelope
|
||||||
|
* Thread3: [B] Message in B envelope
|
||||||
|
* Thread3: [C] Message in C envelope
|
||||||
|
*/
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user