Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit cc6185d

Browse files
committedNov 25, 2024··
rabbit_db_*: Handle khepri_adv:node_props_map() returns from adv API
Khepri 0.17.x will change the return of functions from the khepri_adv and khepri_tx_adv modules. Previously, functions that target one specific tree node, for example `khepri_adv:delete/3`, would return the node props map (`khepri:node_props()`) for the affected node. Now all of the "adv API" returns `khepri_adv:node_props_map()` for consistency.
1 parent 692e96e commit cc6185d

File tree

11 files changed

+282
-122
lines changed

11 files changed

+282
-122
lines changed
 

‎deps/rabbit/src/rabbit_db_binding.erl

+32-16
Original file line numberDiff line numberDiff line change
@@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
837837
end,
838838
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
839839

840-
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
841-
Path = khepri_route_path(
842-
VHost,
843-
Name,
844-
_Kind = ?KHEPRI_WILDCARD_STAR,
845-
_DstName = ?KHEPRI_WILDCARD_STAR,
846-
_RoutingKey = #if_has_data{}),
847-
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
848-
maps:fold(fun(_P, #{data := Set}, Acc) ->
849-
sets:to_list(Set) ++ Acc
850-
end, [], Bindings).
840+
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
841+
Pattern = khepri_route_path(
842+
VHost,
843+
SrcName,
844+
?KHEPRI_WILDCARD_STAR, %% Kind
845+
?KHEPRI_WILDCARD_STAR, %% DstName
846+
#if_has_data{}), %% RoutingKey
847+
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
848+
maps:fold(
849+
fun(Path, Props, Acc) ->
850+
case {Path, Props} of
851+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
852+
VHost, SrcName, _Kind, _Name, _RoutingKey),
853+
#{data := Set}} ->
854+
sets:to_list(Set) ++ Acc;
855+
{_, _} ->
856+
Acc
857+
end
858+
end, [], Bindings).
851859

852860
%% -------------------------------------------------------------------
853861
%% delete_for_destination_in_mnesia().
@@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
892900
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
893901
Pattern = khepri_route_path(
894902
VHost,
895-
_SrcName = ?KHEPRI_WILDCARD_STAR,
903+
?KHEPRI_WILDCARD_STAR, %% SrcName
896904
Kind,
897905
Name,
898-
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
906+
?KHEPRI_WILDCARD_STAR), %% RoutingKey
899907
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
900-
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
901-
sets:to_list(Set) ++ Acc
902-
end, [], BindingsMap),
908+
Bindings = maps:fold(
909+
fun(Path, Props, Acc) ->
910+
case {Path, Props} of
911+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
912+
VHost, _SrcName, Kind, Name, _RoutingKey),
913+
#{data := Set}} ->
914+
sets:to_list(Set) ++ Acc;
915+
{_, _} ->
916+
Acc
917+
end
918+
end, [], BindingsMap),
903919
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
904920
lists:keysort(#binding.source, Bindings), OnlyDurable).
905921

‎deps/rabbit/src/rabbit_db_exchange.erl

+40-10
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,19 @@ update_in_khepri(XName, Fun) ->
331331
Path = khepri_exchange_path(XName),
332332
Ret1 = rabbit_khepri:adv_get(Path),
333333
case Ret1 of
334-
{ok, #{data := X, payload_version := Vsn}} ->
334+
{ok, QueryRet} ->
335+
{X, Vsn} = case QueryRet of
336+
%% Khepri 0.16 and below returned
337+
%% `khepri:node_props()' for adv queries and
338+
%% commands targeting one node:
339+
#{data := Data, payload_version := V} ->
340+
{Data, V};
341+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
342+
%% instead.
343+
#{Path := #{data := Data,
344+
payload_version := V}} ->
345+
{Data, V}
346+
end,
335347
X1 = Fun(X),
336348
UpdatePath =
337349
khepri_path:combine_with_conditions(
@@ -534,8 +546,19 @@ next_serial_in_khepri(XName) ->
534546
Path = khepri_exchange_serial_path(XName),
535547
Ret1 = rabbit_khepri:adv_get(Path),
536548
case Ret1 of
537-
{ok, #{data := Serial,
538-
payload_version := Vsn}} ->
549+
{ok, QueryRet} ->
550+
{Serial, Vsn} = case QueryRet of
551+
%% Khepri 0.16 and below returned
552+
%% `khepri:node_props()' for adv queries and
553+
%% commands targeting one node:
554+
#{data := Data, payload_version := V} ->
555+
{Data, V};
556+
%% Khepri 0.17+ return
557+
%% `khepri_adv:node_props_map()` instead.
558+
#{Path := #{data := Data,
559+
payload_version := V}} ->
560+
{Data, V}
561+
end,
539562
UpdatePath =
540563
khepri_path:combine_with_conditions(
541564
Path, [#if_payload_version{version = Vsn}]),
@@ -711,13 +734,20 @@ delete_all_in_khepri_tx(VHostName) ->
711734
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
712735
Deletions =
713736
maps:fold(
714-
fun(_Path, #{data := X}, Deletions) ->
715-
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
716-
rabbit_db_binding:delete_all_for_exchange_in_khepri(
717-
X, false, true),
718-
Deletions1 = rabbit_binding:add_deletion(
719-
XName, X, deleted, Bindings, XDeletions),
720-
rabbit_binding:combine_deletions(Deletions, Deletions1)
737+
fun(Path, Props, Deletions) ->
738+
case {Path, Props} of
739+
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
740+
#{data := X}} ->
741+
{deleted,
742+
#exchange{name = XName}, Bindings, XDeletions} =
743+
rabbit_db_binding:delete_all_for_exchange_in_khepri(
744+
X, false, true),
745+
Deletions1 = rabbit_binding:add_deletion(
746+
XName, X, deleted, Bindings, XDeletions),
747+
rabbit_binding:combine_deletions(Deletions, Deletions1);
748+
{_, _} ->
749+
Deletions
750+
end
721751
end, rabbit_binding:new_deletions(), NodeProps),
722752
{ok, Deletions}.
723753

‎deps/rabbit/src/rabbit_db_msup.erl

+14-2
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,20 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
135135
mirroring_pid = Overall,
136136
childspec = ChildSpec},
137137
case rabbit_khepri:adv_get(Path) of
138-
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139-
payload_version := Vsn}} ->
138+
{ok, QueryRet} ->
139+
{#mirrored_sup_childspec{mirroring_pid = Pid}, Vsn} =
140+
case QueryRet of
141+
%% Khepri 0.16 and below returned
142+
%% `khepri:node_props()' for adv queries and
143+
%% commands targeting one node:
144+
#{data := Data, payload_version := V} ->
145+
{Data, V};
146+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
147+
%% instead.
148+
#{Path := #{data := Data,
149+
payload_version := V}} ->
150+
{Data, V}
151+
end,
140152
case Overall of
141153
Pid ->
142154
Delegate;

‎deps/rabbit/src/rabbit_db_queue.erl

+62-18
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,16 @@ delete_in_khepri(QueueName, OnlyDurable) ->
411411
fun () ->
412412
Path = khepri_queue_path(QueueName),
413413
case khepri_tx_adv:delete(Path) of
414+
%% Khepri 0.16 and below returned `khepri:node_props()' for
415+
%% adv queries and commands targeting one node:
414416
{ok, #{data := _}} ->
415417
%% we want to execute some things, as decided by rabbit_exchange,
416418
%% after the transaction.
417419
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
420+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
421+
%% instead.
422+
{ok, #{Path := #{data := _}}} ->
423+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
418424
{ok, _} ->
419425
ok
420426
end
@@ -606,7 +612,19 @@ update_in_khepri(QName, Fun) ->
606612
Path = khepri_queue_path(QName),
607613
Ret1 = rabbit_khepri:adv_get(Path),
608614
case Ret1 of
609-
{ok, #{data := Q, payload_version := Vsn}} ->
615+
{ok, QueryRet} ->
616+
{Q, Vsn} = case QueryRet of
617+
%% Khepri 0.16 and below returned
618+
%% `khepri:node_props()' for adv queries and
619+
%% commands targeting one node:
620+
#{data := Data, payload_version := V} ->
621+
{Data, V};
622+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
623+
%% instead.
624+
#{Path := #{data := Data,
625+
payload_version := V}} ->
626+
{Data, V}
627+
end,
610628
UpdatePath = khepri_path:combine_with_conditions(
611629
Path, [#if_payload_version{version = Vsn}]),
612630
Q1 = Fun(Q),
@@ -657,11 +675,23 @@ update_decorators_in_khepri(QName, Decorators) ->
657675
Path = khepri_queue_path(QName),
658676
Ret1 = rabbit_khepri:adv_get(Path),
659677
case Ret1 of
660-
{ok, #{data := Q1, payload_version := Vsn}} ->
661-
Q2 = amqqueue:set_decorators(Q1, Decorators),
678+
{ok, QueryRet} ->
679+
{Q, Vsn} = case QueryRet of
680+
%% Khepri 0.16 and below returned
681+
%% `khepri:node_props()' for adv queries and
682+
%% commands targeting one node:
683+
#{data := Data, payload_version := V} ->
684+
{Data, V};
685+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
686+
%% instead.
687+
#{Path := #{data := Data,
688+
payload_version := V}} ->
689+
{Data, V}
690+
end,
691+
Q1 = amqqueue:set_decorators(Q, Decorators),
662692
UpdatePath = khepri_path:combine_with_conditions(
663693
Path, [#if_payload_version{version = Vsn}]),
664-
Ret2 = rabbit_khepri:put(UpdatePath, Q2),
694+
Ret2 = rabbit_khepri:put(UpdatePath, Q1),
665695
case Ret2 of
666696
ok -> ok;
667697
{error, {khepri, mismatching_node, _}} ->
@@ -1102,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11021132
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11031133
Res = rabbit_khepri:transaction(
11041134
fun() ->
1105-
rabbit_misc:fold_while_ok(
1106-
fun({Path, QName}, Acc) ->
1107-
%% Also see `delete_in_khepri/2'.
1108-
case khepri_tx_adv:delete(Path) of
1109-
{ok, #{data := _}} ->
1110-
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1111-
QName, false),
1112-
{ok, [{QName, Deletions} | Acc]};
1113-
{ok, _} ->
1114-
{ok, Acc};
1115-
{error, _} = Error ->
1116-
Error
1117-
end
1118-
end, [], Qs)
1135+
do_delete_transient_queues_in_khepri_tx(Qs, [])
11191136
end),
11201137
case Res of
11211138
{ok, Items} ->
@@ -1129,6 +1146,33 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11291146
Error
11301147
end.
11311148

1149+
do_delete_transient_queues_in_khepri_tx([], Acc) ->
1150+
{ok, Acc};
1151+
do_delete_transient_queues_in_khepri_tx([{Path, QName} | Rest], Acc) ->
1152+
%% Also see `delete_in_khepri/2'.
1153+
case khepri_tx_adv:delete(Path) of
1154+
{ok, Res} ->
1155+
Acc1 = case Res of
1156+
%% Khepri 0.16 and below returned `khepri:node_props()'
1157+
%% for adv queries and commands targeting one node:
1158+
#{data := _} ->
1159+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1160+
QName, false),
1161+
[{QName, Deletions} | Acc];
1162+
%% Khepri 0.17+ return `khepri_adv:node_props_map()`
1163+
%% instead.
1164+
#{Path := #{data := _}} ->
1165+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1166+
QName, false),
1167+
[{QName, Deletions} | Acc];
1168+
_ ->
1169+
Acc
1170+
end,
1171+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1172+
{error, _} = Error ->
1173+
Error
1174+
end.
1175+
11321176
%% -------------------------------------------------------------------
11331177
%% foreach_transient().
11341178
%% -------------------------------------------------------------------

‎deps/rabbit/src/rabbit_db_rtparams.erl

+27-5
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,13 @@ set_in_khepri(Key, Term) ->
5959
Record = #runtime_parameters{key = Key,
6060
value = Term},
6161
case rabbit_khepri:adv_put(Path, Record) of
62+
%% Khepri 0.16 and below returned `khepri:node_props()' for adv queries
63+
%% and commands targeting one node:
6264
{ok, #{data := Params}} ->
6365
{old, Params#runtime_parameters.value};
66+
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
67+
{ok, #{Path := #{data := Params}}} ->
68+
{old, Params#runtime_parameters.value};
6469
{ok, _} ->
6570
new
6671
end.
@@ -114,8 +119,13 @@ set_in_khepri_tx(Key, Term) ->
114119
Record = #runtime_parameters{key = Key,
115120
value = Term},
116121
case khepri_tx_adv:put(Path, Record) of
122+
%% Khepri 0.16 and below returned `khepri:node_props()' for adv
123+
%% queries and commands targeting one node:
117124
{ok, #{data := Params}} ->
118125
{old, Params#runtime_parameters.value};
126+
%% Khepri 0.17+ return `khepri_adv:node_props_map()` instead.
127+
{ok, #{Path := #{data := Params}}} ->
128+
{old, Params#runtime_parameters.value};
119129
{ok, _} ->
120130
new
121131
end.
@@ -347,11 +357,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
347357
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].
348358

349359
delete_vhost_in_khepri(VHostName) ->
350-
Path = khepri_vhost_rp_path(
351-
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
352-
case rabbit_khepri:adv_delete_many(Path) of
353-
{ok, Props} ->
354-
{ok, rabbit_khepri:collect_payloads(Props)};
360+
Pattern = khepri_vhost_rp_path(
361+
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
362+
case rabbit_khepri:adv_delete_many(Pattern) of
363+
{ok, NodePropsMap} ->
364+
RTParams =
365+
maps:fold(
366+
fun(Path, Props, Acc) ->
367+
case {Path, Props} of
368+
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
369+
VHostName, _, _),
370+
#{data := RTParam}} ->
371+
[RTParam | Acc];
372+
{_, _} ->
373+
Acc
374+
end
375+
end, [], NodePropsMap),
376+
{ok, RTParams};
355377
{error, _} = Err ->
356378
Err
357379
end.

‎deps/rabbit/src/rabbit_db_user.erl

+34-12
Original file line numberDiff line numberDiff line change
@@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
628628
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
629629
rabbit_khepri:transaction(
630630
fun() ->
631-
UserPermissionsPath = khepri_user_permission_path(
632-
?KHEPRI_WILDCARD_STAR, VHostName),
633-
TopicPermissionsPath = khepri_topic_permission_path(
634-
?KHEPRI_WILDCARD_STAR, VHostName,
635-
?KHEPRI_WILDCARD_STAR),
636-
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
637-
{ok, TopicProps} = khepri_tx_adv:delete_many(
638-
TopicPermissionsPath),
639-
Deletions = rabbit_khepri:collect_payloads(
640-
TopicProps,
641-
rabbit_khepri:collect_payloads(UserProps)),
642-
{ok, Deletions}
631+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
643632
end, rw, #{timeout => infinity}).
644633

634+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
635+
UserPermissionsPattern = khepri_user_permission_path(
636+
?KHEPRI_WILDCARD_STAR, VHostName),
637+
TopicPermissionsPattern = khepri_topic_permission_path(
638+
?KHEPRI_WILDCARD_STAR, VHostName,
639+
?KHEPRI_WILDCARD_STAR),
640+
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
641+
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
642+
TopicPermissionsPattern),
643+
Deletions0 =
644+
maps:fold(
645+
fun(Path, Props, Acc) ->
646+
case {Path, Props} of
647+
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
648+
#{data := Permission}} ->
649+
[Permission | Acc];
650+
{_, _} ->
651+
Acc
652+
end
653+
end, [], UserNodePropsMap),
654+
Deletions1 =
655+
maps:fold(
656+
fun(Path, Props, Acc) ->
657+
case {Path, Props} of
658+
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
659+
#{data := Permission}} ->
660+
[Permission | Acc];
661+
{_, _} ->
662+
Acc
663+
end
664+
end, Deletions0, TopicNodePropsMap),
665+
{ok, Deletions1}.
666+
645667
%% -------------------------------------------------------------------
646668
%% get_topic_permissions().
647669
%% -------------------------------------------------------------------

‎deps/rabbit/src/rabbit_db_vhost.erl

+31-7
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,23 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
165165
Path = khepri_vhost_path(VHostName),
166166
Ret1 = rabbit_khepri:adv_get(Path),
167167
case Ret1 of
168-
{ok, #{data := VHost0, payload_version := DVersion}} ->
168+
{ok, QueryRet} ->
169+
{VHost0, Vsn} = case QueryRet of
170+
%% Khepri 0.16 and below returned
171+
%% `khepri:node_props()' for adv queries and
172+
%% commands targeting one node:
173+
#{data := Data, payload_version := V} ->
174+
{Data, V};
175+
%% Khepri 0.17+ return
176+
%% `khepri_adv:node_props_map()` instead.
177+
#{Path := #{data := Data,
178+
payload_version := V}} ->
179+
{Data, V}
180+
end,
169181
VHost = vhost:merge_metadata(VHost0, Metadata),
170182
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
171183
Path1 = khepri_path:combine_with_conditions(
172-
Path, [#if_payload_version{version = DVersion}]),
184+
Path, [#if_payload_version{version = Vsn}]),
173185
Ret2 = rabbit_khepri:put(Path1, VHost),
174186
case Ret2 of
175187
ok ->
@@ -411,13 +423,25 @@ update_in_mnesia_tx(VHostName, UpdateFun)
411423
update_in_khepri(VHostName, UpdateFun) ->
412424
Path = khepri_vhost_path(VHostName),
413425
case rabbit_khepri:adv_get(Path) of
414-
{ok, #{data := V, payload_version := DVersion}} ->
415-
V1 = UpdateFun(V),
426+
{ok, QueryRet} ->
427+
{VHost0, Vsn} = case QueryRet of
428+
%% Khepri 0.16 and below returned
429+
%% `khepri:node_props()' for adv queries and
430+
%% commands targeting one node:
431+
#{data := Data, payload_version := V} ->
432+
{Data, V};
433+
%% Khepri 0.17+ return
434+
%% `khepri_adv:node_props_map()` instead.
435+
#{Path := #{data := Data,
436+
payload_version := V}} ->
437+
{Data, V}
438+
end,
439+
VHost1 = UpdateFun(VHost0),
416440
Path1 = khepri_path:combine_with_conditions(
417-
Path, [#if_payload_version{version = DVersion}]),
418-
case rabbit_khepri:put(Path1, V1) of
441+
Path, [#if_payload_version{version = Vsn}]),
442+
case rabbit_khepri:put(Path1, VHost1) of
419443
ok ->
420-
V1;
444+
VHost1;
421445
{error, {khepri, mismatching_node, _}} ->
422446
update_in_khepri(VHostName, UpdateFun);
423447
Error ->

‎deps/rabbit/src/rabbit_khepri.erl

-46
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,6 @@
175175

176176
-export([force_shrink_member_to_current_member/0]).
177177

178-
%% Helpers for working with the Khepri API / types.
179-
-export([collect_payloads/1,
180-
collect_payloads/2]).
181-
182178
-ifdef(TEST).
183179
-export([force_metadata_store/1,
184180
clear_forced_metadata_store/0]).
@@ -1104,48 +1100,6 @@ handle_async_ret(RaEvent) ->
11041100
fence(Timeout) ->
11051101
khepri:fence(?STORE_ID, Timeout).
11061102

1107-
%% -------------------------------------------------------------------
1108-
%% collect_payloads().
1109-
%% -------------------------------------------------------------------
1110-
1111-
-spec collect_payloads(Props) -> Ret when
1112-
Props :: khepri:node_props(),
1113-
Ret :: [Payload],
1114-
Payload :: term().
1115-
1116-
%% @doc Collects all payloads from a node props map.
1117-
%%
1118-
%% This is the same as calling `collect_payloads(Props, [])'.
1119-
%%
1120-
%% @private
1121-
1122-
collect_payloads(Props) when is_map(Props) ->
1123-
collect_payloads(Props, []).
1124-
1125-
-spec collect_payloads(Props, Acc0) -> Ret when
1126-
Props :: khepri:node_props(),
1127-
Acc0 :: [Payload],
1128-
Ret :: [Payload],
1129-
Payload :: term().
1130-
1131-
%% @doc Collects all payloads from a node props map into the accumulator list.
1132-
%%
1133-
%% This is meant to be used with the `khepri_adv' API to easily collect the
1134-
%% payloads from the return value of `khepri_adv:delete_many/4' for example.
1135-
%%
1136-
%% @returns all payloads in the node props map collected into a list, with
1137-
%% `Acc0' as the tail.
1138-
%%
1139-
%% @private
1140-
1141-
collect_payloads(Props, Acc0) when is_map(Props) andalso is_list(Acc0) ->
1142-
maps:fold(
1143-
fun (_Path, #{data := Payload}, Acc) ->
1144-
[Payload | Acc];
1145-
(_Path, _NoPayload, Acc) ->
1146-
Acc
1147-
end, Acc0, Props).
1148-
11491103
-spec unregister_legacy_projections() -> Ret when
11501104
Ret :: ok | timeout_error().
11511105
%% @doc Unregisters any projections which were registered in RabbitMQ 3.13.x

‎deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl

+14-2
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,25 @@ create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun) ->
104104
create_binding_in_khepri(Src, Dst, Weight, UpdateFun) ->
105105
Path = khepri_consistent_hash_path(Src),
106106
case rabbit_khepri:adv_get(Path) of
107-
{ok, #{data := Chx0, payload_version := DVersion}} ->
107+
{ok, QueryRet} ->
108+
{Chx0, Vsn} = case QueryRet of
109+
%% Khepri 0.16 and below returned
110+
%% `khepri:node_props()' for adv queries and
111+
%% commands targeting one node:
112+
#{data := Data, payload_version := V} ->
113+
{Data, V};
114+
%% Khepri 0.17+ return
115+
%% `khepri_adv:node_props_map()` instead.
116+
#{Path := #{data := Data,
117+
payload_version := V}} ->
118+
{Data, V}
119+
end,
108120
case UpdateFun(Chx0, Dst, Weight) of
109121
already_exists ->
110122
already_exists;
111123
Chx ->
112124
Path1 = khepri_path:combine_with_conditions(
113-
Path, [#if_payload_version{version = DVersion}]),
125+
Path, [#if_payload_version{version = Vsn}]),
114126
Ret2 = rabbit_khepri:put(Path1, Chx),
115127
case Ret2 of
116128
ok ->

‎deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl

+14-2
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,21 @@ create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
108108
update_in_khepri(XName, BindingKeyAndFun, UpdateFun, ErrorFun) ->
109109
Path = khepri_jms_topic_exchange_path(XName),
110110
case rabbit_khepri:adv_get(Path) of
111-
{ok, #{data := BindingFuns, payload_version := DVersion}} ->
111+
{ok, QueryRet} ->
112+
{BindingFuns, Vsn} = case QueryRet of
113+
%% Khepri 0.16 and below returned
114+
%% `khepri:node_props()' for adv queries
115+
%% and commands targeting one node:
116+
#{data := Data, payload_version := V} ->
117+
{Data, V};
118+
%% Khepri 0.17+ return
119+
%% `khepri_adv:node_props_map()` instead.
120+
#{Path := #{data := Data,
121+
payload_version := V}} ->
122+
{Data, V}
123+
end,
112124
Path1 = khepri_path:combine_with_conditions(
113-
Path, [#if_payload_version{version = DVersion}]),
125+
Path, [#if_payload_version{version = Vsn}]),
114126
Ret = rabbit_khepri:put(Path1, UpdateFun(BindingFuns, BindingKeyAndFun)),
115127
case Ret of
116128
ok -> ok;

‎deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl

+14-2
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,22 @@ insert0_in_mnesia(Key, Cached, Message, Length) ->
106106
insert_in_khepri(XName, Message, Length) ->
107107
Path = khepri_recent_history_path(XName),
108108
case rabbit_khepri:adv_get(Path) of
109-
{ok, #{data := Cached0, payload_version := DVersion}} ->
109+
{ok, QueryRet} ->
110+
{Cached0, Vsn} = case QueryRet of
111+
%% Khepri 0.16 and below returned
112+
%% `khepri:node_props()' for adv queries and
113+
%% commands targeting one node:
114+
#{data := Data, payload_version := V} ->
115+
{Data, V};
116+
%% Khepri 0.17+ return
117+
%% `khepri_adv:node_props_map()` instead.
118+
#{Path := #{data := Data,
119+
payload_version := V}} ->
120+
{Data, V}
121+
end,
110122
Cached = add_to_cache(Cached0, Message, Length),
111123
Path1 = khepri_path:combine_with_conditions(
112-
Path, [#if_payload_version{version = DVersion}]),
124+
Path, [#if_payload_version{version = Vsn}]),
113125
Ret = rabbit_khepri:put(Path1, Cached),
114126
case Ret of
115127
ok ->

0 commit comments

Comments
 (0)
Please sign in to comment.