Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
[{partitions, integer},
{binding_keys, string},
{routing_keys, string},
{exchange_type, string},
{max_length_bytes, string},
{max_age, string},
{stream_max_segment_size_bytes, string},
Expand All @@ -61,7 +62,11 @@
"Specify --partitions or --binding-keys, not both."};
validate([_Name], #{partitions := _, routing_keys := _}) ->
{validation_failure,
"Specify --partitions or --binding-keys, not both."};
"Specify --partitions or routing-keys, not both."};
validate([_Name],
#{exchange_type := <<"x-super-stream">>, routing_keys := _}) ->
{validation_failure,
"Exchange type x-super-stream cannot be used with routing-keys."};
validate([_Name], #{partitions := Partitions}) when Partitions < 1 ->
{validation_failure, "The partition number must be greater than 0"};
validate([_Name], Opts) ->
Expand Down Expand Up @@ -132,6 +137,17 @@
"Invalid value for --initial-cluster-size, the "
"value must be a positive integer."}
end;
validate_stream_arguments(#{exchange_type := Type} = Opts) ->
case Type of
<<"direct">> ->
validate_stream_arguments(maps:remove(exchange_type, Opts));
<<"x-super-stream">> ->
validate_stream_arguments(maps:remove(exchange_type, Opts));
_ ->
{validation_failure,
"Invalid value for --exchange_type, must be one of:"
"'direct' or 'x-super-stream'"}
end;
validate_stream_arguments(_) ->
ok.

Expand Down Expand Up @@ -172,48 +188,31 @@

run([SuperStream],
#{node := NodeName,
vhost := VHost,

Check warning on line 191 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test mixed clusters (28, 1.18, khepri) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused

Check warning on line 191 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test mixed clusters (28, 1.18, mnesia) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused

Check warning on line 191 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test (28, 1.18, khepri) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused

Check warning on line 191 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test (28, 1.18, mnesia) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused
timeout := Timeout,
partitions := Partitions} =
Opts) ->
Streams =
[list_to_binary(binary_to_list(SuperStream)
++ "-"
++ integer_to_list(K))
|| K <- lists:seq(0, Partitions - 1)],
RoutingKeys =
[integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)],
create_super_stream(NodeName,
Timeout,
VHost,
SuperStream,
Streams,
stream_arguments(Opts),
RoutingKeys);
Spec0 = maps:with([vhost, exchange_type], Opts),
Spec =
Spec0#{username => cli_acting_user(),
name => SuperStream,
partitions_source => {partition_count, Partitions},
arguments => stream_arguments(Opts)},
create_super_stream(NodeName, Timeout, Spec);
run([SuperStream],
#{node := NodeName,
vhost := VHost,

Check warning on line 204 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test mixed clusters (28, 1.18, khepri) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused

Check warning on line 204 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test mixed clusters (28, 1.18, mnesia) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused

Check warning on line 204 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test (28, 1.18, khepri) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused

Check warning on line 204 in deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl

View workflow job for this annotation

GitHub Actions / Test (28, 1.18, mnesia) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

variable 'VHost' is unused
timeout := Timeout,
binding_keys := BindingKeysStr} =
Opts) ->
BindingKeys =
[rabbit_data_coercion:to_binary(
string:strip(K))
|| K
<- string:tokens(
rabbit_data_coercion:to_list(BindingKeysStr), ",")],
Streams =
[list_to_binary(binary_to_list(SuperStream)
++ "-"
++ binary_to_list(K))
|| K <- BindingKeys],
create_super_stream(NodeName,
Timeout,
VHost,
SuperStream,
Streams,
stream_arguments(Opts),
BindingKeys).
Spec0 = maps:with([vhost, exchange_type], Opts),
RoutingKeys = [string:trim(K) || K <- string:lexemes(BindingKeysStr, ",")],
Spec =
Spec0#{username => cli_acting_user(),
name => SuperStream,
partitions_source => {routing_keys, RoutingKeys},
arguments => stream_arguments(Opts)},
create_super_stream(NodeName, Timeout, Spec).

stream_arguments(Opts) ->
stream_arguments(#{}, Opts).
Expand Down Expand Up @@ -258,28 +257,17 @@
{seconds, S}]) ->
Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S.

create_super_stream(NodeName,
Timeout,
VHost,
SuperStream,
Streams,
Arguments,
RoutingKeys) ->
create_super_stream(NodeName, Timeout, Spec) ->
case rabbit_misc:rpc_call(NodeName,
rabbit_stream_manager,
create_super_stream,
[VHost,
SuperStream,
Streams,
Arguments,
RoutingKeys,
cli_acting_user()],
[Spec],
Timeout)
of
ok ->
{ok,
rabbit_misc:format("Super stream ~ts has been created",
[SuperStream])};
[maps:get(name, Spec)])};
Error ->
Error
end.
Expand Down
102 changes: 102 additions & 0 deletions deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_exchange_type_super_stream).

-include_lib("rabbit_common/include/rabbit.hrl").

-behaviour(rabbit_exchange_type).

Check warning on line 12 in deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl

View workflow job for this annotation

GitHub Actions / Test mixed clusters (28, 1.18, khepri) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

undefined callback function route/3 (behaviour 'rabbit_exchange_type')

Check warning on line 12 in deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl

View workflow job for this annotation

GitHub Actions / Test mixed clusters (28, 1.18, mnesia) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

undefined callback function route/3 (behaviour 'rabbit_exchange_type')

Check warning on line 12 in deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl

View workflow job for this annotation

GitHub Actions / Test (28, 1.18, khepri) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

undefined callback function route/3 (behaviour 'rabbit_exchange_type')

Check warning on line 12 in deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl

View workflow job for this annotation

GitHub Actions / Test (28, 1.18, mnesia) / Test plugins (rabbitmq_stream) / rabbitmq_stream (tests)

undefined callback function route/3 (behaviour 'rabbit_exchange_type')

-export([description/0,
serialise_events/0,
route/2,
info/1,
info/2]).
-export([validate/1,
validate_binding/2,
create/2,
delete/2,
policy_changed/2,
add_binding/3,
remove_bindings/3,
assert_args_equivalence/2]).

-rabbit_boot_step({rabbit_exchange_type_super_stream_registry,
[{description, "exchange type x-super-stream: registry"},
{mfa, {rabbit_registry, register,
[exchange, <<"x-super-stream">>, ?MODULE]}},
{cleanup, {rabbit_registry, unregister,
[exchange, <<"x-super-stream">>]}},
{requires, rabbit_registry},
{enables, kernel_ready}]}).

-define(SEED, 104729).

description() ->
[{description, <<"Super stream exchange type using murmur3 hashing">>}].

serialise_events() -> false.

route(#exchange{name = Name},
#delivery{message = #basic_message{routing_keys = [RKey | _]}}) ->
%% get all bindings for the exchange and use murmur3 to generate
%% the binding key to match on
case rabbit_binding:list_for_source(Name) of
[] ->
[];
Bindings ->
N = integer_to_binary(hash_mod(RKey, length(Bindings))),
case lists:search(fun(#binding{key = Key}) ->
Key =:= N
end, Bindings) of
{value, #binding{destination = Dest}} ->
[Dest];
false ->
[]
end
end.

info(_) ->
[].

info(_, _) ->
[].

validate(_X) ->
ok.

validate_binding(_X, #binding{key = K}) ->
try
%% just check the Key is an integer
_ = binary_to_integer(K),
ok
catch
error:badarg ->
{error,
{binding_invalid, "The binding key must be an integer: ~tp", [K]}}
end.

create(_Serial, _X) ->
ok.

delete(_Serial, _X) ->
ok.

policy_changed(_X1, _X2) ->
ok.

add_binding(_Serial, _X, _B) ->
ok.

remove_bindings(_Serial, _X, _Bs) ->
ok.

assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).

hash_mod(RKey, N) ->
murmerl3:hash_32(RKey, ?SEED) rem N.
7 changes: 3 additions & 4 deletions deps/rabbitmq_stream/src/rabbit_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@

start(_Type, _Args) ->
rabbit_stream_metrics:init(),
rabbit_global_counters:init(#{protocol => stream},
?PROTOCOL_COUNTERS),
rabbit_global_counters:init(#{protocol => stream,
queue_type => ?STREAM_QUEUE_TYPE}),
rabbit_global_counters:init([{protocol, stream}], ?PROTOCOL_COUNTERS),
rabbit_global_counters:init([{protocol, stream},
{queue_type, ?STREAM_QUEUE_TYPE}]),
rabbit_stream_sup:start_link().

tls_host() ->
Expand Down
Loading
Loading