diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 4e192df874f5..12a115da7e38 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -68,6 +68,8 @@ notify_decorators/3, spawn_notify_decorators/3]). +-export([get_member_with_highest_index/3]). + -export([is_enabled/0, is_compatible/3, declare/2, @@ -1245,7 +1247,7 @@ key_metrics_rpc(ServerId) -> Metrics = ra:key_metrics(ServerId), Metrics#{machine_version => rabbit_fifo:version()}. --spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> +-spec status(rabbit_types:vhost(), rabbit_misc:resource_name()) -> [[{binary(), term()}]] | {error, term()}. status(Vhost, QueueName) -> %% Handle not found queues @@ -1335,6 +1337,38 @@ get_sys_status(Proc) -> end. +-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom()) -> + [[{binary(), term()}]] | {error, term()}. +get_member_with_highest_index(Vhost, QueueName, IndexName) -> + case ?MODULE:status(Vhost, QueueName) of + Status when is_list(Status) -> + IndexNameInternal = rabbit_data_coercion:to_atom(IndexName), + case index_name_to_status_key(IndexNameInternal) of + Key when is_binary(Key) -> + {_HighestIndexValue, HighestEntry} = + lists:foldl( + fun(Entry, {PreviousIndexValue, _PreviousEntry} = Acc) -> + case rabbit_misc:pget(Key, Entry) of + CurrentIndexValue when is_integer(CurrentIndexValue), + CurrentIndexValue > PreviousIndexValue -> + {CurrentIndexValue, Entry}; + _ -> + Acc + end + end, {-100, []}, Status), + [HighestEntry]; + undefined -> + [] + end; + {error, _} = Error -> + Error + end. + +index_name_to_status_key(I) when I =:= commit; I =:= commit_index -> <<"Commit Index">>; +index_name_to_status_key(I) when I =:= log; I =:= log_index -> <<"Last Log Index">>; +index_name_to_status_key(I) when I =:= snapshot; I =:= snapshot_index -> <<"Snapshot Index">>; +index_name_to_status_key(_I) -> undefined. + add_member(VHost, Name, Node, Membership, Timeout) when is_binary(VHost) andalso is_binary(Name) andalso diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2ae9f23d4060..ef50ab861cd1 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -115,7 +115,8 @@ groups() -> node_removal_is_not_quorum_critical, select_nodes_with_least_replicas, select_nodes_with_least_replicas_node_down, - subscribe_from_each + subscribe_from_each, + get_member_with_highest_index ]}, @@ -365,6 +366,8 @@ init_per_testcase(Testcase, Config) -> {skip, "peek_with_wrong_queue_type isn't mixed versions compatible"}; cancel_consumer_gh_3729 when IsMixed andalso RabbitMQ3 -> {skip, "this test is not compatible with RabbitMQ 3.13.x"}; + get_member_with_highest_index when IsMixed -> + {skip, "get_member_with_highest_index isn't mixed versions compatible"}; _ -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -4576,6 +4579,95 @@ leader_health_check(Config) -> amqp_connection:close(Conn1), amqp_connection:close(Conn2). +get_member_with_highest_index(Config) -> + [Node1, Node2, Node3, Node4, Node5] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Q = ?config(queue_name, Config), + VHost = <<"/">>, + + Statuses = + %% [{Node, Member, LogIdx, CommitIdx, SnapshotIdx}, ...] + [{Node1, leader, 1015, 1010, 1010}, %% highest SnapshotIdx + {Node2, follower, 1015, 1010, 1010}, %% highest SnapshotIdx (duplicate) + {Node3, follower, 1013, 1013, 1009}, %% highest CommitIdx + {Node4, follower, 1016, 1009, 1008}, %% highest LogIdx + {Node5, follower, 1013, 1012, undefined}], + + Term = 1, + MachineVersion = 7, + + meck:new(rabbit_quorum_queue, [passthrough, no_link]), + meck:expect( + rabbit_quorum_queue, status, + fun(_, _) -> + [[{<<"Node Name">>, Node}, + {<<"Raft State">>, Member}, + {<<"Last Log Index">>, LogIndex}, + {<<"Last Written">>, LogIndex}, + {<<"Last Applied">>, LogIndex}, + {<<"Commit Index">>, CommitIndex}, + {<<"Snapshot Index">>, SnapshotIdx}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}] + || {Node, Member, LogIndex, CommitIndex, SnapshotIdx} <- Statuses] + end), + + ct:pal("quorum status: ~tp", [rabbit_quorum_queue:status(VHost, Q)]), + + ExpectedHighestLogIdx = + [[{<<"Node Name">>, Node4}, + {<<"Raft State">>, follower}, + {<<"Last Log Index">>, 1016}, + {<<"Last Written">>,1016}, + {<<"Last Applied">>,1016}, + {<<"Commit Index">>, 1009}, + {<<"Snapshot Index">>, 1008}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}]], + + [?assertEqual(ExpectedHighestLogIdx, + rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [log, log_index]], + + ExpectedHighestCommitIdx = + [[{<<"Node Name">>, Node3}, + {<<"Raft State">>, follower}, + {<<"Last Log Index">>, 1013}, + {<<"Last Written">>,1013}, + {<<"Last Applied">>,1013}, + {<<"Commit Index">>, 1013}, + {<<"Snapshot Index">>, 1009}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}]], + + [?assertEqual(ExpectedHighestCommitIdx, + rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [commit, commit_index]], + + ExpectedHighestSnapshotIdx = + [[{<<"Node Name">>, Node1}, + {<<"Raft State">>, leader}, + {<<"Last Log Index">>, 1015}, + {<<"Last Written">>,1015}, + {<<"Last Applied">>,1015}, + {<<"Commit Index">>, 1010}, + {<<"Snapshot Index">>, 1010}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}]], + % Duplicate: + % [{<<"Node Name">>, Node2}, + % {<<"Raft State">>, follower}, + % {<<"Last Log Index">>, 1015}, + % {<<"Last Written">>,1015}, + % {<<"Last Applied">>,1015}, + % {<<"Commit Index">>, 1010}, + % {<<"Snapshot Index">>, 1010}, + % {<<"Term">>, Term}, + % {<<"Machine Version">>, MachineVersion}], + + [?assertEqual(ExpectedHighestSnapshotIdx, + rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [snapshot, snapshot_index]], + + ok. leader_locator_client_local(Config) -> [Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex new file mode 100644 index 000000000000..84a554d78bfc --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex @@ -0,0 +1,73 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do + alias RabbitMQ.CLI.Core.DocGuide + import RabbitMQ.CLI.Core.DataCoercion + + @behaviour RabbitMQ.CLI.CommandBehaviour + + use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + + def switches(), do: [index: :string, timeout: :integer] + def aliases(), do: [i: :index, t: :timeout] + + def merge_defaults(args, opts) do + {args, Map.merge(%{vhost: "/", index: "log"}, opts)} + end + + def run([name] = _args, %{vhost: vhost, index: index, node: node_name}) do + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :get_member_with_highest_index, [ + vhost, + name, + to_atom(String.downcase(index)) + ]) do + {:error, :classic_queue_not_supported} -> + index = format_index(String.downcase(index)) + {:error, "Cannot get #{index} index from a classic queue"} + + {:error, :not_found} -> + {:error, {:not_found, :queue, vhost, name}} + + other -> + other + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable + + def usage, do: "member_with_highest_index [--vhost ] [--index ]" + + def usage_additional do + [ + ["", "quorum queue name"], + ["--index ", "name of the index to use to lookup highest member"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.quorum_queues() + ] + end + + def help_section, do: :replication + + def description, do: "Look up first member of a quorum queue with the highest commit, log or snapshot index." + + def banner([name], %{node: node, index: index, vhost: vhost}) do + index = format_index(String.downcase(index)) + "Member with highest #{index} index for queue #{name} in vhost #{vhost} on node #{node}..." + end + + defp format_index("log_index"), do: "log" + defp format_index("commit_index"), do: "commit" + defp format_index("snapshot_index"), do: "snapshot" + defp format_index(index_name), do: index_name +end diff --git a/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs b/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs new file mode 100644 index 000000000000..e56d8eecfea3 --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs @@ -0,0 +1,55 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000 + }} + end + + test "validate: when no arguments are provided, returns a failure" do + assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when two or more arguments are provided, returns a failure" do + assert @command.validate(["quorum-queue-a", "one-extra-arg"], %{}) == + {:validation_failure, :too_many_args} + + assert @command.validate( + ["quorum-queue-a", "extra-arg", "another-extra-arg"], + %{} + ) == {:validation_failure, :too_many_args} + end + + test "validate: treats one positional arguments and default switches as a success" do + assert @command.validate(["quorum-queue-a"], %{}) == :ok + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc" do + assert match?( + {:badrpc, _}, + @command.run( + ["quorum-queue-a"], + %{node: :jake@thedog, vhost: "/", index: "log", timeout: 200} + ) + ) + end +end