Skip to content

Commit 12c3003

Browse files
authored
Add an example (#423)
I had a really tough time figuring out how to do simple pub-sub with cppzmq; I finally figured it out so I'd like to add it.
1 parent 4ceb2c8 commit 12c3003

File tree

1 file changed

+99
-0
lines changed

1 file changed

+99
-0
lines changed
+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#include <future>
2+
#include <iostream>
3+
#include <string>
4+
5+
#include "zmq.hpp"
6+
#include "zmq_addon.hpp"
7+
8+
void PublisherThread(zmq::context_t *ctx) {
9+
// Prepare publisher
10+
zmq::socket_t publisher(*ctx, zmq::socket_type::pub);
11+
publisher.bind("inproc://#1");
12+
13+
// Give the subscribers a chance to connect, so they don't lose any messages
14+
std::this_thread::sleep_for(std::chrono::milliseconds(20));
15+
16+
while (true) {
17+
// Write three messages, each with an envelope and content
18+
publisher.send(zmq::str_buffer("A"), zmq::send_flags::sndmore);
19+
publisher.send(zmq::str_buffer("Message in A envelope"));
20+
publisher.send(zmq::str_buffer("B"), zmq::send_flags::sndmore);
21+
publisher.send(zmq::str_buffer("Message in B envelope"));
22+
publisher.send(zmq::str_buffer("C"), zmq::send_flags::sndmore);
23+
publisher.send(zmq::str_buffer("Message in C envelope"));
24+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
25+
}
26+
}
27+
28+
void SubscriberThread1(zmq::context_t *ctx) {
29+
// Prepare subscriber
30+
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
31+
subscriber.connect("inproc://#1");
32+
33+
// Thread2 opens "A" and "B" envelopes
34+
subscriber.set(zmq::sockopt::subscribe, "A");
35+
subscriber.set(zmq::sockopt::subscribe, "B");
36+
37+
while (1) {
38+
// Receive all parts of the message
39+
std::vector<zmq::message_t> recv_msgs;
40+
zmq::recv_result_t result =
41+
zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
42+
assert(result && "recv failed");
43+
44+
std::cout << "Thread2: [" << recv_msgs[0].to_string_view() << "] "
45+
<< recv_msgs[1].to_string_view() << std::endl;
46+
}
47+
}
48+
49+
void SubscriberThread2(zmq::context_t *ctx) {
50+
// Prepare our context and subscriber
51+
zmq::socket_t subscriber(*ctx, zmq::socket_type::sub);
52+
subscriber.connect("inproc://#1");
53+
54+
// Thread3 opens ALL envelopes
55+
subscriber.set(zmq::sockopt::subscribe, "");
56+
57+
while (1) {
58+
// Receive all parts of the message
59+
std::vector<zmq::message_t> recv_msgs;
60+
zmq::recv_result_t result =
61+
zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs));
62+
assert(result && "recv failed");
63+
64+
std::cout << "Thread3: [" << recv_msgs[0].to_string_view() << "] "
65+
<< recv_msgs[1].to_string_view() << std::endl;
66+
}
67+
}
68+
69+
int main() {
70+
/*
71+
* No I/O threads are involved in passing messages using the inproc transport.
72+
* Therefore, if you are using a ØMQ context for in-process messaging only you
73+
* can initialise the context with zero I/O threads.
74+
*
75+
* Source: http://api.zeromq.org/4-3:zmq-inproc
76+
*/
77+
zmq::context_t ctx(0);
78+
79+
auto thread1 = std::async(std::launch::async, PublisherThread, &ctx);
80+
81+
// Give the publisher a chance to bind, since inproc requires it
82+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
83+
84+
auto thread2 = std::async(std::launch::async, SubscriberThread1, &ctx);
85+
auto thread3 = std::async(std::launch::async, SubscriberThread2, &ctx);
86+
thread1.wait();
87+
thread2.wait();
88+
thread3.wait();
89+
90+
/*
91+
* Output:
92+
* An infinite loop of a mix of:
93+
* Thread2: [A] Message in A envelope
94+
* Thread2: [B] Message in B envelope
95+
* Thread3: [A] Message in A envelope
96+
* Thread3: [B] Message in B envelope
97+
* Thread3: [C] Message in C envelope
98+
*/
99+
}

0 commit comments

Comments
 (0)