Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incoming message interceptors #13641

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

LoisSotoLopez
Copy link
Contributor

Proposed Changes

In this PR we add two new behaviours related to the incoming message interceptors feature mentioned in #10051.

  • A behaviour for Protocol Accessors, i.e.: modules processing messages for some protocol. This rabbit_protocol_accessor behaviour defines the get_property/2 callback. Through this callback Protocol Accessors should expose properties on their state relevant when intercepting messages.

  • A behaviour for Incoming Message Interceptors. This rabbit_incoming_message_interceptor behaviour defines the intercept/4 callback. Modules implementing this behaviour will, if properly configured as described below, process incoming messages. The intercept/4 callback receives (1. Message) the intercepted message, (2. ProtoMod) the name of the rabbit_protocol_processor processing the message, (3. ProtoState) the state of said rabbit_protocol_processor (which the interceptor should use only through ProtoState:get_property/2) and (4. Config) the interceptor configuration.

The rabbit dep. cuttlefish schema has been modified to handle any new interceptor. Configuration entries for incoming message interceptors should look like message_interceptors.incoming.<interceptor_module>.<config_key>=<value>. Applications providing new interceptors must also provide cuttlefish schema files with mappings for each of the configurations the interceptor might need, taking as example the ones provided for existing interceptors in deps/rabbit/priv/schema/rabbit.schema.

For an interceptor to be used by rabbit it needs to have at least one entry associated to it in the rabbit.config file. If an interceptor has no parameter to be configured we can set the enabled <config_key> for it. This will add the <interceptor_module> to the list of interceptors, even if the generated configuration (which will not include enable because this is a fake config key) is empty.

The Cuttlefish translation handling these configurations accepts as <interceptor_module> the set_header_timestamp and set_header_routing_node names for the already existing message interceptors. Those two values get translated to the new rabbit_header_timestamp_interceptor and rabbit_header_routing_node_interceptor respectively.

The order in which configuration keys appear for the different configured interceptors in the rabbit.config file is relevant. They will be invoked in the same order. Check out the example below.

For a rabbit.config file like this:

...
message_interceptors.incoming.set_header_timestamp.overwrite=true
message_interceptors.incoming.foo_interceptor.timeout=3000
message_interceptors.incoming.faa_interceptor.enabled=true
...

The generated configuration would look like this:

...
{rabbit, [
    ...
    {incoming_message_interceptors, [
        {rabbit_header_timestamp_interceptor, #{overwrite => true},
        {foo_interceptor, #{timeout => 3000}},
        {faa_interceptor, #{}}
    ],
    ...
]},
...

and for any incoming message, those three interceptors will be invoked in the same order: rabbit_header_timestamp_interceptor first, foo_interceptor second and faa_interceptor third.

Types of Changes

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)
  • Build system and/or CI

Checklist

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • I have added tests that prove my fix is effective or that my feature works
  • All tests pass locally with my changes.
  • If relevant, I have added necessary documentation to https://github.com/rabbitmq/rabbitmq-website
  • If relevant, I have added this change to the first version(s) in release-notes that I expect to introduce it

Further Comments

@LoisSotoLopez LoisSotoLopez changed the title Message interceptors Incoming message interceptors Mar 27, 2025
@mergify mergify bot added the make label Mar 27, 2025
@kjnilsson kjnilsson self-requested a review March 31, 2025 06:55
Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the behaviour can be reduced to a single behaviour for both incoming and outgoing interceptors and the protocol behaviour can be replaced by a context map.

The formatting is all over the place. Please use a standard editor such as vim or emacs, most come with a reasonable indentation implementation. case | fun keywords should line up with their respective end markers. Try to keep to the 80 char per line limit also.

@@ -0,0 +1,27 @@
-module(rabbit_protocol_accessor).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be replaced with a map that the intercept function takes.

e.g.

#{protocol => mqtt,
    vhost => <<"/">>,
    client_id =><<"test">>}

@@ -0,0 +1,38 @@
-module(rabbit_incoming_message_interceptor).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we need to distinguish incoming and outgoing interceptors are the module / behaviour level.
We should have a single behaviour rabbit_message_interceptor with a single intercept function that takes the message container, the protocol context map (see another comment) and the config.

Instead they would be distinguined by which configuration they are added to. It is fine if this PR only implements incoming interceptors.

@LoisSotoLopez
Copy link
Contributor Author

I think the behaviour can be reduced to a single behaviour for both incoming and outgoing interceptors and the protocol behaviour can be replaced by a context map.

Sounds good to me. The context map could be set on initialization by each protocol accessor (AMQP 0.9.1 channel, AMQP 1.0 session, or MQTT connection process), and then passed to rabbit_incoming_message_interceptor (or rabbit_message_interceptor). If we go for the rabbit_message_interceptor (both incoming and outgoing messages) a keyword incoming | outgoing should be passed along the context map so that the right list of interceptors gets invoked.

Any additional comments on not using the getters approach and going for both a incoming+outgoing approach, @ansd ?

For plugins to provide interceptors they need to provide a cuttlefish
`.schema` file specifying the parameters for the provided interceptor.
Developers can take as example the existing mapping in `rabbit.schema`.
The order of the interceptors depends on the order of appearance of any
`message_interceptors.incoming.<interceptor>` entry in the
`rabbitmq.conf` file.

Co-authored-by: Péter Gömöri <[email protected]>
@LoisSotoLopez LoisSotoLopez force-pushed the message-interceptors branch from 45385e1 to bc566d3 Compare April 6, 2025 09:29
@LoisSotoLopez LoisSotoLopez force-pushed the message-interceptors branch from c164817 to c52fb11 Compare April 9, 2025 07:49
Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're getting closer I think a few suggested changes and some formatting advise.

MsgDirection:: incoming | outgoing,
Resp :: mc:state().
intercept(Msg, MsgInterceptorCtx, MsgDirection) ->
InterceptorsList = list_to_atom(atom_to_list(MsgDirection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to use a tuple key for persistent term than doing this. e.g. {message_interceptors, incoming | outgoing}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs for ad hoc code to be put in rabbit:persist_static_configuration

    persist_static_configuration(
      [classic_queue_index_v2_segment_entry_count,
       classic_queue_store_v2_max_cache_size,
       classic_queue_store_v2_check_crc32,
       incoming_message_interceptors,
       outgoing_message_interceptors
      ]),

needs to become something like

    persist_static_configuration(
      [classic_queue_index_v2_segment_entry_count,
       classic_queue_store_v2_max_cache_size,
       classic_queue_store_v2_check_crc32,
       {incoming_message_interceptors, {message_interceptors, incoming}
       outgoing_message_interceptors, {message_interceptors, outgoing}
      ]),

and

persist_static_configuration(Params) ->
    lists:foreach(
      fun(Param) ->
              case application:get_env(?MODULE, Param) of
                  {ok, Value} ->
                      ok = persistent_term:put(Param, Value);
                  undefined ->
                      ok
              end
      end, Params).

needs to become

persist_static_configuration(Params) ->
    lists:foreach(
        fun({Param, Alias}) ->
            case application:get_env(?MODULE, Param) of
                {ok, Value} ->
                    ok = persistent_term:put(Alias, Value);
                undefined ->
                    ok
            end;
           (Param) ->
            case application:get_env(?MODULE, Param) of
                {ok, Value} ->
                    ok = persistent_term:put(Param, Value);
                undefined ->
                    ok
            end
        end, Params).

Is that fine? What about this other option:

The cuttlefish translations could generate a rabbit.message_interceptors.<incoming|outgoing> config instead of rabbit.<incoming_message_interceptors|outgoing_message_interceptors> and the rabbit:persist_static_configuration would only receive message_interceptors instead of both <incoming|outgoing>_message_interceptors. The interceptor module would read app conf. with message_interceptors and then then get either the incoming or the outgoing ones from the read term.

The app conf would look like

[{rabbit, [{message_interceptors, #{incoming => [...], outgoing => [...] ...

instead of

[{rabbit, [...
    {incoming_message_interceptors, ...},
    {outgoing_message_interceptors,...},
...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. that seems complicated. Perhaps just resolve incoming to incoming_message_interceptors etc in a function rather than doing atom to list conversions.

resolve(incoming) ->
   incoming_message_interceptors;
resolve(outgoing) ->
   outgoing_message_interceptors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, calling atom_to_list for each message that RabbitMQ receives creates lots of garbage, I wouldn't be surprised if performance drops by a few percent.

What Karl suggested is great.

Alternatively, it might be simpler to omit these translations entirely. For example instead of

-spec intercept(Msg, MsgInterceptorCtx, MsgDirection) -> Resp when
    Msg :: mc:state(),
    MsgInterceptorCtx :: map(),
    MsgDirection:: incoming | outgoing,
    Resp :: mc:state().

we could have

-spec intercept(Msg, Context, Group) -> Msg when
    Msg :: mc:state(),
    Context :: map(),
    Group :: incoming_message_interceptors | outgoing_message_interceptors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the group option actually. I think it should be it's own argument to intercept/4 though so we don't have to modify the context map for each call.

Interceptors = persistent_term:get(InterceptorsList, []),
lists:foldl(fun({Module, Config}, Msg0) ->
try
Module:intercept(Msg0, MsgInterceptorCtx, Config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the direction should be passed to the interceptor, you may want to just have a single interceptor for both incoming and outgoing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed an interceptor would do the same independently of the direction of the message. It is the user the one that decides (if the interceptor allows it by providing the appropriate message_interceptors.<direction>. cuttlefish mapping) in which direction should the interceptor be applied, or if it should be applied in both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested by @gomoripeti , the rabbit_message_interceptor module could also add the message direction in the MsgInterceptorCtx map. How does that sound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sounds good.

end, Msg, Interceptors).
-callback intercept(
Msg :: mc:state(),
MsgInterceptorCtx :: map(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define some context properties that we'd always pass, e.g. vhost, username and the current protocol itself?

e.g.

#{protocol := amqp091 | amqp | mqttv3 | mqttv5, %% tags to be defined
   vhost := binary(),
   username := binary(),
   conn_name => binary() % optional
   atom() -> term()}

@LoisSotoLopez
Copy link
Contributor Author

@kjnilsson Thanks on the formatting advices :) . I'm using neovim and I've tried to find out a proper solution on this, but I haven't found yet a solution that would allow me to format my contributions w/out re-formatting previous code. Will try to figure this out.

@kjnilsson
Copy link
Contributor

@kjnilsson Thanks on the formatting advices :) . I'm using neovim and I've tried to find out a proper solution on this, but I haven't found yet a solution that would allow me to format my contributions w/out re-formatting previous code. Will try to figure this out.

It is an art rather than a science really. I use nvim too and it is fine for indentation - just hit '=' for your selection and it mostly does the right thing.

@ansd
Copy link
Member

ansd commented Apr 9, 2025

Thank you @LoisSotoLopez for this contribution!

Any additional comments on not using the getters approach and going for both a incoming+outgoing approach, @ansd ?

The only downside of the context map I see is that we duplicate some state information, for example each MQTT connection has higher memory usage after this PR. However, this context map is cleaner and simpler, so I prefer the context map.

Feel free to go with outgoing interceptors as well.

@kjnilsson
Copy link
Contributor

he only downside of the context map I see is that we duplicate some state information, for example each MQTT connection has higher memory usage after this PR. However, this context map is cleaner and simpler, so I prefer the context map.

MQTT could add the context map to a persistent term to avoid this if necessary

@kjnilsson
Copy link
Contributor

kjnilsson commented Apr 9, 2025

he only downside of the context map I see is that we duplicate some state information, for example each MQTT connection has higher memory usage after this PR. However, this context map is cleaner and simpler, so I prefer the context map.

MQTT could add the context map to a persistent term to avoid this if necessary

no scrap that, the context map is per vhost / user.


-behaviour(rabbit_message_interceptor).

-define(ANN_CLIENT_ID, <<"client_id">>).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be x-opt-client-id or x-opt-mqtt-client-id or x-opt-mqtt-publisher-client-id such that this message annotation will be received by an AMQP 1.0 client in the message-annotation section.
The current annotation client_id also gets lost when the MQTT client publishes into a stream. Maybe this key should be a configuration option by the plugin.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

x-opt-mqtt-client-id gets my vote.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I suppose it's best to include mqtt so that it's clear that this is an MQTT client ID.

"Client ID" is otherwise a broad term which could for example refer to a JMS client ID (https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#client-identifier) whose purpose is similar though:

The purpose of client identifier is to associate a connection and its objects with a state maintained on behalf of the client by a provider. By definition, the client state identified by a client identifier can be ‘in use’ by only one client at a time.

@kjnilsson
Copy link
Contributor

The only downside of the context map I see is that we duplicate some state information, for example each MQTT connection has higher memory usage after this PR. However, this context map is cleaner and simpler, so I prefer the context map.

MQTT could consider building it on the fly to keep memory overhead low

Comment on lines 2680 to 2748
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
[] ->
cuttlefish:unset();
L ->
InterceptorsConfig = [
{Module0, Config, Value}
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L
],
{Result, Order0} = lists:foldl(
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
Interceptor = list_to_atom(Interceptor0),
Key = list_to_atom(Key0),
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
% This Interceptor -> Module alias exists for
% compatibility reasons
Module = case Interceptor of
set_header_timestamp ->
rabbit_header_timestamp_interceptor;
set_header_routing_node ->
rabbit_header_routing_node_interceptor;
_ ->
Interceptor
end,
NewAcc =
maps:update_with(
Module,
MapPutFun,
#{Key => Value},
Acc),
{NewAcc, [Module| Order]}
end,
{#{}, []},
InterceptorsConfig
),
Order = lists:uniq(Order0),
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
end
end
}.

{translation, "rabbit.outgoing_message_interceptors",
fun(Conf) ->
case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of
[] ->
cuttlefish:unset();
L ->
InterceptorsConfig = [
{Module0, Config, Value}
|| {["message_interceptors", "outgoing", Module0, Config], Value} <- L
],
{Result, Order0} = lists:foldl(
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
Module = list_to_atom(Interceptor0),
Key = list_to_atom(Key0),
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
NewAcc =
maps:update_with(
Module,
MapPutFun,
#{Key => Value},
Acc),
{NewAcc, [Module| Order]}
end,
{#{}, []},
InterceptorsConfig
),
Order = lists:uniq(Order0),
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add unit tests?
deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets is the file to add these tests.

@LoisSotoLopez LoisSotoLopez requested review from kjnilsson and ansd April 11, 2025 08:02
Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nearly there! couple of tweaks

@@ -2607,3 +2609,15 @@ mc_env() ->
MqttX ->
#{mqtt_x => MqttX}
end.

build_msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great and what we discussed as we think lower memory overhead is preferrable. It would be good if it at all possible you could run some throughput tests so we can get a view on how much this might affect performance. https://www.rabbitmq.com/blog/2023/03/21/native-mqtt#latency-and-throughput

Copy link
Contributor

@kjnilsson kjnilsson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, a couple of lines to indent with =.

Then just squash all commits and write a nice single commit for the feature and update PR description and we can merge I think.

Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
Overwrite = maps:get(overwrite, Config, false),
Msg = rabbit_message_interceptor:set_msg_annotation(
Msg0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select this funcction and hit = in nvim


-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).

-export([
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select and hit = in nvim


-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).

-export([
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants