diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 201fc99125d..d9386c137d0 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -4087,7 +4087,7 @@ list_connections(Config) -> %% CLI should list AMQP 1.0 container-id {ok, StdOut1} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "container_id"]), - ContainerIds0 = re:split(StdOut1, <<"\n">>, [trim]), + ContainerIds0 = re:split(string:trim(StdOut1), <<"\n">>, [trim]), ContainerIds = lists:sort(ContainerIds0), ?assertEqual([<<>>, ContainerId0, ContainerId2], ContainerIds), @@ -4749,7 +4749,7 @@ idle_time_out_on_server(Config) -> ct:fail({missing_event, ?LINE}) end after - ?assert(rpc(Config, meck, validate, [Mod])), + _ = rpc(Config, meck, validate, [Mod]), ok = rpc(Config, meck, unload, [Mod]), ok = rpc(Config, application, set_env, [App, Par, DefaultVal]) end. diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 1871307bffd..d7a7c526b4f 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -1445,18 +1445,18 @@ variable_queue_restart_large_seq_id2(VQ0, QName) -> Terms = variable_queue_read_terms(QName), Count = proplists:get_value(next_seq_id, Terms), - %% set a very high next_seq_id as if 100M messages have been + %% set a very high next_seq_id as if 100 billion messages have been %% published and consumed - Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000}), + Terms2 = lists:keyreplace(next_seq_id, 1, Terms, {next_seq_id, 100_000_000_000}), {TInit, VQ3} = timer:tc( fun() -> variable_queue_init(test_amqqueue(QName, true), Terms2) end, millisecond), %% even with a very high next_seq_id start of an empty queue - %% should be quick (few milliseconds, but let's give it 100ms, to + %% should be quick (few milliseconds, but let's give it 500ms, to %% avoid flaking on slow servers) - {true, _} = {TInit < 100, TInit}, + {true, _} = {TInit < 500, TInit}, %% should be empty now true = rabbit_variable_queue:is_empty(VQ3), diff --git a/deps/rabbit/test/cluster_minority_SUITE.erl b/deps/rabbit/test/cluster_minority_SUITE.erl index cd9e9ebcc9d..e0d6b4e29a0 100644 --- a/deps/rabbit/test/cluster_minority_SUITE.erl +++ b/deps/rabbit/test/cluster_minority_SUITE.erl @@ -387,28 +387,26 @@ remove_node_when_seed_node_is_leader(Config) -> AMember = {rabbit_khepri:get_store_id(), A}, ra:transfer_leadership(AMember, AMember), clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + ct:pal("Waiting for cluster change permitted on node A"), + ?awaitMatch( + {ok, #{cluster_change_permitted := true, + leader_id := AMember}, AMember}, + rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + 60000), + {ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]), %% Minority partition: A partition_3_node_cluster(Config1), - Pong = ra:ping(AMember, 10000), - ct:pal("Member A state: ~0p", [Pong]), - case Pong of - {pong, leader} -> - ?awaitMatch( - {ok, #{cluster_change_permitted := true}, _}, - rabbit_ct_broker_helpers:rpc( - Config1, A, ra, member_overview, [AMember]), - 60000), - ?awaitMatch( - ok, - rabbit_control_helper:command( - forget_cluster_node, A, [atom_to_list(B)], []), - 60000); - Ret -> - ct:pal("A is not the expected leader: ~p", [Ret]), - {skip, "Node A was not a leader"} - end. + ?assertEqual({pong, leader}, ra:ping(AMember, 10000)), + ?awaitMatch( + ok, + rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + 60000). remove_node_when_seed_node_is_follower(Config) -> [A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs( @@ -418,36 +416,31 @@ remove_node_when_seed_node_is_follower(Config) -> Cluster = [A, B, C], Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), + AMember = {rabbit_khepri:get_store_id(), A}, CMember = {rabbit_khepri:get_store_id(), C}, ra:transfer_leadership(CMember, CMember), clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + ?awaitMatch( + {ok, #{cluster_change_permitted := true, + leader_id := CMember}, AMember}, + rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + 60000), + {ok, Overview, AMember} = rabbit_ct_broker_helpers:rpc( + Config1, A, ra, member_overview, [AMember]), + ct:pal("Member A overview: ~p", [maps:remove(machine, Overview)]), %% Minority partition: A partition_3_node_cluster(Config1), - AMember = {rabbit_khepri:get_store_id(), A}, - Pong = ra:ping(AMember, 10000), - ct:pal("Member A state: ~0p", [Pong]), - case Pong of - {pong, State} - when State =:= follower orelse State =:= pre_vote -> - Ret = rabbit_control_helper:command( - forget_cluster_node, A, [atom_to_list(B)], []), - ?assertMatch({error, _, _}, Ret), - {error, _, Msg} = Ret, - ?assertEqual( - match, - re:run( - Msg, "Khepri cluster could be in minority", - [{capture, none}])); - {pong, await_condition} -> - Ret = rabbit_control_helper:command( - forget_cluster_node, A, [atom_to_list(B)], []), - ?assertMatch(ok, Ret); - Ret -> - ct:pal("A is not the expected leader: ~p", [Ret]), - {skip, "Node A was not a leader"} - end. + Ret = rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "Khepri cluster could be in minority", [{capture, none}])). enable_feature_flag(Config) -> [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbit/test/feature_flags_SUITE.erl b/deps/rabbit/test/feature_flags_SUITE.erl index 5bbc840a495..ff9452ebe3b 100644 --- a/deps/rabbit/test/feature_flags_SUITE.erl +++ b/deps/rabbit/test/feature_flags_SUITE.erl @@ -2,7 +2,8 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2019-2025 Broadcom. All Rights Reserved. The term “Broadcom” +%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(feature_flags_SUITE). @@ -197,14 +198,15 @@ init_per_group(clustering, Config) -> {rmq_nodes_clustered, false}, {start_rmq_with_plugins_disabled, true}]), Config2 = rabbit_ct_helpers:merge_app_env( - Config1, {rabbit, [{forced_feature_flags_on_init, [ - restart_streams, - stream_sac_coordinator_unblock_group, - stream_update_config_command, - stream_filtering, - message_containers, - quorum_queue_non_voters - ]}]}), + Config1, {rabbit, [{forced_feature_flags_on_init, + [ + restart_streams, + stream_sac_coordinator_unblock_group, + stream_update_config_command, + stream_filtering, + message_containers, + quorum_queue_non_voters + ]}]}), rabbit_ct_helpers:run_setup_steps(Config2, [fun prepare_my_plugin/1]); init_per_group(activating_plugin, Config) -> Config1 = rabbit_ct_helpers:set_config( @@ -219,7 +221,8 @@ init_per_group(_, Config) -> end_per_group(_, Config) -> Config. -init_per_testcase(enable_feature_flag_when_ff_file_is_unwritable = Testcase, Config) -> +init_per_testcase( + enable_feature_flag_when_ff_file_is_unwritable = Testcase, Config) -> case erlang:system_info(otp_release) of "26" -> {skip, "Hits a crash in Mnesia fairly frequently"}; @@ -1284,11 +1287,13 @@ activating_plugin_with_new_ff_enabled(Config) -> ok. enable_plugin_feature_flag_after_deactivating_plugin(Config) -> - case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, 'rabbitmq_4.0.0') of + RabbitMQ40Enabled = rabbit_ct_broker_helpers:is_feature_flag_enabled( + Config, 'rabbitmq_4.0.0'), + case RabbitMQ40Enabled of true -> ok; false -> - throw({skip, "this test triggers a bug present in 3.13"}) + throw({skip, "This test triggers a bug present in 3.13"}) end, FFSubsysOk = is_feature_flag_subsystem_available(Config), @@ -1321,11 +1326,13 @@ enable_plugin_feature_flag_after_deactivating_plugin(Config) -> ok. restart_node_with_unknown_enabled_feature_flag(Config) -> - case rabbit_ct_broker_helpers:is_feature_flag_enabled(Config, 'rabbitmq_4.0.0') of + RabbitMQ40Enabled = rabbit_ct_broker_helpers:is_feature_flag_enabled( + Config, 'rabbitmq_4.0.0'), + case RabbitMQ40Enabled of true -> ok; false -> - throw({skip, "this test triggers a bug present in 3.13"}) + throw({skip, "This test triggers a bug present in 3.13"}) end, FFSubsysOk = is_feature_flag_subsystem_available(Config), diff --git a/deps/rabbit/test/metrics_SUITE.erl b/deps/rabbit/test/metrics_SUITE.erl index 202a808bc83..c56d188c9d6 100644 --- a/deps/rabbit/test/metrics_SUITE.erl +++ b/deps/rabbit/test/metrics_SUITE.erl @@ -301,9 +301,9 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) -> connection(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - [_] = read_table_rpc(Config, connection_created), - [_] = read_table_rpc(Config, connection_metrics), - [_] = read_table_rpc(Config, connection_coarse_metrics), + ?awaitMatch([_], read_table_rpc(Config, connection_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, connection_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, connection_coarse_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), force_metric_gc(Config), ?awaitMatch([], read_table_rpc(Config, connection_created), @@ -317,25 +317,25 @@ connection(Config) -> channel(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Chan} = amqp_connection:open_channel(Conn), - [_] = read_table_rpc(Config, channel_created), - [_] = read_table_rpc(Config, channel_metrics), - [_] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000), ok = amqp_channel:close(Chan), - [] = read_table_rpc(Config, channel_created), - [] = read_table_rpc(Config, channel_metrics), - [] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn). channel_connection_close(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, _} = amqp_connection:open_channel(Conn), - [_] = read_table_rpc(Config, channel_created), - [_] = read_table_rpc(Config, channel_metrics), - [_] = read_table_rpc(Config, channel_process_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_process_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), - [] = read_table_rpc(Config, channel_created), - [] = read_table_rpc(Config, channel_metrics), - [] = read_table_rpc(Config, channel_process_metrics). + ?awaitMatch([], read_table_rpc(Config, channel_created), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_process_metrics), 30000). channel_queue_delete_queue(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -344,14 +344,14 @@ channel_queue_delete_queue(Config) -> ensure_exchange_metrics_populated(Chan, Queue), ensure_channel_queue_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_queue_metrics), - [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), delete_queue(Chan, Queue), force_metric_gc(Config), % ensure removal of queue cleans up channel_queue metrics - [] = read_table_rpc(Config, channel_queue_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_metrics), + ?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), ok. @@ -362,26 +362,26 @@ channel_queue_exchange_consumer_close_connection(Config) -> ensure_exchange_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_exchange_metrics), - [_] = read_table_rpc(Config, channel_queue_exchange_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_exchange_metrics), 30000), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), ensure_channel_queue_metrics_populated(Chan, Queue), force_channel_stats(Config), - [_] = read_table_rpc(Config, channel_queue_metrics), + ?awaitMatch([_], read_table_rpc(Config, channel_queue_metrics), 30000), Sub = #'basic.consume'{queue = Queue}, #'basic.consume_ok'{consumer_tag = _} = amqp_channel:call(Chan, Sub), - [_] = read_table_rpc(Config, consumer_created), + ?awaitMatch([_], read_table_rpc(Config, consumer_created), 30000), ok = rabbit_ct_client_helpers:close_connection(Conn), % ensure cleanup happened force_metric_gc(Config), - [] = read_table_rpc(Config, channel_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_exchange_metrics), - [] = read_table_rpc(Config, channel_queue_metrics), - [] = read_table_rpc(Config, consumer_created), + ?awaitMatch([], read_table_rpc(Config, channel_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_exchange_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, channel_queue_metrics), 30000), + ?awaitMatch([], read_table_rpc(Config, consumer_created), 30000), ok. diff --git a/deps/rabbit/test/per_node_limit_SUITE.erl b/deps/rabbit/test/per_node_limit_SUITE.erl index 33b4b466562..8c23e7c6b81 100644 --- a/deps/rabbit/test/per_node_limit_SUITE.erl +++ b/deps/rabbit/test/per_node_limit_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile(export_all). @@ -120,27 +121,28 @@ node_channel_limit(Config) -> ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost), - 0 = count_channels_per_node(Config), + ?awaitMatch(0, count_channels_per_node(Config), 30000), lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1); (_) -> {ok,_ } = open_channel(Conn2) end, lists:seq(1, 5)), - 5 = count_channels_per_node(Config), + ?awaitMatch(5, count_channels_per_node(Config), 30000), %% In total 5 channels are open on this node, so a new one, regardless of %% connection, will not be allowed. It will terminate the connection with %% its channels too. So {error, not_allowed_crash} = open_channel(Conn2), - 3 = count_channels_per_node(Config), + ?awaitMatch(3, count_channels_per_node(Config), 30000), %% As the connection is dead, so are the 2 channels, so we should be able to %% create 2 more on Conn1 {ok , _} = open_channel(Conn1), {ok , _} = open_channel(Conn1), + ?awaitMatch(5, count_channels_per_node(Config), 30000), %% But not a third {error, not_allowed_crash} = open_channel(Conn1), %% Now all connections are closed, so there should be 0 open connections - 0 = count_channels_per_node(Config), + ?awaitMatch(0, count_channels_per_node(Config), 30000), close_all_connections([Conn1, Conn2]), rabbit_ct_broker_helpers:delete_vhost(Config, VHost), diff --git a/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl b/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl index 2cd00041070..db4f5bc3f63 100644 --- a/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl +++ b/deps/rabbit/test/per_user_connection_channel_limit_SUITE.erl @@ -374,7 +374,13 @@ single_node_list_in_user(Config) -> [Conn4] = open_connections(Config, [{0, Username1}]), [_Chan4] = open_channels(Conn4, 1), close_connections([Conn4]), - [#tracked_connection{username = Username1}] = connections_in(Config, Username1), + rabbit_ct_helpers:await_condition( + fun () -> + case connections_in(Config, Username1) of + [#tracked_connection{username = Username1}] -> true; + _ -> false + end + end), [#tracked_channel{username = Username1}] = channels_in(Config, Username1), [Conn5, Conn6] = open_connections(Config, [{0, Username2}, {0, Username2}]), diff --git a/deps/rabbit/test/queue_type_SUITE.erl b/deps/rabbit/test/queue_type_SUITE.erl index bbd4c6fc15c..9519dec56f8 100644 --- a/deps/rabbit/test/queue_type_SUITE.erl +++ b/deps/rabbit/test/queue_type_SUITE.erl @@ -111,7 +111,7 @@ end_per_testcase(Testcase, Config) -> smoke(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -191,7 +191,7 @@ smoke(Config) -> }, ProtocolQueueTypeCounters), - ok = rabbit_ct_client_helpers:close_channel(Ch), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ?assertMatch( #{consumers := 0, @@ -202,7 +202,7 @@ smoke(Config) -> ack_after_queue_delete(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -223,12 +223,13 @@ ack_after_queue_delete(Config) -> after 1000 -> ok end, + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), flush(), ok. stream(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), QName = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, @@ -238,7 +239,7 @@ stream(Config) -> publish_and_confirm(Ch, QName, <<"msg1">>), Args = [{<<"x-stream-offset">>, longstr, <<"last">>}], - SubCh = rabbit_ct_client_helpers:open_channel(Config, 2), + {SubConn, SubCh} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 2), qos(SubCh, 10, false), ok = queue_utils:wait_for_local_stream_member(2, <<"/">>, QName, Config), @@ -262,6 +263,8 @@ stream(Config) -> exit(Err) end, + ok = rabbit_ct_client_helpers:close_connection_and_channel(SubConn, SubCh), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index a2c19569425..dbf6d8a821c 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1306,27 +1306,20 @@ force_shrink_member_to_current_member(Config) -> RaName = ra_name(QQ), rabbit_ct_client_helpers:publish(Ch, QQ, 3), wait_for_messages_ready([Server0], RaName, 3), - - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)), + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, QQ, 3), rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_shrink_member_to_current_member, [<<"/">>, QQ]), wait_for_messages_ready([Server0], RaName, 3), - - {ok, Q1} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes1} = amqqueue:get_type_state(Q1), - ?assertEqual(1, length(Nodes1)), + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, QQ, 1), %% grow queues back to all nodes [rpc:call(Server0, rabbit_quorum_queue, grow, [S, <<"/">>, <<".*">>, all]) || S <- [Server1, Server2]], - - wait_for_messages_ready([Server0], RaName, 3), - {ok, Q2} = rpc:call(Server0, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes2} = amqqueue:get_type_state(Q2), - ?assertEqual(3, length(Nodes2)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, QQ, 3) end. force_all_queues_shrink_member_to_current_member(Config) -> @@ -1351,9 +1344,8 @@ force_all_queues_shrink_member_to_current_member(Config) -> RaName = ra_name(Q), rabbit_ct_client_helpers:publish(Ch, Q, 3), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, Q, 3) end || Q <- QQs], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, @@ -1362,9 +1354,8 @@ force_all_queues_shrink_member_to_current_member(Config) -> [begin RaName = ra_name(Q), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(1, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, Q, 1) end || Q <- QQs], %% grow queues back to all nodes @@ -1373,9 +1364,8 @@ force_all_queues_shrink_member_to_current_member(Config) -> [begin RaName = ra_name(Q), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, <<"/">>]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, <<"/">>, Q, 3) end || Q <- QQs] end. @@ -1417,9 +1407,8 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> QQRes = rabbit_misc:r(VHost, queue, Q), {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 3) end || Q <- QQs, VHost <- VHosts], rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, @@ -1429,11 +1418,13 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> QQRes = rabbit_misc:r(VHost, queue, Q), {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), case VHost of - VHost1 -> ?assertEqual(3, length(Nodes0)); - VHost2 -> ?assertEqual(1, length(Nodes0)) + VHost1 -> + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 3); + VHost2 -> + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 1) end end || Q <- QQs, VHost <- VHosts], @@ -1444,9 +1435,8 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> QQRes = rabbit_misc:r(VHost, queue, Q), {ok, RaName} = rpc:call(Server0, rabbit_queue_type_util, qname_to_internal_name, [QQRes]), wait_for_messages_ready([Server0], RaName, 3), - {ok, Q0} = rpc:call(Server0, rabbit_amqqueue, lookup, [Q, VHost]), - #{nodes := Nodes0} = amqqueue:get_type_state(Q0), - ?assertEqual(3, length(Nodes0)) + queue_utils:assert_number_of_replicas( + Config, Server0, VHost, Q, 3) end || Q <- QQs, VHost <- VHosts] end. @@ -1463,7 +1453,7 @@ force_checkpoint_on_queue(Config) -> ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - N = 20_000, + N = 10_000, rabbit_ct_client_helpers:publish(Ch, QQ, N), wait_for_messages_ready([Server0], RaName, N), @@ -2946,9 +2936,8 @@ delete_member_member_already_deleted(Config) -> ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server2])), - {ok, Q} = rpc:call(Server, rabbit_amqqueue, lookup, [QQ, <<"/">>]), - #{nodes := Nodes} = amqqueue:get_type_state(Q), - ?assertEqual(1, length(Nodes)), + queue_utils:assert_number_of_replicas( + Config, Server, <<"/">>, QQ, 1), ok. delete_member_during_node_down(Config) -> diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 66d3b8c0405..6fad42420e9 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -720,10 +720,12 @@ shrink_coordinator_cluster(Config) -> rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Q = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + %% Wait for the replicas to be ready before stopping a node. + check_leader_and_replicas(Config, [Server0, Server1, Server2]), + ok = rabbit_control_helper:command(stop_app, Server2), ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []), diff --git a/deps/rabbitmq_ct_helpers/src/queue_utils.erl b/deps/rabbitmq_ct_helpers/src/queue_utils.erl index f72dba15456..d2c69792fde 100644 --- a/deps/rabbitmq_ct_helpers/src/queue_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/queue_utils.erl @@ -2,6 +2,8 @@ -include_lib("eunit/include/eunit.hrl"). +-include("include/rabbit_assert.hrl"). + -export([ wait_for_messages_ready/3, wait_for_messages_pending_ack/3, @@ -15,7 +17,8 @@ ra_name/1, ra_machines_use_same_version/3, wait_for_local_stream_member/4, - has_local_stream_member_rpc/1 + has_local_stream_member_rpc/1, + assert_number_of_replicas/5 ]). -define(WFM_SLEEP, 256). @@ -191,3 +194,21 @@ has_local_stream_member_rpc(QName) -> {error, _} -> false end. + +assert_number_of_replicas(Config, Server, VHost, QQ, Count) -> + _ = case rabbit_ct_broker_helpers:configured_metadata_store(Config) of + khepri -> + rabbit_ct_broker_helpers:rpc( + Config, Server, rabbit_khepri, fence, [30000]); + mnesia -> + ok + end, + ?awaitMatch( + Count, + begin + {ok, Q} = rabbit_ct_broker_helpers:rpc( + Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]), + #{nodes := Nodes} = amqqueue:get_type_state(Q), + length(Nodes) + end, + 30000). diff --git a/deps/rabbitmq_mqtt/test/auth_SUITE.erl b/deps/rabbitmq_mqtt/test/auth_SUITE.erl index 38eb6718a10..30d30e8f07f 100644 --- a/deps/rabbitmq_mqtt/test/auth_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/auth_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). %% not defined in v3 -define(SUBACK_FAILURE, 16#80). @@ -1246,6 +1247,7 @@ vhost_connection_limit(Config) -> {ok, _} = emqtt:connect(C1), {ok, C2} = connect_anonymous(Config, <<"client2">>), {ok, _} = emqtt:connect(C2), + ?awaitMatch(2, count_connections_per_vhost(Config), 30000), {ok, C3} = connect_anonymous(Config, <<"client3">>), ExpectedError = expected_connection_limit_error(Config), unlink(C3), @@ -1254,6 +1256,13 @@ vhost_connection_limit(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, <<"/">>). +count_connections_per_vhost(Config) -> + NodeConfig = rabbit_ct_broker_helpers:get_node_config(Config, 0), + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_connection_tracking, count_local_tracked_items_in_vhost, + [<<"/">>]). + vhost_queue_limit(Config) -> ok = rabbit_ct_broker_helpers:set_vhost_limit(Config, 0, <<"/">>, max_queues, 1), {ok, C} = connect_anonymous(Config), diff --git a/deps/rabbitmq_mqtt/test/java_SUITE.erl b/deps/rabbitmq_mqtt/test/java_SUITE.erl index 1f5be1a256c..cf4473fd542 100644 --- a/deps/rabbitmq_mqtt/test/java_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/java_SUITE.erl @@ -67,8 +67,13 @@ init_per_group(Group, Config0) -> [fun merge_app_env/1] ++ rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), - util:enable_plugin(Config1, rabbitmq_mqtt), - Config1. + case Config1 of + _ when is_list(Config1) -> + util:enable_plugin(Config1, rabbitmq_mqtt), + Config1; + {skip, _} -> + Config1 + end. end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps(Config, diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index cbc39f41b87..724fcfdb814 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -1079,12 +1079,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> {ok, _, [Qos]} = emqtt:subscribe(Subv3, Topic, Qos), Sender = spawn_link(?MODULE, send, [self(), Pub, Topic, 0]), receive {publish, #{payload := <<"1">>, - client_pid := Subv3, - packet_id := PacketId}} -> - case Qos of - 0 -> ok; - 1 -> emqtt:puback(Subv3, PacketId) - end + client_pid := Subv3}} -> ok after ?TIMEOUT -> ct:fail("did not receive 1") end, %% Upgrade session from v3 to v5 while another client is sending messages. @@ -1108,7 +1103,7 @@ session_upgrade_v3_v5_qos(Qos, Config) -> 0 -> assert_received_no_duplicates(); 1 -> - ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(2, NumSent - 1)], + ExpectedPayloads = [integer_to_binary(I) || I <- lists:seq(1, NumSent - 1)], ok = expect_publishes(Subv5, Topic, ExpectedPayloads) end, ok = emqtt:disconnect(Pub), diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 44ad5de7307..9345e2e6e56 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -485,7 +485,7 @@ identity_info_test(Config) -> specific_erlang_metrics_present_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), - ?assertEqual(match, re:run(Body, "^erlang_vm_dist_node_queue_size_bytes{", [{capture, none}, multiline])). + ?assertEqual(match, re:run(Body, "^erlang_vm_dirty_io_schedulers ", [{capture, none}, multiline])). global_metrics_present_test(Config) -> {_Headers, Body} = http_get_with_pal(Config, [], 200), diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index aa1f34e3863..099fb1de9f3 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -871,16 +871,16 @@ dest_resource_alarm(AckMode, Config) -> %%---------------------------------------------------------------------------- with_ch(Config, Fun) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Fun(Ch), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), cleanup(Config), ok. with_newch(Config, Fun) -> - Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Fun(Ch), - rabbit_ct_client_helpers:close_channel(Ch), + rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), ok. publish(Ch, X, Key, Payload) when is_binary(Payload) ->