diff --git a/.github/workflows/test-make-target.yaml b/.github/workflows/test-make-target.yaml index 690904c211f9..bb0b0a46145b 100644 --- a/.github/workflows/test-make-target.yaml +++ b/.github/workflows/test-make-target.yaml @@ -57,7 +57,8 @@ jobs: uses: dsaltares/fetch-gh-release-asset@master if: inputs.mixed_clusters with: - version: 'tags/v4.0.5' + repo: 'rabbitmq/server-packages' + version: 'tags/alphas.1744021065493' regex: true file: "rabbitmq-server-generic-unix-\\d.+\\.tar\\.xz" target: ./ diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 4ff8ee36f1dc..0588a0cffd5a 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo end, {deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}. -delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) -> - Path = khepri_route_path( - VHost, - Name, - _Kind = ?KHEPRI_WILDCARD_STAR, - _DstName = ?KHEPRI_WILDCARD_STAR, - _RoutingKey = #if_has_data{}), - {ok, Bindings} = khepri_tx_adv:delete_many(Path), - maps:fold(fun(_P, #{data := Set}, Acc) -> - sets:to_list(Set) ++ Acc - end, [], Bindings). +delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) -> + Pattern = khepri_route_path( + VHost, + SrcName, + ?KHEPRI_WILDCARD_STAR, %% Kind + ?KHEPRI_WILDCARD_STAR, %% DstName + #if_has_data{}), %% RoutingKey + {ok, Bindings} = khepri_tx_adv:delete_many(Pattern), + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_ROUTE_PATH( + VHost, SrcName, _Kind, _Name, _RoutingKey), + #{data := Set}} -> + sets:to_list(Set) ++ Acc; + {_, _} -> + Acc + end + end, [], Bindings). %% ------------------------------------------------------------------- %% delete_for_destination_in_mnesia(). @@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) -> delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) -> Pattern = khepri_route_path( VHost, - _SrcName = ?KHEPRI_WILDCARD_STAR, + ?KHEPRI_WILDCARD_STAR, %% SrcName Kind, Name, - _RoutingKey = ?KHEPRI_WILDCARD_STAR), + ?KHEPRI_WILDCARD_STAR), %% RoutingKey {ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern), - Bindings = maps:fold(fun(_, #{data := Set}, Acc) -> - sets:to_list(Set) ++ Acc - end, [], BindingsMap), + Bindings = maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_ROUTE_PATH( + VHost, _SrcName, Kind, Name, _RoutingKey), + #{data := Set}} -> + sets:to_list(Set) ++ Acc; + {_, _} -> + Acc + end + end, [], BindingsMap), rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4, lists:keysort(#binding.source, Bindings), OnlyDurable). diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index 53f940c02a9a..4d4fd8046480 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) -> Path = khepri_exchange_path(XName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := X, payload_version := Vsn}} -> + {ok, #{Path := #{data := X, payload_version := Vsn}}} -> X1 = Fun(X), UpdatePath = khepri_path:combine_with_conditions( @@ -534,8 +534,7 @@ next_serial_in_khepri(XName) -> Path = khepri_exchange_serial_path(XName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := Serial, - payload_version := Vsn}} -> + {ok, #{Path := #{data := Serial, payload_version := Vsn}}} -> UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), @@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) -> {ok, NodeProps} = khepri_tx_adv:delete_many(Pattern), Deletions = maps:fold( - fun(_Path, #{data := X}, Deletions) -> - {deleted, #exchange{name = XName}, Bindings, XDeletions} = - rabbit_db_binding:delete_all_for_exchange_in_khepri( - X, false, true), - Deletions1 = rabbit_binding:add_deletion( - XName, X, deleted, Bindings, XDeletions), - rabbit_binding:combine_deletions(Deletions, Deletions1) + fun(Path, Props, Deletions) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _), + #{data := X}} -> + {deleted, + #exchange{name = XName}, Bindings, XDeletions} = + rabbit_db_binding:delete_all_for_exchange_in_khepri( + X, false, true), + Deletions1 = rabbit_binding:add_deletion( + XName, X, deleted, Bindings, XDeletions), + rabbit_binding:combine_deletions(Deletions, Deletions1); + {_, _} -> + Deletions + end end, rabbit_binding:new_deletions(), NodeProps), {ok, Deletions}. diff --git a/deps/rabbit/src/rabbit_db_msup.erl b/deps/rabbit/src/rabbit_db_msup.erl index 7c7de3c786fe..7ab072bf2b4c 100644 --- a/deps/rabbit/src/rabbit_db_msup.erl +++ b/deps/rabbit/src/rabbit_db_msup.erl @@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) -> mirroring_pid = Overall, childspec = ChildSpec}, case rabbit_khepri:adv_get(Path) of - {ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid}, - payload_version := Vsn}} -> + {ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid}, + payload_version := Vsn}}} -> case Overall of Pid -> Delegate; @@ -160,6 +160,7 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) -> end end; _ -> + %% FIXME: Not atomic with the get above. ok = rabbit_khepri:put(Path, S), start end. diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 18590879ae0b..281cd0de3714 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -411,8 +411,18 @@ delete_in_khepri(QueueName, OnlyDurable) -> rabbit_khepri:transaction( fun () -> Path = khepri_queue_path(QueueName), + UsesUniformWriteRet = try + khepri_tx:does_api_comply_with(uniform_write_ret) + catch + error:undef -> + false + end, case khepri_tx_adv:delete(Path) of - {ok, #{data := _}} -> + {ok, #{Path := #{data := _}}} when UsesUniformWriteRet -> + %% we want to execute some things, as decided by rabbit_exchange, + %% after the transaction. + rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); + {ok, #{data := _}} when not UsesUniformWriteRet -> %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable); @@ -607,7 +617,7 @@ update_in_khepri(QName, Fun) -> Path = khepri_queue_path(QName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := Q, payload_version := Vsn}} -> + {ok, #{Path := #{data := Q, payload_version := Vsn}}} -> UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), Q1 = Fun(Q), @@ -658,7 +668,7 @@ update_decorators_in_khepri(QName, Decorators) -> Path = khepri_queue_path(QName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := Q1, payload_version := Vsn}} -> + {ok, #{Path := #{data := Q1, payload_version := Vsn}}} -> Q2 = amqqueue:set_decorators(Q1, Decorators), UpdatePath = khepri_path:combine_with_conditions( Path, [#if_payload_version{version = Vsn}]), @@ -1098,15 +1108,12 @@ delete_transient_in_khepri(FilterFun) -> case rabbit_khepri:adv_get_many(PathPattern) of {ok, Props} -> Qs = maps:fold( - fun(Path0, #{data := Q, payload_version := Vsn}, Acc) + fun(Path, #{data := Q, payload_version := Vsn}, Acc) when ?is_amqqueue(Q) -> case FilterFun(Q) of true -> - Path = khepri_path:combine_with_conditions( - Path0, - [#if_payload_version{version = Vsn}]), QName = amqqueue:get_name(Q), - [{Path, QName} | Acc]; + [{Path, Vsn, QName} | Acc]; false -> Acc end @@ -1125,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) -> do_delete_transient_queues_in_khepri(Qs, FilterFun) -> Res = rabbit_khepri:transaction( fun() -> - rabbit_misc:fold_while_ok( - fun({Path, QName}, Acc) -> - %% Also see `delete_in_khepri/2'. - case khepri_tx_adv:delete(Path) of - {ok, #{data := _}} -> - Deletions = rabbit_db_binding:delete_for_destination_in_khepri( - QName, false), - {ok, [{QName, Deletions} | Acc]}; - {ok, _} -> - {ok, Acc}; - {error, _} = Error -> - Error - end - end, [], Qs) + do_delete_transient_queues_in_khepri_tx(Qs, []) end), case Res of {ok, Items} -> @@ -1152,6 +1146,35 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) -> Error end. +do_delete_transient_queues_in_khepri_tx([], Acc) -> + {ok, Acc}; +do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) -> + %% Also see `delete_in_khepri/2'. + VersionedPath = khepri_path:combine_with_conditions( + Path, [#if_payload_version{version = Vsn}]), + UsesUniformWriteRet = try + khepri_tx:does_api_comply_with(uniform_write_ret) + catch + error:undef -> + false + end, + case khepri_tx_adv:delete(VersionedPath) of + {ok, #{Path := #{data := _}}} when UsesUniformWriteRet -> + Deletions = rabbit_db_binding:delete_for_destination_in_khepri( + QName, false), + Acc1 = [{QName, Deletions} | Acc], + do_delete_transient_queues_in_khepri_tx(Rest, Acc1); + {ok, #{data := _}} when not UsesUniformWriteRet -> + Deletions = rabbit_db_binding:delete_for_destination_in_khepri( + QName, false), + Acc1 = [{QName, Deletions} | Acc], + do_delete_transient_queues_in_khepri_tx(Rest, Acc1); + {ok, _} -> + do_delete_transient_queues_in_khepri_tx(Rest, Acc); + {error, _} = Error -> + Error + end. + %% ------------------------------------------------------------------- %% foreach_transient(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_rtparams.erl b/deps/rabbit/src/rabbit_db_rtparams.erl index f0fe8cb2fd05..68decc6ca9c3 100644 --- a/deps/rabbit/src/rabbit_db_rtparams.erl +++ b/deps/rabbit/src/rabbit_db_rtparams.erl @@ -59,7 +59,7 @@ set_in_khepri(Key, Term) -> Record = #runtime_parameters{key = Key, value = Term}, case rabbit_khepri:adv_put(Path, Record) of - {ok, #{data := Params}} -> + {ok, #{Path := #{data := Params}}} -> {old, Params#runtime_parameters.value}; {ok, _} -> new @@ -113,8 +113,16 @@ set_in_khepri_tx(Key, Term) -> Path = khepri_rp_path(Key), Record = #runtime_parameters{key = Key, value = Term}, + UsesUniformWriteRet = try + khepri_tx:does_api_comply_with(uniform_write_ret) + catch + error:undef -> + false + end, case khepri_tx_adv:put(Path, Record) of - {ok, #{data := Params}} -> + {ok, #{Path := #{data := Params}}} when UsesUniformWriteRet -> + {old, Params#runtime_parameters.value}; + {ok, #{data := Params}} when not UsesUniformWriteRet -> {old, Params#runtime_parameters.value}; {ok, _} -> new @@ -347,11 +355,23 @@ delete_vhost_in_mnesia_tx(VHostName) -> <- mnesia:match_object(?MNESIA_TABLE, Match, read)]. delete_vhost_in_khepri(VHostName) -> - Path = khepri_vhost_rp_path( - VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), - case rabbit_khepri:adv_delete_many(Path) of - {ok, Props} -> - {ok, rabbit_khepri:collect_payloads(Props)}; + Pattern = khepri_vhost_rp_path( + VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR), + case rabbit_khepri:adv_delete_many(Pattern) of + {ok, NodePropsMap} -> + RTParams = + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH( + VHostName, _, _), + #{data := RTParam}} -> + [RTParam | Acc]; + {_, _} -> + Acc + end + end, [], NodePropsMap), + {ok, RTParams}; {error, _} = Err -> Err end. diff --git a/deps/rabbit/src/rabbit_db_user.erl b/deps/rabbit/src/rabbit_db_user.erl index dc1b76751a8e..81deccfa6c03 100644 --- a/deps/rabbit/src/rabbit_db_user.erl +++ b/deps/rabbit/src/rabbit_db_user.erl @@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) -> clear_all_permissions_for_vhost_in_khepri(VHostName) -> rabbit_khepri:transaction( fun() -> - UserPermissionsPath = khepri_user_permission_path( - ?KHEPRI_WILDCARD_STAR, VHostName), - TopicPermissionsPath = khepri_topic_permission_path( - ?KHEPRI_WILDCARD_STAR, VHostName, - ?KHEPRI_WILDCARD_STAR), - {ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath), - {ok, TopicProps} = khepri_tx_adv:delete_many( - TopicPermissionsPath), - Deletions = rabbit_khepri:collect_payloads( - TopicProps, - rabbit_khepri:collect_payloads(UserProps)), - {ok, Deletions} + clear_all_permissions_for_vhost_in_khepri_tx(VHostName) end, rw, #{timeout => infinity}). +clear_all_permissions_for_vhost_in_khepri_tx(VHostName) -> + UserPermissionsPattern = khepri_user_permission_path( + ?KHEPRI_WILDCARD_STAR, VHostName), + TopicPermissionsPattern = khepri_topic_permission_path( + ?KHEPRI_WILDCARD_STAR, VHostName, + ?KHEPRI_WILDCARD_STAR), + {ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern), + {ok, TopicNodePropsMap} = khepri_tx_adv:delete_many( + TopicPermissionsPattern), + Deletions0 = + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _), + #{data := Permission}} -> + [Permission | Acc]; + {_, _} -> + Acc + end + end, [], UserNodePropsMap), + Deletions1 = + maps:fold( + fun(Path, Props, Acc) -> + case {Path, Props} of + {?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _), + #{data := Permission}} -> + [Permission | Acc]; + {_, _} -> + Acc + end + end, Deletions0, TopicNodePropsMap), + {ok, Deletions1}. + %% ------------------------------------------------------------------- %% get_topic_permissions(). %% ------------------------------------------------------------------- diff --git a/deps/rabbit/src/rabbit_db_vhost.erl b/deps/rabbit/src/rabbit_db_vhost.erl index 9c925fcb0255..1584e764a93f 100644 --- a/deps/rabbit/src/rabbit_db_vhost.erl +++ b/deps/rabbit/src/rabbit_db_vhost.erl @@ -167,7 +167,7 @@ merge_metadata_in_khepri(VHostName, Metadata) -> Path = khepri_vhost_path(VHostName), Ret1 = rabbit_khepri:adv_get(Path), case Ret1 of - {ok, #{data := VHost0, payload_version := DVersion}} -> + {ok, #{Path := #{data := VHost0, payload_version := DVersion}}} -> VHost = vhost:merge_metadata(VHost0, Metadata), rabbit_log:debug("Updating a virtual host record ~p", [VHost]), Path1 = khepri_path:combine_with_conditions( @@ -443,10 +443,10 @@ update_in_mnesia_tx(VHostName, UpdateFun) update_in_khepri(VHostName, UpdateFun) -> Path = khepri_vhost_path(VHostName), case rabbit_khepri:adv_get(Path) of - {ok, #{data := V, payload_version := DVersion}} -> + {ok, #{Path := #{data := V, payload_version := Vsn}}} -> V1 = UpdateFun(V), Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), case rabbit_khepri:put(Path1, V1) of ok -> V1; diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 5424917ee00c..ae43ae8e51ca 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -112,6 +112,7 @@ get_ra_cluster_name/0, get_store_id/0, transfer_leadership/1, + fence/1, is_empty/0, create/2, @@ -174,10 +175,6 @@ -export([force_shrink_member_to_current_member/0]). -%% Helpers for working with the Khepri API / types. --export([collect_payloads/1, - collect_payloads/2]). - -ifdef(TEST). -export([force_metadata_store/1, clear_forced_metadata_store/0]). @@ -620,7 +617,7 @@ members() -> %% The returned list is empty if there was an error. locally_known_members() -> - case khepri_cluster:locally_known_members(?RA_CLUSTER_NAME) of + case khepri_cluster:members(?RA_CLUSTER_NAME, #{favor => low_latency}) of {ok, Members} -> Members; {error, _Reason} -> [] end. @@ -650,7 +647,7 @@ nodes() -> %% The returned list is empty if there was an error. locally_known_nodes() -> - case khepri_cluster:locally_known_nodes(?RA_CLUSTER_NAME) of + case khepri_cluster:nodes(?RA_CLUSTER_NAME, #{favor => low_latency}) of {ok, Nodes} -> Nodes; {error, _Reason} -> [] end. @@ -1020,12 +1017,14 @@ delete(Path, Options0) -> delete_or_fail(Path) -> case khepri_adv:delete(?STORE_ID, Path, ?DEFAULT_COMMAND_OPTIONS) of - {ok, Result} -> - case maps:size(Result) of + {ok, #{Path := NodeProps}} -> + case maps:size(NodeProps) of 0 -> {error, {node_not_found, #{}}}; _ -> ok end; - Error -> + {ok, #{} = NodePropsMap} when NodePropsMap =:= #{} -> + {error, {node_not_found, #{}}}; + {error, _} = Error -> Error end. @@ -1072,48 +1071,6 @@ handle_async_ret(RaEvent) -> fence(Timeout) -> khepri:fence(?STORE_ID, Timeout). -%% ------------------------------------------------------------------- -%% collect_payloads(). -%% ------------------------------------------------------------------- - --spec collect_payloads(Props) -> Ret when - Props :: khepri:node_props(), - Ret :: [Payload], - Payload :: term(). - -%% @doc Collects all payloads from a node props map. -%% -%% This is the same as calling `collect_payloads(Props, [])'. -%% -%% @private - -collect_payloads(Props) when is_map(Props) -> - collect_payloads(Props, []). - --spec collect_payloads(Props, Acc0) -> Ret when - Props :: khepri:node_props(), - Acc0 :: [Payload], - Ret :: [Payload], - Payload :: term(). - -%% @doc Collects all payloads from a node props map into the accumulator list. -%% -%% This is meant to be used with the `khepri_adv' API to easily collect the -%% payloads from the return value of `khepri_adv:delete_many/4' for example. -%% -%% @returns all payloads in the node props map collected into a list, with -%% `Acc0' as the tail. -%% -%% @private - -collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) -> - maps:fold( - fun (_Path, #{data := Payload}, Acc) -> - [Payload | Acc]; - (_Path, _NoPayload, Acc) -> - Acc - end, Acc0, Props). - -spec unregister_legacy_projections() -> Ret when Ret :: ok | timeout_error(). %% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x @@ -1557,19 +1514,31 @@ get_feature_state(Node) -> %% @private khepri_db_migration_enable(#{feature_name := FeatureName}) -> - maybe - ok ?= sync_cluster_membership_from_mnesia(FeatureName), - ?LOG_INFO( - "Feature flag `~s`: unregistering legacy projections", - [FeatureName], - #{domain => ?RMQLOG_DOMAIN_DB}), - ok ?= unregister_legacy_projections(), - ?LOG_INFO( - "Feature flag `~s`: registering projections", - [FeatureName], - #{domain => ?RMQLOG_DOMAIN_DB}), - ok ?= register_projections(), - migrate_mnesia_tables(FeatureName) + Members = locally_known_members(), + case length(Members) < 2 of + true -> + maybe + ok ?= sync_cluster_membership_from_mnesia(FeatureName), + ?LOG_INFO( + "Feature flag `~s`: unregistering legacy projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= unregister_legacy_projections(), + ?LOG_INFO( + "Feature flag `~s`: registering projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok ?= register_projections(), + migrate_mnesia_tables(FeatureName) + end; + false -> + ?LOG_INFO( + "Feature flag `~s`: node ~0p already clustered (feature flag " + "enabled as part of clustering?); " + "skipping Mnesia->Khepri migration", + [node()], + #{domain => ?RMQLOG_DOMAIN_DB}), + ok end. %% @private diff --git a/deps/rabbit/test/cluster_minority_SUITE.erl b/deps/rabbit/test/cluster_minority_SUITE.erl index 83a2582a5395..4c0ea54c972b 100644 --- a/deps/rabbit/test/cluster_minority_SUITE.erl +++ b/deps/rabbit/test/cluster_minority_SUITE.erl @@ -9,14 +9,14 @@ -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). -compile([export_all, nowarn_export_all]). all() -> [ {group, client_operations}, - {group, cluster_operation_add}, - {group, cluster_operation_remove} + {group, cluster_operation} ]. groups() -> @@ -42,8 +42,10 @@ groups() -> delete_policy, export_definitions ]}, - {cluster_operation_add, [], [add_node]}, - {cluster_operation_remove, [], [remove_node]}, + {cluster_operation, [], [add_node_when_seed_node_is_leader, + add_node_when_seed_node_is_follower, + remove_node_when_seed_node_is_leader, + remove_node_when_seed_node_is_follower]}, {feature_flags, [], [enable_feature_flag]} ]. @@ -127,26 +129,49 @@ init_per_group(Group, Config0) when Group == client_operations; partition_5_node_cluster(Config1), Config1 end; -init_per_group(Group, Config0) -> +init_per_group(_Group, Config0) -> Config = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 5}, - {rmq_nodename_suffix, Group}, {rmq_nodes_clustered, false}, {tcp_ports_base}, {net_ticktime, 5}]), Config1 = rabbit_ct_helpers:merge_app_env( - Config, {rabbit, [{forced_feature_flags_on_init, []}]}), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + Config, {rabbit, [{forced_feature_flags_on_init, []}, + {khepri_leader_wait_retry_timeout, 30000}]}), + Config1. -end_per_group(_, Config) -> +end_per_group(Group, Config) when Group == client_operations; + Group == feature_flags -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - + rabbit_ct_broker_helpers:teardown_steps()); +end_per_group(_Group, Config) -> + Config. + +init_per_testcase(Testcase, Config) + when Testcase =:= add_node_when_seed_node_is_leader orelse + Testcase =:= add_node_when_seed_node_is_follower orelse + Testcase =:= remove_node_when_seed_node_is_leader orelse + Testcase =:= remove_node_when_seed_node_is_follower -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + Config1 = rabbit_ct_helpers:set_config( + Config, [{rmq_nodename_suffix, Testcase}]), + rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). +end_per_testcase(Testcase, Config) + when Testcase =:= add_node_when_seed_node_is_leader orelse + Testcase =:= add_node_when_seed_node_is_follower orelse + Testcase =:= remove_node_when_seed_node_is_leader orelse + Testcase =:= remove_node_when_seed_node_is_follower -> + rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). @@ -271,53 +296,153 @@ set_policy(Config) -> delete_policy(Config) -> ?assertError(_, rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy-to-delete">>)). -add_node(Config) -> - [A, B, C, D, _E] = rabbit_ct_broker_helpers:get_node_configs( +add_node_when_seed_node_is_leader(Config) -> + [A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), %% Three node cluster: A, B, C - ok = rabbit_control_helper:command(stop_app, B), - ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, B), + Cluster = [A, B, C], + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), - ok = rabbit_control_helper:command(stop_app, C), - ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, C), + AMember = {rabbit_khepri:get_store_id(), A}, + _ = ra:transfer_leadership(AMember, AMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), %% Minority partition: A + partition_3_node_cluster(Config1), + + Pong = ra:ping(AMember, 10000), + ct:pal("Member A state: ~0p", [Pong]), + case Pong of + {pong, State} when State =/= follower andalso State =/= candidate -> + Ret = rabbit_control_helper:command( + join_cluster, E, [atom_to_list(A)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "(Khepri cluster could be in minority|\\{:rabbit, \\{\\{:error, :timeout\\})", + [{capture, none}])); + Ret -> + ct:pal("A is not the expected leader: ~p", [Ret]), + {skip, "Node A was not elected leader"} + end. + +add_node_when_seed_node_is_follower(Config) -> + [A, B, C, _D, E] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% Three node cluster: A, B, C Cluster = [A, B, C], - partition_3_node_cluster(Config), - - ok = rabbit_control_helper:command(stop_app, D), - %% The command is appended to the log, but it will be dropped once the connectivity - %% is restored - ?assertMatch(ok, - rabbit_control_helper:command(join_cluster, D, [atom_to_list(A)], [])), - timer:sleep(10000), - join_3_node_cluster(Config), - clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster). - -remove_node(Config) -> + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), + + CMember = {rabbit_khepri:get_store_id(), C}, + ra:transfer_leadership(CMember, CMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), + + %% Minority partition: A + partition_3_node_cluster(Config1), + + AMember = {rabbit_khepri:get_store_id(), A}, + Pong = ra:ping(AMember, 10000), + ct:pal("Member A state: ~0p", [Pong]), + case Pong of + {pong, State} + when State =:= follower orelse State =:= pre_vote -> + Ret = rabbit_control_helper:command( + join_cluster, E, [atom_to_list(A)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "Khepri cluster could be in minority", + [{capture, none}])); + {pong, await_condition} -> + Ret = rabbit_control_helper:command( + join_cluster, E, [atom_to_list(A)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "\\{:rabbit, \\{\\{:error, :timeout\\}", + [{capture, none}])), + clustering_utils:assert_cluster_status( + {Cluster, Cluster}, Cluster); + Ret -> + ct:pal("A is not the expected follower: ~p", [Ret]), + {skip, "Node A was not a follower"} + end. + +remove_node_when_seed_node_is_leader(Config) -> [A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), %% Three node cluster: A, B, C - ok = rabbit_control_helper:command(stop_app, B), - ok = rabbit_control_helper:command(join_cluster, B, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, B), + Cluster = [A, B, C], + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), - ok = rabbit_control_helper:command(stop_app, C), - ok = rabbit_control_helper:command(join_cluster, C, [atom_to_list(A)], []), - rabbit_control_helper:command(start_app, C), + AMember = {rabbit_khepri:get_store_id(), A}, + ra:transfer_leadership(AMember, AMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), %% Minority partition: A - partition_3_node_cluster(Config), + partition_3_node_cluster(Config1), + + Pong = ra:ping(AMember, 10000), + ct:pal("Member A state: ~0p", [Pong]), + case Pong of + {pong, leader} -> + ?awaitMatch( + ok, + rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + 60000); + Ret -> + ct:pal("A is not the expected leader: ~p", [Ret]), + {skip, "Node A was not a leader"} + end. + +remove_node_when_seed_node_is_follower(Config) -> + [A, B, C | _] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + %% Three node cluster: A, B, C Cluster = [A, B, C], + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Cluster), + + CMember = {rabbit_khepri:get_store_id(), C}, + ra:transfer_leadership(CMember, CMember), + clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster), - ok = rabbit_control_helper:command(forget_cluster_node, A, [atom_to_list(B)], []), - timer:sleep(10000), - join_3_node_cluster(Config), - clustering_utils:assert_cluster_status({Cluster, Cluster}, Cluster). + %% Minority partition: A + partition_3_node_cluster(Config1), + + AMember = {rabbit_khepri:get_store_id(), A}, + Pong = ra:ping(AMember, 10000), + ct:pal("Member A state: ~0p", [Pong]), + case Pong of + {pong, State} + when State =:= follower orelse State =:= pre_vote -> + Ret = rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + ?assertMatch({error, _, _}, Ret), + {error, _, Msg} = Ret, + ?assertEqual( + match, + re:run( + Msg, "Khepri cluster could be in minority", + [{capture, none}])); + {pong, await_condition} -> + Ret = rabbit_control_helper:command( + forget_cluster_node, A, [atom_to_list(B)], []), + ?assertMatch(ok, Ret); + Ret -> + ct:pal("A is not the expected leader: ~p", [Ret]), + {skip, "Node A was not a leader"} + end. enable_feature_flag(Config) -> [A | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbit/test/clustering_management_SUITE.erl b/deps/rabbit/test/clustering_management_SUITE.erl index 426f5e35e950..33ff6693e8e0 100644 --- a/deps/rabbit/test/clustering_management_SUITE.erl +++ b/deps/rabbit/test/clustering_management_SUITE.erl @@ -745,13 +745,13 @@ is_in_minority(Ret) -> ?assertMatch(match, re:run(Msg, ".*timed out.*minority.*", [{capture, none}])). reset_last_disc_node(Config) -> - Servers = [Rabbit, Hare | _] = cluster_members(Config), + [Rabbit, Hare | _] = cluster_members(Config), stop_app(Config, Hare), ?assertEqual(ok, change_cluster_node_type(Config, Hare, ram)), start_app(Config, Hare), - case rabbit_ct_broker_helpers:enable_feature_flag(Config, Servers, khepri_db) of + case rabbit_ct_broker_helpers:enable_feature_flag(Config, [Rabbit], khepri_db) of ok -> %% The reset works after the switch to Khepri because the RAM node was %% implicitly converted to a disc one as Khepri always writes data on disc. diff --git a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl index ac01be7bb59d..5bb348c7dab3 100644 --- a/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl +++ b/deps/rabbit/test/peer_discovery_classic_config_SUITE.erl @@ -21,9 +21,7 @@ all() -> [ {group, non_parallel}, - {group, cluster_size_3}, - {group, cluster_size_5}, - {group, cluster_size_7} + {group, discovery} ]. groups() -> @@ -31,18 +29,24 @@ groups() -> {non_parallel, [], [ no_nodes_configured ]}, - {cluster_size_3, [], [ - successful_discovery, - successful_discovery_with_a_subset_of_nodes_coming_online - ]}, - {cluster_size_5, [], [ - successful_discovery, - successful_discovery_with_a_subset_of_nodes_coming_online - ]}, - {cluster_size_7, [], [ - successful_discovery, - successful_discovery_with_a_subset_of_nodes_coming_online - ]} + {discovery, [], + [ + {cluster_size_3, [], + [ + successful_discovery, + successful_discovery_with_a_subset_of_nodes_coming_online + ]}, + {cluster_size_5, [], + [ + successful_discovery, + successful_discovery_with_a_subset_of_nodes_coming_online + ]}, + {cluster_size_7, [], + [ + successful_discovery, + successful_discovery_with_a_subset_of_nodes_coming_online + ]} + ]} ]. suite() -> @@ -63,6 +67,24 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(discovery, Config) -> + case rabbit_ct_helpers:is_mixed_versions(Config) of + false -> + Config; + true -> + %% We can't support the creation of a cluster because peer + %% discovery might select a newer node as the seed node and ask an + %% older node to join it. The creation of the cluster may fail of + %% the cluster might be degraded. Examples: + %% - a feature flag is enabled by the newer node but the older + %% node doesn't know it + %% - the newer node uses a newer Khepri machine version and the + %% older node can join but won't be able to apply Khepri + %% commands and progress. + {skip, + "Peer discovery is unsupported with a mix of old and new " + "RabbitMQ versions"} + end; init_per_group(cluster_size_3 = Group, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}, {group, Group}]); init_per_group(cluster_size_5 = Group, Config) -> diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1a73290e463e..463445b9f474 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -298,6 +298,9 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ init_per_testcase(Testcase, Config) -> ClusterSize = ?config(rmq_nodes_count, Config), IsMixed = rabbit_ct_helpers:is_mixed_versions(), + SameKhepriMacVers = ( + rabbit_ct_broker_helpers:do_nodes_run_same_ra_machine_version( + Config, khepri_machine)), case Testcase of node_removal_is_not_quorum_critical when IsMixed -> {skip, "node_removal_is_not_quorum_critical isn't mixed versions compatible"}; @@ -325,6 +328,9 @@ init_per_testcase(Testcase, Config) -> leader_locator_balanced_random_maintenance when IsMixed -> {skip, "leader_locator_balanced_random_maintenance isn't mixed versions compatible because " "delete_declare isn't mixed versions reliable"}; + leadership_takeover when not SameKhepriMacVers -> + {skip, "leadership_takeover will fail with a mix of Khepri state " + "machine versions"}; reclaim_memory_with_wrong_queue_type when IsMixed -> {skip, "reclaim_memory_with_wrong_queue_type isn't mixed versions compatible"}; peek_with_wrong_queue_type when IsMixed -> @@ -2063,7 +2069,7 @@ recover_from_single_failure(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0). recover_from_multiple_failures(Config) -> - [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server1, Server, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -2360,7 +2366,7 @@ channel_handles_ra_event(Config) -> ?assertEqual(2, basic_get_tag(Ch1, Q2, false)). declare_during_node_down(Config) -> - [Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs( + [DownServer, Server, _] = Servers = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), stop_node(Config, DownServer), @@ -2692,7 +2698,7 @@ delete_member_member_already_deleted(Config) -> ok. delete_member_during_node_down(Config) -> - [Server, DownServer, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs( + [DownServer, Server, Remove] = Servers = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), stop_node(Config, DownServer), @@ -2747,7 +2753,7 @@ cleanup_data_dir(Config) -> %% trying to delete a queue in minority. A case clause there had gone %% previously unnoticed. - [Server1, Server2, Server3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server2, Server1, Server3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, @@ -3594,7 +3600,12 @@ format(Config) -> %% tests rabbit_quorum_queue:format/2 Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Server = hd(Nodes), + Server = case Nodes of + [N] -> + N; + [_, N | _] -> + N + end, Ch = rabbit_ct_client_helpers:open_channel(Config, Server), Q = ?config(queue_name, Config), @@ -3613,7 +3624,9 @@ format(Config) -> ?FUNCTION_NAME, [QRecord, #{}]), %% test all up case - ?assertEqual(<<"quorum">>, proplists:get_value(type, Fmt)), + ?assertMatch( + T when T =:= <<"quorum">> orelse T =:= quorum, + proplists:get_value(type, Fmt)), ?assertEqual(running, proplists:get_value(state, Fmt)), ?assertEqual(Server, proplists:get_value(leader, Fmt)), ?assertEqual(Server, proplists:get_value(node, Fmt)), @@ -3622,15 +3635,17 @@ format(Config) -> case length(Nodes) of 3 -> - [_, Server2, Server3] = Nodes, - ok = rabbit_control_helper:command(stop_app, Server2), + [Server1, _Server2, Server3] = Nodes, + ok = rabbit_control_helper:command(stop_app, Server1), ok = rabbit_control_helper:command(stop_app, Server3), Fmt2 = rabbit_ct_broker_helpers:rpc(Config, Server, rabbit_quorum_queue, ?FUNCTION_NAME, [QRecord, #{}]), - ok = rabbit_control_helper:command(start_app, Server2), + ok = rabbit_control_helper:command(start_app, Server1), ok = rabbit_control_helper:command(start_app, Server3), - ?assertEqual(<<"quorum">>, proplists:get_value(type, Fmt2)), + ?assertMatch( + T when T =:= <<"quorum">> orelse T =:= quorum, + proplists:get_value(type, Fmt2)), ?assertEqual(minority, proplists:get_value(state, Fmt2)), ?assertEqual(Server, proplists:get_value(leader, Fmt2)), ?assertEqual(Server, proplists:get_value(node, Fmt2)), diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 96b7ce84b9f4..9e45d0d04ff9 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -540,50 +540,48 @@ add_replica(Config) -> QQuorum = <>, ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), ?assertEqual({'queue.declare_ok', QClassic, 0, 0}, - declare(Config, Server0, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + declare(Config, Server1, QClassic, [{<<"x-queue-type">>, longstr, <<"classic">>}])), ?assertEqual({'queue.declare_ok', QQuorum, 0, 0}, - declare(Config, Server0, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + declare(Config, Server1, QQuorum, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), %% Not a member of the cluster, what would happen? ?assertEqual({error, node_not_running}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), ?assertEqual({error, classic_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QClassic, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server0])), ?assertEqual({error, quorum_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QQuorum, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server0])), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server1), + Config1 = rabbit_ct_broker_helpers:cluster_nodes( + Config, Server1, [Server0]), timer:sleep(1000), ?assertEqual({error, classic_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QClassic, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QClassic, Server0])), ?assertEqual({error, quorum_queue_not_supported}, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, QQuorum, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, QQuorum, Server0])), ?assertEqual(ok, - rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), %% replicas must be recorded on the state, and if we publish messages then they must %% be stored on disk - check_leader_and_replicas(Config, [Server0, Server1]), + check_leader_and_replicas(Config1, [Server1, Server0]), %% And if we try again? Idempotent - ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + ?assertEqual(ok, rpc:call(Server1, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), %% Add another node - ok = rabbit_control_helper:command(stop_app, Server2), - ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server2), - ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, + Config2 = rabbit_ct_broker_helpers:cluster_nodes( + Config1, Server1, [Server2]), + ?assertEqual(ok, rpc:call(Server1, rabbit_stream_queue, add_replica, [<<"/">>, Q, Server2])), - check_leader_and_replicas(Config, [Server0, Server1, Server2]), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + check_leader_and_replicas(Config2, [Server0, Server1, Server2]), + rabbit_ct_broker_helpers:rpc(Config2, Server1, ?MODULE, delete_testcase_queue, [Q]). delete_replica(Config) -> [Server0, Server1, Server2] = @@ -641,14 +639,9 @@ grow_then_shrink_coordinator_cluster(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - ok = rabbit_control_helper:command(start_app, Server1), - ok = rabbit_control_helper:command(stop_app, Server2), - ok = rabbit_control_helper:command(join_cluster, Server2, [atom_to_list(Server0)], []), - ok = rabbit_control_helper:command(start_app, Server2), + _Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0, Server2]), rabbit_ct_helpers:await_condition( fun() -> @@ -662,17 +655,17 @@ grow_then_shrink_coordinator_cluster(Config) -> end end, 60000), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server1)], []), + ok = rabbit_control_helper:command(stop_app, Server0), + ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server0)], []), ok = rabbit_control_helper:command(stop_app, Server2), - ok = rabbit_control_helper:command(forget_cluster_node, Server0, [atom_to_list(Server2)], []), + ok = rabbit_control_helper:command(forget_cluster_node, Server1, [atom_to_list(Server2)], []), rabbit_ct_helpers:await_condition( fun() -> - case rpc:call(Server0, ra, members, - [{rabbit_stream_coordinator, Server0}]) of + case rpc:call(Server1, ra, members, + [{rabbit_stream_coordinator, Server1}]) of {_, Members, _} -> Nodes = lists:sort([N || {_, N} <- Members]), - lists:sort([Server0]) == Nodes; + lists:sort([Server1]) == Nodes; _ -> false end @@ -685,29 +678,27 @@ grow_coordinator_cluster(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server1), + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0]), %% at this point there _probably_ won't be a stream coordinator member on %% Server1 %% check we can add a new stream replica for the previously declare stream ?assertEqual(ok, - rpc:call(Server1, rabbit_stream_queue, add_replica, - [<<"/">>, Q, Server1])), + rpc:call(Server0, rabbit_stream_queue, add_replica, + [<<"/">>, Q, Server0])), %% also check we can declare a new stream when calling Server1 Q2 = unicode:characters_to_binary([Q, <<"_2">>]), ?assertEqual({'queue.declare_ok', Q2, 0, 0}, - declare(Config, Server1, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config1, Server0, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>}])), %% wait until the stream coordinator detects there is a new rabbit node %% and adds a new member on the new node rabbit_ct_helpers:await_condition( fun() -> - case rpc:call(Server0, ra, members, - [{rabbit_stream_coordinator, Server0}]) of + case rpc:call(Server1, ra, members, + [{rabbit_stream_coordinator, Server1}]) of {_, Members, _} -> Nodes = lists:sort([N || {_, N} <- Members]), lists:sort([Server0, Server1]) == Nodes; @@ -715,7 +706,7 @@ grow_coordinator_cluster(Config) -> false end end, 60000), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + rabbit_ct_broker_helpers:rpc(Config1, 1, ?MODULE, delete_testcase_queue, [Q]). shrink_coordinator_cluster(Config) -> [Server0, Server1, Server2] = @@ -981,19 +972,17 @@ consume_without_local_replica(Config) -> rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, - declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), %% Add another node to the cluster, but it won't have a replica - ok = rabbit_control_helper:command(stop_app, Server1), - ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), - rabbit_control_helper:command(start_app, Server1), + Config1 = rabbit_ct_broker_helpers:cluster_nodes(Config, Server1, [Server0]), timer:sleep(1000), - Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server1), + Ch1 = rabbit_ct_client_helpers:open_channel(Config1, Server0), qos(Ch1, 10, false), ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, self())), - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + rabbit_ct_broker_helpers:rpc(Config1, 1, ?MODULE, delete_testcase_queue, [Q]). consume(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl index 1c62af4607bf..6ffc6d16c8b6 100644 --- a/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl +++ b/deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl @@ -104,13 +104,13 @@ create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun) -> create_binding_in_khepri(Src, Dst, Weight, UpdateFun) -> Path = khepri_consistent_hash_path(Src), case rabbit_khepri:adv_get(Path) of - {ok, #{data := Chx0, payload_version := DVersion}} -> + {ok, #{Path := #{data := Chx0, payload_version := Vsn}}} -> case UpdateFun(Chx0, Dst, Weight) of already_exists -> already_exists; Chx -> Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), Ret2 = rabbit_khepri:put(Path1, Chx), case Ret2 of ok -> diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index 170bc3ddd572..4805a8f716e3 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -981,12 +981,17 @@ cluster_nodes(Config, Nodes) when is_list(Nodes) -> [Nodename]), cluster_nodes1(Config, SecNodeConfig, NodeConfigs1); false -> - [NodeConfig | NodeConfigs1] = NodeConfigs, - Nodename = ?config(nodename, NodeConfig), - ct:pal( - "Using node ~s as the cluster seed node", - [Nodename]), - cluster_nodes1(Config, NodeConfig, NodeConfigs1) + case NodeConfigs of + [NodeConfig, SeedNodeConfig | NodeConfigs1] -> + Nodename = ?config(nodename, SeedNodeConfig), + ct:pal( + "Using node ~s as the cluster seed node", + [Nodename]), + cluster_nodes1( + Config, SeedNodeConfig, [NodeConfig | NodeConfigs1]); + [_] -> + Config + end end; cluster_nodes(Config, SeedNode) -> Nodenames = get_node_configs(Config, nodename), diff --git a/deps/rabbitmq_federation/test/exchange_SUITE.erl b/deps/rabbitmq_federation/test/exchange_SUITE.erl index 9d6297f94dc8..58d617b5def1 100644 --- a/deps/rabbitmq_federation/test/exchange_SUITE.erl +++ b/deps/rabbitmq_federation/test/exchange_SUITE.erl @@ -660,7 +660,7 @@ child_id_format(Config) -> %% %% After that, the supervisors run on the new code. Config2 = rabbit_ct_broker_helpers:cluster_nodes( - Config1, [OldNodeA, NewNodeB, NewNodeD]), + Config1, OldNodeA, [NewNodeB, NewNodeD]), ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeA), ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNodeA), ok = rabbit_ct_broker_helpers:stop_broker(Config2, OldNodeC), diff --git a/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl index 05d63a61566d..bc6af14bbef2 100644 --- a/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl +++ b/deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl @@ -108,9 +108,9 @@ create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) -> update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) -> Path = khepri_jms_topic_exchange_path(XName), case rabbit_khepri:adv_get(Path) of - {ok, #{data := BindingFuns, payload_version := DVersion}} -> + {ok, #{Path := #{data := BindingFuns, payload_version := Vsn}}} -> Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)), case Ret of ok -> ok; diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 7d10cf13a580..6aae9c152d78 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -222,9 +222,14 @@ end_per_testcase(Testcase, Config) -> end_per_testcase0(Testcase, Config) -> rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), %% Assert that every testcase cleaned up their MQTT sessions. + _ = rpc(Config, ?MODULE, delete_queues, []), eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), rabbit_ct_helpers:testcase_finished(Config, Testcase). +delete_queues() -> + [catch rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + %% ------------------------------------------------------------------- %% Testsuite cases %% ------------------------------------------------------------------- @@ -315,7 +320,7 @@ decode_basic_properties(Config) -> {ok, _, [1]} = emqtt:subscribe(C1, Topic, qos1), QuorumQueues = rpc(Config, rabbit_amqqueue, list_by_type, [rabbit_quorum_queue]), ?assertEqual(1, length(QuorumQueues)), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>, routing_key = Topic}, #amqp_msg{payload = Payload}), @@ -323,7 +328,8 @@ decode_basic_properties(Config) -> ok = emqtt:disconnect(C1), C2 = connect(ClientId, Config, [{clean_start, true}]), ok = emqtt:disconnect(C2), - ok = rpc(Config, application, unset_env, [App, Par]). + ok = rpc(Config, application, unset_env, [App, Par]), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). quorum_queue_rejects(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), @@ -376,7 +382,7 @@ publish_to_all_queue_types_qos1(Config) -> publish_to_all_queue_types(Config, qos1). publish_to_all_queue_types(Config, QoS) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"classic-queue">>, QQ = <<"quorum-queue">>, @@ -428,7 +434,8 @@ publish_to_all_queue_types(Config, QoS) -> delete_queue(Ch, [CQ, QQ, SQ]), ok = emqtt:disconnect(C), ?awaitMatch([], - all_connection_pids(Config), 10_000, 1000). + all_connection_pids(Config), 10_000, 1000), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). publish_to_all_non_deprecated_queue_types_qos0(Config) -> publish_to_all_non_deprecated_queue_types(Config, qos0). @@ -437,7 +444,7 @@ publish_to_all_non_deprecated_queue_types_qos1(Config) -> publish_to_all_non_deprecated_queue_types(Config, qos1). publish_to_all_non_deprecated_queue_types(Config, QoS) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ = <<"classic-queue">>, QQ = <<"quorum-queue">>, @@ -487,7 +494,8 @@ publish_to_all_non_deprecated_queue_types(Config, QoS) -> delete_queue(Ch, [CQ, QQ, SQ]), ok = emqtt:disconnect(C), ?awaitMatch([], - all_connection_pids(Config), 10_000, 1000). + all_connection_pids(Config), 10_000, 1000), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% This test case does not require multiple nodes %% but it is grouped together with flow test cases for other queue types @@ -519,7 +527,7 @@ flow(Config, {App, Par, Val}, QueueType) Result = rpc_all(Config, application, set_env, [App, Par, Val]), ?assert(lists:all(fun(R) -> R =:= ok end, Result)), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), QueueName = Topic = atom_to_binary(?FUNCTION_NAME), declare_queue(Ch, QueueName, [{<<"x-queue-type">>, longstr, QueueType}]), bind(Ch, QueueName, Topic), @@ -547,7 +555,8 @@ flow(Config, {App, Par, Val}, QueueType) ?awaitMatch([], all_connection_pids(Config), 10_000, 1000), ?assertEqual(Result, - rpc_all(Config, application, set_env, [App, Par, DefaultVal])). + rpc_all(Config, application, set_env, [App, Par, DefaultVal])), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). events(Config) -> ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config, event_recorder), @@ -791,9 +800,10 @@ queue_down_qos1(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, 1) end, - Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0), + {Conn, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), delete_queue(Ch0, CQ), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch0). %% Consuming classic queue on a different node goes down. consuming_classic_queue_down(Config) -> @@ -832,7 +842,7 @@ consuming_classic_queue_down(Config) -> ok. delete_create_queue(Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), CQ1 = <<"classic-queue-1-delete-create">>, CQ2 = <<"classic-queue-2-delete-create">>, QQ = <<"quorum-queue-delete-create">>, @@ -892,7 +902,8 @@ delete_create_queue(Config) -> 1000, 10), delete_queue(Ch, [CQ1, CQ2, QQ]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). session_expiry(Config) -> App = rabbitmq_mqtt, @@ -1088,7 +1099,7 @@ large_message_amqp_to_mqtt(Config) -> C = connect(ClientId, Config), {ok, _, [1]} = emqtt:subscribe(C, {Topic, qos1}), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Payload0 = binary:copy(<<"x">>, 8_000_000), Payload = <>, amqp_channel:call(Ch, @@ -1096,20 +1107,22 @@ large_message_amqp_to_mqtt(Config) -> routing_key = Topic}, #amqp_msg{payload = Payload}), ok = expect_publishes(C, Topic, [Payload]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). amqp_to_mqtt_qos0(Config) -> Topic = ClientId = Payload = atom_to_binary(?FUNCTION_NAME), C = connect(ClientId, Config), {ok, _, [0]} = emqtt:subscribe(C, {Topic, qos0}), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), amqp_channel:call(Ch, #'basic.publish'{exchange = <<"amq.topic">>, routing_key = Topic}, #amqp_msg{payload = Payload}), ok = expect_publishes(C, Topic, [Payload]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% Packet identifier is a non zero two byte integer. %% Test that the server wraps around the packet identifier. @@ -1590,7 +1603,7 @@ rabbit_status_connection_count(Config) -> trace(Config) -> Server = atom_to_binary(get_node_config(Config, 0, nodename)), Topic = Payload = TraceQ = atom_to_binary(?FUNCTION_NAME), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), declare_queue(Ch, TraceQ, []), #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, @@ -1645,11 +1658,12 @@ trace(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})), delete_queue(Ch, TraceQ), - [ok = emqtt:disconnect(C) || C <- [Pub, Sub]]. + [ok = emqtt:disconnect(C) || C <- [Pub, Sub]], + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). trace_large_message(Config) -> TraceQ = <<"trace-queue">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), declare_queue(Ch, TraceQ, []), #'queue.bind_ok'{} = amqp_channel:call( Ch, #'queue.bind'{queue = TraceQ, @@ -1674,7 +1688,8 @@ trace_large_message(Config) -> {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["trace_off"]), delete_queue(Ch, TraceQ), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). max_packet_size_unauthenticated(Config) -> ClientId = ?FUNCTION_NAME, @@ -1765,7 +1780,7 @@ default_queue_type(Config) -> incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME), CQName = <<"my classic queue">>, Stream = <<"my stream">>, @@ -1813,7 +1828,8 @@ incoming_message_interceptors(Config) -> delete_queue(Ch, Stream), delete_queue(Ch, CQName), true = rpc(Config, persistent_term, erase, [Key]), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% This test makes sure that a retained message that got written in 3.12 or earlier %% can be consumed in 3.13 or later. @@ -1853,7 +1869,7 @@ bind_exchange_to_exchange(Config) -> SourceX = <<"amq.topic">>, DestinationX = <<"destination">>, Q = <<"q">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX, durable = true, auto_delete = true}), @@ -1871,13 +1887,14 @@ bind_exchange_to_exchange(Config) -> eventually(?_assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, amqp_channel:call(Ch, #'basic.get'{queue = Q}))), #'queue.delete_ok'{message_count = 0} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). bind_exchange_to_exchange_single_message(Config) -> SourceX = <<"amq.topic">>, DestinationX = <<"destination">>, Q = <<"q">>, - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DestinationX, durable = true, auto_delete = true}), @@ -1904,7 +1921,8 @@ bind_exchange_to_exchange_single_message(Config) -> timer:sleep(10), ?assertEqual(#'queue.delete_ok'{message_count = 0}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - ok = emqtt:disconnect(C). + ok = emqtt:disconnect(C), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). %% ------------------------------------------------------------------- %% Internal helpers @@ -1936,7 +1954,7 @@ await_confirms_unordered(From, Left) -> end. await_consumer_count(ConsumerCount, ClientId, QoS, Config) -> - Ch = rabbit_ct_client_helpers:open_channel(Config), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), QueueName = rabbit_mqtt_util:queue_name_bin( rabbit_data_coercion:to_binary(ClientId), QoS), eventually( diff --git a/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl b/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl index 4e90afcb4170..96926cc07a4c 100644 --- a/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl +++ b/deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl @@ -106,10 +106,10 @@ insert0_in_mnesia(Key, Cached, Message, Length) -> insert_in_khepri(XName, Message, Length) -> Path = khepri_recent_history_path(XName), case rabbit_khepri:adv_get(Path) of - {ok, #{data := Cached0, payload_version := DVersion}} -> + {ok, #{Path := #{data := Cached0, payload_version := Vsn}}} -> Cached = add_to_cache(Cached0, Message, Length), Path1 = khepri_path:combine_with_conditions( - Path, [#if_payload_version{version = DVersion}]), + Path, [#if_payload_version{version = Vsn}]), Ret = rabbit_khepri:put(Path1, Cached), case Ret of ok -> diff --git a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl index 57afc089d160..5c3221febc0d 100644 --- a/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl +++ b/deps/rabbitmq_shovel/test/rolling_upgrade_SUITE.erl @@ -101,7 +101,7 @@ child_id_format(Config) -> %% Node 4: the secondary umbrella %% ... %% - %% Therefore, `Pouet' will use the primary copy, `OldNode' the secondary + %% Therefore, `NewNode' will use the primary copy, `OldNode' the secondary %% umbrella, `NewRefNode' the primary copy, and `NodeWithQueues' the %% secondary umbrella. @@ -221,7 +221,7 @@ child_id_format(Config) -> %% After that, the supervisors run on the new code. ct:pal("Clustering nodes ~s and ~s", [OldNode, NewNode]), Config1 = rabbit_ct_broker_helpers:cluster_nodes( - Config, [OldNode, NewNode]), + Config, OldNode, [NewNode]), ok = rabbit_ct_broker_helpers:stop_broker(Config1, OldNode), ok = rabbit_ct_broker_helpers:reset_node(Config1, OldNode), diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 540fe593902e..7d65ed6f8a07 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -46,8 +46,8 @@ dep_credentials_obfuscation = hex 3.5.0 dep_cuttlefish = hex 3.4.0 dep_gen_batch_server = hex 0.8.8 dep_jose = hex 1.11.10 -dep_khepri = hex 0.16.0 -dep_khepri_mnesia_migration = hex 0.7.2 +dep_khepri = hex 0.17.1 +dep_khepri_mnesia_migration = hex 0.8.0 dep_meck = hex 1.0.0 dep_osiris = git https://github.com/rabbitmq/osiris v1.8.6 dep_prometheus = hex 4.11.0