Skip to content

Commit 3259f7b

Browse files
committed
fix: malformed PG changes subscriptions should not retry
1 parent 7f28e0f commit 3259f7b

File tree

9 files changed

+291
-140
lines changed

9 files changed

+291
-140
lines changed

lib/extensions/postgres_cdc_rls/cdc_rls.ex

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ defmodule Extensions.PostgresCdcRls do
1111
alias Rls.Subscriptions
1212
alias Realtime.GenRpc
1313

14+
@impl true
1415
@spec handle_connect(map()) :: {:ok, {pid(), pid()}} | nil
1516
def handle_connect(args) do
1617
case get_manager_conn(args["id"]) do
@@ -26,22 +27,39 @@ defmodule Extensions.PostgresCdcRls do
2627
end
2728
end
2829

29-
def handle_after_connect({manager_pid, conn}, settings, params) do
30-
publication = settings["publication"]
31-
opts = [conn, publication, params, manager_pid, self()]
32-
conn_node = node(conn)
30+
@impl true
31+
def handle_after_connect({manager_pid, conn}, settings, params_list) do
32+
with {:ok, subscription_list} <- subscription_list(params_list) do
33+
publication = settings["publication"]
34+
opts = [conn, publication, subscription_list, manager_pid, self()]
35+
conn_node = node(conn)
3336

34-
if conn_node !== node() do
35-
GenRpc.call(conn_node, Subscriptions, :create, opts, timeout: 15_000)
36-
else
37-
apply(Subscriptions, :create, opts)
37+
if conn_node !== node() do
38+
GenRpc.call(conn_node, Subscriptions, :create, opts, timeout: 15_000)
39+
else
40+
apply(Subscriptions, :create, opts)
41+
end
3842
end
3943
end
4044

45+
defp subscription_list(params_list) do
46+
Enum.reduce_while(params_list, [], fn params, acc ->
47+
case Subscriptions.parse_subscription_params(params[:params]) do
48+
{:ok, subscription_params} ->
49+
{:cont, {:ok, [%{id: params.id, claims: params.claims, subscription_params: subscription_params} | acc]}}
50+
51+
{:error, reason} ->
52+
{:halt, {:error, :malformed_subscription_params, reason}}
53+
end
54+
end)
55+
end
56+
57+
@impl true
4158
def handle_subscribe(_, tenant, metadata) do
4259
Endpoint.subscribe("realtime:postgres:" <> tenant, metadata)
4360
end
4461

62+
@impl true
4563
@doc """
4664
Stops the Supervision tree for a tenant.
4765

lib/extensions/postgres_cdc_rls/subscriptions.ex

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
77
import Postgrex, only: [transaction: 2, query: 3, rollback: 2]
88

99
@type conn() :: Postgrex.conn()
10+
@type filter :: {binary, binary, binary}
11+
@type subscription_params :: {binary, binary, [filter]}
12+
@type subscription_list :: [%{id: binary, claims: map, subscription_params: subscription_params}]
1013

1114
@filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]
1215

13-
@spec create(conn(), String.t(), [map()], pid(), pid()) ::
16+
@spec create(conn(), String.t(), subscription_list, pid(), pid()) ::
1417
{:ok, Postgrex.Result.t()}
1518
| {:error, Exception.t() | :malformed_subscription_params | {:subscription_insert_failed, map()}}
16-
def create(conn, publication, params_list, manager, caller) do
19+
def create(conn, publication, subscription_list, manager, caller) do
1720
sql = "with sub_tables as (
1821
select
1922
rr.entity
@@ -50,37 +53,30 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
5053
id"
5154

5255
transaction(conn, fn conn ->
53-
Enum.map(params_list, fn %{id: id, claims: claims, params: params} ->
54-
case parse_subscription_params(params) do
55-
{:ok, [schema, table, filters]} ->
56-
case query(conn, sql, [publication, schema, table, id, claims, filters]) do
57-
{:ok, %{num_rows: num} = result} when num > 0 ->
58-
send(manager, {:subscribed, {caller, id}})
59-
result
56+
Enum.map(subscription_list, fn %{id: id, claims: claims, subscription_params: params = {schema, table, filters}} ->
57+
case query(conn, sql, [publication, schema, table, id, claims, filters]) do
58+
{:ok, %{num_rows: num} = result} when num > 0 ->
59+
send(manager, {:subscribed, {caller, id}})
60+
result
6061

61-
{:ok, _} ->
62-
msg =
63-
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
62+
{:ok, _} ->
63+
msg =
64+
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"
6465

65-
rollback(conn, msg)
66+
rollback(conn, msg)
6667

67-
{:error, exception} ->
68-
msg =
69-
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
68+
{:error, exception} ->
69+
msg =
70+
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"
7071

71-
rollback(conn, msg)
72-
end
73-
74-
{:error, reason} ->
75-
rollback(conn, reason)
72+
rollback(conn, msg)
7673
end
7774
end)
7875
end)
7976
end
8077

81-
defp params_to_log(map) do
82-
map
83-
|> Map.to_list()
78+
defp params_to_log({schema, table, filters}) do
79+
%{schema: schema, table: table, filters: filters}
8480
|> Enum.map_join(", ", fn {k, v} -> "#{k}: #{to_log(v)}" end)
8581
end
8682

@@ -166,39 +162,55 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
166162
167163
## Examples
168164
169-
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"}
170-
iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
171-
{:ok, ["public", "messages", [{"subject", "eq", "hey"}]]}
165+
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"})
166+
{:ok, {"public", "messages", [{"subject", "eq", "hey"}]}}
172167
173168
`in` filter:
174169
175-
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"}
176-
iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
177-
{:ok, ["public", "messages", [{"subject", "in", "{hidee,ho}"}]]}
170+
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"})
171+
{:ok, {"public", "messages", [{"subject", "in", "{hidee,ho}"}]}}
172+
173+
no filter:
174+
175+
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages"})
176+
{:ok, {"public", "messages", []}}
177+
178+
only schema:
179+
180+
iex> parse_subscription_params(%{"schema" => "public"})
181+
{:ok, {"public", "*", []}}
182+
183+
only table:
184+
185+
iex> parse_subscription_params(%{"table" => "messages"})
186+
{:ok, {"public", "messages", []}}
178187
179188
An unsupported filter will respond with an error tuple:
180189
181-
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"}
182-
iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
190+
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"})
183191
{:error, ~s(Error parsing `filter` params: ["like", "hey"])}
184192
185193
Catch `undefined` filters:
186194
187-
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "undefined"}
188-
iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
195+
iex> parse_subscription_params(%{"schema" => "public", "table" => "messages", "filter" => "undefined"})
189196
{:error, ~s(Error parsing `filter` params: ["undefined"])}
190197
198+
Catch `missing params`:
199+
200+
iex> parse_subscription_params(%{})
201+
{:error, ~s(No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{})}
202+
191203
"""
192204

193-
@spec parse_subscription_params(map()) :: {:ok, list} | {:error, binary()}
205+
@spec parse_subscription_params(map()) :: {:ok, subscription_params} | {:error, binary()}
194206
def parse_subscription_params(params) do
195207
case params do
196208
%{"schema" => schema, "table" => table, "filter" => filter} ->
197209
with [col, rest] <- String.split(filter, "=", parts: 2),
198210
[filter_type, value] when filter_type in @filter_types <-
199211
String.split(rest, ".", parts: 2),
200212
{:ok, formatted_value} <- format_filter_value(filter_type, value) do
201-
{:ok, [schema, table, [{col, filter_type, formatted_value}]]}
213+
{:ok, {schema, table, [{col, filter_type, formatted_value}]}}
202214
else
203215
{:error, msg} ->
204216
{:error, "Error parsing `filter` params: #{msg}"}
@@ -208,13 +220,13 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
208220
end
209221

210222
%{"schema" => schema, "table" => table} ->
211-
{:ok, [schema, table, []]}
223+
{:ok, {schema, table, []}}
212224

213225
%{"schema" => schema} ->
214-
{:ok, [schema, "*", []]}
226+
{:ok, {schema, "*", []}}
215227

216228
%{"table" => table} ->
217-
{:ok, ["public", table, []]}
229+
{:ok, {"public", table, []}}
218230

219231
map when is_map_key(map, "user_token") or is_map_key(map, "auth_token") ->
220232
{:error,

lib/extensions/postgres_cdc_rls/worker_supervisor.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule Extensions.PostgresCdcRls.WorkerSupervisor do
66
alias PostgresCdcRls.ReplicationPoller
77
alias PostgresCdcRls.SubscriptionManager
88
alias PostgresCdcRls.SubscriptionsChecker
9-
alias Realtime.Api
9+
alias Realtime.Tenants.Cache
1010
alias Realtime.PostgresCdc.Exception
1111

1212
def start_link(args) do
@@ -17,7 +17,7 @@ defmodule Extensions.PostgresCdcRls.WorkerSupervisor do
1717
@impl true
1818
def init(%{"id" => tenant} = args) when is_binary(tenant) do
1919
Logger.metadata(external_id: tenant, project: tenant)
20-
unless Api.get_tenant_by_external_id(tenant, :primary), do: raise(Exception)
20+
unless Cache.get_tenant_by_external_id(tenant), do: raise(Exception)
2121

2222
subscribers_pids_table = :ets.new(__MODULE__, [:public, :bag])
2323
subscribers_nodes_table = :ets.new(__MODULE__, [:public, :set])

lib/realtime/postgres_cdc.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ defmodule Realtime.PostgresCdc do
8080
end
8181

8282
@callback handle_connect(any()) :: {:ok, any()} | nil
83-
@callback handle_after_connect(any(), any(), any()) :: {:ok, any()} | {:error, any()}
83+
@callback handle_after_connect(any(), any(), any()) :: {:ok, any()} | {:error, any()} | {:error, any(), any()}
8484
@callback handle_subscribe(any(), any(), any()) :: :ok
8585
@callback handle_stop(any(), any()) :: any()
8686
end

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,12 @@ defmodule RealtimeWeb.RealtimeChannel do
297297
push_system_message("postgres_changes", socket, "ok", message, channel_name)
298298
{:noreply, assign(socket, :pg_sub_ref, nil)}
299299

300+
{:error, :malformed_subscription_params, error} ->
301+
maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)
302+
push_system_message("postgres_changes", socket, "error", error, channel_name)
303+
# No point in retrying if the params are invalid
304+
{:noreply, assign(socket, :pg_sub_ref, nil)}
305+
300306
error ->
301307
maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)
302308

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.56.5",
7+
version: "2.56.6",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/extensions/cdc_rls/subscription_manager_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ defmodule Realtime.Extensions.CdcRls.SubscriptionManagerTest do
145145

146146
pg_change_params = %{
147147
id: uuid,
148-
params: %{"event" => "*", "schema" => "public"},
148+
subscription_params: {"public", "*", []},
149149
claims: %{
150150
"exp" => System.system_time(:second) + 100_000,
151151
"iat" => 0,

0 commit comments

Comments
 (0)