From 0024e6515db4c3e2c0f50029b28bab7d0aa4911a Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Tue, 15 Jul 2025 17:12:22 +0100 Subject: [PATCH 1/7] implement rabbitmq-queues pick_member_with_highest_index cli command for quorum queues --- deps/rabbit/src/rabbit_quorum_queue.erl | 36 +++++++++++- .../pick_member_with_highest_index_test.exs | 55 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 deps/rabbitmq_cli/test/queues/pick_member_with_highest_index_test.exs diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 4e192df874f5..12a115da7e38 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -68,6 +68,8 @@ notify_decorators/3, spawn_notify_decorators/3]). +-export([get_member_with_highest_index/3]). + -export([is_enabled/0, is_compatible/3, declare/2, @@ -1245,7 +1247,7 @@ key_metrics_rpc(ServerId) -> Metrics = ra:key_metrics(ServerId), Metrics#{machine_version => rabbit_fifo:version()}. --spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> +-spec status(rabbit_types:vhost(), rabbit_misc:resource_name()) -> [[{binary(), term()}]] | {error, term()}. status(Vhost, QueueName) -> %% Handle not found queues @@ -1335,6 +1337,38 @@ get_sys_status(Proc) -> end. +-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom()) -> + [[{binary(), term()}]] | {error, term()}. +get_member_with_highest_index(Vhost, QueueName, IndexName) -> + case ?MODULE:status(Vhost, QueueName) of + Status when is_list(Status) -> + IndexNameInternal = rabbit_data_coercion:to_atom(IndexName), + case index_name_to_status_key(IndexNameInternal) of + Key when is_binary(Key) -> + {_HighestIndexValue, HighestEntry} = + lists:foldl( + fun(Entry, {PreviousIndexValue, _PreviousEntry} = Acc) -> + case rabbit_misc:pget(Key, Entry) of + CurrentIndexValue when is_integer(CurrentIndexValue), + CurrentIndexValue > PreviousIndexValue -> + {CurrentIndexValue, Entry}; + _ -> + Acc + end + end, {-100, []}, Status), + [HighestEntry]; + undefined -> + [] + end; + {error, _} = Error -> + Error + end. + +index_name_to_status_key(I) when I =:= commit; I =:= commit_index -> <<"Commit Index">>; +index_name_to_status_key(I) when I =:= log; I =:= log_index -> <<"Last Log Index">>; +index_name_to_status_key(I) when I =:= snapshot; I =:= snapshot_index -> <<"Snapshot Index">>; +index_name_to_status_key(_I) -> undefined. + add_member(VHost, Name, Node, Membership, Timeout) when is_binary(VHost) andalso is_binary(Name) andalso diff --git a/deps/rabbitmq_cli/test/queues/pick_member_with_highest_index_test.exs b/deps/rabbitmq_cli/test/queues/pick_member_with_highest_index_test.exs new file mode 100644 index 000000000000..817c853a70a0 --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/pick_member_with_highest_index_test.exs @@ -0,0 +1,55 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000 + }} + end + + test "validate: when no arguments are provided, returns a failure" do + assert @command.validate([], %{}) == {:validation_failure, :not_enough_args} + end + + test "validate: when two or more arguments are provided, returns a failure" do + assert @command.validate(["quorum-queue-a", "one-extra-arg"], %{}) == + {:validation_failure, :too_many_args} + + assert @command.validate( + ["quorum-queue-a", "extra-arg", "another-extra-arg"], + %{} + ) == {:validation_failure, :too_many_args} + end + + test "validate: treats one positional arguments and default switches as a success" do + assert @command.validate(["quorum-queue-a"], %{}) == :ok + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc" do + assert match?( + {:badrpc, _}, + @command.run( + ["quorum-queue-a"], + %{node: :jake@thedog, vhost: "/", index: "log", timeout: 200} + ) + ) + end +end From 2d0f9a61761e48082646caf1770867175a2763c6 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Tue, 15 Jul 2025 17:18:07 +0100 Subject: [PATCH 2/7] add tests for acquiring qq member with highest index --- deps/rabbit/test/quorum_queue_SUITE.erl | 94 ++++++++++++++++++- .../pick_member_with_highest_index.ex | 73 ++++++++++++++ 2 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/pick_member_with_highest_index.ex diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 2ae9f23d4060..ef50ab861cd1 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -115,7 +115,8 @@ groups() -> node_removal_is_not_quorum_critical, select_nodes_with_least_replicas, select_nodes_with_least_replicas_node_down, - subscribe_from_each + subscribe_from_each, + get_member_with_highest_index ]}, @@ -365,6 +366,8 @@ init_per_testcase(Testcase, Config) -> {skip, "peek_with_wrong_queue_type isn't mixed versions compatible"}; cancel_consumer_gh_3729 when IsMixed andalso RabbitMQ3 -> {skip, "this test is not compatible with RabbitMQ 3.13.x"}; + get_member_with_highest_index when IsMixed -> + {skip, "get_member_with_highest_index isn't mixed versions compatible"}; _ -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -4576,6 +4579,95 @@ leader_health_check(Config) -> amqp_connection:close(Conn1), amqp_connection:close(Conn2). +get_member_with_highest_index(Config) -> + [Node1, Node2, Node3, Node4, Node5] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Q = ?config(queue_name, Config), + VHost = <<"/">>, + + Statuses = + %% [{Node, Member, LogIdx, CommitIdx, SnapshotIdx}, ...] + [{Node1, leader, 1015, 1010, 1010}, %% highest SnapshotIdx + {Node2, follower, 1015, 1010, 1010}, %% highest SnapshotIdx (duplicate) + {Node3, follower, 1013, 1013, 1009}, %% highest CommitIdx + {Node4, follower, 1016, 1009, 1008}, %% highest LogIdx + {Node5, follower, 1013, 1012, undefined}], + + Term = 1, + MachineVersion = 7, + + meck:new(rabbit_quorum_queue, [passthrough, no_link]), + meck:expect( + rabbit_quorum_queue, status, + fun(_, _) -> + [[{<<"Node Name">>, Node}, + {<<"Raft State">>, Member}, + {<<"Last Log Index">>, LogIndex}, + {<<"Last Written">>, LogIndex}, + {<<"Last Applied">>, LogIndex}, + {<<"Commit Index">>, CommitIndex}, + {<<"Snapshot Index">>, SnapshotIdx}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}] + || {Node, Member, LogIndex, CommitIndex, SnapshotIdx} <- Statuses] + end), + + ct:pal("quorum status: ~tp", [rabbit_quorum_queue:status(VHost, Q)]), + + ExpectedHighestLogIdx = + [[{<<"Node Name">>, Node4}, + {<<"Raft State">>, follower}, + {<<"Last Log Index">>, 1016}, + {<<"Last Written">>,1016}, + {<<"Last Applied">>,1016}, + {<<"Commit Index">>, 1009}, + {<<"Snapshot Index">>, 1008}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}]], + + [?assertEqual(ExpectedHighestLogIdx, + rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [log, log_index]], + + ExpectedHighestCommitIdx = + [[{<<"Node Name">>, Node3}, + {<<"Raft State">>, follower}, + {<<"Last Log Index">>, 1013}, + {<<"Last Written">>,1013}, + {<<"Last Applied">>,1013}, + {<<"Commit Index">>, 1013}, + {<<"Snapshot Index">>, 1009}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}]], + + [?assertEqual(ExpectedHighestCommitIdx, + rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [commit, commit_index]], + + ExpectedHighestSnapshotIdx = + [[{<<"Node Name">>, Node1}, + {<<"Raft State">>, leader}, + {<<"Last Log Index">>, 1015}, + {<<"Last Written">>,1015}, + {<<"Last Applied">>,1015}, + {<<"Commit Index">>, 1010}, + {<<"Snapshot Index">>, 1010}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}]], + % Duplicate: + % [{<<"Node Name">>, Node2}, + % {<<"Raft State">>, follower}, + % {<<"Last Log Index">>, 1015}, + % {<<"Last Written">>,1015}, + % {<<"Last Applied">>,1015}, + % {<<"Commit Index">>, 1010}, + % {<<"Snapshot Index">>, 1010}, + % {<<"Term">>, Term}, + % {<<"Machine Version">>, MachineVersion}], + + [?assertEqual(ExpectedHighestSnapshotIdx, + rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [snapshot, snapshot_index]], + + ok. leader_locator_client_local(Config) -> [Server1 | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/pick_member_with_highest_index.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/pick_member_with_highest_index.ex new file mode 100644 index 000000000000..0f451e30bdfe --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/pick_member_with_highest_index.ex @@ -0,0 +1,73 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand do + alias RabbitMQ.CLI.Core.DocGuide + import RabbitMQ.CLI.Core.DataCoercion + + @behaviour RabbitMQ.CLI.CommandBehaviour + + use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + + def switches(), do: [index: :string, timeout: :integer] + def aliases(), do: [i: :index, t: :timeout] + + def merge_defaults(args, opts) do + {args, Map.merge(%{vhost: "/", index: "log"}, opts)} + end + + def run([name] = _args, %{vhost: vhost, index: index, node: node_name}) do + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :get_member_with_highest_index, [ + vhost, + name, + to_atom(String.downcase(index)) + ]) do + {:error, :classic_queue_not_supported} -> + index = format_index(String.downcase(index)) + {:error, "Cannot get #{index} index from a classic queue"} + + {:error, :not_found} -> + {:error, {:not_found, :queue, vhost, name}} + + other -> + other + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable + + def usage, do: "pick_member_with_highest_index [--vhost ] [--index ]" + + def usage_additional do + [ + ["", "quorum queue name"], + ["--index ", "name of the index to use to lookup highest member"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.quorum_queues() + ] + end + + def help_section, do: :replication + + def description, do: "Look up the member of a quorum queue with the highest commit, log or snapshot index." + + def banner([name], %{node: node, index: index, vhost: vhost}) do + index = format_index(String.downcase(index)) + "Member with highest #{index} index for queue #{name} in vhost #{vhost} on node #{node}..." + end + + defp format_index("log_index"), do: "log" + defp format_index("commit_index"), do: "commit" + defp format_index("snapshot_index"), do: "snapshot" + defp format_index(index_name), do: index_name +end From e94e296f4e4d561a33d7082e7c3c2d8422cdfdd4 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Wed, 16 Jul 2025 10:32:40 +0100 Subject: [PATCH 3/7] rename cli command to rabbitmq-queues member_with_highest_index --- ...r_with_highest_index.ex => member_with_highest_index.ex} | 6 +++--- ...st_index_test.exs => member_with_highest_index_test.exs} | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) rename deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/{pick_member_with_highest_index.ex => member_with_highest_index.ex} (86%) rename deps/rabbitmq_cli/test/queues/{pick_member_with_highest_index_test.exs => member_with_highest_index_test.exs} (91%) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/pick_member_with_highest_index.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex similarity index 86% rename from deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/pick_member_with_highest_index.ex rename to deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex index 0f451e30bdfe..84a554d78bfc 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/pick_member_with_highest_index.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex @@ -4,7 +4,7 @@ ## ## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand do +defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do alias RabbitMQ.CLI.Core.DocGuide import RabbitMQ.CLI.Core.DataCoercion @@ -42,7 +42,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand do def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable - def usage, do: "pick_member_with_highest_index [--vhost ] [--index ]" + def usage, do: "member_with_highest_index [--vhost ] [--index ]" def usage_additional do [ @@ -59,7 +59,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand do def help_section, do: :replication - def description, do: "Look up the member of a quorum queue with the highest commit, log or snapshot index." + def description, do: "Look up first member of a quorum queue with the highest commit, log or snapshot index." def banner([name], %{node: node, index: index, vhost: vhost}) do index = format_index(String.downcase(index)) diff --git a/deps/rabbitmq_cli/test/queues/pick_member_with_highest_index_test.exs b/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs similarity index 91% rename from deps/rabbitmq_cli/test/queues/pick_member_with_highest_index_test.exs rename to deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs index 817c853a70a0..e56d8eecfea3 100644 --- a/deps/rabbitmq_cli/test/queues/pick_member_with_highest_index_test.exs +++ b/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs @@ -4,11 +4,11 @@ ## ## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. -defmodule RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommandTest do +defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommandTest do use ExUnit.Case, async: false import TestHelper - @command RabbitMQ.CLI.Queues.Commands.PickMemberWithHighestIndexCommand + @command RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand setup_all do RabbitMQ.CLI.Core.Distribution.start() From 98edd0201408edf29c9fbec82463ec1818babc7b Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Mon, 21 Jul 2025 11:28:25 +0100 Subject: [PATCH 4/7] make commit index default option for member_with_highest_index command --- .../rabbitmq/cli/queues/commands/member_with_highest_index.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex index 84a554d78bfc..c1c6e877b893 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex @@ -17,7 +17,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do def aliases(), do: [i: :index, t: :timeout] def merge_defaults(args, opts) do - {args, Map.merge(%{vhost: "/", index: "log"}, opts)} + {args, Map.merge(%{vhost: "/", index: "commit"}, opts)} end def run([name] = _args, %{vhost: vhost, index: index, node: node_name}) do From b35f7c8a72d0cc3fcf3b6329d5159d81beb51e9d Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 24 Jul 2025 18:43:59 +0100 Subject: [PATCH 5/7] add --offline-members flag to control including down members in rabbitmq-queues member_with_highest_index command --- deps/rabbit/src/rabbit_quorum_queue.erl | 17 +++++++++------ deps/rabbit/test/quorum_queue_SUITE.erl | 21 ++++++++++++++++--- .../commands/member_with_highest_index.ex | 14 +++++++------ .../queues/member_with_highest_index_test.exs | 2 +- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 12a115da7e38..b9de5a7fd4eb 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -68,7 +68,7 @@ notify_decorators/3, spawn_notify_decorators/3]). --export([get_member_with_highest_index/3]). +-export([get_member_with_highest_index/4]). -export([is_enabled/0, is_compatible/3, @@ -1337,9 +1337,9 @@ get_sys_status(Proc) -> end. --spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom()) -> +-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom(), boolean()) -> [[{binary(), term()}]] | {error, term()}. -get_member_with_highest_index(Vhost, QueueName, IndexName) -> +get_member_with_highest_index(Vhost, QueueName, IndexName, IncludeOfflineMembers) -> case ?MODULE:status(Vhost, QueueName) of Status when is_list(Status) -> IndexNameInternal = rabbit_data_coercion:to_atom(IndexName), @@ -1348,9 +1348,14 @@ get_member_with_highest_index(Vhost, QueueName, IndexName) -> {_HighestIndexValue, HighestEntry} = lists:foldl( fun(Entry, {PreviousIndexValue, _PreviousEntry} = Acc) -> - case rabbit_misc:pget(Key, Entry) of - CurrentIndexValue when is_integer(CurrentIndexValue), - CurrentIndexValue > PreviousIndexValue -> + State = rabbit_misc:pget(<<"Raft State">>, Entry), + case {rabbit_misc:pget(Key, Entry), IncludeOfflineMembers} of + {CurrentIndexValue, false} when is_integer(CurrentIndexValue), + CurrentIndexValue > PreviousIndexValue, + State /= noproc -> + {CurrentIndexValue, Entry}; + {CurrentIndexValue, true} when is_integer(CurrentIndexValue), + CurrentIndexValue > PreviousIndexValue -> {CurrentIndexValue, Entry}; _ -> Acc diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index ef50ab861cd1..502d7c4ae6b2 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -4591,8 +4591,8 @@ get_member_with_highest_index(Config) -> [{Node1, leader, 1015, 1010, 1010}, %% highest SnapshotIdx {Node2, follower, 1015, 1010, 1010}, %% highest SnapshotIdx (duplicate) {Node3, follower, 1013, 1013, 1009}, %% highest CommitIdx - {Node4, follower, 1016, 1009, 1008}, %% highest LogIdx - {Node5, follower, 1013, 1012, undefined}], + {Node4, follower, 1016, 1009, undefined}, %% highest LogIdx + {Node5, noproc, 1050, 1050, 1050}], %% highest but noproc Term = 1, MachineVersion = 7, @@ -4622,7 +4622,7 @@ get_member_with_highest_index(Config) -> {<<"Last Written">>,1016}, {<<"Last Applied">>,1016}, {<<"Commit Index">>, 1009}, - {<<"Snapshot Index">>, 1008}, + {<<"Snapshot Index">>, undefined}, {<<"Term">>, Term}, {<<"Machine Version">>, MachineVersion}]], @@ -4667,6 +4667,21 @@ get_member_with_highest_index(Config) -> [?assertEqual(ExpectedHighestSnapshotIdx, rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I)) || I <- [snapshot, snapshot_index]], + ExpectedHighestIdxForAll = + [[{<<"Node Name">>, Node5}, + {<<"Raft State">>, noproc}, + {<<"Last Log Index">>, 1050}, + {<<"Last Written">>,1050}, + {<<"Last Applied">>,1050}, + {<<"Commit Index">>, 1050}, + {<<"Snapshot Index">>, 1050}, + {<<"Term">>, Term}, + {<<"Machine Version">>, MachineVersion}]], + + [?assertEqual(ExpectedHighestIdxForAll, + rabbit_quorum_queue:get_member_with_highest_index(VHost, Q, I, true)) + || I <- [log, log_index, commit, commit_index, snapshot, snapshot_index]], + ok. leader_locator_client_local(Config) -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex index c1c6e877b893..3cd9560cf359 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex @@ -13,18 +13,19 @@ defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do use RabbitMQ.CLI.Core.AcceptsOnePositionalArgument use RabbitMQ.CLI.Core.RequiresRabbitAppRunning - def switches(), do: [index: :string, timeout: :integer] - def aliases(), do: [i: :index, t: :timeout] + def switches(), do: [offline_members: :boolean, index: :string, timeout: :integer] + def aliases(), do: [o: :offline_members, i: :index, t: :timeout] def merge_defaults(args, opts) do - {args, Map.merge(%{vhost: "/", index: "commit"}, opts)} + {args, Map.merge(%{vhost: "/", index: "commit", offline_members: true}, opts)} end - def run([name] = _args, %{vhost: vhost, index: index, node: node_name}) do + def run([name] = _args, %{vhost: vhost, index: index, node: node_name, offline_members: offline_members}) do case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :get_member_with_highest_index, [ vhost, name, - to_atom(String.downcase(index)) + to_atom(String.downcase(index)), + offline_members ]) do {:error, :classic_queue_not_supported} -> index = format_index(String.downcase(index)) @@ -42,11 +43,12 @@ defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable - def usage, do: "member_with_highest_index [--vhost ] [--index ]" + def usage, do: "member_with_highest_index [--vhost ] [--offline-members] [--index ]" def usage_additional do [ ["", "quorum queue name"], + ["--offline-members", "include members which are down (in noproc state)"], ["--index ", "name of the index to use to lookup highest member"] ] end diff --git a/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs b/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs index e56d8eecfea3..faac79be3437 100644 --- a/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs +++ b/deps/rabbitmq_cli/test/queues/member_with_highest_index_test.exs @@ -48,7 +48,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommandTest do {:badrpc, _}, @command.run( ["quorum-queue-a"], - %{node: :jake@thedog, vhost: "/", index: "log", timeout: 200} + %{node: :jake@thedog, vhost: "/", index: "log", offline_members: true, timeout: 200} ) ) end From 58dc6f2ae77578cffef726cc64ca95612ce4483c Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 24 Jul 2025 18:50:41 +0100 Subject: [PATCH 6/7] default --offline-members flag in member_with_highest_index cli command to false --- .../rabbitmq/cli/queues/commands/member_with_highest_index.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex index 3cd9560cf359..2f5d67020868 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/member_with_highest_index.ex @@ -17,7 +17,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.MemberWithHighestIndexCommand do def aliases(), do: [o: :offline_members, i: :index, t: :timeout] def merge_defaults(args, opts) do - {args, Map.merge(%{vhost: "/", index: "commit", offline_members: true}, opts)} + {args, Map.merge(%{vhost: "/", index: "commit", offline_members: false}, opts)} end def run([name] = _args, %{vhost: vhost, index: index, node: node_name, offline_members: offline_members}) do From 3a1e3eeab68ef9f219bcd68beffee79d218a0f33 Mon Sep 17 00:00:00 2001 From: Ayanda Dube Date: Thu, 24 Jul 2025 19:01:13 +0100 Subject: [PATCH 7/7] add rabbit_quorum_queue:get_member_with_highest_index/3 clause --- deps/rabbit/src/rabbit_quorum_queue.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index b9de5a7fd4eb..9dce4784e1da 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -68,7 +68,8 @@ notify_decorators/3, spawn_notify_decorators/3]). --export([get_member_with_highest_index/4]). +-export([get_member_with_highest_index/3, + get_member_with_highest_index/4]). -export([is_enabled/0, is_compatible/3, @@ -1337,6 +1338,11 @@ get_sys_status(Proc) -> end. +-spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom()) -> + [[{binary(), term()}]] | {error, term()}. +get_member_with_highest_index(Vhost, QueueName, IndexName) -> + get_member_with_highest_index(Vhost, QueueName, IndexName, false). + -spec get_member_with_highest_index(rabbit_types:vhost(), rabbit_misc:resource_name(), atom(), boolean()) -> [[{binary(), term()}]] | {error, term()}. get_member_with_highest_index(Vhost, QueueName, IndexName, IncludeOfflineMembers) ->