diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index e3fdc9847500..3efabd9fad91 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2658,23 +2658,102 @@ end}. {mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ {datatype, {enum, [true, false]}}]}. +% Pseudo-key to include the interceptor in the list of interceptors. +% - If any other configuration is provided for the interceptor this +% configuration is not required. +% - If no other configuration is provided, this one is required so that the +% interceptor gets invoked. +{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [ + {datatype, {enum, [true]}}]}. + +{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [ + {datatype, {enum, [true]}}]}. + +{mapping, + "message_interceptors.incoming.set_header_timestamp.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. +{mapping, + "message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. + +{mapping, + "message_interceptors.incoming.set_header_routing_node.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. +{mapping, + "message_interceptors.incoming.rabbit_message_interceptor_timestamp.overwrite", + "rabbit.incoming_message_interceptors", + [{datatype, {enum, [true, false]}}]}. + {translation, "rabbit.incoming_message_interceptors", fun(Conf) -> - case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of - [] -> - cuttlefish:unset(); - L -> - [begin - Interceptor = list_to_atom(Interceptor0), - case lists:member(Interceptor, [set_header_timestamp, - set_header_routing_node]) of - true -> - {Interceptor, Overwrite}; - false -> - cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) - end - end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L] - end + case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of + [] -> + cuttlefish:unset(); + L -> + InterceptorsConfig = [ + {Module0, Config, Value} + || {["message_interceptors", "incoming", Module0, Config], Value} <- L + ], + {Result, Order0} = lists:foldl( + fun({Interceptor0, Key0, Value}, {Acc, Order}) -> + Interceptor = list_to_atom(Interceptor0), + Key = list_to_atom(Key0), + MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, + % This Interceptor -> Module alias exists for + % compatibility reasons + Module = case Interceptor of + set_header_timestamp -> + rabbit_message_interceptor_timestamp; + set_header_routing_node -> + rabbit_message_interceptor_routing_node; + _ -> + Interceptor + end, + NewAcc = maps:update_with(Module, + MapPutFun, + #{Key => Value}, + Acc), + {NewAcc, [Module| Order]} + end, + {#{}, []}, + InterceptorsConfig + ), + Order = lists:uniq(Order0), + [{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] + end + end +}. + +{translation, "rabbit.outgoing_message_interceptors", + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of + [] -> + cuttlefish:unset(); + L -> + InterceptorsConfig = [ + {Module0, Config, Value} + || {["message_interceptors", "outgoing", Module0, Config], Value} <- L + ], + {Result, Order0} = lists:foldl( + fun({Interceptor0, Key0, Value}, {Acc, Order}) -> + Module = list_to_atom(Interceptor0), + Key = list_to_atom(Key0), + MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end, + NewAcc = maps:update_with(Module, + MapPutFun, + #{Key => Value}, + Acc), + {NewAcc, [Module| Order]} + end, + {#{}, []}, + InterceptorsConfig + ), + Order = lists:uniq(Order0), + [{O, maps:without([enabled], maps:get(O, Result))} || O <- Order] + end end }. diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 525b1db835ac..fee70422b0b2 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1656,7 +1656,8 @@ persist_static_configuration() -> [classic_queue_index_v2_segment_entry_count, classic_queue_store_v2_max_cache_size, classic_queue_store_v2_check_crc32, - incoming_message_interceptors + incoming_message_interceptors, + outgoing_message_interceptors ]), %% Disallow the following two cases: diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index d72a9666fe4f..606c23aef211 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -283,7 +283,8 @@ max_handle :: link_handle(), max_incoming_window :: pos_integer(), max_link_credit :: pos_integer(), - max_queue_credit :: pos_integer() + max_queue_credit :: pos_integer(), + msg_interceptor_ctx :: map() }). -record(state, { @@ -474,7 +475,11 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, max_handle = EffectiveHandleMax, max_incoming_window = MaxIncomingWindow, max_link_credit = MaxLinkCredit, - max_queue_credit = MaxQueueCredit + max_queue_credit = MaxQueueCredit, + msg_interceptor_ctx = #{protocol => ?PROTOCOL, + username => User#user.username, + vhost => Vhost, + conn_name => ConnName} }}}. terminate(_Reason, #state{incoming_links = IncomingLinks, @@ -2411,7 +2416,8 @@ incoming_link_transfer( trace_state = Trace, conn_name = ConnName, channel_num = ChannelNum, - max_link_credit = MaxLinkCredit}}) -> + max_link_credit = MaxLinkCredit, + msg_interceptor_ctx = MsgInterceptorCtx}}) -> {PayloadBin, DeliveryId, Settled} = case MultiTransfer of @@ -2436,7 +2442,9 @@ incoming_link_transfer( Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of {ok, X, RoutingKeys, Mc1, PermCache} -> - Mc2 = rabbit_message_interceptor:intercept(Mc1), + Mc2 = rabbit_message_interceptor:intercept(Mc1, + MsgInterceptorCtx, + incoming_message_interceptors), check_user_id(Mc2, User), TopicPermCache = check_write_permitted_on_topics( X, User, RoutingKeys, TopicPermCache0), diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 86d71d7af902..c188fd70bbd7 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -110,7 +110,8 @@ authz_context, max_consumers, % taken from rabbit.consumer_max_per_channel %% defines how ofter gc will be executed - writer_gc_threshold + writer_gc_threshold, + msg_interceptor_ctx }). -record(pending_ack, { @@ -509,7 +510,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_timeout = ConsumerTimeout, authz_context = OptionalVariables, max_consumers = MaxConsumers, - writer_gc_threshold = GCThreshold + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = #{protocol => amqp091, + username => User#user.username, + vhost => VHost, + conn_name => ConnName} }, limiter = Limiter, tx = none, @@ -813,6 +818,7 @@ get_consumer_timeout() -> _ -> undefined end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -1167,7 +1173,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, user = #user{username = Username} = User, trace_state = TraceState, authz_context = AuthzContext, - writer_gc_threshold = GCThreshold + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = MsgInterceptorCtx }, tx = Tx, confirm_enabled = ConfirmEnabled, @@ -1206,7 +1213,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]); {ok, Message0} -> check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext), - Message = rabbit_message_interceptor:intercept(Message0), + Message = rabbit_message_interceptor:intercept(Message0, + MsgInterceptorCtx, + incoming_message_interceptors), check_user_id_header(Message, User), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), [deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames], diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl index 0d28fe6ef9af..b218c46955e8 100644 --- a/deps/rabbit/src/rabbit_message_interceptor.erl +++ b/deps/rabbit/src/rabbit_message_interceptor.erl @@ -1,50 +1,49 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - -%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp -%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can -%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that -%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages. -module(rabbit_message_interceptor). --include("mc.hrl"). --export([intercept/1]). +-export([intercept/3, + set_msg_annotation/4]). + +-type protocol() :: amqp091 | amqp10 | mqtt310 | mqtt311 | mqtt50. --define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). --define(HEADER_ROUTING_NODE, <<"x-routed-by">>). +-type msg_interceptor_ctx() :: #{protocol := protocol(), + vhost := binary(), + username := binary(), + conn_name => binary(), + atom() => term()}. --spec intercept(mc:state()) -> mc:state(). -intercept(Msg) -> - Interceptors = persistent_term:get(incoming_message_interceptors, []), - lists:foldl(fun({InterceptorName, Overwrite}, M) -> - intercept(M, InterceptorName, Overwrite) - end, Msg, Interceptors). +-callback intercept(Msg, MsgInterceptorCtx, Group, Config) -> Msg when + Msg :: mc:state(), + MsgInterceptorCtx :: msg_interceptor_ctx(), + Group :: incoming_message_interceptors | outgoing_message_interceptors, + Config :: #{atom() := term()}. -intercept(Msg, set_header_routing_node, Overwrite) -> - Node = atom_to_binary(node()), - set_annotation(Msg, ?HEADER_ROUTING_NODE, Node, Overwrite); -intercept(Msg0, set_header_timestamp, Overwrite) -> - Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), - Msg = set_annotation(Msg0, ?HEADER_TIMESTAMP, Ts, Overwrite), - set_timestamp(Msg, Ts, Overwrite). +-spec intercept(Msg, MsgInterceptorCtx, Group) -> Msg when + Msg :: mc:state(), + MsgInterceptorCtx :: map(), + Group :: incoming_message_interceptors | outgoing_message_interceptors. +intercept(Msg, MsgInterceptorCtx, Group) -> + Interceptors = persistent_term:get(Group, []), + lists:foldl(fun({Module, Config}, Msg0) -> + try + Module:intercept(Msg0, + MsgInterceptorCtx, + Group, + Config) + catch + error:undef -> + Msg0 + end + end, Msg , Interceptors). --spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) -> mc:state(). -set_annotation(Msg, Key, Value, Overwrite) -> +-spec set_msg_annotation(mc:state(), + mc:ann_key(), + mc:ann_value(), + boolean() + ) -> mc:state(). +set_msg_annotation(Msg, Key, Value, Overwrite) -> case {mc:x_header(Key, Msg), Overwrite} of {Val, false} when Val =/= undefined -> Msg; _ -> mc:set_annotation(Key, Value, Msg) end. - --spec set_timestamp(mc:state(), pos_integer(), boolean()) -> mc:state(). -set_timestamp(Msg, Timestamp, Overwrite) -> - case {mc:timestamp(Msg), Overwrite} of - {Ts, false} when is_integer(Ts) -> - Msg; - _ -> - mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg) - end. diff --git a/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl b/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl new file mode 100644 index 000000000000..1b3f384bf904 --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor_routing_node.erl @@ -0,0 +1,14 @@ +-module(rabbit_message_interceptor_routing_node). +-behaviour(rabbit_message_interceptor). + +-define(HEADER_ROUTING_NODE, <<"x-routed-by">>). + +-export([intercept/4]). + +intercept(Msg, _MsgInterceptorCtx, _Group, Config) -> + Node = atom_to_binary(node()), + Overwrite = maps:get(overwrite, Config, false), + rabbit_message_interceptor:set_msg_annotation(Msg, + ?HEADER_ROUTING_NODE, + Node, + Overwrite). diff --git a/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl b/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl new file mode 100644 index 000000000000..058fd757f5ca --- /dev/null +++ b/deps/rabbit/src/rabbit_message_interceptor_timestamp.erl @@ -0,0 +1,26 @@ +-module(rabbit_message_interceptor_timestamp). +-behaviour(rabbit_message_interceptor). + +-include("mc.hrl"). + +-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). + +-export([intercept/4]). + +intercept(Msg0, _MsgInterceptorCtx, _Group, Config) -> + Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), + Overwrite = maps:get(overwrite, Config, false), + Msg = rabbit_message_interceptor:set_msg_annotation( + Msg0, + ?HEADER_TIMESTAMP, + Ts, + Overwrite), + set_msg_timestamp(Msg, Ts, Overwrite). + +set_msg_timestamp(Msg, Timestamp, Overwrite) -> + case {mc:timestamp(Msg), Overwrite} of + {Ts, false} when is_integer(Ts) -> + Msg; + _ -> + mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg) + end. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 4b2e5e43623c..db060329f207 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -4380,8 +4380,11 @@ available_messages(QType, Config) -> incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, - ok = rpc(Config, persistent_term, put, [Key, [{set_header_routing_node, false}, - {set_header_timestamp, false}]]), + ok = rpc(Config, + persistent_term, + put, + [Key, [{rabbit_message_interceptor_routing_node, #{overwrite => false}}, + {rabbit_message_interceptor_timestamp, #{overwrite => false}}]]), Stream = <<"my stream">>, QQName = <<"my quorum queue">>, {_, Session, LinkPair} = Init = init(Config), diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index cc353e23337f..1add57451c74 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1100,7 +1100,9 @@ credential_validator.regexp = ^abc\\d+", {message_interceptors, "message_interceptors.incoming.set_header_timestamp.overwrite = true", [{rabbit, [ - {incoming_message_interceptors, [{set_header_timestamp, true}]} + {incoming_message_interceptors, [ + {rabbit_message_interceptor_timestamp, #{overwrite => true}} + ]} ]}], []}, @@ -1110,8 +1112,54 @@ credential_validator.regexp = ^abc\\d+", message_interceptors.incoming.set_header_timestamp.overwrite = false ", [{rabbit, [ - {incoming_message_interceptors, [{set_header_routing_node, false}, - {set_header_timestamp, false}]} + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{overwrite => false}}, + {rabbit_message_interceptor_timestamp, #{overwrite => false}} + ]} + ]}], + []}, + + % Enable key allows to configure interceptors with empty conf + {message_interceptors, + " + message_interceptors.incoming.set_header_routing_node.enabled = true + ", + [{rabbit, [ + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{}} + ]} + ]}], + []}, + + % An interceptor can be configured twice, with different options, both in + % incoming and outgoing group of interceptors + {message_interceptors, + " + message_interceptors.incoming.rabbit_message_interceptor_routing_node.overwrite = true + message_interceptors.outgoing.rabbit_message_interceptor_routing_node.enabled = true + ", + [{rabbit, [ + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{overwrite => true}} + ]}, + {outgoing_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{}} + ]} + ]}], + []}, + + % Given a parameter gets configured multiple times, last value prevails + {message_interceptors, + " + message_interceptors.incoming.set_header_routing_node.overwrite = true + message_interceptors.incoming.set_header_routing_node.overwrite = false + message_interceptors.incoming.set_header_routing_node.overwrite = true + message_interceptors.incoming.set_header_routing_node.overwrite = false + ", + [{rabbit, [ + {incoming_message_interceptors, [ + {rabbit_message_interceptor_routing_node, #{overwrite => false}} + ]} ]}], []}, diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 4b5feddb509d..00d73d719d88 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -348,8 +348,10 @@ amqpl_amqp_bin_amqpl(_Config) -> Msg0 = mc:init(mc_amqpl, Content, annotations()), ok = persistent_term:put(incoming_message_interceptors, - [{set_header_timestamp, false}]), - Msg = rabbit_message_interceptor:intercept(Msg0), + [{rabbit_message_interceptor_timestamp, #{overwrite => false}}]), + Msg = rabbit_message_interceptor:intercept(Msg0, + #{}, + incoming_message_interceptors), ?assertEqual(<<"exch">>, mc:exchange(Msg)), ?assertEqual([<<"apple">>], mc:routing_keys(Msg)), diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl index 1abc39d0b042..37183408e68f 100644 --- a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl +++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl @@ -40,9 +40,9 @@ init_per_testcase(Testcase, Config0) -> headers_no_overwrite -> false end, Val = maps:to_list( - maps:from_keys([set_header_timestamp, - set_header_routing_node], - Overwrite)), + maps:from_keys([rabbit_message_interceptor_timestamp, + rabbit_message_interceptor_routing_node], + #{overwrite => Overwrite})), Config = rabbit_ct_helpers:merge_app_env( Config1, {rabbit, [{incoming_message_interceptors, Val}]}), rabbit_ct_helpers:run_steps( diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index b69e2b06075c..89f15fed3ea7 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -303,3 +303,9 @@ end}. {datatype, integer}, {validators, ["non_negative_integer"]} ]}. + +{mapping, "message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key", + "rabbit.incoming_message_interceptors", + [{datatype, string}, + {default, "x-opt-mqtt-client-id"}] +}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl new file mode 100644 index 000000000000..eda84589d920 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_message_interceptor_client_id.erl @@ -0,0 +1,17 @@ +-module(rabbit_mqtt_message_interceptor_client_id). + +-behaviour(rabbit_message_interceptor). + +-export([intercept/4]). + +intercept(Msg, + #{client_id := ClientId}, + incoming_message_interceptors, + #{annotation_key := AnnotationKey} + ) -> + rabbit_message_interceptor:set_msg_annotation(Msg, + AnnotationKey, + ClientId, + true); +intercept(Msg, _MsgInterceptorCtx, _Group, _Config) -> + Msg. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 7ae0893a13eb..cce8499e7e93 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -5,7 +5,6 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_mqtt_processor). - -feature(maybe_expr, enable). -export([info/2, init/4, process_packet/2, @@ -1635,10 +1634,13 @@ publish_to_queues( conn_name = ConnName, trace_state = TraceState}, auth_state = #auth_state{user = #user{username = Username}}} = State) -> + MsgInterceptorCtx = build_msg_interceptor_ctx(State), Anns = #{?ANN_EXCHANGE => ExchangeNameBin, ?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]}, Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), - Msg = rabbit_message_interceptor:intercept(Msg0), + Msg = rabbit_message_interceptor:intercept(Msg0, + MsgInterceptorCtx, + incoming_message_interceptors), case rabbit_exchange:lookup(ExchangeName) of {ok, Exchange} -> QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}), @@ -2607,3 +2609,15 @@ mc_env() -> MqttX -> #{mqtt_x => MqttX} end. + +build_msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId, + conn_name = ConnName, + vhost = VHost, + proto_ver = ProtoVer + }, + auth_state = #auth_state{user = #user{username = Username}}}) -> + #{protocol => ProtoVer, + username => Username, + vhost => VHost, + conn_name => ConnName, + client_id => ClientId}. diff --git a/deps/rabbitmq_mqtt/test/client_id_interceptor_SUITE.erl b/deps/rabbitmq_mqtt/test/client_id_interceptor_SUITE.erl new file mode 100644 index 000000000000..4796ec744197 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/client_id_interceptor_SUITE.erl @@ -0,0 +1,97 @@ +-module(client_id_interceptor_SUITE). +-compile([export_all, + nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-import(util, + [connect/2]). + +all() -> + [{group, intercept}]. + +groups() -> + [ + {intercept, [], [incoming]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + Config. +init_per_testcase(Testcase, Config0) -> + Config1 = rabbit_ct_helpers:set_config( + Config0, [{rmq_nodename_suffix, Testcase}]), + Val = maps:to_list( + maps:from_keys([rabbit_mqtt_message_interceptor_client_id], + #{annotation_key => <<"x-client_id">>})), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, {rabbit, [{incoming_message_interceptors, Val}]}), + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun start_amqp10_client_app/1]), + rabbit_ct_helpers:testcase_started(Config3, Testcase). + +end_per_testcase(Testcase, Config0) -> + Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase), + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). + +start_amqp10_client_app(Config) -> + ?assertMatch({ok, _}, application:ensure_all_started(amqp10_client)), + Config. + +incoming(Config) -> + Host = ?config(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + ClientId = Container = atom_to_binary(?FUNCTION_NAME), + + %% With AMQP 1.0 + OpnConf = #{address => Host, + port => Port, + container_id => Container, + sasl => {plain, <<"guest">>, <<"guest">>}}, + {ok, Connection1} = amqp10_client:open_connection(OpnConf), + {ok, Session1} = amqp10_client:begin_session(Connection1), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session1, <<"pair">>), + QName = <<"queue for AMQP 1.0 client">>, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"topic.1">>, #{}), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session1, <<"test-receiver">>, + rabbitmq_amqp_address:queue(QName), + unsettled, configuration), + + C = connect(ClientId, Config), + Correlation = <<"some correlation ID">>, + ContentType = <<"text/plain">>, + RequestPayload = <<"my request">>, + {ok, _} = emqtt:publish(C, <<"topic/1">>, + #{'Content-Type' => ContentType, + 'Correlation-Data' => Correlation}, + RequestPayload, [{qos, 1}]), + + {ok, Msg1} = amqp10_client:get_msg(Receiver), + Props = amqp10_msg:message_annotations(Msg1), + ?assertMatch(ClientId, maps:get(<<"x-client_id">>, Props)), + + % With AMQP 0.9 + Ch = rabbit_ct_client_helpers:open_channel(Config), + + ?_assertMatch({#'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{headers = [{<<"x-client_id">>, + longstr, + <<"incoming">>}]} }}, + amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true})), + + rabbit_ct_client_helpers:close_channel(Ch), + emqtt:disconnect(C). diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 6aae9c152d78..1f151651f5a1 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -1779,7 +1779,10 @@ default_queue_type(Config) -> incoming_message_interceptors(Config) -> Key = ?FUNCTION_NAME, - ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), + ok = rpc(Config, + persistent_term, + put, + [Key, [{rabbit_message_interceptor_timestamp, #{overwrite => false}}]]), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME), CQName = <<"my classic queue">>,