Skip to content

Commit 35023f8

Browse files
authored
fix: add tenant_id tag to GenRpc calls from CdcRls (#1598)
1 parent 4653393 commit 35023f8

File tree

5 files changed

+39
-14
lines changed

5 files changed

+39
-14
lines changed

lib/extensions/postgres_cdc_rls/cdc_rls.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@ defmodule Extensions.PostgresCdcRls do
2828
end
2929

3030
@impl true
31-
def handle_after_connect({manager_pid, conn}, settings, params_list) do
31+
def handle_after_connect({manager_pid, conn}, settings, params_list, tenant) do
3232
with {:ok, subscription_list} <- subscription_list(params_list) do
3333
publication = settings["publication"]
3434
opts = [conn, publication, subscription_list, manager_pid, self()]
3535
conn_node = node(conn)
3636

3737
if conn_node !== node() do
38-
GenRpc.call(conn_node, Subscriptions, :create, opts, timeout: 15_000)
38+
GenRpc.call(conn_node, Subscriptions, :create, opts, timeout: 15_000, tenant_id: tenant)
3939
else
4040
apply(Subscriptions, :create, opts)
4141
end
@@ -90,7 +90,7 @@ defmodule Extensions.PostgresCdcRls do
9090
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
9191
)
9292

93-
case GenRpc.call(launch_node, __MODULE__, :start, [args], timeout: 30_000, tenant: tenant) do
93+
case GenRpc.call(launch_node, __MODULE__, :start, [args], timeout: 30_000, tenant_id: tenant) do
9494
{:ok, _pid} = ok ->
9595
ok
9696

lib/realtime/postgres_cdc.ex

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ defmodule Realtime.PostgresCdc do
1616
apply(module, :handle_connect, [opts])
1717
end
1818

19-
def after_connect(module, connect_response, extension, params) do
20-
apply(module, :handle_after_connect, [connect_response, extension, params])
19+
def after_connect(module, connect_response, extension, params, tenant) do
20+
apply(module, :handle_after_connect, [connect_response, extension, params, tenant])
2121
end
2222

2323
def subscribe(module, pg_change_params, tenant, metadata) do
@@ -80,7 +80,8 @@ defmodule Realtime.PostgresCdc do
8080
end
8181

8282
@callback handle_connect(any()) :: {:ok, any()} | nil
83-
@callback handle_after_connect(any(), any(), any()) :: {:ok, any()} | {:error, any()} | {:error, any(), any()}
83+
@callback handle_after_connect(any(), any(), any(), tenant_id :: String.t()) ::
84+
{:ok, any()} | {:error, any()} | {:error, any(), any()}
8485
@callback handle_subscribe(any(), any(), any()) :: :ok
8586
@callback handle_stop(any(), any()) :: any()
8687
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ defmodule RealtimeWeb.RealtimeChannel do
290290

291291
case PostgresCdc.connect(module, args) do
292292
{:ok, response} ->
293-
case PostgresCdc.after_connect(module, response, postgres_extension, pg_change_params) do
293+
case PostgresCdc.after_connect(module, response, postgres_extension, pg_change_params, tenant) do
294294
{:ok, _response} ->
295295
message = "Subscribed to PostgreSQL"
296296
maybe_log_info(socket, message)

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.57.4",
7+
version: "2.57.5",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/extensions/cdc_rls/cdc_rls_test.exs

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
8686
{:ok, response} = PostgresCdcRls.handle_connect(args)
8787

8888
# Now subscribe to the Postgres Changes
89-
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
89+
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id)
9090

9191
RealtimeWeb.Endpoint.unsubscribe(Realtime.Syn.PostgresCdc.syn_topic(tenant.external_id))
9292
%{tenant: tenant}
@@ -242,9 +242,9 @@ defmodule Realtime.Extensions.CdcRlsTest do
242242

243243
on_exit(fn -> :telemetry.detach(__MODULE__) end)
244244

245-
:telemetry.attach(
245+
:telemetry.attach_many(
246246
__MODULE__,
247-
[:realtime, :tenants, :payload, :size],
247+
[[:realtime, :tenants, :payload, :size], [:realtime, :rpc]],
248248
&__MODULE__.handle_telemetry/4,
249249
pid: self()
250250
)
@@ -291,8 +291,19 @@ defmodule Realtime.Extensions.CdcRlsTest do
291291
Process.sleep(3000)
292292
{:ok, response} = PostgresCdcRls.handle_connect(args)
293293

294+
assert_receive {
295+
:telemetry,
296+
[:realtime, :rpc],
297+
%{latency: _},
298+
%{
299+
tenant: "dev_tenant",
300+
mechanism: :gen_rpc,
301+
success: true
302+
}
303+
}
304+
294305
# Now subscribe to the Postgres Changes
295-
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
306+
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id)
296307
assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
297308

298309
# Insert a record
@@ -382,7 +393,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
382393
{:ok, response} = PostgresCdcRls.handle_connect(args)
383394

384395
# Now subscribe to the Postgres Changes
385-
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
396+
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id)
386397
assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
387398

388399
log =
@@ -468,7 +479,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
468479
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)
469480

470481
# Now subscribe to the Postgres Changes
471-
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)
482+
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params, external_id)
472483
assert %Postgrex.Result{rows: [[1]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", [])
473484

474485
# Insert a record
@@ -507,6 +518,19 @@ defmodule Realtime.Extensions.CdcRlsTest do
507518
assert {:ok, %RateCounter{id: {:channel, :db_events, "dev_tenant"}, bucket: bucket}} = RateCounter.get(rate)
508519
assert 1 in bucket
509520

521+
assert_receive {
522+
:telemetry,
523+
[:realtime, :rpc],
524+
%{latency: _},
525+
%{
526+
tenant: "dev_tenant",
527+
mechanism: :gen_rpc,
528+
origin_node: _,
529+
success: true,
530+
target_node: ^node
531+
}
532+
}
533+
510534
:erpc.call(node, PostgresCdcRls, :handle_stop, [tenant.external_id, 10_000])
511535
end
512536
end

0 commit comments

Comments
 (0)