Skip to content

Commit 0ffb34b

Browse files
authored
fix: on ReplicationPoller send only subscription IDs relevant to the node (#1583)
1 parent 9541666 commit 0ffb34b

File tree

3 files changed

+21
-11
lines changed

3 files changed

+21
-11
lines changed

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,13 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
206206

207207
case collect_subscription_nodes(subscribers_nodes_table, change.subscription_ids) do
208208
{:ok, nodes} ->
209-
for node <- nodes do
209+
for {node, subscription_ids} <- nodes do
210210
TenantBroadcaster.pubsub_direct_broadcast(
211211
node,
212212
tenant_id,
213213
topic,
214-
change,
214+
# Send only the subscription IDs relevant to this node
215+
%{change | subscription_ids: MapSet.new(subscription_ids)},
215216
MessageDispatcher,
216217
:postgres_changes
217218
)
@@ -236,10 +237,16 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
236237
defp handle_list_changes_result({:error, reason}, _, _, _), do: {:error, reason}
237238

238239
defp collect_subscription_nodes(subscribers_nodes_table, subscription_ids) do
239-
Enum.reduce_while(subscription_ids, {:ok, MapSet.new()}, fn subscription_id, {:ok, acc} ->
240+
Enum.reduce_while(subscription_ids, {:ok, %{}}, fn subscription_id, {:ok, acc} ->
240241
case :ets.lookup(subscribers_nodes_table, subscription_id) do
241-
[{_, node}] -> {:cont, {:ok, MapSet.put(acc, node)}}
242-
_ -> {:halt, {:error, :node_not_found}}
242+
[{_, node}] ->
243+
updated_acc =
244+
Map.update(acc, node, [subscription_id], fn existing_ids -> [subscription_id | existing_ids] end)
245+
246+
{:cont, {:ok, updated_acc}}
247+
248+
_ ->
249+
{:halt, {:error, :node_not_found}}
243250
end
244251
end)
245252
rescue

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.55.1",
7+
version: "2.55.2",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/extensions/cdc_rls/replication_poller_test.exs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
296296
false,
297297
[
298298
sub1 = <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>,
299-
sub2 = <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>>
299+
sub2 = <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>>,
300+
sub3 = <<49, 59, 209, 112, 173, 77, 17, 240, 191, 41, 118, 202, 193, 157, 232, 187>>
300301
],
301302
[]
302303
]
@@ -306,9 +307,10 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
306307
messages: []
307308
}}
308309

309-
# Both subscriptions have node information
310+
# All subscriptions have node information
310311
:ets.insert(args["subscribers_nodes_table"], {sub1, node()})
311312
:ets.insert(args["subscribers_nodes_table"], {sub2, :"[email protected]"})
313+
:ets.insert(args["subscribers_nodes_table"], {sub3, node()})
312314

313315
expect(Replications, :list_changes, fn _, _, _, _, _ -> results end)
314316
reject(&TenantBroadcaster.pubsub_broadcast/5)
@@ -345,9 +347,10 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do
345347

346348
assert Enum.count(calls) == 2
347349

348-
Enum.each(calls, fn [node, _, _, _, _, _] ->
349-
assert node in [node(), :"[email protected]"]
350-
end)
350+
node_subs = Enum.map(calls, fn [node, _, _, change, _, _] -> {node, change.subscription_ids} end)
351+
352+
assert {node(), MapSet.new([sub1, sub3])} in node_subs
353+
assert {:"[email protected]", MapSet.new([sub2])} in node_subs
351354
end
352355
end
353356

0 commit comments

Comments
 (0)