diff --git a/examples/pubsub_multithread_inproc.cpp b/examples/pubsub_multithread_inproc.cpp new file mode 100644 index 0000000..be2b304 --- /dev/null +++ b/examples/pubsub_multithread_inproc.cpp @@ -0,0 +1,99 @@ +#include +#include +#include + +#include "zmq.hpp" +#include "zmq_addon.hpp" + +void PublisherThread(zmq::context_t *ctx) { + // Prepare publisher + zmq::socket_t publisher(*ctx, zmq::socket_type::pub); + publisher.bind("inproc://#1"); + + // Give the subscribers a chance to connect, so they don't lose any messages + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + + while (true) { + // Write three messages, each with an envelope and content + publisher.send(zmq::str_buffer("A"), zmq::send_flags::sndmore); + publisher.send(zmq::str_buffer("Message in A envelope")); + publisher.send(zmq::str_buffer("B"), zmq::send_flags::sndmore); + publisher.send(zmq::str_buffer("Message in B envelope")); + publisher.send(zmq::str_buffer("C"), zmq::send_flags::sndmore); + publisher.send(zmq::str_buffer("Message in C envelope")); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } +} + +void SubscriberThread1(zmq::context_t *ctx) { + // Prepare subscriber + zmq::socket_t subscriber(*ctx, zmq::socket_type::sub); + subscriber.connect("inproc://#1"); + + // Thread2 opens "A" and "B" envelopes + subscriber.set(zmq::sockopt::subscribe, "A"); + subscriber.set(zmq::sockopt::subscribe, "B"); + + while (1) { + // Receive all parts of the message + std::vector recv_msgs; + zmq::recv_result_t result = + zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs)); + assert(result && "recv failed"); + + std::cout << "Thread2: [" << recv_msgs[0].to_string_view() << "] " + << recv_msgs[1].to_string_view() << std::endl; + } +} + +void SubscriberThread2(zmq::context_t *ctx) { + // Prepare our context and subscriber + zmq::socket_t subscriber(*ctx, zmq::socket_type::sub); + subscriber.connect("inproc://#1"); + + // Thread3 opens ALL envelopes + subscriber.set(zmq::sockopt::subscribe, ""); + + while (1) { + // Receive all parts of the message + std::vector recv_msgs; + zmq::recv_result_t result = + zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs)); + assert(result && "recv failed"); + + std::cout << "Thread3: [" << recv_msgs[0].to_string_view() << "] " + << recv_msgs[1].to_string_view() << std::endl; + } +} + +int main() { + /* + * No I/O threads are involved in passing messages using the inproc transport. + * Therefore, if you are using a ØMQ context for in-process messaging only you + * can initialise the context with zero I/O threads. + * + * Source: http://api.zeromq.org/4-3:zmq-inproc + */ + zmq::context_t ctx(0); + + auto thread1 = std::async(std::launch::async, PublisherThread, &ctx); + + // Give the publisher a chance to bind, since inproc requires it + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + auto thread2 = std::async(std::launch::async, SubscriberThread1, &ctx); + auto thread3 = std::async(std::launch::async, SubscriberThread2, &ctx); + thread1.wait(); + thread2.wait(); + thread3.wait(); + + /* + * Output: + * An infinite loop of a mix of: + * Thread2: [A] Message in A envelope + * Thread2: [B] Message in B envelope + * Thread3: [A] Message in A envelope + * Thread3: [B] Message in B envelope + * Thread3: [C] Message in C envelope + */ +}