Skip to content

Commit b3918cd

Browse files
dcorbachomergify[bot]
authored andcommitted
Quorum queues: unblock publishers when clearing max-length policy
(cherry picked from commit 05f0e03)
1 parent c447701 commit b3918cd

File tree

1 file changed

+22
-11
lines changed

1 file changed

+22
-11
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1913,11 +1913,21 @@ checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
19131913
Effects = add_delivery_effects(Effects0, SendAcc, State0),
19141914
{State0, ExpiredMsg, lists:reverse(Effects)}.
19151915

1916-
evaluate_limit(_Index, Result, _BeforeState,
1916+
evaluate_limit(_Index, Result,
1917+
#?STATE{cfg = #cfg{max_length = undefined,
1918+
max_bytes = undefined}},
19171919
#?STATE{cfg = #cfg{max_length = undefined,
19181920
max_bytes = undefined}} = State,
19191921
Effects) ->
19201922
{State, Result, Effects};
1923+
evaluate_limit(_Index, Result, _BeforeState,
1924+
#?STATE{cfg = #cfg{max_length = undefined,
1925+
max_bytes = undefined},
1926+
enqueuers = Enqs0} = State0,
1927+
Effects0) ->
1928+
%% max_length and/or max_bytes policies have just been deleted
1929+
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
1930+
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
19211931
evaluate_limit(Index, Result, BeforeState,
19221932
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
19231933
enqueuers = Enqs0} = State0,
@@ -1947,16 +1957,7 @@ evaluate_limit(Index, Result, BeforeState,
19471957
case {Before, is_below_soft_limit(State0)} of
19481958
{false, true} ->
19491959
%% we have moved below the lower limit
1950-
{Enqs, Effects} =
1951-
maps:fold(
1952-
fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
1953-
E = E0#enqueuer{blocked = undefined},
1954-
{Enqs#{P => E},
1955-
[{send_msg, P, {queue_status, go}, [ra_event]}
1956-
| Acc]};
1957-
(_P, _E, Acc) ->
1958-
Acc
1959-
end, {Enqs0, Effects0}, Enqs0),
1960+
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
19601961
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
19611962
_ ->
19621963
{State0, Result, Effects0}
@@ -1965,6 +1966,16 @@ evaluate_limit(Index, Result, BeforeState,
19651966
{State0, Result, Effects0}
19661967
end.
19671968

1969+
unblock_enqueuers(Enqs0, Effects0) ->
1970+
maps:fold(
1971+
fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
1972+
E = E0#enqueuer{blocked = undefined},
1973+
{Enqs#{P => E},
1974+
[{send_msg, P, {queue_status, go}, [ra_event]}
1975+
| Acc]};
1976+
(_P, _E, Acc) ->
1977+
Acc
1978+
end, {Enqs0, Effects0}, Enqs0).
19681979

19691980
%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
19701981
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->

0 commit comments

Comments
 (0)