Skip to content

Commit 243800f

Browse files
markusgustmergify[bot]
authored andcommitted
ensures the pending returns an integer and not a list for shovels
(cherry picked from commit fe6fb4c)
1 parent 0c8337c commit 243800f

File tree

5 files changed

+200
-8
lines changed

5 files changed

+200
-8
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
ack/3,
3434
nack/3,
3535
status/1,
36-
forward/3
36+
forward/3,
37+
pending_count/1
3738
]).
3839

3940
%% Function references should not be stored on the metadata store.
@@ -360,6 +361,10 @@ status(#{dest := #{blocked_by := BlockReasons}}) when BlockReasons =/= [] ->
360361
status(_) ->
361362
running.
362363

364+
pending_count(#{dest := Dest}) ->
365+
Pending = maps:get(pending, Dest, queue:new()),
366+
queue:len(Pending).
367+
363368
add_pending(Elem, State = #{dest := Dest}) ->
364369
Pending = maps:get(pending, Dest, queue:new()),
365370
State#{dest => Dest#{pending => queue:in(Elem, Pending)}}.

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
ack/3,
3333
nack/3,
3434
status/1,
35-
forward/3
35+
forward/3,
36+
pending_count/1
3637
]).
3738

3839
-import(rabbit_misc, [pget/2, pget/3]).
@@ -317,6 +318,10 @@ status(_) ->
317318
%% Destination not yet connected
318319
ignore.
319320

321+
pending_count(#{dest := Dest}) ->
322+
Pending = maps:get(pending, Dest, []),
323+
length(Pending).
324+
320325
-spec forward(Tag :: tag(), Mc :: mc:state(), state()) ->
321326
state() | {stop, any()}.
322327
forward(_Tag, _Mc,

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
ack/3,
4343
nack/3,
4444
forward/3,
45-
status/1
45+
status/1,
46+
pending_count/1
4647
]).
4748

4849
-export([
@@ -443,6 +444,11 @@ add_routing(Msg0, Dest) ->
443444
status(_) ->
444445
running.
445446

447+
pending_count(#{source := #{current := #{unacked_message_q := UAMQ}}}) ->
448+
?QUEUE:len(UAMQ);
449+
pending_count(_State) ->
450+
0.
451+
446452
%% Internal
447453

448454
parse_parameter(_, _, none) ->

deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
-callback forward(Tag :: tag(), Msg :: mc:state(), state()) ->
8686
state() | {stop, any()}.
8787
-callback status(state()) -> rabbit_shovel_status:shovel_status().
88+
-callback pending_count(state()) -> non_neg_integer().
8889

8990
-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
9091
source_config() | dest_config().
@@ -164,12 +165,12 @@ incr_forwarded(State = #{dest := Dest}) ->
164165
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.
165166

166167
-spec metrics(state()) -> rabbit_shovel_status:metrics().
167-
metrics(_State = #{source := Source,
168-
dest := Dest}) ->
168+
metrics(#{source := Source,
169+
dest := #{module := Mod}} = State) ->
169170
#{remaining => maps:get(remaining, Source, unlimited),
170-
remaining_unacked => maps:get(remaining_unacked, Source, 0),
171-
pending => maps:get(pending, Dest, 0),
172-
forwarded => maps:get(forwarded, Dest, 0)}.
171+
remaining_unacked => maps:get(remaining_unacked, Source, 0),
172+
pending => Mod:pending_count(State),
173+
forwarded => maps:get(forwarded, maps:get(dest, State), 0)}.
173174

174175

175176
%% Common functions
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(pending_count_SUITE).
9+
10+
-compile(export_all).
11+
12+
-include_lib("eunit/include/eunit.hrl").
13+
-include_lib("rabbit/include/mc.hrl").
14+
-include("../include/rabbit_shovel.hrl").
15+
16+
%%%===================================================================
17+
%%% Common Test callbacks
18+
%%%===================================================================
19+
20+
all() ->
21+
[
22+
{group, pending_count_tests}
23+
].
24+
25+
groups() ->
26+
[
27+
{pending_count_tests, [], [
28+
amqp091_pending_count_empty_queue,
29+
amqp091_pending_count_with_messages,
30+
amqp091_pending_count_after_drain,
31+
amqp10_pending_count_empty_list,
32+
amqp10_pending_count_with_messages,
33+
amqp10_pending_count_after_clear,
34+
local_pending_count_empty_queue,
35+
local_pending_count_with_messages,
36+
local_pending_count_after_settle,
37+
behaviour_metrics_includes_pending,
38+
behaviour_pending_count_delegation
39+
]}
40+
].
41+
42+
init_per_suite(Config) ->
43+
Config.
44+
45+
end_per_suite(_Config) ->
46+
ok.
47+
48+
init_per_group(_Group, Config) ->
49+
Config.
50+
51+
end_per_group(_Group, _Config) ->
52+
ok.
53+
54+
init_per_testcase(_TestCase, Config) ->
55+
Config.
56+
57+
end_per_testcase(_TestCase, _Config) ->
58+
meck:unload(),
59+
ok.
60+
61+
%%%===================================================================
62+
%%% Test cases
63+
%%%===================================================================
64+
65+
%% Test AMQP 0.9.1 pending_count functionality
66+
amqp091_pending_count_empty_queue(_Config) ->
67+
%% Test that pending_count returns 0 when no messages are pending
68+
State = #{dest => #{}},
69+
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).
70+
71+
amqp091_pending_count_with_messages(_Config) ->
72+
%% Test that pending_count returns correct count when messages are pending
73+
PendingQueue = queue:from_list([{1, msg1}, {2, msg2}, {3, msg3}]),
74+
State = #{dest => #{pending => PendingQueue}},
75+
?assertEqual(3, rabbit_amqp091_shovel:pending_count(State)).
76+
77+
amqp091_pending_count_after_drain(_Config) ->
78+
%% Test that pending_count returns 0 after messages are drained
79+
EmptyQueue = queue:new(),
80+
State = #{dest => #{pending => EmptyQueue}},
81+
?assertEqual(0, rabbit_amqp091_shovel:pending_count(State)).
82+
83+
%% Test AMQP 1.0 pending_count functionality
84+
amqp10_pending_count_empty_list(_Config) ->
85+
%% Test that pending_count returns 0 when no messages are pending
86+
State = #{dest => #{}},
87+
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).
88+
89+
amqp10_pending_count_with_messages(_Config) ->
90+
%% Test that pending_count returns correct count when messages are pending
91+
PendingList = [{1, msg1}, {2, msg2}],
92+
State = #{dest => #{pending => PendingList}},
93+
?assertEqual(2, rabbit_amqp10_shovel:pending_count(State)).
94+
95+
amqp10_pending_count_after_clear(_Config) ->
96+
%% Test that pending_count returns 0 after pending list is cleared
97+
State = #{dest => #{pending => []}},
98+
?assertEqual(0, rabbit_amqp10_shovel:pending_count(State)).
99+
100+
%% Test Local shovel pending_count functionality
101+
local_pending_count_empty_queue(_Config) ->
102+
%% Test that pending_count returns 0 when unacked message queue is empty
103+
EmptyQueue = lqueue:new(),
104+
State = #{source => #{current => #{unacked_message_q => EmptyQueue}}},
105+
?assertEqual(0, rabbit_local_shovel:pending_count(State)).
106+
107+
local_pending_count_with_messages(_Config) ->
108+
%% Test that pending_count returns correct count from unacked message queue
109+
UnackedQueue = lqueue:from_list([msg1, msg2, msg3, msg4]),
110+
State = #{source => #{current => #{unacked_message_q => UnackedQueue}}},
111+
?assertEqual(4, rabbit_local_shovel:pending_count(State)).
112+
113+
local_pending_count_after_settle(_Config) ->
114+
%% Test that pending_count returns 0 when state doesn't contain unacked queue
115+
State = #{source => #{current => #{}}},
116+
?assertEqual(0, rabbit_local_shovel:pending_count(State)).
117+
118+
%% Test behaviour module integration
119+
behaviour_metrics_includes_pending(_Config) ->
120+
%% Mock the destination module's pending_count and status functions
121+
meck:new(rabbit_amqp091_shovel, [passthrough]),
122+
meck:expect(rabbit_amqp091_shovel, pending_count, fun(_) -> 5 end),
123+
meck:expect(rabbit_amqp091_shovel, status, fun(_) -> running end),
124+
125+
State = #{source => #{remaining => 10, remaining_unacked => 3},
126+
dest => #{module => rabbit_amqp091_shovel, forwarded => 7}},
127+
128+
{_Status, Metrics} = rabbit_shovel_behaviour:status(State),
129+
130+
?assertMatch(#{remaining := 10,
131+
remaining_unacked := 3,
132+
pending := 5,
133+
forwarded := 7}, Metrics),
134+
135+
?assert(meck:validate(rabbit_amqp091_shovel)).
136+
137+
behaviour_pending_count_delegation(_Config) ->
138+
%% Test that the behaviour module correctly delegates to the specific implementation
139+
meck:new(rabbit_amqp10_shovel, [passthrough]),
140+
meck:expect(rabbit_amqp10_shovel, pending_count, fun(_State) -> 3 end),
141+
meck:expect(rabbit_amqp10_shovel, status, fun(_State) -> running end),
142+
143+
State = #{dest => #{module => rabbit_amqp10_shovel}},
144+
145+
%% This would be called indirectly through status/1
146+
{_Status, Metrics} = rabbit_shovel_behaviour:status(#{source => #{},
147+
dest => maps:get(dest, State)}),
148+
149+
?assertEqual(3, maps:get(pending, Metrics)),
150+
?assert(meck:validate(rabbit_amqp10_shovel)).
151+
152+
%%%===================================================================
153+
%%% Integration tests for pending_count behavior in different scenarios
154+
%%%===================================================================
155+
156+
%% Additional test cases to verify pending_count behavior in realistic scenarios
157+
%% These could be added if we want to test the actual message flow scenarios
158+
159+
pending_count_during_flow_control(_Config) ->
160+
%% Test case outline: Verify pending_count increases when flow control blocks forwarding
161+
%% and decreases when flow control is lifted
162+
%% This would require more complex setup with actual message handling
163+
ok.
164+
165+
pending_count_with_multiple_ack_modes(_Config) ->
166+
%% Test case outline: Verify pending_count behaves correctly across different ack modes
167+
%% (no_ack, on_publish, on_confirm)
168+
ok.
169+
170+
pending_count_edge_cases(_Config) ->
171+
%% Test case outline: Test edge cases like:
172+
%% - Missing dest/source maps
173+
%% - Malformed pending data structures
174+
%% - Very large pending counts
175+
ok.

0 commit comments

Comments
 (0)