Skip to content

Commit de07119

Browse files
authored
Merge pull request #370 from gummif/gfa/multipartn
Problem: Missing recv multipart to fixed buffers
2 parents 0f8601c + 30fdfe0 commit de07119

File tree

2 files changed

+138
-16
lines changed

2 files changed

+138
-16
lines changed

tests/recv_multipart.cpp

+85
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,89 @@ TEST_CASE("recv_multipart test", "[recv_multipart]")
5656
CHECK_THROWS_AS(zmq::recv_multipart(zmq::socket_ref(), std::back_inserter(msgs)), const zmq::error_t &);
5757
}
5858
}
59+
60+
TEST_CASE("recv_multipart_n test", "[recv_multipart]")
61+
{
62+
zmq::context_t context(1);
63+
zmq::socket_t output(context, ZMQ_PAIR);
64+
zmq::socket_t input(context, ZMQ_PAIR);
65+
output.bind("inproc://multipart.test");
66+
input.connect("inproc://multipart.test");
67+
68+
SECTION("send 1 message")
69+
{
70+
input.send(zmq::str_buffer("hello"));
71+
72+
std::array<zmq::message_t, 1> msgs;
73+
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
74+
REQUIRE(ret);
75+
CHECK(*ret == 1);
76+
CHECK(msgs[0].size() == 5);
77+
}
78+
SECTION("send 1 message 2")
79+
{
80+
input.send(zmq::str_buffer("hello"));
81+
82+
std::array<zmq::message_t, 2> msgs;
83+
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
84+
REQUIRE(ret);
85+
CHECK(*ret == 1);
86+
CHECK(msgs[0].size() == 5);
87+
CHECK(msgs[1].size() == 0);
88+
}
89+
SECTION("send 2 messages, recv 1")
90+
{
91+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
92+
input.send(zmq::str_buffer("world!"));
93+
94+
std::array<zmq::message_t, 1> msgs;
95+
CHECK_THROWS_AS(
96+
zmq::recv_multipart_n(output, msgs.data(), msgs.size()),
97+
const std::runtime_error&);
98+
}
99+
SECTION("recv 0")
100+
{
101+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
102+
input.send(zmq::str_buffer("world!"));
103+
104+
std::array<zmq::message_t, 1> msgs;
105+
CHECK_THROWS_AS(
106+
zmq::recv_multipart_n(output, msgs.data(), 0),
107+
const std::runtime_error&);
108+
}
109+
SECTION("send 2 messages")
110+
{
111+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
112+
input.send(zmq::str_buffer("world!"));
113+
114+
std::array<zmq::message_t, 2> msgs;
115+
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size());
116+
REQUIRE(ret);
117+
CHECK(*ret == 2);
118+
CHECK(msgs[0].size() == 5);
119+
CHECK(msgs[1].size() == 6);
120+
}
121+
SECTION("send no messages, dontwait")
122+
{
123+
std::array<zmq::message_t, 1> msgs;
124+
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size(), zmq::recv_flags::dontwait);
125+
CHECK_FALSE(ret);
126+
REQUIRE(msgs[0].size() == 0);
127+
}
128+
SECTION("send 1 partial message, dontwait")
129+
{
130+
input.send(zmq::str_buffer("hello"), zmq::send_flags::sndmore);
131+
132+
std::array<zmq::message_t, 1> msgs;
133+
auto ret = zmq::recv_multipart_n(output, msgs.data(), msgs.size(), zmq::recv_flags::dontwait);
134+
CHECK_FALSE(ret);
135+
REQUIRE(msgs[0].size() == 0);
136+
}
137+
SECTION("recv with invalid socket")
138+
{
139+
std::array<zmq::message_t, 1> msgs;
140+
CHECK_THROWS_AS(zmq::recv_multipart_n(zmq::socket_ref(), msgs.data(), msgs.size()), const zmq::error_t &);
141+
}
142+
}
143+
59144
#endif

zmq_addon.hpp

+53-16
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,21 @@ namespace zmq
4040

4141
#ifdef ZMQ_CPP11
4242

43-
/* Receive a multipart message.
44-
45-
Writes the zmq::message_t objects to OutputIterator out.
46-
The out iterator must handle an unspecified number of writes,
47-
e.g. by using std::back_inserter.
48-
49-
Returns: the number of messages received or nullopt (on EAGAIN).
50-
Throws: if recv throws. Any exceptions thrown
51-
by the out iterator will be propagated and the message
52-
may have been only partially received with pending
53-
message parts. It is adviced to close this socket in that event.
54-
*/
55-
template<class OutputIt>
56-
ZMQ_NODISCARD
57-
recv_result_t recv_multipart(socket_ref s, OutputIt out,
58-
recv_flags flags = recv_flags::none)
43+
namespace detail
44+
{
45+
template<bool CheckN, class OutputIt>
46+
recv_result_t recv_multipart_n(socket_ref s, OutputIt out, size_t n,
47+
recv_flags flags)
5948
{
6049
size_t msg_count = 0;
6150
message_t msg;
6251
while (true)
6352
{
53+
if (CheckN)
54+
{
55+
if (msg_count >= n)
56+
throw std::runtime_error("Too many message parts in recv_multipart_n");
57+
}
6458
if (!s.recv(msg, flags))
6559
{
6660
// zmq ensures atomic delivery of messages
@@ -75,6 +69,49 @@ recv_result_t recv_multipart(socket_ref s, OutputIt out,
7569
}
7670
return msg_count;
7771
}
72+
} // namespace detail
73+
74+
/* Receive a multipart message.
75+
76+
Writes the zmq::message_t objects to OutputIterator out.
77+
The out iterator must handle an unspecified number of writes,
78+
e.g. by using std::back_inserter.
79+
80+
Returns: the number of messages received or nullopt (on EAGAIN).
81+
Throws: if recv throws. Any exceptions thrown
82+
by the out iterator will be propagated and the message
83+
may have been only partially received with pending
84+
message parts. It is adviced to close this socket in that event.
85+
*/
86+
template<class OutputIt>
87+
ZMQ_NODISCARD
88+
recv_result_t recv_multipart(socket_ref s, OutputIt out,
89+
recv_flags flags = recv_flags::none)
90+
{
91+
return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
92+
}
93+
94+
/* Receive a multipart message.
95+
96+
Writes at most n zmq::message_t objects to OutputIterator out.
97+
If the number of message parts of the incoming message exceeds n
98+
then an exception will be thrown.
99+
100+
Returns: the number of messages received or nullopt (on EAGAIN).
101+
Throws: if recv throws. Throws std::runtime_error if the number
102+
of message parts exceeds n (exactly n messages will have been written
103+
to out). Any exceptions thrown
104+
by the out iterator will be propagated and the message
105+
may have been only partially received with pending
106+
message parts. It is adviced to close this socket in that event.
107+
*/
108+
template<class OutputIt>
109+
ZMQ_NODISCARD
110+
recv_result_t recv_multipart_n(socket_ref s, OutputIt out, size_t n,
111+
recv_flags flags = recv_flags::none)
112+
{
113+
return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
114+
}
78115

79116
/* Send a multipart message.
80117

0 commit comments

Comments
 (0)