@@ -347,21 +347,32 @@ basic_roundtrip_ibmmq(Config) ->
347
347
Hostname = ? config (rmq_hostname , Config ),
348
348
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
349
349
OpenConf = #{address => Hostname , port => Port , sasl => ? config (sasl , Config )},
350
- roundtrip (OpenConf , [{body , <<" banana" >>}, {destination , <<" DEV.QUEUE.3" >>}]).
350
+ roundtrip (OpenConf , [
351
+ {body , <<" banana" >>},
352
+ {destination , <<" DEV.QUEUE.3" >>},
353
+ {sender_capabilities , <<" queue" >>},
354
+ {receiver_capabilities , <<" queue" >>},
355
+ {message_annotations , #{}}
356
+ ]).
351
357
352
358
roundtrip (OpenConf ) ->
353
359
roundtrip (OpenConf , []).
354
360
355
361
roundtrip (OpenConf , Args ) ->
356
362
Body = proplists :get_value (body , Args , <<" banana" >>),
357
363
Destination = proplists :get_value (destination , Args , <<" test1" >>),
364
+ SenderCapabilities = proplists :get_value (sender_capabilities , Args , <<>>),
365
+ ReceiverCapabilities = proplists :get_value (receiver_capabilities , Args , <<>>),
366
+ MessageAnnotations = proplists :get_value (message_annotations , Args ,
367
+ #{<<" x-key" >> => <<" x-value" >>,
368
+ <<" x_key" >> => <<" x_value" >>}),
369
+
358
370
{ok , Connection } = amqp10_client :open_connection (OpenConf ),
359
371
{ok , Session } = amqp10_client :begin_session (Connection ),
360
- ct :log (" Session attached " ),
361
- SenderAttachArgs = #{name => <<" banana-sender:DEV.QUEUE.3" >>,
372
+ SenderAttachArgs = #{name => <<" banana-sender" >>,
362
373
role => {sender , #{address => Destination ,
363
- durable => unsettled_state ,
364
- capabilities => << " queue " >> }},
374
+ durable => unsettled_state ,
375
+ capabilities => SenderCapabilities }},
365
376
snd_settle_mode => settled ,
366
377
rcv_settle_mode => first ,
367
378
filter => #{},
@@ -370,7 +381,6 @@ roundtrip(OpenConf, Args) ->
370
381
{ok , Sender } = amqp10_client :attach_link (Session , SenderAttachArgs ),
371
382
% %await_link(Sender, credited, link_credit_timeout),
372
383
await_link (Sender , attached , attached_timeout ),
373
- ct :log (" Sender attached " ),
374
384
375
385
Now = os :system_time (millisecond ),
376
386
Props = #{content_encoding => <<" my content encoding" >>,
@@ -379,13 +389,12 @@ roundtrip(OpenConf, Args) ->
379
389
creation_time => Now ,
380
390
group_id => <<" my group ID" >>,
381
391
message_id => <<" my message ID" >>,
382
- to => Destination
392
+ to => << " localhost " >>
383
393
},
384
394
Msg0 = amqp10_msg :new (<<" my-tag" >>, Body , true ),
385
395
Msg1 = amqp10_msg :set_application_properties (#{" a_key" => " a_value" }, Msg0 ),
386
396
Msg2 = amqp10_msg :set_properties (Props , Msg1 ),
387
- Msg = amqp10_msg :set_message_annotations (#{<<" x-key" >> => " x-value" ,
388
- <<" x_key" >> => " x_value" }, Msg2 ),
397
+ Msg = amqp10_msg :set_message_annotations (MessageAnnotations , Msg2 ),
389
398
ok = amqp10_client :send_msg (Sender , Msg ),
390
399
ok = amqp10_client :detach_link (Sender ),
391
400
await_link (Sender , {detached , normal }, link_detach_timeout ),
@@ -395,24 +404,25 @@ roundtrip(OpenConf, Args) ->
395
404
name => <<" banana-receiver" >>,
396
405
role => {receiver , #{address => Destination ,
397
406
durable => unsettled_state ,
398
- capabilities => << " queue " >> }, self ()},
407
+ capabilities => ReceiverCapabilities }, self ()},
399
408
snd_settle_mode => settled ,
400
409
rcv_settle_mode => first ,
401
410
filter => #{},
402
411
properties => #{}
403
412
},
404
413
{ok , Receiver } = amqp10_client :attach_link (Session , ReceiverAttachArgs ),
405
- {ok , OutMsg } = amqp10_client :get_msg (Receiver , 4_000 ),
414
+ {ok , OutMsg } = amqp10_client :get_msg (Receiver , 4 * 60_000 ),
406
415
ok = amqp10_client :end_session (Session ),
407
416
ok = amqp10_client :close_connection (Connection ),
408
417
409
418
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
410
419
ActualProps = amqp10_msg :properties (OutMsg ),
411
- [ ? assertMatch (V , maps :get (K , ActualProps )) || K := V <- Props , K =/= creation_time ],
420
+ [ ? assertEqual (V , maps :get (K , ActualProps )) || K := V <- Props , K =/= creation_time ],
412
421
413
422
? assertEqual (#{<<" a_key" >> => <<" a_value" >>}, amqp10_msg :application_properties (OutMsg )),
414
- ? assertMatch (#{<<" x-key" >> := <<" x-value" >>,
415
- <<" x_key" >> := <<" x_value" >>}, amqp10_msg :message_annotations (OutMsg )),
423
+ ActualMessageAnnotations = amqp10_msg :message_annotations (OutMsg ),
424
+ [ ? assertEqual (V , maps :get (K , ActualMessageAnnotations )) || K := V <- MessageAnnotations ],
425
+
416
426
? assertEqual ([Body ], amqp10_msg :body (OutMsg )),
417
427
ok .
418
428
@@ -455,12 +465,14 @@ filtered_roundtrip(OpenConf, Args) ->
455
465
456
466
ok = amqp10_client :send_msg (Sender , Msg2 ),
457
467
468
+ SelectorFilter = #{<<" apache.org:selector-filter:string" >> =>
469
+ <<" amqp.annotation.x-opt-enqueuedtimeutc > " , Now2Binary /binary >>},
458
470
{ok , FilteredReceiver } = amqp10_client :attach_receiver_link (Session ,
459
471
<<" filtered-receiver" >>,
460
472
Destination ,
461
473
settled ,
462
474
unsettled_state ,
463
- #{<< " apache.org:selector-filter:string " >> => << " amqp.annotation.x-opt-enqueuedtimeutc > " , Now2Binary / binary >>} ),
475
+ SelectorFilter ),
464
476
465
477
{ok , OutMsg2 } = amqp10_client :get_msg (DefaultReceiver , 60_000 * 4 ),
466
478
? assertEqual (<<" msg-2-tag" >>, amqp10_msg :delivery_tag (OutMsg2 )),
0 commit comments