From 6425a944c6712c6576d85d3c1de4c744ed677c04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 9 Jul 2025 18:09:59 +0200 Subject: [PATCH 01/15] feature_flags_SUITE: Fix style [Why] Several lines were crossing the 80-columns boundary, plus messages without a capital first letter. --- deps/rabbit/test/feature_flags_SUITE.erl | 35 ++++++++++++++---------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/deps/rabbit/test/feature_flags_SUITE.erl b/deps/rabbit/test/feature_flags_SUITE.erl index 5bbc840a495..ff9452ebe3b 100644 --- a/deps/rabbit/test/feature_flags_SUITE.erl +++ b/deps/rabbit/test/feature_flags_SUITE.erl @@ -2,7 +2,8 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2019-2025 Broadcom. All Rights Reserved. The term “Broadcom” +%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(feature_flags_SUITE). @@ -197,14 +198,15 @@ init_per_group(clustering, Config) -> {rmq_nodes_clustered, false}, {start_rmq_with_plugins_disabled, true}]), Config2 = rabbit_ct_helpers:merge_app_env( - Config1, {rabbit, [{forced_feature_flags_on_init, [ - restart_streams, - stream_sac_coordinator_unblock_group, - stream_update_config_command, - stream_filtering, - message_containers, - quorum_queue_non_voters - ]}]}), + Config1, {rabbit, [{forced_feature_flags_on_init, + [ + restart_streams, + stream_sac_coordinator_unblock_group, + stream_update_config_command, + stream_filtering, + message_containers, + quorum_queue_non_voters + ]}]}), rabbit_ct_helpers:run_setup_steps(Config2, [fun prepare_my_plugin/1]); init_per_group(activating_plugin, Config) -> Config1 = rabbit_ct_helpers:set_config( @@ -219,7 +221,8 @@ init_per_group(_, Config) -> end_per_group(_, Config) -> Config. -init_per_testcase(enable_feature_flag_when_ff_file_is_unwritable = Testcase, Config) -> +init_per_testcase( + enable_feature_flag_when_ff_file_is_unwritable = Testcase, Config) -> case erlang:system_info(otp_release) of "26" -> {skip, "Hits a crash in Mnesia fairly frequently"}; @@ -1284,11 +1287,13 @@ activating_plugin_with_new_ff_enabled(Config) -> ok. enable_plugin_feature_flag_after_deactivating_plugin(Config) -> - case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, 'rabbitmq_4.0.0') of + RabbitMQ40Enabled = rabbit_ct_broker_helpers:is_feature_flag_enabled( + Config, 'rabbitmq_4.0.0'), + case RabbitMQ40Enabled of true -> ok; false -> - throw({skip, "this test triggers a bug present in 3.13"}) + throw({skip, "This test triggers a bug present in 3.13"}) end, FFSubsysOk = is_feature_flag_subsystem_available(Config), @@ -1321,11 +1326,13 @@ enable_plugin_feature_flag_after_deactivating_plugin(Config) -> ok. restart_node_with_unknown_enabled_feature_flag(Config) -> - case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, 'rabbitmq_4.0.0') of + RabbitMQ40Enabled = rabbit_ct_broker_helpers:is_feature_flag_enabled( + Config, 'rabbitmq_4.0.0'), + case RabbitMQ40Enabled of true -> ok; false -> - throw({skip, "this test triggers a bug present in 3.13"}) + throw({skip, "This test triggers a bug present in 3.13"}) end, FFSubsysOk = is_feature_flag_subsystem_available(Config), From 804874282d37b5ab35856a02c3b0db12936f51cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 10 Jul 2025 11:59:26 +0200 Subject: [PATCH 02/15] quorum_queue_SUITE: Use Khepri fence before checking number of replicas [Why] When `wait_for_messages_ready/3` returns, we are sure that the replicas are in the expected state. However, the `#amqqueue{}` record is updated in Khepri, we don't know when all Khepri store members will be up-to-date. It can happen that `Server0` is not up-to-date when we query that record to get the list of replicass, leading to a test failure. [How] First, the check is moved to its own function is `queue_utils`. Then, if Khepri is being used, we use a Khepri fence to ensure previous operations were applied on the given server. This way, we get a consistent view of the `#amqqueue{}` record and thus the list of replicas. --- deps/rabbit/test/quorum_queue_SUITE.erl | 59 ++++++++------------ deps/rabbitmq_ct_helpers/src/queue_utils.erl | 23 +++++++- 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2ae9f23d406..5383a077d39 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1306,27 +1306,20 @@ force_shrink_member_to_current_member(Config) -> RaName = ra_name(QQ), rabbit_ct_client_helpers:publish(Ch, QQ, 3), wait_for_messages_ready([Server0], RaName, 3), - - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)), + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, QQ, 3), rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_shrink_member_to_current_member, [<<"/">>, QQ]), wait_for_messages_ready([Server0], RaName, 3), - - {ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes1} = amqqueue:get_type_state(Q1), - ?assertEqual(1, length(Nodes1)), + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, QQ, 1), %% grow queues back to all nodes [rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]], - - wait_for_messages_ready([Server0], RaName, 3), - {ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes2} = amqqueue:get_type_state(Q2), - ?assertEqual(3, length(Nodes2)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, QQ, 3) end. force_all_queues_shrink_member_to_current_member(Config) -> @@ -1351,9 +1344,8 @@ force_all_queues_shrink_member_to_current_member(Config) -> RaName = ra_name(Q), rabbit_ct_client_helpers:publish(Ch, Q, 3), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, Q, 3) end || Q <- QQs], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, @@ -1362,9 +1354,8 @@ force_all_queues_shrink_member_to_current_member(Config) -> [begin RaName = ra_name(Q), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(1, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, Q, 1) end || Q <- QQs], %% grow queues back to all nodes @@ -1373,9 +1364,8 @@ force_all_queues_shrink_member_to_current_member(Config) -> [begin RaName = ra_name(Q), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, Q, 3) end || Q <- QQs] end. @@ -1417,9 +1407,8 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> QQRes = rabbit_misc:r(VHost, queue, Q), {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 3) end || Q <- QQs, VHost <- VHosts], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, @@ -1429,11 +1418,13 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> QQRes = rabbit_misc:r(VHost, queue, Q), {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), case VHost of - VHost1 -> ?assertEqual(3, length(Nodes0)); - VHost2 -> ?assertEqual(1, length(Nodes0)) + VHost1 -> + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 3); + VHost2 -> + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 1) end end || Q <- QQs, VHost <- VHosts], @@ -1444,9 +1435,8 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> QQRes = rabbit_misc:r(VHost, queue, Q), {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 3) end || Q <- QQs, VHost <- VHosts] end. @@ -2946,9 +2936,8 @@ delete_member_member_already_deleted(Config) -> ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server2])), - {ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes} = amqqueue:get_type_state(Q), - ?assertEqual(1, length(Nodes)), + queue_utils:assert_number_of_replicas( + Config, Server, <<"/">>, QQ, 1), ok. delete_member_during_node_down(Config) -> diff --git a/deps/rabbitmq_ct_helpers/src/queue_utils.erl b/deps/rabbitmq_ct_helpers/src/queue_utils.erl index f72dba15456..d2c69792fde 100644 --- a/deps/rabbitmq_ct_helpers/src/queue_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/queue_utils.erl @@ -2,6 +2,8 @@ -include_lib("eunit/include/eunit.hrl"). +-include("include/rabbit_assert.hrl"). + -export([ wait_for_messages_ready/3, wait_for_messages_pending_ack/3, @@ -15,7 +17,8 @@ ra_name/1, ra_machines_use_same_version/3, wait_for_local_stream_member/4, - has_local_stream_member_rpc/1 + has_local_stream_member_rpc/1, + assert_number_of_replicas/5 ]). -define(WFM_SLEEP, 256). @@ -191,3 +194,21 @@ has_local_stream_member_rpc(QName) -> {error, _} -> false end. + +assert_number_of_replicas(Config, Server, VHost, QQ, Count) -> + _ = case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + khepri -> + rabbit_ct_broker_helpers:rpc( + Config, Server, rabbit_khepri, fence, [30000]); + mnesia -> + ok + end, + ?awaitMatch( + Count, + begin + {ok, Q} = rabbit_ct_broker_helpers:rpc( + Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]), + #{nodes := Nodes} = amqqueue:get_type_state(Q), + length(Nodes) + end, + 30000). From 3a3ea2a55910a282ea468d1572b5ba855a01cebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 10 Jul 2025 15:22:26 +0200 Subject: [PATCH 03/15] cluster_minority_SUITE: Ensure cluster can be changed before partition ... in `remove_node_when_seed_node_is_leader/1` and `remove_node_when_seed_node_is_follower/1`. [Why] The check was performed after the partition so far. It was incorrect because if a cluster change was not permitted at the time of the partition, it would not be afterwards. Thus there was a race condition here. [How] Now, the check is performed before the partition. Thanks to this new approach, we are sure of the state of node A and don't need the cass block near the end of the test cases. This should fix some test flakes we see locally and in CI. --- deps/rabbit/test/cluster_minority_SUITE.erl | 75 ++++++++++----------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/deps/rabbit/test/cluster_minority_SUITE.erl b/deps/rabbit/test/cluster_minority_SUITE.erl index cd9e9ebcc9d..e0d6b4e29a0 100644 --- a/deps/rabbit/test/cluster_minority_SUITE.erl +++ b/deps/rabbit/test/cluster_minority_SUITE.erl @@ -387,28 +387,26 @@ remove_node_when_seed_node_is_leader(Config) -> AMember = {rabbit_khepri:get_store_id(), A}, ra:transfer_leadership(AMember, AMember), clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + ct:pal("Waiting for cluster change permitted on node A"), + ?awaitMatch( + {ok, #{cluster_change_permitted := true, + leader_id := AMember}, AMember}, + rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + 60000), + {ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]), %% Minority partition: A partition_3_node_cluster(Config1), - Pong = ra:ping(AMember, 10000), - ct:pal("Member A state: ~0p", [Pong]), - case Pong of - {pong, leader} -> - ?awaitMatch( - {ok, #{cluster_change_permitted := true}, _}, - rabbit_ct_broker_helpers:rpc( - Config1, A, ra, member_overview, [AMember]), - 60000), - ?awaitMatch( - ok, - rabbit_control_helper:command( - forget_cluster_node, A, [atom_to_list(B)], []), - 60000); - Ret -> - ct:pal("A is not the expected leader: ~p", [Ret]), - {skip, "Node A was not a leader"} - end. + ?assertEqual({pong, leader}, ra:ping(AMember, 10000)), + ?awaitMatch( + ok, + rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + 60000). remove_node_when_seed_node_is_follower(Config) -> [A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs( @@ -418,36 +416,31 @@ remove_node_when_seed_node_is_follower(Config) -> Cluster = [A, B, C], Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), + AMember = {rabbit_khepri:get_store_id(), A}, CMember = {rabbit_khepri:get_store_id(), C}, ra:transfer_leadership(CMember, CMember), clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + ?awaitMatch( + {ok, #{cluster_change_permitted := true, + leader_id := CMember}, AMember}, + rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + 60000), + {ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]), %% Minority partition: A partition_3_node_cluster(Config1), - AMember = {rabbit_khepri:get_store_id(), A}, - Pong = ra:ping(AMember, 10000), - ct:pal("Member A state: ~0p", [Pong]), - case Pong of - {pong, State} - when State =:= follower orelse State =:= pre_vote -> - Ret = rabbit_control_helper:command( - forget_cluster_node, A, [atom_to_list(B)], []), - ?assertMatch({error, _, _}, Ret), - {error, _, Msg} = Ret, - ?assertEqual( - match, - re:run( - Msg, "Khepri cluster could be in minority", - [{capture, none}])); - {pong, await_condition} -> - Ret = rabbit_control_helper:command( - forget_cluster_node, A, [atom_to_list(B)], []), - ?assertMatch(ok, Ret); - Ret -> - ct:pal("A is not the expected leader: ~p", [Ret]), - {skip, "Node A was not a leader"} - end. + Ret = rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "Khepri cluster could be in minority", [{capture, none}])). enable_feature_flag(Config) -> [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), From fb6a03d5504fe8db17739b8ea1d185b75e17b70b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 10 Jul 2025 19:00:44 +0200 Subject: [PATCH 04/15] quorum_queue_SUITE: Use less messages in `force_checkpoint_on_queue` [Why] The default checkpoint interval is 16384. Therefore with 20,000 messages published by the testcase, there is a chance a checkpoint is created. This would hit an assertion in the testcase which expects no checkpoints before it forces the creation of one. We see this happening in CI. Not locally because the testcase runs fast enough. [How] The testcase now sends 10,000 messages. This is still a lot of messages while staying under the default checkpoint interval. --- deps/rabbit/test/quorum_queue_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 5383a077d39..8ed59776f2b 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1453,7 +1453,7 @@ force_checkpoint_on_queue(Config) -> ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - N = 20_000, + N = 10_000, rabbit_ct_client_helpers:publish(Ch, QQ, N), wait_for_messages_ready([Server0], RaName, N), From 5f15baaa694cb27207d005a89ea83a57f9ea84ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 11 Jul 2025 10:00:53 +0200 Subject: [PATCH 05/15] per_user_connection_channel_limit_SUITE: Fix test flake in `single_node_list_in_user` [Why] This was the only place where a condition was checked once after a connection close, instead of waiting for it to become true. This caused some transient failures in CI when the connection tracking took a bit of time to update and the check was performed before that. --- .../test/per_user_connection_channel_limit_SUITE.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl index 2cd00041070..db4f5bc3f63 100644 --- a/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl @@ -374,7 +374,13 @@ single_node_list_in_user(Config) -> [Conn4] = open_connections(Config, [{0, Username1}]), [_Chan4] = open_channels(Conn4, 1), close_connections([Conn4]), - [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + rabbit_ct_helpers:await_condition( + fun () -> + case connections_in(Config, Username1) of + [#tracked_connection{username = Username1}] -> true; + _ -> false + end + end), [#tracked_channel{username = Username1}] = channels_in(Config, Username1), [Conn5, Conn6] = open_connections(Config, [{0, Username2}, {0, Username2}]), From f2a70d2c1803dd77ac0d354443fce8009b4fb0ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 11 Jul 2025 10:20:00 +0200 Subject: [PATCH 06/15] queue_type_SUITE: Be explicit about connection open+close [Why] The tests relied on `rabbit_ct_client_helpers` connection and channel manager which doesn't seem to be robust. It causes more harm than helps so far. Hopefully, this will fix some test flakes in CI. --- deps/rabbit/test/queue_type_SUITE.erl | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index 6de4a29d2fc..b777ad3222d 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -111,7 +111,7 @@ end_per_testcase(Testcase, Config) -> smoke(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -191,7 +191,7 @@ smoke(Config) -> }, ProtocolQueueTypeCounters), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ?assertMatch( #{consumers := 0, @@ -202,7 +202,7 @@ smoke(Config) -> ack_after_queue_delete(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -223,12 +223,13 @@ ack_after_queue_delete(Config) -> after 1000 -> ok end, + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), flush(), ok. stream(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -238,7 +239,7 @@ stream(Config) -> publish_and_confirm(Ch, QName, <<"msg1">>), Args = [{<<"x-stream-offset">>, longstr, <<"last">>}], - SubCh = rabbit_ct_client_helpers:open_channel(Config, 2), + {SubConn, SubCh} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 2), qos(SubCh, 10, false), ok = queue_utils:wait_for_local_stream_member(2, <<"/">>, QName, Config), @@ -262,6 +263,8 @@ stream(Config) -> exit(Err) end, + ok = rabbit_ct_client_helpers:close_connection_and_channel(SubConn, SubCh), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok. From be84ef96b5172c0e4fcd5025eb08af511290986b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 11 Jul 2025 10:57:04 +0200 Subject: [PATCH 07/15] dynamic_SUITE: Be explicit about connection open+close [Why] The tests relied on `rabbit_ct_client_helpers` connection and channel manager which doesn't seem to be robust. It causes more harm than helps so far. Hopefully, this will fix some test flakes in CI. --- deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index aa1f34e3863..099fb1de9f3 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -871,16 +871,16 @@ dest_resource_alarm(AckMode, Config) -> %%---------------------------------------------------------------------------- with_ch(Config, Fun) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Fun(Ch), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), cleanup(Config), ok. with_newch(Config, Fun) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Fun(Ch), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok. publish(Ch, X, Key, Payload) when is_binary(Payload) -> From 60908d47c76a8b85cf959c924010e542aacc4534 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 11 Jul 2025 11:28:09 +0200 Subject: [PATCH 08/15] rabbit_prometheus_http_SUITE: Use another Erlang metric [Why] It looks like `erlang_vm_dist_node_queue_size_bytes` is not always present, even though other Erlang-specific metrics are present. [How] The goal is to ensure Erlang metrics are present in the output, so just use another one that is likely to be there. --- deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index e37db1296a8..7f99331d451 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -473,7 +473,7 @@ identity_info_test(Config) -> specific_erlang_metrics_present_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), - ?assertEqual(match, re:run(Body, "^erlang_vm_dist_node_queue_size_bytes{", [{capture, none}, multiline])). + ?assertEqual(match, re:run(Body, "^erlang_vm_dirty_io_schedulers ", [{capture, none}, multiline])). global_metrics_present_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), From c72946abaaf8f191b5c2e80fefa4c609d14d5168 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Mon, 14 Jul 2025 16:28:28 +0200 Subject: [PATCH 09/15] backing_queue_SUITE: Increase the restart time boundary [Why] ehie flaked today since the restart took 309ms, thus above the allowed 100ms (outside of CI, it takes single-digit ms) [How] Increase the allowed time but also significantly increase next_seq_id. This test exists because in the past we had an O(n) algorithm in CQ recovery, leading to a slow recovery of even empty queues, if they had a very large next_seq_id. Now that this operation is O(1), a much larger next_seq_id shouldn't affect the time it takes to run this test, while accidentally re-introducing an O(n) algorithm should fail this test consistently. --- deps/rabbit/test/backing_queue_SUITE.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 1871307bffd..d7a7c526b4f 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -1445,18 +1445,18 @@ variable_queue_restart_large_seq_id2(VQ0, QName) -> Terms = variable_queue_read_terms(QName), Count = proplists:get_value(next_seq_id, Terms), - %% set a very high next_seq_id as if 100M messages have been + %% set a very high next_seq_id as if 100 billion messages have been %% published and consumed - Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}), + Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000_000}), {TInit, VQ3} = timer:tc( fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end, millisecond), %% even with a very high next_seq_id start of an empty queue - %% should be quick (few milliseconds, but let's give it 100ms, to + %% should be quick (few milliseconds, but let's give it 500ms, to %% avoid flaking on slow servers) - {true, _} = {TInit < 100, TInit}, + {true, _} = {TInit < 500, TInit}, %% should be empty now true = rabbit_variable_queue:is_empty(VQ3), From b24a5cd5994b16f94232ce36babab610bf232d64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 15 Jul 2025 11:32:00 +0200 Subject: [PATCH 10/15] per_node_limit_SUITE: Wait for the channel count to be up-to-date [Why] In the `node_channel_limit` testcase, we open several channels and verify the count of opened channels in all places but one: after the first connection failure, when we try to open 3 channels. Opening 3 channels in a row might not be tracked in time to reject the third channel because the counter is updated asynchronously. [How] We simply wait for the counter to reach 5 before opening the third channel. We change all checks to use `?awaitMatch/3` in the process to be more robust with timing issues. --- deps/rabbit/test/per_node_limit_SUITE.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 33b4b466562..8c23e7c6b81 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). @@ -120,27 +121,28 @@ node_channel_limit(Config) -> ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), - 0 = count_channels_per_node(Config), + ?awaitMatch(0, count_channels_per_node(Config), 30000), lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1); (_) -> {ok,_ } = open_channel(Conn2) end, lists:seq(1, 5)), - 5 = count_channels_per_node(Config), + ?awaitMatch(5, count_channels_per_node(Config), 30000), %% In total 5 channels are open on this node, so a new one, regardless of %% connection, will not be allowed. It will terminate the connection with %% its channels too. So {error, not_allowed_crash} = open_channel(Conn2), - 3 = count_channels_per_node(Config), + ?awaitMatch(3, count_channels_per_node(Config), 30000), %% As the connection is dead, so are the 2 channels, so we should be able to %% create 2 more on Conn1 {ok , _} = open_channel(Conn1), {ok , _} = open_channel(Conn1), + ?awaitMatch(5, count_channels_per_node(Config), 30000), %% But not a third {error, not_allowed_crash} = open_channel(Conn1), %% Now all connections are closed, so there should be 0 open connections - 0 = count_channels_per_node(Config), + ?awaitMatch(0, count_channels_per_node(Config), 30000), close_all_connections([Conn1, Conn2]), rabbit_ct_broker_helpers:delete_vhost(Config, VHost), From 0308e0cce3149bdda500256b12a81154014601c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 15 Jul 2025 16:41:23 +0200 Subject: [PATCH 11/15] auth_SUITE: Wait for connection tracking to be up-to-date ... when testing vhost limits [Why] The tracking is aynchronous, thus the third MQTT connection might be opened before the tracking is up-to-date, which the testcase doesn't expect. --- deps/rabbitmq_mqtt/test/auth_SUITE.erl | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index 94c0af330b9..6c0fb9e2b8d 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). %% not defined in v3 -define(SUBACK_FAILURE, 16#80). @@ -1246,6 +1247,7 @@ vhost_connection_limit(Config) -> {ok, _} = emqtt:connect(C1), {ok, C2} = connect_anonymous(Config, <<"client2">>), {ok, _} = emqtt:connect(C2), + ?awaitMatch(2, count_connections_per_vhost(Config), 30000), {ok, C3} = connect_anonymous(Config, <<"client3">>), ExpectedError = expected_connection_limit_error(Config), unlink(C3), @@ -1254,6 +1256,13 @@ vhost_connection_limit(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, <<"/">>). +count_connections_per_vhost(Config) -> + NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 0), + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_connection_tracking, count_local_tracked_items_in_vhost, + [<<"/">>]). + vhost_queue_limit(Config) -> ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, <<"/">>, max_queues, 1), {ok, C} = connect_anonymous(Config), From 7766698ccb85d20c5abb4222fb43ef463eb12477 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 15 Jul 2025 17:13:29 +0200 Subject: [PATCH 12/15] java_SUITE: Add missing error handling --- deps/rabbitmq_mqtt/test/java_SUITE.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl index 1f5be1a256c..cf4473fd542 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl @@ -67,8 +67,13 @@ init_per_group(Group, Config0) -> [fun merge_app_env/1] ++ rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), - util:enable_plugin(Config1, rabbitmq_mqtt), - Config1. + case Config1 of + _ when is_list(Config1) -> + util:enable_plugin(Config1, rabbitmq_mqtt), + Config1; + {skip, _} -> + Config1 + end. end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps(Config, From 4696e3eda46717c83ec84e86b251e48e00961ebb Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 16 Jul 2025 12:52:21 +0200 Subject: [PATCH 13/15] v5_SUITE: session_upgrade_v3_v5_qos1 Prior to this commit, the following test case flaked: ``` make -C deps/rabbitmq_mqtt ct-v5 t=cluster_size_1:session_upgrade_v3_v5_qos1 ``` The test case failed with: ``` {v5_SUITE,session_upgrade_v3_v5_qos,1112} {test_case_failed,Received unexpected PUBLISH payload. Expected: <<"2">> Got: <<"1">>} ``` The broker logs showed: ``` 2025-07-15 15:50:23.914152+00:00 [debug] <0.758.0> MQTT accepting TCP connection <0.758.0> (127.0.0.1:38594 -> 127.0.0.1:27005) 2025-07-15 15:50:23.914289+00:00 [debug] <0.758.0> Received a CONNECT, client ID: session_upgrade_v3_v5_qos, username: undefined, clean start: false, protocol version: 3, keepalive: 60, property names: [] 2025-07-15 15:50:23.914403+00:00 [debug] <0.758.0> MQTT connection 127.0.0.1:38594 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost 2025-07-15 15:50:23.914480+00:00 [debug] <0.758.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal 2025-07-15 15:50:23.914641+00:00 [info] <0.758.0> Accepted MQTT connection 127.0.0.1:38594 -> 127.0.0.1:27005 for client ID session_upgrade_v3_v5_qos 2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> Received a SUBSCRIBE with subscription(s) [{mqtt_subscription, 2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> <<"session_upgrade_v3_v5_qos">>, 2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> {mqtt_subscription_opts,1,false, 2025-07-15 15:50:23.914977+00:00 [debug] <0.758.0> false,0,undefined}}] 2025-07-15 15:50:23.924503+00:00 [debug] <0.764.0> MQTT accepting TCP connection <0.764.0> (127.0.0.1:38608 -> 127.0.0.1:27005) 2025-07-15 15:50:23.924922+00:00 [debug] <0.764.0> Received a CONNECT, client ID: session_upgrade_v3_v5_qos, username: undefined, clean start: false, protocol version: 5, keepalive: 60, property names: [] 2025-07-15 15:50:23.925589+00:00 [error] <0.758.0> writing to MQTT socket #Port<0.63> failed: closed 2025-07-15 15:50:23.925635+00:00 [debug] <0.764.0> MQTT connection 127.0.0.1:38608 -> 127.0.0.1:27005 picked vhost using plugin_configuration_or_default_vhost 2025-07-15 15:50:23.925670+00:00 [info] <0.758.0> MQTT connection <<"127.0.0.1:38594 -> 127.0.0.1:27005">> will terminate because peer closed TCP connection 2025-07-15 15:50:23.925727+00:00 [debug] <0.764.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal 2025-07-15 15:50:24.000790+00:00 [info] <0.764.0> Accepted MQTT connection 127.0.0.1:38608 -> 127.0.0.1:27005 for client ID session_upgrade_v3_v5_qos 2025-07-15 15:50:24.016553+00:00 [warning] <0.764.0> MQTT disconnecting client <<"127.0.0.1:38608 -> 127.0.0.1:27005">> with client ID 'session_upgrade_v3_v5_qos', reason: normal ``` This shows evidence that the MQTT server connection did not process the DISCONNECT packet. The hypothesis is that the server connection did not even process the PUBACK packet from the client. Hence, the first message got requeued and re-delivered to the new v5 client. This commit fixes this flake by not acking the first message. Hence, we always expect that the first message will be redelivered to the new v5 client. --- deps/rabbitmq_mqtt/test/v5_SUITE.erl | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 87483af840f..0a9f12db6f2 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -1079,12 +1079,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> {ok, _, [Qos]} = emqtt:subscribe(Subv3, Topic, Qos), Sender = spawn_link(?MODULE, send, [self(), Pub, Topic, 0]), receive {publish, #{payload := <<"1">>, - client_pid := Subv3, - packet_id := PacketId}} -> - case Qos of - 0 -> ok; - 1 -> emqtt:puback(Subv3, PacketId) - end + client_pid := Subv3}} -> ok after ?TIMEOUT -> ct:fail("did not receive 1") end, %% Upgrade session from v3 to v5 while another client is sending messages. @@ -1108,7 +1103,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> 0 -> assert_received_no_duplicates(); 1 -> - ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(2, NumSent - 1)], + ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(1, NumSent - 1)], ok = expect_publishes(Subv5, Topic, ExpectedPayloads) end, ok = emqtt:disconnect(Pub), From 63d06c15c42ef9eac863eb07d33143968e7f2b88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 16 Jul 2025 13:35:18 +0200 Subject: [PATCH 14/15] amqp_client_SUITE: Trim "list_connections" output before parsing it [Why] Sometimes, at least in CI, it looks like the output of the CLI is prepended with a newline, sometimes not. This breaks the check of that output. [How] We just trim the output before parsing it. The parsing already takes care of trimming internal whitespaces. --- deps/rabbit/test/amqp_client_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 99b1ab64906..455da467f45 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -4087,7 +4087,7 @@ list_connections(Config) -> %% CLI should list AMQP 1.0 container-id {ok, StdOut1} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "container_id"]), - ContainerIds0 = re:split(StdOut1, <<"\n">>, [trim]), + ContainerIds0 = re:split(string:trim(StdOut1), <<"\n">>, [trim]), ContainerIds = lists:sort(ContainerIds0), ?assertEqual([<<>>, ContainerId0, ContainerId2], ContainerIds), From 87fcd0bc47037026377f3ded970040da5c97bd17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 16 Jul 2025 14:59:07 +0200 Subject: [PATCH 15/15] metrics_SUITE: Wait for ETS table to be up-to-date ... in several test cases. [Why] In CI or any slow and/or busy environment, it may take time for the ETS tables to ge updated. --- deps/rabbit/test/metrics_SUITE.erl | 54 +++++++++++++++--------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/deps/rabbit/test/metrics_SUITE.erl b/deps/rabbit/test/metrics_SUITE.erl index 202a808bc83..c56d188c9d6 100644 --- a/deps/rabbit/test/metrics_SUITE.erl +++ b/deps/rabbit/test/metrics_SUITE.erl @@ -301,9 +301,9 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) -> connection(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - [_] = read_table_rpc(Config, connection_created), - [_] = read_table_rpc(Config, connection_metrics), - [_] = read_table_rpc(Config, connection_coarse_metrics), + ?awaitMatch([_], read_table_rpc(Config, connection_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, connection_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, connection_coarse_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), force_metric_gc(Config), ?awaitMatch([], read_table_rpc(Config, connection_created), @@ -317,25 +317,25 @@ connection(Config) -> channel(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Chan} = amqp_connection:open_channel(Conn), - [_] = read_table_rpc(Config, channel_created), - [_] = read_table_rpc(Config, channel_metrics), - [_] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000), ok = amqp_channel:close(Chan), - [] = read_table_rpc(Config, channel_created), - [] = read_table_rpc(Config, channel_metrics), - [] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn). channel_connection_close(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, _} = amqp_connection:open_channel(Conn), - [_] = read_table_rpc(Config, channel_created), - [_] = read_table_rpc(Config, channel_metrics), - [_] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), - [] = read_table_rpc(Config, channel_created), - [] = read_table_rpc(Config, channel_metrics), - [] = read_table_rpc(Config, channel_process_metrics). + ?awaitMatch([], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000). channel_queue_delete_queue(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -344,14 +344,14 @@ channel_queue_delete_queue(Config) -> ensure_exchange_metrics_populated(Chan, Queue), ensure_channel_queue_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_queue_metrics), - [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), delete_queue(Chan, Queue), force_metric_gc(Config), % ensure removal of queue cleans up channel_queue metrics - [] = read_table_rpc(Config, channel_queue_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_metrics), + ?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), ok. @@ -362,26 +362,26 @@ channel_queue_exchange_consumer_close_connection(Config) -> ensure_exchange_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_exchange_metrics), - [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_exchange_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), ensure_channel_queue_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_queue_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000), Sub = #'basic.consume'{queue = Queue}, #'basic.consume_ok'{consumer_tag = _} = amqp_channel:call(Chan, Sub), - [_] = read_table_rpc(Config, consumer_created), + ?awaitMatch([_], read_table_rpc(Config, consumer_created), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), % ensure cleanup happened force_metric_gc(Config), - [] = read_table_rpc(Config, channel_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_metrics), - [] = read_table_rpc(Config, consumer_created), + ?awaitMatch([], read_table_rpc(Config, channel_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, consumer_created), 30000), ok.