From 39e15dd6fb374118d82f0e9fe9d51ddd0405fd6c Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 9 Dec 2025 16:58:26 +0000 Subject: [PATCH] Implement super stream exchange type --- ...CLI.Ctl.Commands.AddSuperStreamCommand.erl | 82 +++----- .../src/rabbit_exchange_type_super_stream.erl | 102 +++++++++ deps/rabbitmq_stream/src/rabbit_stream.erl | 7 +- .../src/rabbit_stream_manager.erl | 199 ++++++++---------- 4 files changed, 233 insertions(+), 157 deletions(-) create mode 100644 deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index 369963b94fd5..7d402a72e25b 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -42,6 +42,7 @@ switches() -> [{partitions, integer}, {binding_keys, string}, {routing_keys, string}, + {exchange_type, string}, {max_length_bytes, string}, {max_age, string}, {stream_max_segment_size_bytes, string}, @@ -61,7 +62,11 @@ validate([_Name], #{partitions := _, binding_keys := _}) -> "Specify --partitions or --binding-keys, not both."}; validate([_Name], #{partitions := _, routing_keys := _}) -> {validation_failure, - "Specify --partitions or --binding-keys, not both."}; + "Specify --partitions or routing-keys, not both."}; +validate([_Name], + #{exchange_type := <<"x-super-stream">>, routing_keys := _}) -> + {validation_failure, + "Exchange type x-super-stream cannot be used with routing-keys."}; validate([_Name], #{partitions := Partitions}) when Partitions < 1 -> {validation_failure, "The partition number must be greater than 0"}; validate([_Name], Opts) -> @@ -132,6 +137,17 @@ validate_stream_arguments(#{initial_cluster_size := Value} = Opts) -> "Invalid value for --initial-cluster-size, the " "value must be a positive integer."} end; +validate_stream_arguments(#{exchange_type := Type} = Opts) -> + case Type of + <<"direct">> -> + validate_stream_arguments(maps:remove(exchange_type, Opts)); + <<"x-super-stream">> -> + validate_stream_arguments(maps:remove(exchange_type, Opts)); + _ -> + {validation_failure, + "Invalid value for --exchange_type, must be one of:" + "'direct' or 'x-super-stream'"} + end; validate_stream_arguments(_) -> ok. @@ -176,44 +192,27 @@ run([SuperStream], timeout := Timeout, partitions := Partitions} = Opts) -> - Streams = - [list_to_binary(binary_to_list(SuperStream) - ++ "-" - ++ integer_to_list(K)) - || K <- lists:seq(0, Partitions - 1)], - RoutingKeys = - [integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)], - create_super_stream(NodeName, - Timeout, - VHost, - SuperStream, - Streams, - stream_arguments(Opts), - RoutingKeys); + Spec0 = maps:with([vhost, exchange_type], Opts), + Spec = + Spec0#{username => cli_acting_user(), + name => SuperStream, + partitions_source => {partition_count, Partitions}, + arguments => stream_arguments(Opts)}, + create_super_stream(NodeName, Timeout, Spec); run([SuperStream], #{node := NodeName, vhost := VHost, timeout := Timeout, binding_keys := BindingKeysStr} = Opts) -> - BindingKeys = - [rabbit_data_coercion:to_binary( - string:strip(K)) - || K - <- string:tokens( - rabbit_data_coercion:to_list(BindingKeysStr), ",")], - Streams = - [list_to_binary(binary_to_list(SuperStream) - ++ "-" - ++ binary_to_list(K)) - || K <- BindingKeys], - create_super_stream(NodeName, - Timeout, - VHost, - SuperStream, - Streams, - stream_arguments(Opts), - BindingKeys). + Spec0 = maps:with([vhost, exchange_type], Opts), + RoutingKeys = [string:trim(K) || K <- string:lexemes(BindingKeysStr, ",")], + Spec = + Spec0#{username => cli_acting_user(), + name => SuperStream, + partitions_source => {routing_keys, RoutingKeys}, + arguments => stream_arguments(Opts)}, + create_super_stream(NodeName, Timeout, Spec). stream_arguments(Opts) -> stream_arguments(#{}, Opts). @@ -258,28 +257,17 @@ duration_to_seconds([{sign, _}, {seconds, S}]) -> Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S. -create_super_stream(NodeName, - Timeout, - VHost, - SuperStream, - Streams, - Arguments, - RoutingKeys) -> +create_super_stream(NodeName, Timeout, Spec) -> case rabbit_misc:rpc_call(NodeName, rabbit_stream_manager, create_super_stream, - [VHost, - SuperStream, - Streams, - Arguments, - RoutingKeys, - cli_acting_user()], + [Spec], Timeout) of ok -> {ok, rabbit_misc:format("Super stream ~ts has been created", - [SuperStream])}; + [maps:get(name, Spec)])}; Error -> Error end. diff --git a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl new file mode 100644 index 000000000000..a498369d7286 --- /dev/null +++ b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl @@ -0,0 +1,102 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_exchange_type_super_stream). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, + serialise_events/0, + route/2, + info/1, + info/2]). +-export([validate/1, + validate_binding/2, + create/2, + delete/2, + policy_changed/2, + add_binding/3, + remove_bindings/3, + assert_args_equivalence/2]). + +-rabbit_boot_step({rabbit_exchange_type_super_stream_registry, + [{description, "exchange type x-super-stream: registry"}, + {mfa, {rabbit_registry, register, + [exchange, <<"x-super-stream">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [exchange, <<"x-super-stream">>]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-define(SEED, 104729). + +description() -> + [{description, <<"Super stream exchange type using murmur3 hashing">>}]. + +serialise_events() -> false. + +route(#exchange{name = Name}, + #delivery{message = #basic_message{routing_keys = [RKey | _]}}) -> + %% get all bindings for the exchange and use murmur3 to generate + %% the binding key to match on + case rabbit_binding:list_for_source(Name) of + [] -> + []; + Bindings -> + N = integer_to_binary(hash_mod(RKey, length(Bindings))), + case lists:search(fun(#binding{key = Key}) -> + Key =:= N + end, Bindings) of + {value, #binding{destination = Dest}} -> + [Dest]; + false -> + [] + end + end. + +info(_) -> + []. + +info(_, _) -> + []. + +validate(_X) -> + ok. + +validate_binding(_X, #binding{key = K}) -> + try + %% just check the Key is an integer + _ = binary_to_integer(K), + ok + catch + error:badarg -> + {error, + {binding_invalid, "The binding key must be an integer: ~tp", [K]}} + end. + +create(_Serial, _X) -> + ok. + +delete(_Serial, _X) -> + ok. + +policy_changed(_X1, _X2) -> + ok. + +add_binding(_Serial, _X, _B) -> + ok. + +remove_bindings(_Serial, _X, _Bs) -> + ok. + +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). + +hash_mod(RKey, N) -> + murmerl3:hash_32(RKey, ?SEED) rem N. diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index e1baceb657de..bcf9c8e3a062 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -42,10 +42,9 @@ start(_Type, _Args) -> rabbit_stream_metrics:init(), - rabbit_global_counters:init(#{protocol => stream}, - ?PROTOCOL_COUNTERS), - rabbit_global_counters:init(#{protocol => stream, - queue_type => ?STREAM_QUEUE_TYPE}), + rabbit_global_counters:init([{protocol, stream}], ?PROTOCOL_COUNTERS), + rabbit_global_counters:init([{protocol, stream}, + {queue_type, ?STREAM_QUEUE_TYPE}]), rabbit_stream_sup:start_link(). tls_host() -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index d5500de2a455..246a200ca152 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -27,7 +27,7 @@ %% API -export([create/4, delete/3, - create_super_stream/6, + create_super_stream/1, delete_super_stream/3, lookup_leader/2, lookup_local_member/2, @@ -38,6 +38,23 @@ partition_index/3, reset_offset/3]). +-record(state, {configuration}). + +-type super_stream_spec() :: + #{name := binary(), + vhost := binary(), + username := binary(), + partitions_source := + {partition_count, pos_integer()} | {routing_keys, [binary()]}, + arguments => map(), + exchange_type => binary()}. + +start_link(Conf) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []). + +init([Conf]) -> + {ok, #state{configuration = Conf}}. + -spec create(binary(), binary(), #{binary() => binary()}, binary()) -> {ok, map()} | {error, reference_already_exists} | @@ -89,80 +106,41 @@ delete(VirtualHost, Reference, Username) -> {error, reference_not_found} end. --spec create_super_stream(binary(), - binary(), - [binary()], - #{binary() => binary()}, - [binary()], - binary()) -> - ok | {error, term()}. -create_super_stream(VirtualHost, - Name, - Partitions, - Arguments, - BindingKeys, - Username) -> - case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of - {error, Reason} -> - {error, Reason}; - ok -> - case declare_super_stream_exchange(VirtualHost, Name, Username) of - ok -> - RollbackOperations = - [fun() -> - delete_super_stream_exchange(VirtualHost, Name, - Username) - end], - QueueCreationsResult = - lists:foldl(fun (Partition, {ok, RollbackOps}) -> - Args = - default_super_stream_arguments(Arguments), - case create(VirtualHost, - Partition, - Args, - Username) - of - {ok, _} -> - {ok, - [fun() -> - delete(VirtualHost, - Partition, - Username) - end] - ++ RollbackOps}; - {error, Reason} -> - {{error, Reason}, - RollbackOps} - end; - (_, - {{error, _Reason}, _RollbackOps} = - Acc) -> - Acc - end, - {ok, RollbackOperations}, Partitions), - case QueueCreationsResult of - {ok, RollbackOps} -> - BindingsResult = - add_super_stream_bindings(VirtualHost, - Name, - Partitions, - BindingKeys, - Username), - case BindingsResult of - ok -> - ok; - Error -> - _ = [Fun() || Fun <- RollbackOps], - Error - end; - {{error, Reason}, RollbackOps} -> - _ = [Fun() || Fun <- RollbackOps], - {error, Reason} - end; - {error, Msg} -> - {error, Msg} - end - end. +-spec create_super_stream(super_stream_spec()) -> + ok | {error, term()}. +create_super_stream(#{exchange_type := <<"x-super-stream">>, + partitions_source := {routing_keys, _}}) -> + {error, unsupported_specification}; +create_super_stream(#{name := Name, + vhost := VHost, + username := Username, + partitions_source := PartitionSource} = + Spec) -> + Type = maps:get(exchange_type, Spec, <<"direct">>), + Arguments = maps:get(arguments, Spec, #{}), + {Partitions, RoutingKeys} = + case PartitionSource of + {partition_count, Count} -> + Streams = + [rabbit_stream_utils:partition_name(Name, K) + || K <- lists:seq(0, Count - 1)], + Keys = [integer_to_binary(K) || K <- lists:seq(0, Count - 1)], + {Streams, Keys}; + {routing_keys, Keys} -> + Streams = + [rabbit_stream_utils:partition_name(Name, K) || K <- Keys], + {Streams, Keys} + end, + + gen_server:call(?MODULE, + {create_super_stream, + VHost, + Name, + Type, + Partitions, + Arguments, + RoutingKeys, + Username}). -spec delete_super_stream(binary(), binary(), binary()) -> ok | {error, term()}. @@ -355,14 +333,14 @@ partition_index(VirtualHost, SuperStream, Stream) -> "super stream ~tp (virtual host ~tp)", [Stream, SuperStream, VirtualHost]), try - _ = rabbit_exchange:lookup_or_die(ExchangeName), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), UnorderedBindings = _ = [Binding || Binding = #binding{destination = #resource{name = Q} = D} <- rabbit_binding:list_for_source(ExchangeName), is_resource_stream_queue(D), Q == Stream], OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), + rabbit_stream_utils:sort_partitions(Exchange, UnorderedBindings), ?LOG_DEBUG("Bindings: ~tp", [OrderedBindings]), case OrderedBindings of [] -> @@ -588,14 +566,14 @@ do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username) -> super_stream_partitions(VirtualHost, SuperStream) -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), try - _ = rabbit_exchange:lookup_or_die(ExchangeName), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), UnorderedBindings = [Binding || Binding = #binding{destination = D} <- rabbit_binding:list_for_source(ExchangeName), is_resource_stream_queue(D)], OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), + rabbit_stream_utils:sort_partitions(Exchange, UnorderedBindings), {ok, lists:foldl(fun (#binding{destination = #resource{kind = queue, name = Q}}, @@ -701,15 +679,16 @@ check_already_existing_queue0(VirtualHost, [Q | T], _Error) -> rabbit_misc:format("~ts is not a correct name for a queue", [Q])}} end. -declare_super_stream_exchange(VirtualHost, Name, Username) -> +declare_super_stream_exchange(VirtualHost, Name, Type, Username) -> case rabbit_stream_utils:enforce_correct_name(Name) of {ok, CorrectName} -> Args = - rabbit_misc:set_table_value([], - <<"x-super-stream">>, - bool, - true), - CheckedType = rabbit_exchange:check_type(<<"direct">>), + rabbit_misc:set_table_value([], + <<"x-super-stream">>, + bool, + true), + CheckedType = rabbit_exchange:check_type(Type), + ?LOG_DEBUG("CheckedType ~p", [CheckedType]), ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), XResult = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> @@ -750,28 +729,30 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> add_super_stream_bindings(VirtualHost, Name, + Type, Partitions, BindingKeys, Username) -> PartitionsBindingKeys = lists:zip(Partitions, BindingKeys), BindingsResult = - lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) -> - case add_super_stream_binding(VirtualHost, - Name, - Partition, - BindingKey, - Order, - Username) - of - ok -> - {ok, Order + 1}; - {error, Reason} -> - {{error, Reason}, 0} - end; - (_, {{error, _Reason}, _Order} = Acc) -> - Acc - end, - {ok, 0}, PartitionsBindingKeys), + lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) -> + case add_super_stream_binding(VirtualHost, + Name, + Type, + Partition, + RoutingKey, + Order, + Username) + of + ok -> + {ok, Order + 1}; + {error, Reason} -> + {{error, Reason}, 0} + end; + (_, {{error, _Reason}, _Order} = Acc) -> + Acc + end, + {ok, 0}, PartitionsBindingKeys), case BindingsResult of {ok, _} -> ok; @@ -781,6 +762,7 @@ add_super_stream_bindings(VirtualHost, add_super_stream_binding(VirtualHost, SuperStream, + ExchangeType, Partition, BindingKey, Order, @@ -793,10 +775,15 @@ add_super_stream_binding(VirtualHost, QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin), Pid = self(), Arguments = - rabbit_misc:set_table_value([], - <<"x-stream-partition-order">>, - long, - Order), + case ExchangeType of + <<"direct">> -> + rabbit_misc:set_table_value([], + <<"x-stream-partition-order">>, + long, + Order); + _ -> + [] + end, case rabbit_binding:add(#binding{source = ExchangeName, destination = QueueName, key = BindingKey,