Skip to content

Commit 7821a27

Browse files
Merge pull request #12314 from rabbitmq/mergify/bp/v4.0.x/pr-12310
QQ: unblock publishers when queue length policy is deleted (backport #12310)
2 parents 20bec8f + b3918cd commit 7821a27

File tree

2 files changed

+53
-11
lines changed

2 files changed

+53
-11
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1913,11 +1913,21 @@ checkout0(_Meta, {_Activity, ExpiredMsg, State0, Effects0}, SendAcc) ->
19131913
Effects = add_delivery_effects(Effects0, SendAcc, State0),
19141914
{State0, ExpiredMsg, lists:reverse(Effects)}.
19151915

1916-
evaluate_limit(_Index, Result, _BeforeState,
1916+
evaluate_limit(_Index, Result,
1917+
#?STATE{cfg = #cfg{max_length = undefined,
1918+
max_bytes = undefined}},
19171919
#?STATE{cfg = #cfg{max_length = undefined,
19181920
max_bytes = undefined}} = State,
19191921
Effects) ->
19201922
{State, Result, Effects};
1923+
evaluate_limit(_Index, Result, _BeforeState,
1924+
#?STATE{cfg = #cfg{max_length = undefined,
1925+
max_bytes = undefined},
1926+
enqueuers = Enqs0} = State0,
1927+
Effects0) ->
1928+
%% max_length and/or max_bytes policies have just been deleted
1929+
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
1930+
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
19211931
evaluate_limit(Index, Result, BeforeState,
19221932
#?STATE{cfg = #cfg{overflow_strategy = Strategy},
19231933
enqueuers = Enqs0} = State0,
@@ -1947,16 +1957,7 @@ evaluate_limit(Index, Result, BeforeState,
19471957
case {Before, is_below_soft_limit(State0)} of
19481958
{false, true} ->
19491959
%% we have moved below the lower limit
1950-
{Enqs, Effects} =
1951-
maps:fold(
1952-
fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
1953-
E = E0#enqueuer{blocked = undefined},
1954-
{Enqs#{P => E},
1955-
[{send_msg, P, {queue_status, go}, [ra_event]}
1956-
| Acc]};
1957-
(_P, _E, Acc) ->
1958-
Acc
1959-
end, {Enqs0, Effects0}, Enqs0),
1960+
{Enqs, Effects} = unblock_enqueuers(Enqs0, Effects0),
19601961
{State0#?STATE{enqueuers = Enqs}, Result, Effects};
19611962
_ ->
19621963
{State0, Result, Effects0}
@@ -1965,6 +1966,16 @@ evaluate_limit(Index, Result, BeforeState,
19651966
{State0, Result, Effects0}
19661967
end.
19671968

1969+
unblock_enqueuers(Enqs0, Effects0) ->
1970+
maps:fold(
1971+
fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
1972+
E = E0#enqueuer{blocked = undefined},
1973+
{Enqs#{P => E},
1974+
[{send_msg, P, {queue_status, go}, [ra_event]}
1975+
| Acc]};
1976+
(_P, _E, Acc) ->
1977+
Acc
1978+
end, {Enqs0, Effects0}, Enqs0).
19681979

19691980
%% [6,5,4,3,2,1] -> [[1,2],[3,4],[5,6]]
19701981
chunk_disk_msgs([], _Bytes, [[] | Chunks]) ->

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ all_tests() ->
151151
message_bytes_metrics,
152152
queue_length_limit_drop_head,
153153
queue_length_limit_reject_publish,
154+
queue_length_limit_policy_cleared,
154155
subscribe_redelivery_limit,
155156
subscribe_redelivery_limit_disable,
156157
subscribe_redelivery_limit_many,
@@ -2973,6 +2974,36 @@ queue_length_limit_reject_publish(Config) ->
29732974
ok = publish_confirm(Ch, QQ),
29742975
ok.
29752976

2977+
queue_length_limit_policy_cleared(Config) ->
2978+
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2979+
2980+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
2981+
QQ = ?config(queue_name, Config),
2982+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
2983+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
2984+
ok = rabbit_ct_broker_helpers:set_policy(
2985+
Config, 0, <<"max-length">>, QQ, <<"queues">>,
2986+
[{<<"max-length">>, 2},
2987+
{<<"overflow">>, <<"reject-publish">>}]),
2988+
timer:sleep(1000),
2989+
RaName = ra_name(QQ),
2990+
QueryFun = fun rabbit_fifo:overview/1,
2991+
?awaitMatch({ok, {_, #{config := #{max_length := 2}}}, _},
2992+
rpc:call(Server, ra, local_query, [RaName, QueryFun]),
2993+
?DEFAULT_AWAIT),
2994+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
2995+
ok = publish_confirm(Ch, QQ),
2996+
ok = publish_confirm(Ch, QQ),
2997+
ok = publish_confirm(Ch, QQ), %% QQs allow one message above the limit
2998+
wait_for_messages_ready(Servers, RaName, 3),
2999+
fail = publish_confirm(Ch, QQ),
3000+
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"max-length">>),
3001+
?awaitMatch({ok, {_, #{config := #{max_length := undefined}}}, _},
3002+
rpc:call(Server, ra, local_query, [RaName, QueryFun]),
3003+
?DEFAULT_AWAIT),
3004+
ok = publish_confirm(Ch, QQ),
3005+
wait_for_messages_ready(Servers, RaName, 4).
3006+
29763007
purge(Config) ->
29773008
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
29783009

0 commit comments

Comments
 (0)