Skip to content

Commit 5f8c094

Browse files
committed
QQ: remove use of rabbit_fifo_index
From the main state machine (still used in the dlx module). This can be done as we no longer need to super efficiently query the smallest raft index. Removing it will reduce peak memory use somewhat as well as simplifying the code.
1 parent 1daec5d commit 5f8c094

File tree

3 files changed

+36
-140
lines changed

3 files changed

+36
-140
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 29 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
query_messages_checked_out/1,
6161
query_messages_total/1,
6262
query_processes/1,
63-
query_ra_indexes/1,
6463
query_waiting_consumers/1,
6564
query_consumer_count/1,
6665
query_consumers/1,
@@ -309,12 +308,10 @@ apply(Meta, #modify{consumer_key = ConsumerKey,
309308
apply(#{index := Idx} = Meta,
310309
#requeue{consumer_key = ConsumerKey,
311310
msg_id = MsgId,
312-
index = OldIdx,
311+
index = _OldIdx,
313312
header = Header0},
314313
#?STATE{consumers = Cons,
315-
messages = Messages,
316-
ra_indexes = Indexes0,
317-
enqueue_count = EnqCount} = State00) ->
314+
messages = Messages} = State00) ->
318315
%% the actual consumer key was looked up in the aux handler so we
319316
%% dont need to use find_consumer/2 here
320317
case Cons of
@@ -326,12 +323,9 @@ apply(#{index := Idx} = Meta,
326323
State0 = add_bytes_return(Header, State00),
327324
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0),
328325
credit = increase_credit(Con0, 1)},
329-
State1 = State0#?STATE{ra_indexes = rabbit_fifo_index:delete(OldIdx,
330-
Indexes0),
331-
messages = rabbit_fifo_q:in(no,
326+
State1 = State0#?STATE{messages = rabbit_fifo_q:in(no,
332327
?MSG(Idx, Header),
333-
Messages),
334-
enqueue_count = EnqCount + 1},
328+
Messages)},
335329
State2 = update_or_remove_con(Meta, ConsumerKey, Con, State1),
336330
{State3, Effects} = activate_next_consumer({State2, []}),
337331
checkout(Meta, State0, State3, Effects);
@@ -467,31 +461,10 @@ apply(#{index := Idx} = Meta,
467461
checkout(Meta, State0, State2, [{monitor, process, Pid} | Effs], Reply);
468462
apply(#{index := Index}, #purge{},
469463
#?STATE{messages_total = Total,
470-
returns = Returns,
471-
ra_indexes = Indexes0,
472464
msg_bytes_enqueue = MsgBytesEnqueue
473465
} = State0) ->
474466
NumReady = messages_ready(State0),
475-
Indexes = case Total of
476-
NumReady ->
477-
%% All messages are either in 'messages' queue or
478-
%% 'returns' queue.
479-
%% No message is awaiting acknowledgement.
480-
%% Optimization: empty all 'ra_indexes'.
481-
rabbit_fifo_index:empty();
482-
_ ->
483-
%% Some messages are checked out to consumers
484-
%% awaiting acknowledgement.
485-
%% Therefore we cannot empty all 'ra_indexes'.
486-
%% We only need to delete the indexes from the 'returns'
487-
%% queue because messages of the 'messages' queue are
488-
%% not part of the 'ra_indexes'.
489-
lqueue:fold(fun(?MSG(I, _), Acc) ->
490-
rabbit_fifo_index:delete(I, Acc)
491-
end, Indexes0, Returns)
492-
end,
493-
State1 = State0#?STATE{ra_indexes = Indexes,
494-
messages = rabbit_fifo_q:new(),
467+
State1 = State0#?STATE{messages = rabbit_fifo_q:new(),
495468
messages_total = Total - NumReady,
496469
returns = lqueue:new(),
497470
msg_bytes_enqueue = 0
@@ -557,7 +530,7 @@ apply(#{system_time := Ts} = Meta,
557530
apply(#{system_time := Ts} = Meta,
558531
{down, Pid, noconnection},
559532
#?STATE{consumers = Cons0,
560-
enqueuers = Enqs0} = State0) ->
533+
enqueuers = Enqs0} = State0) ->
561534
%% A node has been disconnected. This doesn't necessarily mean that
562535
%% any processes on this node are down, they _may_ come back so here
563536
%% we just mark them as suspected (effectively deactivated)
@@ -724,7 +697,8 @@ snapshot_installed(_Meta, #?MODULE{cfg = #cfg{resource = QR},
724697

725698
convert_v7_to_v8(#{} = _Meta, StateV7) ->
726699
StateV8 = StateV7,
727-
StateV8.
700+
StateV8#?STATE{discarded_bytes = 0,
701+
unused_0 = ?NIL}.
728702

729703
purge_node(Meta, Node, State, Effects) ->
730704
lists:foldl(fun(Pid, {S0, E0}) ->
@@ -864,7 +838,6 @@ tick(Ts, #?STATE{cfg = #cfg{resource = QName}} = State) ->
864838
-spec overview(state()) -> map().
865839
overview(#?STATE{consumers = Cons,
866840
enqueuers = Enqs,
867-
% enqueue_count = EnqCount,
868841
msg_bytes_enqueue = EnqueueBytes,
869842
msg_bytes_checkout = CheckoutBytes,
870843
cfg = Cfg,
@@ -947,9 +920,7 @@ which_module(8) -> ?MODULE.
947920

948921
-record(snapshot, {index :: ra:index(),
949922
timestamp :: milliseconds(),
950-
% smallest_index :: undefined | ra:index(),
951923
messages_total = 0 :: non_neg_integer(),
952-
% indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(),
953924
bytes_out = 0 :: non_neg_integer()}).
954925
-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
955926
-record(?AUX, {name :: atom(),
@@ -1235,9 +1206,6 @@ query_processes(#?STATE{enqueuers = Enqs, consumers = Cons0}) ->
12351206
maps:keys(maps:merge(Enqs, Cons)).
12361207

12371208

1238-
query_ra_indexes(#?STATE{ra_indexes = RaIndexes}) ->
1239-
RaIndexes.
1240-
12411209
query_waiting_consumers(#?STATE{waiting_consumers = WaitingConsumers}) ->
12421210
WaitingConsumers.
12431211

@@ -1593,14 +1561,12 @@ apply_enqueue(#{index := RaftIdx,
15931561
decr_total(#?STATE{messages_total = Tot} = State) ->
15941562
State#?STATE{messages_total = Tot - 1}.
15951563

1596-
drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
1564+
drop_head(#?STATE{} = State0, Effects) ->
15971565
case take_next_msg(State0) of
1598-
{?MSG(Idx, Header) = Msg, State1} ->
1599-
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
1600-
State2 = State1#?STATE{ra_indexes = Indexes},
1601-
State3 = decr_total(add_bytes_drop(Header, State2)),
1566+
{?MSG(_Idx, Header) = Msg, State1} ->
1567+
State = decr_total(add_bytes_drop(Header, State1)),
16021568
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
1603-
dlx = DlxState} = State = State3,
1569+
dlx = DlxState} = State,
16041570
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
16051571
{State, combine_effects(DlxEffects, Effects)};
16061572
empty ->
@@ -1668,7 +1634,6 @@ update_expiry_header(ExpiryTs, Header) ->
16681634
maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
16691635
{_MetaSize, BodySize},
16701636
Effects, #?STATE{msg_bytes_enqueue = Enqueue,
1671-
enqueue_count = EnqCount,
16721637
messages = Messages,
16731638
messages_total = Total} = State0) ->
16741639
% direct enqueue without tracking
@@ -1678,15 +1643,13 @@ maybe_enqueue(RaftIdx, Ts, undefined, undefined, RawMsg,
16781643
Msg = ?MSG(RaftIdx, Header),
16791644
PTag = priority_tag(RawMsg),
16801645
State = State0#?STATE{msg_bytes_enqueue = Enqueue + Size,
1681-
enqueue_count = EnqCount + 1,
16821646
messages_total = Total + 1,
16831647
messages = rabbit_fifo_q:in(PTag, Msg, Messages)
16841648
},
16851649
{ok, State, Effects};
16861650
maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
16871651
{_MetaSize, BodySize} = Size,
16881652
Effects0, #?STATE{msg_bytes_enqueue = Enqueue,
1689-
enqueue_count = EnqCount,
16901653
enqueuers = Enqueuers0,
16911654
messages = Messages,
16921655
messages_total = Total} = State0) ->
@@ -1712,7 +1675,6 @@ maybe_enqueue(RaftIdx, Ts, From, MsgSeqNo, RawMsg,
17121675
end,
17131676
PTag = priority_tag(RawMsg),
17141677
State = State0#?STATE{msg_bytes_enqueue = Enqueue + BodySize,
1715-
enqueue_count = EnqCount + 1,
17161678
messages_total = Total + 1,
17171679
messages = rabbit_fifo_q:in(PTag, Msg, Messages),
17181680
enqueuers = Enqueuers0#{From => Enq},
@@ -1755,47 +1717,42 @@ return(Meta, ConsumerKey,
17551717
% used to process messages that are finished
17561718
complete(Meta, ConsumerKey, [MsgId],
17571719
#consumer{checked_out = Checked0} = Con0,
1758-
#?STATE{ra_indexes = Indexes0,
1759-
msg_bytes_checkout = BytesCheckout,
1720+
#?STATE{msg_bytes_checkout = BytesCheckout,
17601721
messages_total = Tot} = State0,
17611722
Effects) ->
17621723
case maps:take(MsgId, Checked0) of
1763-
{?MSG(Idx, Hdr), Checked} ->
1724+
{?MSG(_Idx, Hdr), Checked} ->
17641725
SettledSize = get_header(size, Hdr),
1765-
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
17661726
Con = Con0#consumer{checked_out = Checked,
17671727
credit = increase_credit(Con0, 1)},
17681728
State1 = update_or_remove_con(Meta, ConsumerKey, Con, State0),
1769-
{State1#?STATE{ra_indexes = Indexes,
1770-
msg_bytes_checkout = BytesCheckout - SettledSize,
1771-
messages_total = Tot - 1},
1729+
{State1#?STATE{msg_bytes_checkout = BytesCheckout - SettledSize,
1730+
messages_total = Tot - 1},
17721731
[{aux, {bytes_out, SettledSize}} | Effects]};
17731732
error ->
17741733
{State0, Effects}
17751734
end;
17761735
complete(Meta, ConsumerKey, MsgIds,
17771736
#consumer{checked_out = Checked0} = Con0,
1778-
#?STATE{ra_indexes = Indexes0,
1779-
msg_bytes_checkout = BytesCheckout,
1737+
#?STATE{msg_bytes_checkout = BytesCheckout,
17801738
messages_total = Tot} = State0, Effects) ->
1781-
{SettledSize, Checked, Indexes}
1739+
{SettledSize, Checked}
17821740
= lists:foldl(
1783-
fun (MsgId, {S0, Ch0, Idxs}) ->
1741+
fun (MsgId, {S0, Ch0}) ->
17841742
case maps:take(MsgId, Ch0) of
1785-
{?MSG(Idx, Hdr), Ch} ->
1743+
{?MSG(_Idx, Hdr), Ch} ->
17861744
S = get_header(size, Hdr) + S0,
1787-
{S, Ch, rabbit_fifo_index:delete(Idx, Idxs)};
1745+
{S, Ch};
17881746
error ->
1789-
{S0, Ch0, Idxs}
1747+
{S0, Ch0}
17901748
end
1791-
end, {0, Checked0, Indexes0}, MsgIds),
1749+
end, {0, Checked0}, MsgIds),
17921750
Len = map_size(Checked0) - map_size(Checked),
17931751
Con = Con0#consumer{checked_out = Checked,
17941752
credit = increase_credit(Con0, Len)},
17951753
State1 = update_or_remove_con(Meta, ConsumerKey, Con, State0),
1796-
{State1#?STATE{ra_indexes = Indexes,
1797-
msg_bytes_checkout = BytesCheckout - SettledSize,
1798-
messages_total = Tot - Len},
1754+
{State1#?STATE{msg_bytes_checkout = BytesCheckout - SettledSize,
1755+
messages_total = Tot - Len},
17991756
[{aux, {bytes_out, SettledSize}} | Effects]}.
18001757

18011758
increase_credit(#consumer{cfg = #consumer_cfg{lifetime = once},
@@ -2072,21 +2029,16 @@ add_delivery_effects(Effects0, AccMap, State) ->
20722029
end, Effects0, AccMap).
20732030

20742031
take_next_msg(#?STATE{returns = Returns0,
2075-
messages = Messages0,
2076-
ra_indexes = Indexes0
2077-
} = State) ->
2032+
messages = Messages0} = State) ->
20782033
case lqueue:out(Returns0) of
20792034
{{value, NextMsg}, Returns} ->
20802035
{NextMsg, State#?STATE{returns = Returns}};
20812036
{empty, _} ->
20822037
case rabbit_fifo_q:out(Messages0) of
20832038
empty ->
20842039
empty;
2085-
{?MSG(RaftIdx, _) = Msg, Messages} ->
2086-
%% add index here
2087-
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
2088-
{Msg, State#?STATE{messages = Messages,
2089-
ra_indexes = Indexes}}
2040+
{?MSG(_RaftIdx, _) = Msg, Messages} ->
2041+
{Msg, State#?STATE{messages = Messages}}
20902042
end
20912043
end.
20922044

@@ -2224,18 +2176,15 @@ expire_msgs(RaCmdTs, Result, State, Effects) ->
22242176
end.
22252177

22262178
expire(RaCmdTs, State0, Effects) ->
2227-
{?MSG(Idx, Header) = Msg,
2179+
{?MSG(_Idx, Header) = Msg,
22282180
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
22292181
dlx = DlxState0,
2230-
ra_indexes = Indexes0,
22312182
messages_total = Tot,
22322183
msg_bytes_enqueue = MsgBytesEnqueue} = State1} =
22332184
take_next_msg(State0),
22342185
{DlxState, DlxEffects} = rabbit_fifo_dlx:discard([Msg], expired,
22352186
DLH, DlxState0),
2236-
Indexes = rabbit_fifo_index:delete(Idx, Indexes0),
22372187
State = State1#?STATE{dlx = DlxState,
2238-
ra_indexes = Indexes,
22392188
messages_total = Tot - 1,
22402189
msg_bytes_enqueue =
22412190
MsgBytesEnqueue - get_header(size, Header)},

deps/rabbit/src/rabbit_fifo.hrl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,11 @@
187187
messages_total = 0 :: non_neg_integer(),
188188
% queue of returned msg_in_ids - when checking out it picks from
189189
returns = lqueue:new() :: lqueue:lqueue(term()),
190-
% a counter of enqueues - used to trigger shadow copy points
190+
% discareded bytes - a counter that is incremented every time a command
191+
% is procesesed that does not need to be kept (live indexes).
192+
% Approximate, used for triggering snapshots
191193
% reset to 0 when release_cursor gets stored
192-
enqueue_count = 0 :: non_neg_integer(),
194+
discarded_bytes = 0,
193195
% a map containing all the live processes that have ever enqueued
194196
% a message to this queue
195197
enqueuers = #{} :: #{pid() => #enqueuer{}},
@@ -198,7 +200,7 @@
198200
% rabbit_fifo_index can be slow when calculating the smallest
199201
% index when there are large gaps but should be faster than gb_trees
200202
% for normal appending operations as it's backed by a map
201-
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
203+
unused_0 = ?NIL,
202204
unused_1 = ?NIL,
203205
% consumers need to reflect consumer state at time of snapshot
204206
consumers = #{} :: #{consumer_key() => consumer()},
@@ -211,6 +213,8 @@
211213
%% one is picked if active consumer is cancelled or dies
212214
%% used only when single active consumer is on
213215
waiting_consumers = [] :: [{consumer_key(), consumer()}],
216+
%% records the timestamp whenever the queue was last considered
217+
%% active in terms of consumer activity
214218
last_active :: option(non_neg_integer()),
215219
msg_cache :: option({ra:index(), raw_msg()}),
216220
unused_2 = ?NIL

deps/rabbit/test/rabbit_fifo_prop_SUITE.erl

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ all_tests() ->
6161
scenario32,
6262
upgrade,
6363
messages_total,
64-
ra_indexes,
6564
simple_prefetch,
6665
simple_prefetch_without_checkout_cancel,
6766
simple_prefetch_01,
@@ -910,30 +909,6 @@ messages_total(_Config) ->
910909
end)
911910
end, [], Size).
912911

913-
ra_indexes(_Config) ->
914-
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> false end),
915-
Size = 256,
916-
run_proper(
917-
fun () ->
918-
?FORALL({Length, Bytes, DeliveryLimit, SingleActive},
919-
frequency([{5, {undefined, undefined, undefined, false}},
920-
{5, {oneof([range(1, 10), undefined]),
921-
oneof([range(1, 1000), undefined]),
922-
oneof([range(1, 3), undefined]),
923-
oneof([true, false])
924-
}}]),
925-
begin
926-
Config = config(?FUNCTION_NAME,
927-
Length,
928-
Bytes,
929-
SingleActive,
930-
DeliveryLimit),
931-
?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops, Config)),
932-
collect({log_size, length(O)},
933-
ra_indexes_prop(Config, O)))
934-
end)
935-
end, [], Size).
936-
937912
simple_prefetch(_Config) ->
938913
Size = 500,
939914
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
@@ -1592,38 +1567,6 @@ messages_total_invariant() ->
15921567
end
15931568
end.
15941569

1595-
ra_indexes_prop(Conf0, Commands) ->
1596-
Conf = Conf0#{release_cursor_interval => 100},
1597-
Indexes = lists:seq(1, length(Commands)),
1598-
Entries = lists:zip(Indexes, Commands),
1599-
InitState = test_init(Conf),
1600-
run_log(InitState, Entries, ra_indexes_invariant()),
1601-
true.
1602-
1603-
ra_indexes_invariant() ->
1604-
%% The raft indexes contained in the `ra_indexes` `rabbit_fifo_index` must
1605-
%% be the same as all indexes checked out by consumers plus those in the
1606-
%% `returns` queue.
1607-
fun(#rabbit_fifo{ra_indexes = Index,
1608-
consumers = C,
1609-
returns = R}) ->
1610-
RIdxs = lqueue:fold(fun(?MSG(I, _), Acc) -> [I | Acc] end, [], R),
1611-
CIdxs = maps:fold(fun(_, #consumer{checked_out = Ch}, Acc0) ->
1612-
maps:fold(fun(_, ?MSG(I, _), Acc) ->
1613-
[I | Acc]
1614-
end, Acc0, Ch)
1615-
end, [], C),
1616-
ActualIdxs = lists:sort(RIdxs ++ CIdxs),
1617-
IndexIdxs = lists:sort(rabbit_fifo_index:to_list(Index)),
1618-
case ActualIdxs == IndexIdxs of
1619-
true -> true;
1620-
false ->
1621-
ct:pal("ra_indexes invariant failed Expected ~b Got ~b",
1622-
[ActualIdxs, IndexIdxs]),
1623-
false
1624-
end
1625-
end.
1626-
16271570
simple_prefetch_prop(Conf0, Commands, WithCheckoutCancel) ->
16281571
Conf = Conf0#{release_cursor_interval => 100},
16291572
Indexes = lists:seq(1, length(Commands)),

0 commit comments

Comments
 (0)