Skip to content

Commit efe59e8

Browse files
committed
Add outgoing messages interception call
1 parent c7c7cd1 commit efe59e8

File tree

3 files changed

+23
-7
lines changed

3 files changed

+23
-7
lines changed

Diff for: deps/rabbit/src/rabbit_amqp_session.erl

+6-2
Original file line numberDiff line numberDiff line change
@@ -2164,7 +2164,8 @@ handle_deliver(ConsumerTag, AckRequired,
21642164
conn_name = ConnName,
21652165
channel_num = ChannelNum,
21662166
user = #user{username = Username},
2167-
trace_state = Trace}}) ->
2167+
trace_state = Trace,
2168+
msg_interceptor_ctx = MsgInterceptorCtx}}) ->
21682169
Handle = ctag_to_handle(ConsumerTag),
21692170
case OutgoingLinks0 of
21702171
#{Handle := #outgoing_link{queue_type = QType,
@@ -2180,7 +2181,10 @@ handle_deliver(ConsumerTag, AckRequired,
21802181
message_format = ?UINT(?MESSAGE_FORMAT),
21812182
settled = SendSettled},
21822183
Mc1 = mc:convert(mc_amqp, Mc0),
2183-
Mc = mc:set_annotation(redelivered, Redelivered, Mc1),
2184+
Mc2 = rabbit_message_interceptor:intercept(Mc1,
2185+
MsgInterceptorCtx,
2186+
outgoing_message_interceptors),
2187+
Mc = mc:set_annotation(redelivered, Redelivered, Mc2),
21842188
Sections = mc:protocol_state(Mc),
21852189
validate_message_size(Sections, MaxMessageSize),
21862190
Frames = transfer_frames(Transfer, Sections, MaxFrameSize),

Diff for: deps/rabbit/src/rabbit_channel.erl

+13-4
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,8 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
12551255
conn_pid = ConnPid,
12561256
user = User,
12571257
virtual_host = VHostPath,
1258-
authz_context = AuthzContext
1258+
authz_context = AuthzContext,
1259+
msg_interceptor_ctx = MsgInterceptorCtx
12591260
},
12601261
limiter = Limiter,
12611262
next_tag = DeliveryTag,
@@ -1270,8 +1271,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
12701271
Q, NoAck, rabbit_limiter:pid(Limiter),
12711272
DeliveryTag, QueueStates0)
12721273
end) of
1273-
{ok, MessageCount, Msg, QueueStates} ->
1274+
{ok, MessageCount, Msg0, QueueStates} ->
12741275
{ok, QueueType} = rabbit_queue_type:module(QueueName, QueueStates),
1276+
Msg = rabbit_message_interceptor:intercept(Msg0,
1277+
MsgInterceptorCtx,
1278+
outgoing_message_interceptors),
12751279
handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Msg,
12761280
QueueType, State#ch{queue_states = QueueStates});
12771281
{empty, QueueStates} ->
@@ -2601,11 +2605,16 @@ handle_deliver(CTag, Ack, Msgs, State) when is_list(Msgs) ->
26012605
end, State, Msgs).
26022606

26032607
handle_deliver0(ConsumerTag, AckRequired,
2604-
{QName, QPid, _MsgId, Redelivered, MsgCont0} = Msg,
2608+
{QName, QPid, _MsgId, Redelivered, MsgCont0} = Msg0,
26052609
State = #ch{cfg = #conf{writer_pid = WriterPid,
2606-
writer_gc_threshold = GCThreshold},
2610+
writer_gc_threshold = GCThreshold,
2611+
msg_interceptor_ctx = MsgInterceptorCtx},
26072612
next_tag = DeliveryTag,
26082613
queue_states = Qs}) ->
2614+
2615+
Msg = rabbit_message_interceptor:intercept(Msg0,
2616+
MsgInterceptorCtx,
2617+
outgoing_message_interceptors),
26092618
Exchange = mc:exchange(MsgCont0),
26102619
[RoutingKey | _] = mc:routing_keys(MsgCont0),
26112620
MsgCont = mc:convert(mc_amqpl, MsgCont0),

Diff for: deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

+4-1
Original file line numberDiff line numberDiff line change
@@ -2073,7 +2073,10 @@ deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery,
20732073
true -> ?QOS_1;
20742074
false -> ?QOS_0
20752075
end,
2076-
McMqtt = mc:convert(mc_mqtt, Mc, mc_env()),
2076+
McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()),
2077+
McMqtt = rabbit_message_interceptor:intercept(McMqtt0,
2078+
build_msg_interceptor_ctx(State0),
2079+
outgoing_message_interceptors),
20772080
MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt),
20782081
QoS = effective_qos(PublisherQos, SubscriberQoS),
20792082
{SettleOp, State1} = maybe_publish_to_client(MqttMsg, Delivery, QoS, State0),

0 commit comments

Comments
 (0)