diff --git a/lib/extensions/postgres_cdc_rls/message_dispatcher.ex b/lib/extensions/postgres_cdc_rls/message_dispatcher.ex index 8e7ae7f5f..a5f488bc4 100644 --- a/lib/extensions/postgres_cdc_rls/message_dispatcher.ex +++ b/lib/extensions/postgres_cdc_rls/message_dispatcher.ex @@ -7,23 +7,13 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do """ alias Phoenix.Socket.Broadcast - alias Realtime.GenCounter - alias Realtime.RateCounter - alias Realtime.Tenants def dispatch([_ | _] = topic_subscriptions, _from, payload) do {sub_ids, payload} = Map.pop(payload, :subscription_ids) - [{_pid, {:subscriber_fastlane, _fastlane_pid, _serializer, _ids, _join_topic, tenant_id, _is_new_api}} | _] = - topic_subscriptions - - # Ensure RateCounter is started - rate = Tenants.db_events_per_second_rate(tenant_id) - RateCounter.new(rate) - _ = Enum.reduce(topic_subscriptions, %{}, fn - {_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, _tenant, is_new_api}}, cache -> + {_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, is_new_api}}, cache -> for {bin_id, id} <- ids, reduce: [] do acc -> if MapSet.member?(sub_ids, bin_id) do @@ -36,20 +26,11 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do [_ | _] = valid_ids -> new_payload = if is_new_api do - %Broadcast{ - topic: join_topic, - event: "postgres_changes", - payload: %{ids: valid_ids, data: payload} - } + %Broadcast{topic: join_topic, event: "postgres_changes", payload: %{ids: valid_ids, data: payload}} else - %Broadcast{ - topic: join_topic, - event: payload.type, - payload: payload - } + %Broadcast{topic: join_topic, event: payload.type, payload: payload} end - GenCounter.add(rate.id) broadcast_message(cache, fastlane_pid, new_payload, serializer) _ -> diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 271f7e5ea..88e920c6d 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -198,12 +198,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do :ok _ -> - Realtime.GenCounter.add(rate_counter_args.id, rows_count) - for row <- rows, change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do topic = "realtime:postgres:" <> tenant_id + Realtime.GenCounter.add(rate_counter_args.id, MapSet.size(change.subscription_ids)) + case collect_subscription_nodes(subscribers_nodes_table, change.subscription_ids) do {:ok, nodes} -> for {node, subscription_ids} <- nodes do diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index a140f5417..70a426357 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -727,7 +727,7 @@ defmodule RealtimeWeb.RealtimeChannel do end) subscription_metadata = - {:subscriber_fastlane, transport_pid, serializer, ids, topic, tenant, is_new_api} + {:subscriber_fastlane, transport_pid, serializer, ids, topic, is_new_api} metadata = [metadata: subscription_metadata] diff --git a/mix.exs b/mix.exs index 0eff491f0..897768ef0 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.58.0", + version: "2.58.1", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 863c29b60..df442f32d 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -2240,8 +2240,8 @@ defmodule Realtime.Integration.RtChannelTest do # 0 events as no broadcast used assert 2 = get_count([:realtime, :rate_counter, :channel, :joins], external_id) assert 2 = get_count([:realtime, :rate_counter, :channel, :presence_events], external_id) - # 5 + 5 + 5 (5 for each websocket and 5 while publishing) - assert 15 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id) + # (5 for each websocket) + assert 10 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id) assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id) end diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index 246682ea8..9ab74b0c3 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -72,7 +72,7 @@ defmodule Realtime.Extensions.CdcRlsTest do topic = "realtime:test" serializer = Phoenix.Socket.V1.JSONSerializer - subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true} + subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true} metadata = [metadata: subscription_metadata] :ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata) @@ -281,7 +281,7 @@ defmodule Realtime.Extensions.CdcRlsTest do topic = "realtime:test" serializer = Phoenix.Socket.V1.JSONSerializer - subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true} + subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true} metadata = [metadata: subscription_metadata] :ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata) @@ -339,11 +339,9 @@ defmodule Realtime.Extensions.CdcRlsTest do rate = Realtime.Tenants.db_events_per_second_rate(tenant) - assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = - RateCounter.get(rate) + assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate) - # 1 from ReplicationPoller and 1 from MessageDispatcher - assert Enum.sum(bucket) == 2 + assert Enum.sum(bucket) == 1 assert_receive { :telemetry, @@ -382,7 +380,7 @@ defmodule Realtime.Extensions.CdcRlsTest do topic = "realtime:test" serializer = Phoenix.Socket.V1.JSONSerializer - subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true} + subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true} metadata = [metadata: subscription_metadata] :ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata) @@ -474,7 +472,7 @@ defmodule Realtime.Extensions.CdcRlsTest do topic = "realtime:test" serializer = Phoenix.Socket.V1.JSONSerializer - subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true} + subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true} metadata = [metadata: subscription_metadata] :ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata) @@ -510,14 +508,6 @@ defmodule Realtime.Extensions.CdcRlsTest do "topic" => "realtime:test" } = message - # Wait for RateCounter to update - Process.sleep(2000) - - rate = Realtime.Tenants.db_events_per_second_rate(tenant) - - assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate) - assert 1 in bucket - assert_receive { :telemetry, [:realtime, :rpc], diff --git a/test/realtime/extensions/cdc_rls/replication_poller_test.exs b/test/realtime/extensions/cdc_rls/replication_poller_test.exs index 5824c880b..251506047 100644 --- a/test/realtime/extensions/cdc_rls/replication_poller_test.exs +++ b/test/realtime/extensions/cdc_rls/replication_poller_test.exs @@ -2,9 +2,9 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do # Tweaking application env use Realtime.DataCase, async: false - alias Extensions.PostgresCdcRls.MessageDispatcher use Mimic + alias Extensions.PostgresCdcRls.MessageDispatcher alias Extensions.PostgresCdcRls.ReplicationPoller, as: Poller alias Extensions.PostgresCdcRls.Replications @@ -14,6 +14,8 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do UpdatedRecord } + alias Realtime.RateCounter + alias RealtimeWeb.TenantBroadcaster import Poller, only: [generate_record: 1] @@ -46,10 +48,10 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do empty_results = {:ok, %Postgrex.Result{rows: [], num_rows: 0}} stub(Replications, :list_changes, fn _, _, _, _, _ -> empty_results end) - %{args: args} + %{args: args, tenant: tenant} end - test "handles no new changes", %{args: args} do + test "handles no new changes", %{args: args, tenant: tenant} do tenant_id = args["id"] reject(&TenantBroadcaster.pubsub_direct_broadcast/6) reject(&TenantBroadcaster.pubsub_broadcast/5) @@ -64,43 +66,25 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do 500 refute_receive _any + + # Wait for RateCounter to update + Process.sleep(1100) + + rate = Realtime.Tenants.db_events_per_second_rate(tenant) + assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate) + assert sum == 0 end - test "handles new changes with missing ets table", %{args: args} do + test "handles new changes with missing ets table", %{args: args, tenant: tenant} do tenant_id = args["id"] :ets.delete(args["subscribers_nodes_table"]) results = - {:ok, - %Postgrex.Result{ - command: :select, - columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"], - rows: [ - [ - %{ - "columns" => [ - %{"name" => "id", "type" => "int4"}, - %{"name" => "details", "type" => "text"} - ], - "commit_timestamp" => "2025-10-13T07:50:28.066Z", - "record" => %{"details" => "test", "id" => 55}, - "schema" => "public", - "table" => "test", - "type" => "INSERT" - }, - false, - [ - <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, - <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>> - ], - [] - ] - ], - num_rows: 1, - connection_id: 123, - messages: [] - }} + build_result([ + <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, + <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>> + ]) expect(Replications, :list_changes, fn _, _, _, _, _ -> results end) reject(&TenantBroadcaster.pubsub_direct_broadcast/6) @@ -133,41 +117,23 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do %{tenant: ^tenant_id} }, 500 + + # Wait for RateCounter to update + Process.sleep(1100) + + rate = Realtime.Tenants.db_events_per_second_rate(tenant) + assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate) + assert sum == 2 end - test "handles new changes with no subscription nodes", %{args: args} do + test "handles new changes with no subscription nodes", %{args: args, tenant: tenant} do tenant_id = args["id"] results = - {:ok, - %Postgrex.Result{ - command: :select, - columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"], - rows: [ - [ - %{ - "columns" => [ - %{"name" => "id", "type" => "int4"}, - %{"name" => "details", "type" => "text"} - ], - "commit_timestamp" => "2025-10-13T07:50:28.066Z", - "record" => %{"details" => "test", "id" => 55}, - "schema" => "public", - "table" => "test", - "type" => "INSERT" - }, - false, - [ - <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, - <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>> - ], - [] - ] - ], - num_rows: 1, - connection_id: 123, - messages: [] - }} + build_result([ + <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, + <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>> + ]) expect(Replications, :list_changes, fn _, _, _, _, _ -> results end) reject(&TenantBroadcaster.pubsub_direct_broadcast/6) @@ -200,41 +166,23 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do %{tenant: ^tenant_id} }, 500 + + # Wait for RateCounter to update + Process.sleep(1100) + + rate = Realtime.Tenants.db_events_per_second_rate(tenant) + assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate) + assert sum == 2 end - test "handles new changes with missing subscription nodes", %{args: args} do + test "handles new changes with missing subscription nodes", %{args: args, tenant: tenant} do tenant_id = args["id"] results = - {:ok, - %Postgrex.Result{ - command: :select, - columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"], - rows: [ - [ - %{ - "columns" => [ - %{"name" => "id", "type" => "int4"}, - %{"name" => "details", "type" => "text"} - ], - "commit_timestamp" => "2025-10-13T07:50:28.066Z", - "record" => %{"details" => "test", "id" => 55}, - "schema" => "public", - "table" => "test", - "type" => "INSERT" - }, - false, - [ - sub1 = <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, - <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>> - ], - [] - ] - ], - num_rows: 1, - connection_id: 123, - messages: [] - }} + build_result([ + sub1 = <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, + <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>> + ]) # Only one subscription has node information :ets.insert(args["subscribers_nodes_table"], {sub1, node()}) @@ -270,42 +218,24 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do %{tenant: ^tenant_id} }, 500 + + # Wait for RateCounter to update + Process.sleep(1100) + + rate = Realtime.Tenants.db_events_per_second_rate(tenant) + assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate) + assert sum == 2 end - test "handles new changes with subscription nodes information", %{args: args} do + test "handles new changes with subscription nodes information", %{args: args, tenant: tenant} do tenant_id = args["id"] results = - {:ok, - %Postgrex.Result{ - command: :select, - columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"], - rows: [ - [ - %{ - "columns" => [ - %{"name" => "id", "type" => "int4"}, - %{"name" => "details", "type" => "text"} - ], - "commit_timestamp" => "2025-10-13T07:50:28.066Z", - "record" => %{"details" => "test", "id" => 55}, - "schema" => "public", - "table" => "test", - "type" => "INSERT" - }, - false, - [ - sub1 = <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, - sub2 = <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>>, - sub3 = <<49, 59, 209, 112, 173, 77, 17, 240, 191, 41, 118, 202, 193, 157, 232, 187>> - ], - [] - ] - ], - num_rows: 1, - connection_id: 123, - messages: [] - }} + build_result([ + sub1 = <<71, 36, 83, 212, 168, 9, 17, 240, 165, 186, 118, 202, 193, 157, 232, 187>>, + sub2 = <<251, 188, 190, 118, 168, 119, 17, 240, 188, 87, 118, 202, 193, 157, 232, 187>>, + sub3 = <<49, 59, 209, 112, 173, 77, 17, 240, 191, 41, 118, 202, 193, 157, 232, 187>> + ]) # All subscriptions have node information :ets.insert(args["subscribers_nodes_table"], {sub1, node()}) @@ -351,6 +281,13 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do assert {node(), MapSet.new([sub1, sub3])} in node_subs assert {:"someothernode@127.0.0.1", MapSet.new([sub2])} in node_subs + + # Wait for RateCounter to update + Process.sleep(1100) + + rate = Realtime.Tenants.db_events_per_second_rate(tenant) + assert {:ok, %RateCounter{sum: sum}} = RateCounter.get(rate) + assert sum == 3 end end @@ -651,4 +588,33 @@ defmodule Realtime.Extensions.PostgresCdcRls.ReplicationPollerTest do end def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {:telemetry, event, measures, metadata}) + + defp build_result(subscription_ids) do + {:ok, + %Postgrex.Result{ + command: :select, + columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"], + rows: [ + [ + %{ + "columns" => [ + %{"name" => "id", "type" => "int4"}, + %{"name" => "details", "type" => "text"} + ], + "commit_timestamp" => "2025-10-13T07:50:28.066Z", + "record" => %{"details" => "test", "id" => 55}, + "schema" => "public", + "table" => "test", + "type" => "INSERT" + }, + false, + subscription_ids, + [] + ] + ], + num_rows: 1, + connection_id: 123, + messages: [] + }} + end end