Skip to content

Commit 48b9980

Browse files
committed
fix: increment rate counter from ReplicationPoller only
We can do this because we know all subscriptions for this tenant that should receive a message
1 parent f44287e commit 48b9980

File tree

7 files changed

+97
-167
lines changed

7 files changed

+97
-167
lines changed

lib/extensions/postgres_cdc_rls/message_dispatcher.ex

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,13 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do
77
"""
88

99
alias Phoenix.Socket.Broadcast
10-
alias Realtime.GenCounter
11-
alias Realtime.RateCounter
12-
alias Realtime.Tenants
1310

1411
def dispatch([_ | _] = topic_subscriptions, _from, payload) do
1512
{sub_ids, payload} = Map.pop(payload, :subscription_ids)
1613

17-
[{_pid, {:subscriber_fastlane, _fastlane_pid, _serializer, _ids, _join_topic, tenant_id, _is_new_api}} | _] =
18-
topic_subscriptions
19-
20-
# Ensure RateCounter is started
21-
rate = Tenants.db_events_per_second_rate(tenant_id)
22-
RateCounter.new(rate)
23-
2414
_ =
2515
Enum.reduce(topic_subscriptions, %{}, fn
26-
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, _tenant, is_new_api}}, cache ->
16+
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, is_new_api}}, cache ->
2717
for {bin_id, id} <- ids, reduce: [] do
2818
acc ->
2919
if MapSet.member?(sub_ids, bin_id) do
@@ -36,20 +26,11 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do
3626
[_ | _] = valid_ids ->
3727
new_payload =
3828
if is_new_api do
39-
%Broadcast{
40-
topic: join_topic,
41-
event: "postgres_changes",
42-
payload: %{ids: valid_ids, data: payload}
43-
}
29+
%Broadcast{topic: join_topic, event: "postgres_changes", payload: %{ids: valid_ids, data: payload}}
4430
else
45-
%Broadcast{
46-
topic: join_topic,
47-
event: payload.type,
48-
payload: payload
49-
}
31+
%Broadcast{topic: join_topic, event: payload.type, payload: payload}
5032
end
5133

52-
GenCounter.add(rate.id)
5334
broadcast_message(cache, fastlane_pid, new_payload, serializer)
5435

5536
_ ->

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,12 +198,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
198198
:ok
199199

200200
_ ->
201-
Realtime.GenCounter.add(rate_counter_args.id, rows_count)
202-
203201
for row <- rows,
204202
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
205203
topic = "realtime:postgres:" <> tenant_id
206204

205+
Realtime.GenCounter.add(rate_counter_args.id, MapSet.size(change.subscription_ids))
206+
207207
case collect_subscription_nodes(subscribers_nodes_table, change.subscription_ids) do
208208
{:ok, nodes} ->
209209
for {node, subscription_ids} <- nodes do

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -721,7 +721,7 @@ defmodule RealtimeWeb.RealtimeChannel do
721721
end)
722722

723723
subscription_metadata =
724-
{:subscriber_fastlane, transport_pid, serializer, ids, topic, tenant, is_new_api}
724+
{:subscriber_fastlane, transport_pid, serializer, ids, topic, is_new_api}
725725

726726
metadata = [metadata: subscription_metadata]
727727

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.56.0",
7+
version: "2.56.1",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2240,8 +2240,8 @@ defmodule Realtime.Integration.RtChannelTest do
22402240
# 0 events as no broadcast used
22412241
assert 2 = get_count([:realtime, :rate_counter, :channel, :joins], external_id)
22422242
assert 2 = get_count([:realtime, :rate_counter, :channel, :presence_events], external_id)
2243-
# 5 + 5 + 5 (5 for each websocket and 5 while publishing)
2244-
assert 15 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
2243+
# (5 for each websocket)
2244+
assert 10 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
22452245
assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id)
22462246
end
22472247

test/realtime/extensions/cdc_rls/cdc_rls_test.exs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
7272
topic = "realtime:test"
7373
serializer = Phoenix.Socket.V1.JSONSerializer
7474

75-
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
75+
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
7676
metadata = [metadata: subscription_metadata]
7777
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
7878

@@ -274,7 +274,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
274274
topic = "realtime:test"
275275
serializer = Phoenix.Socket.V1.JSONSerializer
276276

277-
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
277+
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
278278
metadata = [metadata: subscription_metadata]
279279
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
280280

@@ -321,11 +321,9 @@ defmodule Realtime.Extensions.CdcRlsTest do
321321

322322
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
323323

324-
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} =
325-
RateCounter.get(rate)
324+
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
326325

327-
# 1 from ReplicationPoller and 1 from MessageDispatcher
328-
assert Enum.sum(bucket) == 2
326+
assert Enum.sum(bucket) == 1
329327

330328
assert_receive {
331329
:telemetry,
@@ -364,7 +362,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
364362
topic = "realtime:test"
365363
serializer = Phoenix.Socket.V1.JSONSerializer
366364

367-
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
365+
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
368366
metadata = [metadata: subscription_metadata]
369367
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
370368

@@ -456,7 +454,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
456454
topic = "realtime:test"
457455
serializer = Phoenix.Socket.V1.JSONSerializer
458456

459-
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
457+
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
460458
metadata = [metadata: subscription_metadata]
461459
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
462460

@@ -492,14 +490,6 @@ defmodule Realtime.Extensions.CdcRlsTest do
492490
"topic" => "realtime:test"
493491
} = message
494492

495-
# Wait for RateCounter to update
496-
Process.sleep(2000)
497-
498-
rate = Realtime.Tenants.db_events_per_second_rate(tenant)
499-
500-
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
501-
assert 1 in bucket
502-
503493
:erpc.call(node, PostgresCdcRls, :handle_stop, [tenant.external_id, 10_000])
504494
end
505495
end

0 commit comments

Comments
 (0)