Skip to content

Commit bc566d3

Browse files
Add incoming message interceptors
For plugins to provide interceptors they need to provide a cuttlefish `.schema` file specifying the parameters for the provided interceptor. Developers can take as example the existing mapping in `rabbit.schema`. The order of the interceptors depends on the order of appearance of any `message_interceptors.incoming.<interceptor>` entry in the `rabbitmq.conf` file. Co-authored-by: Péter Gömöri <[email protected]>
1 parent 6d1689c commit bc566d3

13 files changed

+229
-71
lines changed

deps/rabbit/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_on
273273
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
274274
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
275275

276-
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
276+
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_incoming_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
277277
PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int
278278
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
279279
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue

deps/rabbit/ct.test.spec

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
, rabbit_fifo_prop_SUITE
116116
, rabbit_fifo_v0_SUITE
117117
, rabbit_local_random_exchange_SUITE
118-
, rabbit_message_interceptor_SUITE
118+
, rabbit_incoming_message_interceptor_SUITE
119119
, rabbit_stream_coordinator_SUITE
120120
, rabbit_stream_sac_coordinator_SUITE
121121
, rabbitmq_4_0_deprecations_SUITE

deps/rabbit/priv/schema/rabbit.schema

+45-11
Original file line numberDiff line numberDiff line change
@@ -2658,22 +2658,56 @@ end}.
26582658
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
26592659
{datatype, {enum, [true, false]}}]}.
26602660

2661+
% Pseudo-key to include the interceptor in the list of interceptors.
2662+
% - If any other configuration is provided for the interceptor this
2663+
% configuration is not required.
2664+
% - If no other configuration is provided, this one is required so that the
2665+
% interceptor gets invoked.
2666+
{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [
2667+
{datatype, {enum, [true]}}]}.
2668+
2669+
{mapping, "message_interceptors.incoming.set_header_timestamp.overwrite", "rabbit.incoming_message_interceptors", [
2670+
{datatype, {enum, [true, false]}}]}.
2671+
2672+
{mapping, "message_interceptors.incoming.set_header_routing_node.overwrite", "rabbit.incoming_message_interceptors", [
2673+
{datatype, {enum, [true, false]}}]}.
2674+
26612675
{translation, "rabbit.incoming_message_interceptors",
26622676
fun(Conf) ->
2663-
case cuttlefish_variable:filter_by_prefix("message_interceptors", Conf) of
2677+
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
26642678
[] ->
26652679
cuttlefish:unset();
26662680
L ->
2667-
[begin
2668-
Interceptor = list_to_atom(Interceptor0),
2669-
case lists:member(Interceptor, [set_header_timestamp,
2670-
set_header_routing_node]) of
2671-
true ->
2672-
{Interceptor, Overwrite};
2673-
false ->
2674-
cuttlefish:invalid(io_lib:format("~p is invalid", [Interceptor]))
2675-
end
2676-
end || {["message_interceptors", "incoming", Interceptor0, "overwrite"], Overwrite} <- L]
2681+
InterceptorsConfig = [
2682+
{Module0, Config, Value}
2683+
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L
2684+
],
2685+
{Result, Order0} = lists:foldl(
2686+
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2687+
Interceptor = list_to_atom(Interceptor0),
2688+
Key = list_to_atom(Key0),
2689+
MapPutFun = fun(Key, Value) -> fun(Old) -> maps:put(Key, Value, Old) end end,
2690+
Module = case Interceptor of
2691+
set_header_timestamp ->
2692+
rabbit_header_timestamp_interceptor;
2693+
set_header_routing_node ->
2694+
rabbit_header_routing_node_interceptor;
2695+
_ ->
2696+
Interceptor
2697+
end,
2698+
NewAcc =
2699+
maps:update_with(
2700+
Module,
2701+
MapPutFun(Key, Value),
2702+
#{Key => Value},
2703+
Acc),
2704+
{NewAcc, [Module| Order]}
2705+
end,
2706+
{#{}, []},
2707+
InterceptorsConfig
2708+
),
2709+
Order = lists:uniq(Order0),
2710+
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
26772711
end
26782712
end
26792713
}.

deps/rabbit/src/rabbit_amqp_session.erl

+14-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
-compile({inline, [maps_update_with/4]}).
1111

1212
-behaviour(gen_server).
13+
-behaviour(rabbit_protocol_accessor).
1314

1415
-include_lib("kernel/include/logger.hrl").
1516
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -105,6 +106,9 @@
105106
handle_info/2,
106107
format_status/1]).
107108

109+
% `rabbit_protocol_accessor` behaviour callbacks
110+
-export([get_property/2]).
111+
108112
-import(rabbit_amqp_util,
109113
[protocol_error/3]).
110114
-import(serial_number,
@@ -2436,7 +2440,7 @@ incoming_link_transfer(
24362440
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24372441
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24382442
{ok, X, RoutingKeys, Mc1, PermCache} ->
2439-
Mc2 = rabbit_message_interceptor:intercept(Mc1),
2443+
Mc2 = rabbit_incoming_message_interceptor:intercept(Mc1, ?MODULE, State0),
24402444
check_user_id(Mc2, User),
24412445
TopicPermCache = check_write_permitted_on_topics(
24422446
X, User, RoutingKeys, TopicPermCache0),
@@ -3874,6 +3878,15 @@ format_status(
38743878
topic_permission_cache => TopicPermissionCache},
38753879
maps:update(state, State, Status).
38763880

3881+
get_property(user, #state{cfg = #cfg{user = User}}) ->
3882+
User;
3883+
get_property(vhost, #state{cfg = #cfg{vhost = VHost}}) ->
3884+
VHost;
3885+
get_property(connection_name, #state{cfg = #cfg{conn_name = ConnectionName}}) ->
3886+
ConnectionName;
3887+
get_property(_, _) ->
3888+
undefined.
3889+
38773890
-spec info(pid()) ->
38783891
{ok, rabbit_types:infos()} | {error, term()}.
38793892
info(Pid) ->

deps/rabbit/src/rabbit_channel.erl

+16-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
-module(rabbit_channel).
99

10+
-behaviour(rabbit_protocol_accessor).
11+
1012
%% rabbit_channel processes represent an AMQP 0-9-1 channels.
1113
%%
1214
%% Connections parse protocol frames coming from clients and
@@ -60,6 +62,9 @@
6062
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
6163
format_message_queue/2]).
6264

65+
% `rabbit_protocol_accessor` behaviour callbacks
66+
-export([get_property/2]).
67+
6368
%% Internal
6469
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
6570
-export([get_vhost/1, get_user/1]).
@@ -813,6 +818,16 @@ get_consumer_timeout() ->
813818
_ ->
814819
undefined
815820
end.
821+
822+
get_property(user, #ch{cfg = #conf{user = User}}) ->
823+
User;
824+
get_property(vhost, #ch{cfg = #conf{virtual_host = VHost}}) ->
825+
VHost;
826+
get_property(connection_name, #ch{cfg = #conf{conn_name = ConnectionName}}) ->
827+
ConnectionName;
828+
get_property(_, _) ->
829+
undefined.
830+
816831
%%---------------------------------------------------------------------------
817832

818833
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -1206,7 +1221,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12061221
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
12071222
{ok, Message0} ->
12081223
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
1209-
Message = rabbit_message_interceptor:intercept(Message0),
1224+
Message = rabbit_incoming_message_interceptor:intercept(Message0, ?MODULE, State),
12101225
check_user_id_header(Message, User),
12111226
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
12121227
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
-module(rabbit_header_routing_node_interceptor).
2+
-behaviour(rabbit_incoming_message_interceptor).
3+
4+
-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
5+
6+
-export([
7+
intercept/4
8+
]).
9+
10+
intercept(Msg, _ProtoMod, _ProtoState, Config) ->
11+
Node = atom_to_binary(node()),
12+
Overwrite = maps:get(overwrite, Config, false),
13+
rabbit_incoming_message_interceptor:set_msg_annotation(
14+
Msg,
15+
?HEADER_ROUTING_NODE,
16+
Node,
17+
Overwrite).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-module(rabbit_header_timestamp_interceptor).
2+
-behaviour(rabbit_incoming_message_interceptor).
3+
4+
-include("mc.hrl").
5+
6+
-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
7+
8+
-export([
9+
intercept/4
10+
]).
11+
12+
intercept(Msg0, _ProtoMod, _ProtoState, Config) ->
13+
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
14+
Overwrite = maps:get(overwrite, Config, false),
15+
Msg = rabbit_incoming_message_interceptor:set_msg_annotation(
16+
Msg0,
17+
?HEADER_TIMESTAMP,
18+
Ts,
19+
Overwrite),
20+
set_msg_timestamp(Msg, Ts, Overwrite).
21+
22+
set_msg_timestamp(Msg, Timestamp, Overwrite) ->
23+
case {mc:timestamp(Msg), Overwrite} of
24+
{Ts, false} when is_integer(Ts) ->
25+
Msg;
26+
_ ->
27+
mc:set_annotation(?ANN_TIMESTAMP, Timestamp, Msg)
28+
end.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
-module(rabbit_incoming_message_interceptor).
2+
3+
-export([
4+
intercept/3,
5+
set_msg_annotation/4
6+
]).
7+
8+
-callback intercept(
9+
Msg :: mc:state(),
10+
ProtoMod :: module(),
11+
ProtoState :: term(),
12+
Config :: #{atom() := term()}
13+
) -> mc:state().
14+
15+
-spec intercept(Msg, ProtoMod, ProtoState) -> Resp when
16+
Msg :: mc:state(),
17+
ProtoMod :: module(),
18+
ProtoState :: term(),
19+
Resp :: mc:state().
20+
intercept(Msg, ProtoMod, ProtoState) ->
21+
Interceptors = persistent_term:get(incoming_message_interceptors, []),
22+
lists:foldl(fun({Module, Config}, Msg0) ->
23+
try
24+
Module:intercept(Msg0, ProtoMod, ProtoState, Config)
25+
catch
26+
error:undef ->
27+
Msg0
28+
end
29+
end, Msg , Interceptors).
30+
31+
-spec set_msg_annotation(mc:state(), mc:ann_key(), mc:ann_value(), boolean()) -> mc:state().
32+
set_msg_annotation(Msg, Key, Value, Overwrite) ->
33+
case {mc:x_header(Key, Msg), Overwrite} of
34+
{Val, false} when Val =/= undefined ->
35+
Msg;
36+
_ ->
37+
mc:set_annotation(Key, Value, Msg)
38+
end.

deps/rabbit/src/rabbit_message_interceptor.erl

-50
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-module(rabbit_protocol_accessor).
2+
3+
-callback get_property(Property, State) -> Resp when
4+
Property :: atom(),
5+
State :: term(),
6+
Resp :: term().
7+
8+
-export([
9+
get_proto_state_properties/3
10+
]).
11+
12+
-spec get_proto_state_properties(
13+
ProtoAccessor :: module(),
14+
ProtoState :: term(),
15+
Properties :: [Property]
16+
) -> Resp :: [{Property, term()}] when
17+
Property :: atom().
18+
get_proto_state_properties(ProtoAccessor, ProtoState, Properties) ->
19+
lists:foldl(
20+
fun(Property, Results) ->
21+
case ProtoAccessor:get_property(Property, ProtoState) of
22+
undefined -> Results;
23+
Value -> [{Property, Value} | Results]
24+
end
25+
end,
26+
[],
27+
Properties).

deps/rabbit/test/rabbit_message_interceptor_SUITE.erl deps/rabbit/test/rabbit_incoming_message_interceptor_SUITE.erl

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
%%
55
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66

7-
-module(rabbit_message_interceptor_SUITE).
7+
-module(rabbit_incoming_message_interceptor_SUITE).
88

99
-include_lib("eunit/include/eunit.hrl").
1010
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -40,9 +40,9 @@ init_per_testcase(Testcase, Config0) ->
4040
headers_no_overwrite -> false
4141
end,
4242
Val = maps:to_list(
43-
maps:from_keys([set_header_timestamp,
44-
set_header_routing_node],
45-
Overwrite)),
43+
maps:from_keys([rabbit_header_timestamp_interceptor,
44+
rabbit_header_routing_node_interceptor],
45+
#{overwrite => Overwrite})),
4646
Config = rabbit_ct_helpers:merge_app_env(
4747
Config1, {rabbit, [{incoming_message_interceptors, Val}]}),
4848
rabbit_ct_helpers:run_steps(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-module(rabbit_mqtt_client_id_interceptor).
2+
3+
-behaviour(rabbit_incoming_message_interceptor).
4+
5+
-define(ANN_CLIENT_ID, <<"client_id">>).
6+
7+
-export([
8+
intercept/4
9+
]).
10+
11+
intercept(Msg, ProtoAccessor, ProtoState, _Config) ->
12+
case rabbit_protocol_accessor:get_proto_state_properties(ProtoAccessor, ProtoState, [client_id]) of
13+
[{client_id, ClientId}] ->
14+
rabbit_incoming_message_interceptor:set_msg_annotation(
15+
Msg,
16+
?ANN_CLIENT_ID,
17+
ClientId,
18+
true);
19+
[] ->
20+
Msg
21+
end.

0 commit comments

Comments
 (0)