Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 3 additions & 22 deletions lib/extensions/postgres_cdc_rls/message_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,13 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do
"""

alias Phoenix.Socket.Broadcast
alias Realtime.GenCounter
alias Realtime.RateCounter
alias Realtime.Tenants

def dispatch([_ | _] = topic_subscriptions, _from, payload) do
{sub_ids, payload} = Map.pop(payload, :subscription_ids)

[{_pid, {:subscriber_fastlane, _fastlane_pid, _serializer, _ids, _join_topic, tenant_id, _is_new_api}} | _] =
topic_subscriptions

# Ensure RateCounter is started
rate = Tenants.db_events_per_second_rate(tenant_id)
RateCounter.new(rate)

_ =
Enum.reduce(topic_subscriptions, %{}, fn
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, _tenant, is_new_api}}, cache ->
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, ids, join_topic, is_new_api}}, cache ->
for {bin_id, id} <- ids, reduce: [] do
acc ->
if MapSet.member?(sub_ids, bin_id) do
Expand All @@ -36,20 +26,11 @@ defmodule Extensions.PostgresCdcRls.MessageDispatcher do
[_ | _] = valid_ids ->
new_payload =
if is_new_api do
%Broadcast{
topic: join_topic,
event: "postgres_changes",
payload: %{ids: valid_ids, data: payload}
}
%Broadcast{topic: join_topic, event: "postgres_changes", payload: %{ids: valid_ids, data: payload}}
else
%Broadcast{
topic: join_topic,
event: payload.type,
payload: payload
}
%Broadcast{topic: join_topic, event: payload.type, payload: payload}
end

GenCounter.add(rate.id)
broadcast_message(cache, fastlane_pid, new_payload, serializer)

_ ->
Expand Down
4 changes: 2 additions & 2 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
:ok

_ ->
Realtime.GenCounter.add(rate_counter_args.id, rows_count)

for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant_id

Realtime.GenCounter.add(rate_counter_args.id, MapSet.size(change.subscription_ids))

case collect_subscription_nodes(subscribers_nodes_table, change.subscription_ids) do
{:ok, nodes} ->
for {node, subscription_ids} <- nodes do
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end)

subscription_metadata =
{:subscriber_fastlane, transport_pid, serializer, ids, topic, tenant, is_new_api}
{:subscriber_fastlane, transport_pid, serializer, ids, topic, is_new_api}

metadata = [metadata: subscription_metadata]

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.56.0",
version: "2.56.1",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
4 changes: 2 additions & 2 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2240,8 +2240,8 @@ defmodule Realtime.Integration.RtChannelTest do
# 0 events as no broadcast used
assert 2 = get_count([:realtime, :rate_counter, :channel, :joins], external_id)
assert 2 = get_count([:realtime, :rate_counter, :channel, :presence_events], external_id)
# 5 + 5 + 5 (5 for each websocket and 5 while publishing)
assert 15 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
# (5 for each websocket)
assert 10 = get_count([:realtime, :rate_counter, :channel, :db_events], external_id)
assert 0 = get_count([:realtime, :rate_counter, :channel, :events], external_id)
end

Expand Down
22 changes: 6 additions & 16 deletions test/realtime/extensions/cdc_rls/cdc_rls_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
topic = "realtime:test"
serializer = Phoenix.Socket.V1.JSONSerializer

subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
metadata = [metadata: subscription_metadata]
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)

Expand Down Expand Up @@ -274,7 +274,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
topic = "realtime:test"
serializer = Phoenix.Socket.V1.JSONSerializer

subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
metadata = [metadata: subscription_metadata]
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)

Expand Down Expand Up @@ -321,11 +321,9 @@ defmodule Realtime.Extensions.CdcRlsTest do

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} =
RateCounter.get(rate)
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)

# 1 from ReplicationPoller and 1 from MessageDispatcher
assert Enum.sum(bucket) == 2
assert Enum.sum(bucket) == 1

assert_receive {
:telemetry,
Expand Down Expand Up @@ -364,7 +362,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
topic = "realtime:test"
serializer = Phoenix.Socket.V1.JSONSerializer

subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
metadata = [metadata: subscription_metadata]
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)

Expand Down Expand Up @@ -456,7 +454,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
topic = "realtime:test"
serializer = Phoenix.Socket.V1.JSONSerializer

subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, external_id, true}
subscription_metadata = {:subscriber_fastlane, self(), serializer, ids, topic, true}
metadata = [metadata: subscription_metadata]
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)

Expand Down Expand Up @@ -492,14 +490,6 @@ defmodule Realtime.Extensions.CdcRlsTest do
"topic" => "realtime:test"
} = message

# Wait for RateCounter to update
Process.sleep(2000)

rate = Realtime.Tenants.db_events_per_second_rate(tenant)

assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
assert 1 in bucket

:erpc.call(node, PostgresCdcRls, :handle_stop, [tenant.external_id, 10_000])
end
end
Expand Down
Loading