1818
1919-define (TIMEOUT , 30_000 ).
2020
21+ % % This is the pseudo queue that is specially interpreted by RabbitMQ.
22+ -define (REPLY_QUEUE , <<" amq.rabbitmq.reply-to" >>).
23+
2124all () ->
2225 [
2326 {group , cluster_size_1 },
@@ -28,7 +31,11 @@ groups() ->
2831 [
2932 {cluster_size_1 , [shuffle ],
3033 [
31- trace
34+ trace ,
35+ failure_ack_mode ,
36+ failure_multiple_consumers ,
37+ failure_reuse_consumer_tag ,
38+ failure_publish
3239 ]},
3340 {cluster_size_3 , [shuffle ],
3441 [
@@ -82,8 +89,6 @@ trace(Config) ->
8289 Node = atom_to_binary (rabbit_ct_broker_helpers :get_node_config (Config , 0 , nodename )),
8390 TraceQueue = <<" tests.amqpl_direct_reply_to.trace.tracing" >>,
8491 RequestQueue = <<" tests.amqpl_direct_reply_to.trace.requests" >>,
85- % % This is the pseudo queue that is specially interpreted by RabbitMQ.
86- ReplyQueue = <<" amq.rabbitmq.reply-to" >>,
8792 RequestPayload = <<" my request" >>,
8893 ReplyPayload = <<" my reply" >>,
8994 CorrelationId = <<" my correlation ID" >>,
@@ -102,7 +107,7 @@ trace(Config) ->
102107
103108 % % There is no need to declare this pseudo queue first.
104109 amqp_channel :subscribe (RequesterCh ,
105- # 'basic.consume' {queue = ReplyQueue ,
110+ # 'basic.consume' {queue = ? REPLY_QUEUE ,
106111 no_ack = true },
107112 self ()),
108113 CTag = receive # 'basic.consume_ok' {consumer_tag = CTag0 } -> CTag0
@@ -114,7 +119,7 @@ trace(Config) ->
114119 amqp_channel :cast (
115120 RequesterCh ,
116121 # 'basic.publish' {routing_key = RequestQueue },
117- # amqp_msg {props = # 'P_basic' {reply_to = ReplyQueue ,
122+ # amqp_msg {props = # 'P_basic' {reply_to = ? REPLY_QUEUE ,
118123 correlation_id = CorrelationId },
119124 payload = RequestPayload }),
120125 receive # 'basic.ack' {} -> ok
@@ -182,6 +187,85 @@ trace(Config) ->
182187 [# 'queue.delete_ok' {} = amqp_channel :call (Ch , # 'queue.delete' {queue = Q0 }) || Q0 <- Qs ],
183188 {ok , _ } = rabbit_ct_broker_helpers :rabbitmqctl (Config , 0 , [" trace_off" ]).
184189
190+ % % A consumer must consume in no-ack mode.
191+ failure_ack_mode (Config ) ->
192+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
193+ Consume = # 'basic.consume' {queue = ? REPLY_QUEUE ,
194+ no_ack = false },
195+ try amqp_channel :subscribe (Ch , Consume , self ()) of
196+ _ ->
197+ ct :fail (" expected subscribe in ack mode to fail" )
198+ catch exit :Reason ->
199+ ? assertMatch (
200+ {{_ , {_ , _ , <<" PRECONDITION_FAILED - reply consumer cannot acknowledge" >>}}, _ },
201+ Reason )
202+ end ,
203+ ok = rabbit_ct_client_helpers :close_connection (Conn ).
204+
205+ % % In AMQP 0.9.1 there can be at most one reply consumer per channel.
206+ failure_multiple_consumers (Config ) ->
207+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
208+ Consume = # 'basic.consume' {queue = ? REPLY_QUEUE ,
209+ no_ack = true },
210+ amqp_channel :subscribe (Ch , Consume , self ()),
211+ receive # 'basic.consume_ok' {} -> ok
212+ end ,
213+
214+ try amqp_channel :subscribe (Ch , Consume , self ()) of
215+ _ ->
216+ ct :fail (" expected second subscribe to fail" )
217+ catch exit :Reason ->
218+ ? assertMatch (
219+ {{_ , {_ , _ , <<" PRECONDITION_FAILED - reply consumer already set" >>}}, _ },
220+ Reason )
221+ end ,
222+ ok = rabbit_ct_client_helpers :close_connection (Conn ).
223+
224+ % % Reusing the same consumer tag should fail.
225+ failure_reuse_consumer_tag (Config ) ->
226+ {_ , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
227+ Ctag = <<" my-tag" >>,
228+
229+ # 'queue.declare_ok' {queue = Q } = amqp_channel :call (Ch , # 'queue.declare' {exclusive = true }),
230+ amqp_channel :subscribe (Ch , # 'basic.consume' {queue = Q ,
231+ consumer_tag = Ctag }, self ()),
232+ receive # 'basic.consume_ok' {} -> ok
233+ end ,
234+
235+ try amqp_channel :subscribe (Ch , # 'basic.consume' {queue = ? REPLY_QUEUE ,
236+ consumer_tag = Ctag ,
237+ no_ack = true }, self ()) of
238+ _ ->
239+ ct :fail (" expected reusing consumer tag to fail" )
240+ catch exit :Reason ->
241+ ? assertMatch (
242+ {{_ , {connection_closing ,
243+ {_ , _ , <<" NOT_ALLOWED - attempt to reuse consumer tag 'my-tag'" >>}
244+ }}, _ },
245+ Reason )
246+ end .
247+
248+ % % Publishing with reply_to header set but without consuming from the pseudo queue should fail.
249+ failure_publish (Config ) ->
250+ {Conn , Ch } = rabbit_ct_client_helpers :open_connection_and_channel (Config ),
251+
252+ Ref = monitor (process , Ch ),
253+ amqp_channel :cast (
254+ Ch ,
255+ # 'basic.publish' {routing_key = <<" some request queue" >>},
256+ # amqp_msg {props = # 'P_basic' {reply_to = ? REPLY_QUEUE ,
257+ correlation_id = <<" some correlation ID" >>},
258+ payload = <<" some payload" >>}),
259+
260+ receive {'DOWN' , Ref , process , Ch , Reason } ->
261+ ? assertMatch (
262+ {_ , {_ , _ , <<" PRECONDITION_FAILED - fast reply consumer does not exist" >>}},
263+ Reason )
264+ after ? TIMEOUT ->
265+ ct :fail (" expected channel error" )
266+ end ,
267+ ok = rabbit_ct_client_helpers :close_connection (Conn ).
268+
185269% % "new" and "old" refers to new and old RabbitMQ versions in mixed version tests.
186270rpc_new_to_old_node (Config ) ->
187271 rpc (0 , 1 , Config ).
@@ -190,36 +274,40 @@ rpc_old_to_new_node(Config) ->
190274 rpc (1 , 0 , Config ).
191275
192276rpc (RequesterNode , ResponderNode , Config ) ->
193- RequestQueue = <<" tests.amqpl_direct_reply_to.rpc.requests" >>,
194- % % This is the pseudo queue that is specially interpreted by RabbitMQ.
195- ReplyQueue = <<" amq.rabbitmq.reply-to" >>,
277+ RequestQueue = <<" request queue" >>,
196278 RequestPayload = <<" my request" >>,
197- ReplyPayload = <<" my reply" >>,
198279 CorrelationId = <<" my correlation ID" >>,
199280 RequesterCh = rabbit_ct_client_helpers :open_channel (Config , RequesterNode ),
200281 ResponderCh = rabbit_ct_client_helpers :open_channel (Config , ResponderNode ),
201282
202283 % % There is no need to declare this pseudo queue first.
203284 amqp_channel :subscribe (RequesterCh ,
204- # 'basic.consume' {queue = ReplyQueue ,
285+ # 'basic.consume' {queue = ? REPLY_QUEUE ,
205286 no_ack = true },
206287 self ()),
207288 CTag = receive # 'basic.consume_ok' {consumer_tag = CTag0 } -> CTag0
208289 end ,
290+
291+ ? assertEqual (# 'queue.declare_ok' {queue = ? REPLY_QUEUE ,
292+ message_count = 0 ,
293+ consumer_count = 1 },
294+ amqp_channel :call (RequesterCh ,
295+ # 'queue.declare' {queue = ? REPLY_QUEUE })),
296+
209297 # 'queue.declare_ok' {} = amqp_channel :call (
210298 RequesterCh ,
211299 # 'queue.declare' {queue = RequestQueue }),
212300 # 'confirm.select_ok' {} = amqp_channel :call (RequesterCh , # 'confirm.select' {}),
213301 amqp_channel :register_confirm_handler (RequesterCh , self ()),
302+
214303 % % Send the request.
215304 amqp_channel :cast (
216305 RequesterCh ,
217306 # 'basic.publish' {routing_key = RequestQueue },
218- # amqp_msg {props = # 'P_basic' {reply_to = ReplyQueue ,
307+ # amqp_msg {props = # 'P_basic' {reply_to = ? REPLY_QUEUE ,
219308 correlation_id = CorrelationId },
220309 payload = RequestPayload }),
221310 receive # 'basic.ack' {} -> ok
222- after ? TIMEOUT -> ct :fail (confirm_timeout )
223311 end ,
224312
225313 ok = wait_for_queue_declared (RequestQueue , ResponderNode , Config ),
@@ -229,20 +317,101 @@ rpc(RequesterNode, ResponderNode, Config) ->
229317 correlation_id = CorrelationId },
230318 payload = RequestPayload }
231319 } = amqp_channel :call (ResponderCh , # 'basic.get' {queue = RequestQueue }),
320+
321+ % % Test what the docs state:
322+ % % "If the RPC server is going to perform some expensive computation it might wish
323+ % % to check if the client has gone away. To do this the server can declare the
324+ % % generated reply name first on a disposable channel in order to determine whether
325+ % % it still exists."
326+ ? assertEqual (# 'queue.declare_ok' {queue = ReplyTo ,
327+ message_count = 0 ,
328+ consumer_count = 1 },
329+ amqp_channel :call (ResponderCh ,
330+ # 'queue.declare' {queue = ReplyTo })),
331+
232332 % % Send the reply.
233333 amqp_channel :cast (
234334 ResponderCh ,
235335 # 'basic.publish' {routing_key = ReplyTo },
236336 # amqp_msg {props = # 'P_basic' {correlation_id = CorrelationId },
237- payload = ReplyPayload }),
337+ payload = << " reply 1 " >> }),
238338
239- % % Receive the reply.
339+ % % Let's assume the RPC server sends multiple replies for a single request.
340+ % % (This is a bit unusual but should work.)
341+ amqp_channel :cast (
342+ ResponderCh ,
343+ # 'basic.publish' {routing_key = ReplyTo },
344+ # amqp_msg {props = # 'P_basic' {correlation_id = CorrelationId },
345+ payload = <<" reply 2" >>}),
346+
347+ % % Receive the frst reply.
348+ receive {# 'basic.deliver' {consumer_tag = CTag ,
349+ redelivered = false ,
350+ exchange = <<>>,
351+ routing_key = ReplyTo },
352+ # amqp_msg {payload = P1 ,
353+ props = # 'P_basic' {correlation_id = CorrelationId }}} ->
354+ ? assertEqual (<<" reply 1" >>, P1 )
355+ after ? TIMEOUT -> ct :fail ({missing_reply , ? LINE })
356+ end ,
357+
358+ % % Receive the second reply.
240359 receive {# 'basic.deliver' {consumer_tag = CTag },
241- # amqp_msg {payload = ReplyPayload ,
360+ # amqp_msg {payload = P2 ,
242361 props = # 'P_basic' {correlation_id = CorrelationId }}} ->
243- ok
244- after ? TIMEOUT -> ct :fail (missing_reply )
245- end .
362+ ? assertEqual (<<" reply 2" >>, P2 )
363+ after ? TIMEOUT -> ct :fail ({missing_reply , ? LINE })
364+ end ,
365+
366+ % % The requester sends a reply to itself.
367+ % % (Really odd, but should work.)
368+ amqp_channel :cast (
369+ RequesterCh ,
370+ # 'basic.publish' {routing_key = ReplyTo },
371+ # amqp_msg {props = # 'P_basic' {correlation_id = CorrelationId },
372+ payload = <<" reply 3" >>}),
373+
374+ receive {# 'basic.deliver' {consumer_tag = CTag },
375+ # amqp_msg {payload = P3 ,
376+ props = # 'P_basic' {correlation_id = CorrelationId }}} ->
377+ ? assertEqual (<<" reply 3" >>, P3 )
378+ after ? TIMEOUT -> ct :fail ({missing_reply , ? LINE })
379+ end ,
380+
381+ % % Requester cancels consumption.
382+ ? assertMatch (# 'basic.cancel_ok' {consumer_tag = CTag },
383+ amqp_channel :call (RequesterCh , # 'basic.cancel' {consumer_tag = CTag })),
384+
385+ % % Send a final reply.
386+ amqp_channel :cast (
387+ ResponderCh ,
388+ # 'basic.publish' {routing_key = ReplyTo },
389+ # amqp_msg {props = # 'P_basic' {correlation_id = CorrelationId },
390+ payload = <<" reply 4" >>}),
391+
392+ % % The final reply shouldn't be delivered since the requester cancelled consumption.
393+ receive {# 'basic.deliver' {}, # amqp_msg {}} ->
394+ ct :fail (" did not expect delivery after cancellation" )
395+ after 100 -> ok
396+ end ,
397+
398+ % % Responder checks again if the requester is still there.
399+ % % This time, the requester and its queue should be gone.
400+ try amqp_channel :call (ResponderCh , # 'queue.declare' {queue = ReplyTo }) of
401+ _ ->
402+ ct :fail (" expected queue.declare to fail" )
403+ catch exit :Reason ->
404+ ? assertMatch (
405+ {{_ , {_ , _ , <<" NOT_FOUND - no queue '" ,
406+ ReplyTo :(byte_size (ReplyTo ))/binary ,
407+ " ' in vhost '/'" >>}}, _ },
408+ Reason )
409+ end ,
410+
411+ % % Clean up.
412+ # 'queue.delete_ok' {} = amqp_channel :call (RequesterCh ,
413+ # 'queue.delete' {queue = RequestQueue }),
414+ ok = rabbit_ct_client_helpers :close_channel (RequesterCh ).
246415
247416wait_for_queue_declared (Queue , Node , Config ) ->
248417 eventually (
0 commit comments