diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 8326990d9e11..c57975f0cce9 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -273,7 +273,7 @@ PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_on PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor -PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue +PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_msg_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue diff --git a/deps/rabbit/ct.test.spec b/deps/rabbit/ct.test.spec index 62f63daff854..104f7f40bfda 100644 --- a/deps/rabbit/ct.test.spec +++ b/deps/rabbit/ct.test.spec @@ -115,7 +115,7 @@ , rabbit_fifo_prop_SUITE , rabbit_fifo_v0_SUITE , rabbit_local_random_exchange_SUITE -, rabbit_message_interceptor_SUITE +, rabbit_msg_interceptor_SUITE , rabbit_stream_coordinator_SUITE , rabbit_stream_sac_coordinator_SUITE , rabbitmq_4_0_deprecations_SUITE diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 1118c7827ab0..ba20e864fdb3 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2664,27 +2664,54 @@ end}. %% %% Message interceptors %% -{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [ + +{mapping, "message_interceptors.$stage.$name.$key", "rabbit.message_interceptors", [ {datatype, {enum, [true, false]}}]}. -{translation, "rabbit.incoming_message_interceptors", - fun(Conf) -> +{translation, "rabbit.message_interceptors", + fun(Conf) -> case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of [] -> cuttlefish:unset(); L -> - [begin - Interceptor = list_to_atom(Interceptor0), - case lists:member(Interceptor, [set_header_timestamp, - set_header_routing_node]) of - true -> - {Interceptor, Overwrite}; - false -> - cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor])) - end - end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L] + lists:foldr( + fun({["message_interceptors", "incoming", "set_header_routing_node", "overwrite"], Overwrite}, Acc) + when is_boolean(Overwrite) -> + Mod = rabbit_msg_interceptor_routing_node, + Cfg = #{overwrite => Overwrite}, + [{Mod, Cfg} | Acc]; + ({["message_interceptors", "incoming", "set_header_timestamp", "overwrite"], Overwrite}, Acc) + when is_boolean(Overwrite) -> + Mod = rabbit_msg_interceptor_timestamp, + Cfg = #{incoming => true, + overwrite => Overwrite}, + case lists:keytake(Mod, 1, Acc) of + false -> + [{Mod, Cfg} | Acc]; + {value, {Mod, Cfg1}, Acc1} -> + Cfg2 = maps:merge(Cfg1, Cfg), + [{Mod, Cfg2} | Acc1] + end; + ({["message_interceptors", "outgoing", "timestamp", "enabled"], Enabled}, Acc) -> + case Enabled of + true -> + Mod = rabbit_msg_interceptor_timestamp, + Cfg = #{outgoing => true}, + case lists:keytake(Mod, 1, Acc) of + false -> + [{Mod, Cfg} | Acc]; + {value, {Mod, Cfg1}, Acc1} -> + Cfg2 = maps:merge(Cfg1, Cfg), + [{Mod, Cfg2} | Acc1] + end; + false -> + Acc + end; + (Other, _Acc) -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Other])) + end, [], L) end - end + end }. {mapping, "stream.replication.port_range.min", "osiris.port_range", [ diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index cac190e2cb5e..37602df7fed7 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -462,7 +462,6 @@ protocol_state(#content{properties = #'P_basic'{headers = H00, priority = Priority0, delivery_mode = DeliveryMode0} = B0} = C, Anns) -> - %% Add any x- annotations as headers H0 = case H00 of undefined -> []; _ -> @@ -474,6 +473,7 @@ protocol_state(#content{properties = #'P_basic'{headers = H00, _ -> H0 end, + %% Add any x- annotations as headers Headers1 = maps:fold( fun (<<"x-", _/binary>> = Key, Val, H) when is_integer(Val) -> [{Key, long, Val} | H]; diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index 525b1db835ac..20bd4765b2a3 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -1655,10 +1655,12 @@ persist_static_configuration() -> persist_static_configuration( [classic_queue_index_v2_segment_entry_count, classic_queue_store_v2_max_cache_size, - classic_queue_store_v2_check_crc32, - incoming_message_interceptors + classic_queue_store_v2_check_crc32 ]), + Interceptors = application:get_env(?MODULE, message_interceptors, []), + ok = rabbit_msg_interceptor:add(Interceptors), + %% Disallow the following two cases: %% 1. Negative values %% 2. MoreCreditAfter greater than InitialCredit diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index d72a9666fe4f..caa2024fa1e9 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -283,7 +283,8 @@ max_handle :: link_handle(), max_incoming_window :: pos_integer(), max_link_credit :: pos_integer(), - max_queue_credit :: pos_integer() + max_queue_credit :: pos_integer(), + msg_interceptor_ctx :: rabbit_msg_interceptor:context() }). -record(state, { @@ -474,7 +475,11 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, max_handle = EffectiveHandleMax, max_incoming_window = MaxIncomingWindow, max_link_credit = MaxLinkCredit, - max_queue_credit = MaxQueueCredit + max_queue_credit = MaxQueueCredit, + msg_interceptor_ctx = #{protocol => ?PROTOCOL, + vhost => Vhost, + username => User#user.username, + connection_name => ConnName} }}}. terminate(_Reason, #state{incoming_links = IncomingLinks, @@ -2159,7 +2164,8 @@ handle_deliver(ConsumerTag, AckRequired, conn_name = ConnName, channel_num = ChannelNum, user = #user{username = Username}, - trace_state = Trace}}) -> + trace_state = Trace, + msg_interceptor_ctx = MsgIcptCtx}}) -> Handle = ctag_to_handle(ConsumerTag), case OutgoingLinks0 of #{Handle := #outgoing_link{queue_type = QType, @@ -2174,8 +2180,9 @@ handle_deliver(ConsumerTag, AckRequired, delivery_tag = {binary, Dtag}, message_format = ?UINT(?MESSAGE_FORMAT), settled = SendSettled}, - Mc1 = mc:convert(mc_amqp, Mc0), - Mc = mc:set_annotation(redelivered, Redelivered, Mc1), + Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx), + Mc2 = mc:convert(mc_amqp, Mc1), + Mc = mc:set_annotation(redelivered, Redelivered, Mc2), Sections = mc:protocol_state(Mc), validate_message_size(Sections, MaxMessageSize), Frames = transfer_frames(Transfer, Sections, MaxFrameSize), @@ -2411,7 +2418,8 @@ incoming_link_transfer( trace_state = Trace, conn_name = ConnName, channel_num = ChannelNum, - max_link_credit = MaxLinkCredit}}) -> + max_link_credit = MaxLinkCredit, + msg_interceptor_ctx = MsgIcptCtx}}) -> {PayloadBin, DeliveryId, Settled} = case MultiTransfer of @@ -2436,10 +2444,10 @@ incoming_link_transfer( Mc0 = mc:init(mc_amqp, PayloadBin, #{}), case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of {ok, X, RoutingKeys, Mc1, PermCache} -> - Mc2 = rabbit_message_interceptor:intercept(Mc1), - check_user_id(Mc2, User), + check_user_id(Mc1, User), TopicPermCache = check_write_permitted_on_topics( X, User, RoutingKeys, TopicPermCache0), + Mc2 = rabbit_msg_interceptor:intercept_incoming(Mc1, MsgIcptCtx), QNames = rabbit_exchange:route(X, Mc2, #{return_binding_keys => true}), rabbit_trace:tap_in(Mc2, QNames, ConnName, ChannelNum, Username, Trace), Opts = #{correlation => {HandleInt, DeliveryId}}, diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 86d71d7af902..38614fc4de72 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -110,7 +110,8 @@ authz_context, max_consumers, % taken from rabbit.consumer_max_per_channel %% defines how ofter gc will be executed - writer_gc_threshold + writer_gc_threshold, + msg_interceptor_ctx :: rabbit_msg_interceptor:context() }). -record(pending_ack, { @@ -492,6 +493,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams), {ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold), MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity), + MsgIcptCtx = #{protocol => amqp091, + vhost => VHost, + username => User#user.username, + connection_name => ConnName}, State = #ch{cfg = #conf{state = starting, protocol = Protocol, channel = Channel, @@ -509,8 +514,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_timeout = ConsumerTimeout, authz_context = OptionalVariables, max_consumers = MaxConsumers, - writer_gc_threshold = GCThreshold - }, + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = MsgIcptCtx}, limiter = Limiter, tx = none, next_tag = 1, @@ -657,13 +662,14 @@ handle_cast({deliver_reply, _K, _Del}, noreply(State); handle_cast({deliver_reply, _K, _Msg}, State = #ch{reply_consumer = none}) -> noreply(State); -handle_cast({deliver_reply, Key, Msg}, - State = #ch{cfg = #conf{writer_pid = WriterPid}, +handle_cast({deliver_reply, Key, Mc}, + State = #ch{cfg = #conf{writer_pid = WriterPid, + msg_interceptor_ctx = MsgIcptCtx}, next_tag = DeliveryTag, reply_consumer = {ConsumerTag, _Suffix, Key}}) -> - Content = mc:protocol_state(mc:convert(mc_amqpl, Msg)), - ExchName = mc:exchange(Msg), - [RoutingKey | _] = mc:routing_keys(Msg), + ExchName = mc:exchange(Mc), + [RoutingKey | _] = mc:routing_keys(Mc), + Content = outgoing_content(Mc, MsgIcptCtx), ok = rabbit_writer:send_command( WriterPid, #'basic.deliver'{consumer_tag = ConsumerTag, @@ -813,6 +819,7 @@ get_consumer_timeout() -> _ -> undefined end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -1167,7 +1174,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, user = #user{username = Username} = User, trace_state = TraceState, authz_context = AuthzContext, - writer_gc_threshold = GCThreshold + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = MsgIcptCtx }, tx = Tx, confirm_enabled = ConfirmEnabled, @@ -1206,8 +1214,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]); {ok, Message0} -> check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext), - Message = rabbit_message_interceptor:intercept(Message0), - check_user_id_header(Message, User), + check_user_id_header(Message0, User), + Message = rabbit_msg_interceptor:intercept_incoming(Message0, MsgIcptCtx), QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}), [deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames], Queues = rabbit_amqqueue:lookup_many(QNames), @@ -2592,15 +2600,15 @@ handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) -> end, State, Msgs). handle_deliver0(ConsumerTag, AckRequired, - {QName, QPid, _MsgId, Redelivered, MsgCont0} = Msg, + {QName, QPid, _MsgId, Redelivered, Mc} = Msg, State = #ch{cfg = #conf{writer_pid = WriterPid, - writer_gc_threshold = GCThreshold}, + writer_gc_threshold = GCThreshold, + msg_interceptor_ctx = MsgIcptCtx}, next_tag = DeliveryTag, queue_states = Qs}) -> - Exchange = mc:exchange(MsgCont0), - [RoutingKey | _] = mc:routing_keys(MsgCont0), - MsgCont = mc:convert(mc_amqpl, MsgCont0), - Content = mc:protocol_state(MsgCont), + Exchange = mc:exchange(Mc), + [RoutingKey | _] = mc:routing_keys(Mc), + Content = outgoing_content(Mc, MsgIcptCtx), Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, redelivered = Redelivered, @@ -2621,12 +2629,11 @@ handle_deliver0(ConsumerTag, AckRequired, record_sent(deliver, QueueType, ConsumerTag, AckRequired, Msg, State). handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, - Msg0 = {_QName, _QPid, _MsgId, Redelivered, MsgCont0}, + Msg0 = {_QName, _QPid, _MsgId, Redelivered, Mc}, QueueType, State) -> - Exchange = mc:exchange(MsgCont0), - [RoutingKey | _] = mc:routing_keys(MsgCont0), - MsgCont = mc:convert(mc_amqpl, MsgCont0), - Content = mc:protocol_state(MsgCont), + Exchange = mc:exchange(Mc), + [RoutingKey | _] = mc:routing_keys(Mc), + Content = outgoing_content(Mc, State#ch.cfg#conf.msg_interceptor_ctx), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -2637,6 +2644,11 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Content), {noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg0, State)}. +outgoing_content(Mc, MsgIcptCtx) -> + Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc, MsgIcptCtx), + Mc2 = mc:convert(mc_amqpl, Mc1), + mc:protocol_state(Mc2). + init_tick_timer(State = #ch{tick_timer = undefined}) -> {ok, Interval} = application:get_env(rabbit, channel_tick_interval), State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}; diff --git a/deps/rabbit/src/rabbit_message_interceptor.erl b/deps/rabbit/src/rabbit_message_interceptor.erl deleted file mode 100644 index 0d28fe6ef9af..000000000000 --- a/deps/rabbit/src/rabbit_message_interceptor.erl +++ /dev/null @@ -1,50 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - -%% This module exists since 3.12 replacing plugins rabbitmq-message-timestamp -%% and rabbitmq-routing-node-stamp. Instead of using these plugins, RabbitMQ core can -%% now be configured to add such headers. This enables non-AMQP 0.9.1 protocols (that -%% do not use rabbit_channel) to also add AMQP 0.9.1 headers to incoming messages. --module(rabbit_message_interceptor). --include("mc.hrl"). - --export([intercept/1]). - --define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>). --define(HEADER_ROUTING_NODE, <<"x-routed-by">>). - --spec intercept(mc:state()) -> mc:state(). -intercept(Msg) -> - Interceptors = persistent_term:get(incoming_message_interceptors, []), - lists:foldl(fun({InterceptorName, Overwrite}, M) -> - intercept(M, InterceptorName, Overwrite) - end, Msg, Interceptors). - -intercept(Msg, set_header_routing_node, Overwrite) -> - Node = atom_to_binary(node()), - set_annotation(Msg, ?HEADER_ROUTING_NODE, Node, Overwrite); -intercept(Msg0, set_header_timestamp, Overwrite) -> - Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), - Msg = set_annotation(Msg0, ?HEADER_TIMESTAMP, Ts, Overwrite), - set_timestamp(Msg, Ts, Overwrite). - --spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) -> mc:state(). -set_annotation(Msg, Key, Value, Overwrite) -> - case {mc:x_header(Key, Msg), Overwrite} of - {Val, false} when Val =/= undefined -> - Msg; - _ -> - mc:set_annotation(Key, Value, Msg) - end. - --spec set_timestamp(mc:state(), pos_integer(), boolean()) -> mc:state(). -set_timestamp(Msg, Timestamp, Overwrite) -> - case {mc:timestamp(Msg), Overwrite} of - {Ts, false} when is_integer(Ts) -> - Msg; - _ -> - mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg) - end. diff --git a/deps/rabbit/src/rabbit_msg_interceptor.erl b/deps/rabbit/src/rabbit_msg_interceptor.erl new file mode 100644 index 000000000000..3854a838591f --- /dev/null +++ b/deps/rabbit/src/rabbit_msg_interceptor.erl @@ -0,0 +1,78 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbit_msg_interceptor). + +%% client API +-export([intercept_incoming/2, + intercept_outgoing/2, + add/1, + remove/1]). +%% helpers for behaviour implementations +-export([set_annotation/4]). + +%% same protocol names as output by Prometheus endpoint +-type protocol() :: amqp091 | amqp10 | mqtt310 | mqtt311 | mqtt50. +-type context() :: #{protocol := protocol(), + vhost := rabbit_types:vhost(), + username := rabbit_types:username(), + connection_name := binary(), + atom() => term()}. +-type config() :: #{atom() => term()}. +-type interceptor() :: {module(), config()}. +-type interceptors() :: [interceptor()]. +-type stage() :: incoming | outgoing. + +-define(KEY, message_interceptors). + +-export_type([context/0]). + +-callback intercept(mc:state(), context(), stage(), config()) -> + mc:state(). + +-spec intercept_incoming(mc:state(), context()) -> + mc:state(). +intercept_incoming(Msg, Ctx) -> + intercept(Msg, Ctx, incoming). + +-spec intercept_outgoing(mc:state(), context()) -> + mc:state(). +intercept_outgoing(Msg, Ctx) -> + intercept(Msg, Ctx, outgoing). + +intercept(Msg, Ctx, Stage) -> + Interceptors = persistent_term:get(?KEY), + lists:foldl(fun({Mod, Cfg}, Msg0) -> + Mod:intercept(Msg0, Ctx, Stage, Cfg) + end, Msg, Interceptors). + +-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(), + Overwrite :: boolean()) -> + mc:state(). +set_annotation(Msg, Key, Value, true) -> + mc:set_annotation(Key, Value, Msg); +set_annotation(Msg, Key, Value, false) -> + case mc:x_header(Key, Msg) of + undefined -> + mc:set_annotation(Key, Value, Msg); + _ -> + Msg + end. + +-spec add(interceptors()) -> ok. +add(Interceptors) -> + %% validation + lists:foreach(fun({Mod, #{}}) -> + case erlang:function_exported(Mod, intercept, 4) of + true -> ok; + false -> error(Mod) + end + end, Interceptors), + persistent_term:put(?KEY, persistent_term:get(?KEY, []) ++ Interceptors). + +-spec remove(interceptors()) -> ok. +remove(Interceptors) -> + persistent_term:put(?KEY, persistent_term:get(?KEY, []) -- Interceptors). diff --git a/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl b/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl new file mode 100644 index 000000000000..d8b4c77c6d09 --- /dev/null +++ b/deps/rabbit/src/rabbit_msg_interceptor_routing_node.erl @@ -0,0 +1,19 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbit_msg_interceptor_routing_node). +-behaviour(rabbit_msg_interceptor). + +-define(KEY, <<"x-routed-by">>). + +-export([intercept/4]). + +intercept(Msg, _Ctx, incoming, Cfg) -> + Node = atom_to_binary(node()), + Overwrite = maps:get(overwrite, Cfg), + rabbit_msg_interceptor:set_annotation(Msg, ?KEY, Node, Overwrite); +intercept(Msg, _Ctx, _Stage, _Cfg) -> + Msg. diff --git a/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl b/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl new file mode 100644 index 000000000000..e07269a3c494 --- /dev/null +++ b/deps/rabbit/src/rabbit_msg_interceptor_timestamp.erl @@ -0,0 +1,38 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbit_msg_interceptor_timestamp). +-behaviour(rabbit_msg_interceptor). + +-include("mc.hrl"). + +%% For backwards compat, we use the key defined in the old plugin +%% https://github.com/rabbitmq/rabbitmq-message-timestamp +-define(KEY_INCOMING, <<"timestamp_in_ms">>). +-define(KEY_OUTGOING, <<"x-opt-rabbitmq-sent-time">>). + +-export([intercept/4]). + +intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Cfg) -> + Overwrite = maps:get(overwrite, Cfg), + Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0), + Msg = rabbit_msg_interceptor:set_annotation(Msg0, ?KEY_INCOMING, Ts, Overwrite), + set_timestamp(Msg, Ts, Overwrite); +intercept(Msg, _Ctx, outgoing, #{outgoing := _True}) -> + Ts = os:system_time(millisecond), + mc:set_annotation(?KEY_OUTGOING, Ts, Msg); +intercept(Msg, _MsgIcptCtx, _Stage, _Cfg) -> + Msg. + +set_timestamp(Msg, Ts, true) -> + mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg); +set_timestamp(Msg, Ts, false) -> + case mc:timestamp(Msg) of + undefined -> + mc:set_annotation(?ANN_TIMESTAMP, Ts, Msg); + _ -> + Msg + end. diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 4b2e5e43623c..27a6f357d027 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -119,7 +119,7 @@ groups() -> available_messages_classic_queue, available_messages_quorum_queue, available_messages_stream, - incoming_message_interceptors, + message_interceptors, trace_classic_queue, trace_stream, user_id, @@ -4378,10 +4378,12 @@ available_messages(QType, Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). -incoming_message_interceptors(Config) -> - Key = ?FUNCTION_NAME, - ok = rpc(Config, persistent_term, put, [Key, [{set_header_routing_node, false}, - {set_header_timestamp, false}]]), +message_interceptors(Config) -> + Key = message_interceptors, + ok = rpc(Config, persistent_term, put, + [Key, [{rabbit_msg_interceptor_routing_node, #{overwrite => false}}, + {rabbit_msg_interceptor_timestamp, #{overwrite => false, + incoming => true}}]]), Stream = <<"my stream">>, QQName = <<"my quorum queue">>, {_, Session, LinkPair} = Init = init(Config), @@ -4428,7 +4430,7 @@ incoming_message_interceptors(Config) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, Stream), {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QQName), ok = close(Init), - true = rpc(Config, persistent_term, erase, [Key]). + ok = rpc(Config, persistent_term, put, [Key, []]). trace_classic_queue(Config) -> trace(atom_to_binary(?FUNCTION_NAME), <<"classic">>, Config). diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 5e266656073d..b908b0786a87 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1111,21 +1111,38 @@ credential_validator.regexp = ^abc\\d+", %% Message interceptors %% - {message_interceptors, + {single_incoming_message_interceptor, "message_interceptors.incoming.set_header_timestamp.overwrite = true", [{rabbit, [ - {incoming_message_interceptors, [{set_header_timestamp, true}]} + {message_interceptors, [ + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => true}} + ]} ]}], []}, - {message_interceptors, + {single_outgoing_message_interceptor, + "message_interceptors.outgoing.timestamp.enabled = true", + [{rabbit, [ + {message_interceptors, [ + {rabbit_msg_interceptor_timestamp, #{outgoing => true}} + ]} + ]}], + []}, + + {multiple_message_interceptors, " message_interceptors.incoming.set_header_routing_node.overwrite = false message_interceptors.incoming.set_header_timestamp.overwrite = false + message_interceptors.outgoing.timestamp.enabled = true ", [{rabbit, [ - {incoming_message_interceptors, [{set_header_routing_node, false}, - {set_header_timestamp, false}]} + {message_interceptors, [ + {rabbit_msg_interceptor_routing_node, #{overwrite => false}}, + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => false, + outgoing => true}} + ]} ]}], []}, diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index 4b5feddb509d..3d9c9954cb78 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -347,9 +347,11 @@ amqpl_amqp_bin_amqpl(_Config) -> payload_fragments_rev = [<<"data">>]}, Msg0 = mc:init(mc_amqpl, Content, annotations()), - ok = persistent_term:put(incoming_message_interceptors, - [{set_header_timestamp, false}]), - Msg = rabbit_message_interceptor:intercept(Msg0), + ok = persistent_term:put( + message_interceptors, + [{rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => false}}]), + Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, #{}), ?assertEqual(<<"exch">>, mc:exchange(Msg)), ?assertEqual([<<"apple">>], mc:routing_keys(Msg)), @@ -450,7 +452,7 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual(RoutingHeaders, maps:remove(<<"timestamp_in_ms">>, RoutingHeaders2)), - true = persistent_term:erase(incoming_message_interceptors). + ok = persistent_term:put(message_interceptors, []). amqpl_cc_amqp_bin_amqpl(_Config) -> Headers = [{<<"CC">>, array, [{longstr, <<"q1">>}, diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_msg_interceptor_SUITE.erl similarity index 55% rename from deps/rabbit/test/rabbit_message_interceptor_SUITE.erl rename to deps/rabbit/test/rabbit_msg_interceptor_SUITE.erl index 1abc39d0b042..500be0d61383 100644 --- a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl +++ b/deps/rabbit/test/rabbit_msg_interceptor_SUITE.erl @@ -4,7 +4,7 @@ %% %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. --module(rabbit_message_interceptor_SUITE). +-module(rabbit_msg_interceptor_SUITE). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -15,17 +15,19 @@ all() -> [ - {group, tests} + {group, cluster_size_1} ]. groups() -> [ - {tests, [shuffle], [headers_overwrite, - headers_no_overwrite - ]} + {cluster_size_1, [shuffle], + [incoming_overwrite, + incoming_no_overwrite, + outgoing]} ]. init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), rabbit_ct_helpers:log_environment(), rabbit_ct_helpers:run_setup_steps(Config). @@ -35,16 +37,20 @@ end_per_suite(Config) -> init_per_testcase(Testcase, Config0) -> Config1 = rabbit_ct_helpers:set_config( Config0, [{rmq_nodename_suffix, Testcase}]), - Overwrite = case Testcase of - headers_overwrite -> true; - headers_no_overwrite -> false - end, - Val = maps:to_list( - maps:from_keys([set_header_timestamp, - set_header_routing_node], - Overwrite)), + Val = case Testcase of + incoming_overwrite -> + [{rabbit_msg_interceptor_routing_node, #{overwrite => true}}, + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => true}}]; + incoming_no_overwrite -> + [{rabbit_msg_interceptor_routing_node, #{overwrite => false}}, + {rabbit_msg_interceptor_timestamp, #{incoming => true, + overwrite => false}}]; + outgoing -> + [{rabbit_msg_interceptor_timestamp, #{outgoing => true}}] + end, Config = rabbit_ct_helpers:merge_app_env( - Config1, {rabbit, [{incoming_message_interceptors, Val}]}), + Config1, {rabbit, [{message_interceptors, Val}]}), rabbit_ct_helpers:run_steps( Config, rabbit_ct_broker_helpers:setup_steps() ++ @@ -57,13 +63,13 @@ end_per_testcase(Testcase, Config0) -> rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_broker_helpers:teardown_steps()). -headers_overwrite(Config) -> - headers(true, Config). +incoming_overwrite(Config) -> + incoming(true, Config). -headers_no_overwrite(Config) -> - headers(false, Config). +incoming_no_overwrite(Config) -> + incoming(false, Config). -headers(Overwrite, Config) -> +incoming(Overwrite, Config) -> Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), Payload = QName = atom_to_binary(?FUNCTION_NAME), Ch = rabbit_ct_client_helpers:open_channel(Config), @@ -80,13 +86,13 @@ headers(Overwrite, Config) -> #amqp_msg{payload = Payload, props = #'P_basic'{ timestamp = Secs, - headers = [{<<"timestamp_in_ms">>, long, Ms}, + headers = [{<<"timestamp_in_ms">>, long, ReceivedMs}, {<<"x-routed-by">>, longstr, Server}] }}} - when Ms < NowMs + 4000 andalso - Ms > NowMs - 4000 andalso - Secs < NowSecs + 4 andalso - Secs > NowSecs - 4, + when ReceivedMs < NowMs + 5000 andalso + ReceivedMs > NowMs - 5000 andalso + Secs < NowSecs + 5 andalso + Secs > NowSecs - 5, amqp_channel:call(Ch, #'basic.get'{queue = QName}))) end, AssertHeaders(), @@ -110,3 +116,29 @@ headers(Overwrite, Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok. + +outgoing(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {_, Session, LinkPair} = Init = amqp_utils:init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + {ok, Sender} = amqp10_client:attach_sender_link_sync( + Session, <<"sender">>, Address, settled), + ok = amqp_utils:wait_for_credit(Sender), + + Now = os:system_time(millisecond), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag">>, <<"msg">>, true)), + + {ok, Msg} = amqp10_client:get_msg(Receiver), + #{<<"x-opt-rabbitmq-sent-time">> := Sent} = amqp10_msg:message_annotations(Msg), + ct:pal("client sent message at ~b~nRabbitMQ sent message at ~b", + [Now, Sent]), + ?assert(Sent > Now - 5000), + ?assert(Sent < Now + 5000), + + ok = amqp10_client:detach_link(Sender), + ok = amqp10_client:detach_link(Receiver), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = amqp_utils:close(Init). diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index b69e2b06075c..1be98c757edf 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -303,3 +303,34 @@ end}. {datatype, integer}, {validators, ["non_negative_integer"]} ]}. + +%% +%% Message interceptors +%% + +{mapping, "mqtt.message_interceptors.$stage.$name.$key", "rabbitmq_mqtt.message_interceptors", [ + {datatype, {enum, [true, false]}} +]}. + +{translation, "rabbitmq_mqtt.message_interceptors", + fun(Conf) -> + case cuttlefish_variable:filter_by_prefix("mqtt.message_interceptors", Conf) of + [] -> + cuttlefish:unset(); + L -> + lists:foldr( + fun({["mqtt", "message_interceptors", "incoming", "set_client_id_annotation", "enabled"], Enabled}, Acc) -> + case Enabled of + true -> + Mod = rabbit_mqtt_msg_interceptor_client_id, + Cfg = #{}, + [{Mod, Cfg} | Acc]; + false -> + Acc + end; + (Other, _Acc) -> + cuttlefish:invalid(io_lib:format("~p is invalid", [Other])) + end, [], L) + end + end +}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 3ea308bb5f5b..8ecbd85b66ab 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -35,7 +35,8 @@ start(normal, []) -> Result. stop(_) -> - rabbit_mqtt_sup:stop_listeners(). + rabbit_mqtt_sup:stop_listeners(), + rabbit_msg_interceptor:remove(mqtt_message_interceptors()). -spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term(). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> @@ -115,9 +116,14 @@ persist_static_configuration() -> assert_valid_max_packet_size(MaxSizeAuth), {ok, MaxMsgSize} = application:get_env(rabbit, max_message_size), ?assert(MaxSizeAuth =< MaxMsgSize), - ok = persistent_term:put(?PERSISTENT_TERM_MAX_PACKET_SIZE_AUTHENTICATED, MaxSizeAuth). + ok = persistent_term:put(?PERSISTENT_TERM_MAX_PACKET_SIZE_AUTHENTICATED, MaxSizeAuth), + + ok = rabbit_msg_interceptor:add(mqtt_message_interceptors()). assert_valid_max_packet_size(Val) -> ?assert(is_integer(Val) andalso Val > 0 andalso Val =< ?MAX_PACKET_SIZE). + +mqtt_message_interceptors() -> + application:get_env(?APP_NAME, message_interceptors, []). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl new file mode 100644 index 000000000000..00864f03023b --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_msg_interceptor_client_id.erl @@ -0,0 +1,20 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbit_mqtt_msg_interceptor_client_id). +-behaviour(rabbit_msg_interceptor). + +-export([intercept/4]). + +-define(KEY, <<"x-opt-mqtt-client-id">>). + +intercept(Msg, #{protocol := Proto, client_id := ClientId}, incoming, _Cfg) + when Proto =:= mqtt50 orelse + Proto =:= mqtt311 orelse + Proto =:= mqtt310 -> + mc:set_annotation(?KEY, ClientId, Msg); +intercept(Msg, _Ctx, _Stage, _Cfg) -> + Msg. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 7ae0893a13eb..ac22c9044b05 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -5,7 +5,6 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_mqtt_processor). - -feature(maybe_expr, enable). -export([info/2, init/4, process_packet/2, @@ -93,7 +92,8 @@ %% The database stores the MQTT subscription options in the binding arguments for: %% * v1 as Erlang record #mqtt_subscription_opts{} %% * v2 as AMQP 0.9.1 table - binding_args_v2 :: boolean() + binding_args_v2 :: boolean(), + msg_interceptor_ctx :: rabbit_msg_interceptor:context() }). -record(state, @@ -215,9 +215,15 @@ process_connect( %% To simplify logic, we decide at connection establishment time to stick %% with either binding args v1 or v2 for the lifetime of the connection. BindingArgsV2 = rabbit_feature_flags:is_enabled('rabbitmq_4.1.0'), + ProtoVerAtom = proto_integer_to_atom(ProtoVer), + MsgIcptCtx = #{protocol => ProtoVerAtom, + vhost => VHost, + username => Username, + connection_name => ConnName, + client_id => ClientId}, S = #state{ cfg = #cfg{socket = Socket, - proto_ver = proto_integer_to_atom(ProtoVer), + proto_ver = ProtoVerAtom, clean_start = CleanStart, session_expiry_interval_secs = SessionExpiry, ssl_login_name = SslLoginName, @@ -238,7 +244,8 @@ process_connect( will_msg = WillMsg, max_packet_size_outbound = MaxPacketSize, topic_alias_maximum_outbound = TopicAliasMaxOutbound, - binding_args_v2 = BindingArgsV2}, + binding_args_v2 = BindingArgsV2, + msg_interceptor_ctx = MsgIcptCtx}, auth_state = #auth_state{ user = User, authz_ctx = AuthzCtx}}, @@ -1633,14 +1640,15 @@ publish_to_queues( #state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin}, delivery_flow = Flow, conn_name = ConnName, - trace_state = TraceState}, + trace_state = TraceState, + msg_interceptor_ctx = MsgIcptCtx}, auth_state = #auth_state{user = #user{username = Username}}} = State) -> Anns = #{?ANN_EXCHANGE => ExchangeNameBin, ?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]}, Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), - Msg = rabbit_message_interceptor:intercept(Msg0), case rabbit_exchange:lookup(ExchangeName) of {ok, Exchange} -> + Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, MsgIcptCtx), QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}), QNames = drop_local(QNames0, State), rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState), @@ -2065,12 +2073,14 @@ deliver_to_client(Msgs, Ack, State) -> deliver_one_to_client(Msg, Ack, S) end, State, Msgs). -deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery, - AckRequired, State0) -> +deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc0} = Delivery, + AckRequired, + #state{cfg = #cfg{msg_interceptor_ctx = MsgIcptCtx}} = State0) -> SubscriberQoS = case AckRequired of true -> ?QOS_1; false -> ?QOS_0 end, + Mc = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx), McMqtt = mc:convert(mc_mqtt, Mc, mc_env()), MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt), QoS = effective_qos(PublisherQos, SubscriberQoS), diff --git a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets index 92c1b2f29c7e..a1af02451cd3 100644 --- a/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets +++ b/deps/rabbitmq_mqtt/test/config_schema_SUITE_data/rabbitmq_mqtt.snippets @@ -171,5 +171,22 @@ "mqtt.topic_alias_maximum = 0", [{rabbitmq_mqtt,[ {topic_alias_maximum, 0}]}], - [rabbitmq_mqtt]} + [rabbitmq_mqtt]}, + + {message_interceptor_enabled, + "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true", + [{rabbitmq_mqtt, [ + {message_interceptors, [ + {rabbit_mqtt_msg_interceptor_client_id, #{}} + ]} + ]}], + []}, + + {message_interceptor_disabled, + "mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = false", + [{rabbitmq_mqtt, [ + {message_interceptors, []} + ]}], + []} + ]. diff --git a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl index 6aae9c152d78..09bae18c37fe 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_shared_SUITE.erl @@ -120,7 +120,7 @@ cluster_size_1_tests() -> ,max_packet_size_unauthenticated ,max_packet_size_authenticated ,default_queue_type - ,incoming_message_interceptors + ,message_interceptors ,utf8 ,retained_message_conversion ,bind_exchange_to_exchange @@ -1777,11 +1777,18 @@ default_queue_type(Config) -> ok = emqtt:disconnect(C2), ok = rabbit_ct_broker_helpers:delete_vhost(Config, Vhost). -incoming_message_interceptors(Config) -> - Key = ?FUNCTION_NAME, - ok = rpc(Config, persistent_term, put, [Key, [{set_header_timestamp, false}]]), +message_interceptors(Config) -> + ok = rpc(Config, persistent_term, put, + [message_interceptors, + [ + {rabbit_mqtt_msg_interceptor_client_id, #{}}, + {rabbit_msg_interceptor_timestamp, #{overwrite => false, + incoming => true, + outgoing => true}} + ]]), {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), - Payload = ClientId = Topic = atom_to_binary(?FUNCTION_NAME), + Payload = Topic = atom_to_binary(?FUNCTION_NAME), + ClientId = <<"🆔"/utf8>>, CQName = <<"my classic queue">>, Stream = <<"my stream">>, declare_queue(Ch, CQName, [{<<"x-queue-type">>, longstr, <<"classic">>}]), @@ -1798,14 +1805,21 @@ incoming_message_interceptors(Config) -> #amqp_msg{payload = Payload, props = #'P_basic'{ timestamp = Secs, - headers = [{<<"timestamp_in_ms">>, long, Millis} | _] + headers = Headers }} } = amqp_channel:call(Ch, #'basic.get'{queue = CQName}), - ?assert(Secs < NowSecs + 4), - ?assert(Secs > NowSecs - 4), - ?assert(Millis < NowMillis + 4000), - ?assert(Millis > NowMillis - 4000), + {_, long, ReceivedTs} = lists:keyfind(<<"timestamp_in_ms">>, 1, Headers), + ?assert(Secs < NowSecs + 9), + ?assert(Secs > NowSecs - 9), + ?assert(ReceivedTs < NowMillis + 9000), + ?assert(ReceivedTs > NowMillis - 9000), + {_, long, SentTs} = lists:keyfind(<<"x-opt-rabbitmq-sent-time">>, 1, Headers), + ?assert(SentTs < NowMillis + 9000), + ?assert(SentTs > NowMillis - 9000), + + ?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId}, + lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, Headers)), #'basic.qos_ok'{} = amqp_channel:call(Ch, #'basic.qos'{prefetch_count = 1}), CTag = <<"my ctag">>, @@ -1819,15 +1833,20 @@ incoming_message_interceptors(Config) -> receive {#'basic.deliver'{consumer_tag = CTag}, #amqp_msg{payload = Payload, props = #'P_basic'{ - headers = [{<<"timestamp_in_ms">>, long, Millis} | _XHeaders] + headers = [{<<"timestamp_in_ms">>, long, ReceivedTs} | XHeaders] }}} -> - ok + ?assertEqual({<<"x-opt-mqtt-client-id">>, longstr, ClientId}, + lists:keyfind(<<"x-opt-mqtt-client-id">>, 1, XHeaders)), + + {_, long, SentTs1} = lists:keyfind(<<"x-opt-rabbitmq-sent-time">>, 1, XHeaders), + ?assert(SentTs1 < NowMillis + 9000), + ?assert(SentTs1 > NowMillis - 9000) after ?TIMEOUT -> ct:fail(missing_deliver) end, delete_queue(Ch, Stream), delete_queue(Ch, CQName), - true = rpc(Config, persistent_term, erase, [Key]), + ok = rpc(Config, persistent_term, put, [message_interceptors, []]), ok = emqtt:disconnect(C), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch). diff --git a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl index 693345dc4cec..bbe37b56a9c7 100644 --- a/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl +++ b/deps/rabbitmq_web_mqtt/test/web_mqtt_shared_SUITE.erl @@ -79,7 +79,7 @@ trace_large_message(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). max_packet_size_unauthenticated(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). max_packet_size_authenticated(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). default_queue_type(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). -incoming_message_interceptors(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). +message_interceptors(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). utf8(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). retained_message_conversion(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). bind_exchange_to_exchange(Config) -> mqtt_shared_SUITE:?FUNCTION_NAME(Config). diff --git a/release-notes/4.2.0.md b/release-notes/4.2.0.md new file mode 100644 index 000000000000..f387b5143fab --- /dev/null +++ b/release-notes/4.2.0.md @@ -0,0 +1,20 @@ +## RabbitMQ 4.2.0 + +RabbitMQ 4.2.0 is a new feature release. + + +## Features + +### Incoming and Outgoing Message Interceptors for native protocols + +Incoming and outgoing messages can now be intercepted on the broker. +This works for AMQP 1.0, AMQP 0.9.1, and MQTT. + +What the interceptor does is entirely up to its implementation - it can validate message metadata, add annotations, or perform arbitrary side effects. +Custom interceptors can be developed and integrated via [plugins](./plugins). + +Two new optional built-in interceptors were added to RabbitMQ: +1. Timestamps for outgoing messages +2. Setting client ID of publishing MQTT client + +Detailed information can be found in the [Message Interceptor](https://www.rabbitmq.com/docs/next/message-inteceptor) documentation.