Skip to content

Commit dec8f0c

Browse files
authored
fix: multiple valid subscriptions (#1596)
1 parent b80da2d commit dec8f0c

File tree

4 files changed

+55
-3
lines changed

4 files changed

+55
-3
lines changed

lib/extensions/postgres_cdc_rls/cdc_rls.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ defmodule Extensions.PostgresCdcRls do
4343
end
4444

4545
defp subscription_list(params_list) do
46-
Enum.reduce_while(params_list, [], fn params, acc ->
46+
Enum.reduce_while(params_list, {:ok, []}, fn params, {:ok, acc} ->
4747
case Subscriptions.parse_subscription_params(params[:params]) do
4848
{:ok, subscription_params} ->
4949
{:cont, {:ok, [%{id: params.id, claims: params.claims, subscription_params: subscription_params} | acc]}}

lib/extensions/postgres_cdc_rls/subscriptions.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
7575
end)
7676
end)
7777
rescue
78-
e -> {:error, e}
78+
e in DBConnection.ConnectionError -> {:error, e}
7979
catch
8080
:exit, reason -> {:error, {:exit, reason}}
8181
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.57.2",
7+
version: "2.57.3",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime_web/channels/realtime_channel_test.exs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,58 @@ defmodule RealtimeWeb.RealtimeChannelTest do
9393
refute_receive _any
9494
end
9595

96+
test "multiple subscriptions", %{tenant: tenant} do
97+
jwt = Generators.generate_jwt_token(tenant)
98+
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))
99+
100+
config = %{
101+
"presence" => %{"enabled" => false},
102+
"postgres_changes" => [
103+
%{"event" => "INSERT", "schema" => "public", "table" => "test"},
104+
%{"event" => "DELETE", "schema" => "public", "table" => "test"}
105+
]
106+
}
107+
108+
assert {:ok, reply, _socket} = subscribe_and_join(socket, "realtime:test", %{"config" => config})
109+
110+
assert %{
111+
postgres_changes: [
112+
%{:id => sub_id, "event" => "INSERT", "schema" => "public", "table" => "test"},
113+
%{
114+
:id => 4_845_530,
115+
"event" => "DELETE",
116+
"schema" => "public",
117+
"table" => "test"
118+
}
119+
]
120+
} =
121+
reply
122+
123+
assert_push "system",
124+
%{message: "Subscribed to PostgreSQL", status: "ok", extension: "postgres_changes", channel: "test"},
125+
5000
126+
127+
{:ok, conn} = Connect.lookup_or_start_connection(tenant.external_id)
128+
%{rows: [[id]]} = Postgrex.query!(conn, "insert into test (details) values ('test') returning id", [])
129+
130+
assert_push "postgres_changes",
131+
%{
132+
data: %Realtime.Adapters.Changes.NewRecord{
133+
table: "test",
134+
type: "INSERT",
135+
record: %{"details" => "test", "id" => ^id},
136+
columns: [%{"name" => "id", "type" => "int4"}, %{"name" => "details", "type" => "text"}],
137+
errors: nil,
138+
schema: "public",
139+
commit_timestamp: _
140+
},
141+
ids: [4_845_530, ^sub_id]
142+
},
143+
500
144+
145+
refute_receive _any
146+
end
147+
96148
test "malformed subscription params", %{tenant: tenant} do
97149
jwt = Generators.generate_jwt_token(tenant)
98150
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))

0 commit comments

Comments
 (0)