Skip to content

Commit 8caf8c0

Browse files
committed
rabbit_db_*: Handle breaking change in khepri adv API return type
All callers of `khepri_adv` and `khepri_tx_adv` need updates to handle the now consistent return type of `khepri:node_props_map()` in Khepri 0.17. We don't need any compatibility code to handle "either the old return type or the new return type" because the translation is done entirely in the "client side" code in Khepri - meaning that the return value from the Ra server is the same but it is translated differently by the functions in `khepri_adv` and `khepri_tx_adv`.
1 parent 2dd94c7 commit 8caf8c0

File tree

11 files changed

+138
-126
lines changed

11 files changed

+138
-126
lines changed

deps/rabbit/src/rabbit_db_binding.erl

+32-16
Original file line numberDiff line numberDiff line change
@@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
837837
end,
838838
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
839839

840-
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
841-
Path = khepri_route_path(
842-
VHost,
843-
Name,
844-
_Kind = ?KHEPRI_WILDCARD_STAR,
845-
_DstName = ?KHEPRI_WILDCARD_STAR,
846-
_RoutingKey = #if_has_data{}),
847-
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
848-
maps:fold(fun(_P, #{data := Set}, Acc) ->
849-
sets:to_list(Set) ++ Acc
850-
end, [], Bindings).
840+
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
841+
Pattern = khepri_route_path(
842+
VHost,
843+
SrcName,
844+
?KHEPRI_WILDCARD_STAR, %% Kind
845+
?KHEPRI_WILDCARD_STAR, %% DstName
846+
#if_has_data{}), %% RoutingKey
847+
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
848+
maps:fold(
849+
fun(Path, Props, Acc) ->
850+
case {Path, Props} of
851+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
852+
VHost, SrcName, _Kind, _Name, _RoutingKey),
853+
#{data := Set}} ->
854+
sets:to_list(Set) ++ Acc;
855+
{_, _} ->
856+
Acc
857+
end
858+
end, [], Bindings).
851859

852860
%% -------------------------------------------------------------------
853861
%% delete_for_destination_in_mnesia().
@@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
892900
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
893901
Pattern = khepri_route_path(
894902
VHost,
895-
_SrcName = ?KHEPRI_WILDCARD_STAR,
903+
?KHEPRI_WILDCARD_STAR, %% SrcName
896904
Kind,
897905
Name,
898-
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
906+
?KHEPRI_WILDCARD_STAR), %% RoutingKey
899907
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
900-
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
901-
sets:to_list(Set) ++ Acc
902-
end, [], BindingsMap),
908+
Bindings = maps:fold(
909+
fun(Path, Props, Acc) ->
910+
case {Path, Props} of
911+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
912+
VHost, _SrcName, Kind, Name, _RoutingKey),
913+
#{data := Set}} ->
914+
sets:to_list(Set) ++ Acc;
915+
{_, _} ->
916+
Acc
917+
end
918+
end, [], BindingsMap),
903919
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
904920
lists:keysort(#binding.source, Bindings), OnlyDurable).
905921

deps/rabbit/src/rabbit_db_exchange.erl

+16-10
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) ->
331331
Path = khepri_exchange_path(XName),
332332
Ret1 = rabbit_khepri:adv_get(Path),
333333
case Ret1 of
334-
{ok, #{data := X, payload_version := Vsn}} ->
334+
{ok, #{Path := #{data := X, payload_version := Vsn}}} ->
335335
X1 = Fun(X),
336336
UpdatePath =
337337
khepri_path:combine_with_conditions(
@@ -534,8 +534,7 @@ next_serial_in_khepri(XName) ->
534534
Path = khepri_exchange_serial_path(XName),
535535
Ret1 = rabbit_khepri:adv_get(Path),
536536
case Ret1 of
537-
{ok, #{data := Serial,
538-
payload_version := Vsn}} ->
537+
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
539538
UpdatePath =
540539
khepri_path:combine_with_conditions(
541540
Path, [#if_payload_version{version = Vsn}]),
@@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) ->
711710
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
712711
Deletions =
713712
maps:fold(
714-
fun(_Path, #{data := X}, Deletions) ->
715-
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
716-
rabbit_db_binding:delete_all_for_exchange_in_khepri(
717-
X, false, true),
718-
Deletions1 = rabbit_binding:add_deletion(
719-
XName, X, deleted, Bindings, XDeletions),
720-
rabbit_binding:combine_deletions(Deletions, Deletions1)
713+
fun(Path, Props, Deletions) ->
714+
case {Path, Props} of
715+
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
716+
#{data := X}} ->
717+
{deleted,
718+
#exchange{name = XName}, Bindings, XDeletions} =
719+
rabbit_db_binding:delete_all_for_exchange_in_khepri(
720+
X, false, true),
721+
Deletions1 = rabbit_binding:add_deletion(
722+
XName, X, deleted, Bindings, XDeletions),
723+
rabbit_binding:combine_deletions(Deletions, Deletions1);
724+
{_, _} ->
725+
Deletions
726+
end
721727
end, rabbit_binding:new_deletions(), NodeProps),
722728
{ok, Deletions}.
723729

deps/rabbit/src/rabbit_db_msup.erl

+2-2
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
135135
mirroring_pid = Overall,
136136
childspec = ChildSpec},
137137
case rabbit_khepri:adv_get(Path) of
138-
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139-
payload_version := Vsn}} ->
138+
{ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139+
payload_version := Vsn}}} ->
140140
case Overall of
141141
Pid ->
142142
Delegate;

deps/rabbit/src/rabbit_db_queue.erl

+24-22
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ delete_in_khepri(QueueName, OnlyDurable) ->
411411
fun () ->
412412
Path = khepri_queue_path(QueueName),
413413
case khepri_tx_adv:delete(Path) of
414-
{ok, #{data := _}} ->
414+
{ok, #{Path := #{data := _}}} ->
415415
%% we want to execute some things, as decided by rabbit_exchange,
416416
%% after the transaction.
417417
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
@@ -606,7 +606,7 @@ update_in_khepri(QName, Fun) ->
606606
Path = khepri_queue_path(QName),
607607
Ret1 = rabbit_khepri:adv_get(Path),
608608
case Ret1 of
609-
{ok, #{data := Q, payload_version := Vsn}} ->
609+
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
610610
UpdatePath = khepri_path:combine_with_conditions(
611611
Path, [#if_payload_version{version = Vsn}]),
612612
Q1 = Fun(Q),
@@ -657,7 +657,7 @@ update_decorators_in_khepri(QName, Decorators) ->
657657
Path = khepri_queue_path(QName),
658658
Ret1 = rabbit_khepri:adv_get(Path),
659659
case Ret1 of
660-
{ok, #{data := Q1, payload_version := Vsn}} ->
660+
{ok, #{Path := #{data := Q1, payload_version := Vsn}}} ->
661661
Q2 = amqqueue:set_decorators(Q1, Decorators),
662662
UpdatePath = khepri_path:combine_with_conditions(
663663
Path, [#if_payload_version{version = Vsn}]),
@@ -1075,15 +1075,12 @@ delete_transient_in_khepri(FilterFun) ->
10751075
case rabbit_khepri:adv_get_many(PathPattern) of
10761076
{ok, Props} ->
10771077
Qs = maps:fold(
1078-
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
1078+
fun(Path, #{data := Q, payload_version := Vsn}, Acc)
10791079
when ?is_amqqueue(Q) ->
10801080
case FilterFun(Q) of
10811081
true ->
1082-
Path = khepri_path:combine_with_conditions(
1083-
Path0,
1084-
[#if_payload_version{version = Vsn}]),
10851082
QName = amqqueue:get_name(Q),
1086-
[{Path, QName} | Acc];
1083+
[{Path, Vsn, QName} | Acc];
10871084
false ->
10881085
Acc
10891086
end
@@ -1102,20 +1099,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11021099
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11031100
Res = rabbit_khepri:transaction(
11041101
fun() ->
1105-
rabbit_misc:fold_while_ok(
1106-
fun({Path, QName}, Acc) ->
1107-
%% Also see `delete_in_khepri/2'.
1108-
case khepri_tx_adv:delete(Path) of
1109-
{ok, #{data := _}} ->
1110-
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1111-
QName, false),
1112-
{ok, [{QName, Deletions} | Acc]};
1113-
{ok, _} ->
1114-
{ok, Acc};
1115-
{error, _} = Error ->
1116-
Error
1117-
end
1118-
end, [], Qs)
1102+
do_delete_transient_queues_in_khepri_tx(Qs, [])
11191103
end),
11201104
case Res of
11211105
{ok, Items} ->
@@ -1129,6 +1113,24 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11291113
Error
11301114
end.
11311115

1116+
do_delete_transient_queues_in_khepri_tx([], Acc) ->
1117+
{ok, Acc};
1118+
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
1119+
%% Also see `delete_in_khepri/2'.
1120+
VersionedPath = khepri_path:combine_with_conditions(
1121+
Path, [#if_payload_version{version = Vsn}]),
1122+
case khepri_tx_adv:delete(VersionedPath) of
1123+
{ok, #{Path := #{data := _}}} ->
1124+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1125+
QName, false),
1126+
Acc1 = [{QName, Deletions} | Acc],
1127+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1128+
{ok, _} ->
1129+
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
1130+
{error, _} = Error ->
1131+
Error
1132+
end.
1133+
11321134
%% -------------------------------------------------------------------
11331135
%% foreach_transient().
11341136
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_rtparams.erl

+19-7
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ set_in_khepri(Key, Term) ->
5959
Record = #runtime_parameters{key = Key,
6060
value = Term},
6161
case rabbit_khepri:adv_put(Path, Record) of
62-
{ok, #{data := Params}} ->
62+
{ok, #{Path := #{data := Params}}} ->
6363
{old, Params#runtime_parameters.value};
6464
{ok, _} ->
6565
new
@@ -114,7 +114,7 @@ set_in_khepri_tx(Key, Term) ->
114114
Record = #runtime_parameters{key = Key,
115115
value = Term},
116116
case khepri_tx_adv:put(Path, Record) of
117-
{ok, #{data := Params}} ->
117+
{ok, #{Path := #{data := Params}}} ->
118118
{old, Params#runtime_parameters.value};
119119
{ok, _} ->
120120
new
@@ -347,11 +347,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
347347
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].
348348

349349
delete_vhost_in_khepri(VHostName) ->
350-
Path = khepri_vhost_rp_path(
351-
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
352-
case rabbit_khepri:adv_delete_many(Path) of
353-
{ok, Props} ->
354-
{ok, rabbit_khepri:collect_payloads(Props)};
350+
Pattern = khepri_vhost_rp_path(
351+
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
352+
case rabbit_khepri:adv_delete_many(Pattern) of
353+
{ok, NodePropsMap} ->
354+
RTParams =
355+
maps:fold(
356+
fun(Path, Props, Acc) ->
357+
case {Path, Props} of
358+
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
359+
VHostName, _, _),
360+
#{data := RTParam}} ->
361+
[RTParam | Acc];
362+
{_, _} ->
363+
Acc
364+
end
365+
end, [], NodePropsMap),
366+
{ok, RTParams};
355367
{error, _} = Err ->
356368
Err
357369
end.

deps/rabbit/src/rabbit_db_user.erl

+34-12
Original file line numberDiff line numberDiff line change
@@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
628628
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
629629
rabbit_khepri:transaction(
630630
fun() ->
631-
UserPermissionsPath = khepri_user_permission_path(
632-
?KHEPRI_WILDCARD_STAR, VHostName),
633-
TopicPermissionsPath = khepri_topic_permission_path(
634-
?KHEPRI_WILDCARD_STAR, VHostName,
635-
?KHEPRI_WILDCARD_STAR),
636-
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
637-
{ok, TopicProps} = khepri_tx_adv:delete_many(
638-
TopicPermissionsPath),
639-
Deletions = rabbit_khepri:collect_payloads(
640-
TopicProps,
641-
rabbit_khepri:collect_payloads(UserProps)),
642-
{ok, Deletions}
631+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
643632
end, rw, #{timeout => infinity}).
644633

634+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
635+
UserPermissionsPattern = khepri_user_permission_path(
636+
?KHEPRI_WILDCARD_STAR, VHostName),
637+
TopicPermissionsPattern = khepri_topic_permission_path(
638+
?KHEPRI_WILDCARD_STAR, VHostName,
639+
?KHEPRI_WILDCARD_STAR),
640+
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
641+
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
642+
TopicPermissionsPattern),
643+
Deletions0 =
644+
maps:fold(
645+
fun(Path, Props, Acc) ->
646+
case {Path, Props} of
647+
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
648+
#{data := Permission}} ->
649+
[Permission | Acc];
650+
{_, _} ->
651+
Acc
652+
end
653+
end, [], UserNodePropsMap),
654+
Deletions1 =
655+
maps:fold(
656+
fun(Path, Props, Acc) ->
657+
case {Path, Props} of
658+
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
659+
#{data := Permission}} ->
660+
[Permission | Acc];
661+
{_, _} ->
662+
Acc
663+
end
664+
end, Deletions0, TopicNodePropsMap),
665+
{ok, Deletions1}.
666+
645667
%% -------------------------------------------------------------------
646668
%% get_topic_permissions().
647669
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_vhost.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
167167
Path = khepri_vhost_path(VHostName),
168168
Ret1 = rabbit_khepri:adv_get(Path),
169169
case Ret1 of
170-
{ok, #{data := VHost0, payload_version := DVersion}} ->
170+
{ok, #{Path := #{data := VHost0, payload_version := DVersion}}} ->
171171
VHost = vhost:merge_metadata(VHost0, Metadata),
172172
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
173173
Path1 = khepri_path:combine_with_conditions(
@@ -443,10 +443,10 @@ update_in_mnesia_tx(VHostName, UpdateFun)
443443
update_in_khepri(VHostName, UpdateFun) ->
444444
Path = khepri_vhost_path(VHostName),
445445
case rabbit_khepri:adv_get(Path) of
446-
{ok, #{data := V, payload_version := DVersion}} ->
446+
{ok, #{Path := #{data := V, payload_version := Vsn}}} ->
447447
V1 = UpdateFun(V),
448448
Path1 = khepri_path:combine_with_conditions(
449-
Path, [#if_payload_version{version = DVersion}]),
449+
Path, [#if_payload_version{version = Vsn}]),
450450
case rabbit_khepri:put(Path1, V1) of
451451
ok ->
452452
V1;

0 commit comments

Comments
 (0)