Skip to content

Commit f44287e

Browse files
authored
feat: regional broadcasting (#1580)
When regional_broadcasting is true GenRpcPubSub uses a single node in each region to broadcast to the rest of the region. Nodes communicating inside the same region have a much lower latency.
1 parent 0ffb34b commit f44287e

File tree

12 files changed

+336
-72
lines changed

12 files changed

+336
-72
lines changed

config/runtime.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
7171
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
7272
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
7373
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
74+
regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)
7475

7576
no_channel_timeout_in_ms =
7677
if config_env() == :test,
@@ -128,7 +129,8 @@ config :realtime,
128129
platform: platform,
129130
pubsub_adapter: pubsub_adapter,
130131
broadcast_pool_size: broadcast_pool_size,
131-
users_scope_shards: users_scope_shards
132+
users_scope_shards: users_scope_shards,
133+
regional_broadcasting: regional_broadcasting
132134

133135
if config_env() != :test && run_janitor? do
134136
config :realtime,

config/test.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ config :realtime, RealtimeWeb.Endpoint,
3131
server: true
3232

3333
config :realtime,
34+
regional_broadcasting: true,
3435
region: "us-east-1",
3536
db_enc_key: "1234567890123456",
3637
jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"),

lib/realtime/gen_rpc/pub_sub.ex

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ defmodule Realtime.GenRpcPubSub do
55

66
@behaviour Phoenix.PubSub.Adapter
77
alias Realtime.GenRpc
8+
alias Realtime.GenRpcPubSub.Worker
9+
alias Realtime.Nodes
810
use Supervisor
911

1012
@impl true
@@ -45,36 +47,83 @@ defmodule Realtime.GenRpcPubSub do
4547
@impl true
4648
def broadcast(adapter_name, topic, message, dispatcher) do
4749
worker = worker_name(adapter_name, self())
48-
GenRpc.abcast(Node.list(), worker, forward_to_local(topic, message, dispatcher), key: worker)
50+
51+
if Application.get_env(:realtime, :regional_broadcasting, false) do
52+
my_region = Application.get_env(:realtime, :region)
53+
# broadcast to all other nodes in the region
54+
55+
other_nodes = for node <- Realtime.Nodes.region_nodes(my_region), node != node(), do: node
56+
GenRpc.abcast(other_nodes, worker, Worker.forward_to_local(topic, message, dispatcher), key: worker)
57+
58+
# send a message to a node in each region to forward to the rest of the region
59+
other_region_nodes = nodes_from_other_regions(my_region, worker)
60+
61+
GenRpc.abcast(other_region_nodes, worker, Worker.forward_to_region(topic, message, dispatcher), key: worker)
62+
else
63+
GenRpc.abcast(Node.list(), worker, Worker.forward_to_local(topic, message, dispatcher), key: worker)
64+
end
65+
66+
:ok
67+
end
68+
69+
defp nodes_from_other_regions(my_region, key) do
70+
Enum.flat_map(Nodes.all_node_regions(), fn
71+
^my_region ->
72+
[]
73+
74+
region ->
75+
case Nodes.node_from_region(region, key) do
76+
{:ok, node} -> [node]
77+
_ -> []
78+
end
79+
end)
4980
end
5081

5182
@impl true
5283
def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do
5384
worker = worker_name(adapter_name, self())
54-
GenRpc.abcast([node_name], worker, forward_to_local(topic, message, dispatcher), key: worker)
85+
GenRpc.abcast([node_name], worker, Worker.forward_to_local(topic, message, dispatcher), key: worker)
5586
end
56-
57-
defp forward_to_local(topic, message, dispatcher), do: {:ftl, topic, message, dispatcher}
5887
end
5988

6089
defmodule Realtime.GenRpcPubSub.Worker do
6190
@moduledoc false
6291
use GenServer
6392

93+
def forward_to_local(topic, message, dispatcher), do: {:ftl, topic, message, dispatcher}
94+
def forward_to_region(topic, message, dispatcher), do: {:ftr, topic, message, dispatcher}
95+
6496
@doc false
65-
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)
97+
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, {pubsub, worker}, name: worker)
6698

6799
@impl true
68-
def init(pubsub) do
100+
def init({pubsub, worker}) do
69101
Process.flag(:message_queue_data, :off_heap)
70102
Process.flag(:fullsweep_after, 20)
71-
{:ok, pubsub}
103+
{:ok, {pubsub, worker}}
72104
end
73105

74106
@impl true
75-
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
107+
# Forward to local
108+
def handle_info({:ftl, topic, message, dispatcher}, {pubsub, worker}) do
109+
Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher)
110+
{:noreply, {pubsub, worker}}
111+
end
112+
113+
# Forward to the rest of the region
114+
def handle_info({:ftr, topic, message, dispatcher}, {pubsub, worker}) do
115+
# Forward to local first
76116
Phoenix.PubSub.local_broadcast(pubsub, topic, message, dispatcher)
77-
{:noreply, pubsub}
117+
118+
# Then broadcast to the rest of my region
119+
my_region = Application.get_env(:realtime, :region)
120+
other_nodes = for node <- Realtime.Nodes.region_nodes(my_region), node != node(), do: node
121+
122+
if other_nodes != [] do
123+
Realtime.GenRpc.abcast(other_nodes, worker, forward_to_local(topic, message, dispatcher), key: worker)
124+
end
125+
126+
{:noreply, {pubsub, worker}}
78127
end
79128

80129
@impl true

lib/realtime/nodes.ex

Lines changed: 25 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,27 @@ defmodule Realtime.Nodes do
6464

6565
def region_nodes(nil), do: []
6666

67+
@doc """
68+
Picks a node from a region based on the provided key
69+
"""
70+
@spec node_from_region(String.t(), term()) :: {:ok, node} | {:error, :not_available}
71+
def node_from_region(region, key) when is_binary(region) do
72+
nodes = region_nodes(region)
73+
74+
case nodes do
75+
[] ->
76+
{:error, :not_available}
77+
78+
_ ->
79+
member_count = Enum.count(nodes)
80+
index = :erlang.phash2(key, member_count)
81+
82+
{:ok, Enum.fetch!(nodes, index)}
83+
end
84+
end
85+
86+
def node_from_region(_, _), do: {:error, :not_available}
87+
6788
@doc """
6889
Picks the node to launch the Postgres connection on.
6990
@@ -132,59 +153,9 @@ defmodule Realtime.Nodes do
132153
end
133154
end
134155

135-
@mapping_realtime_region_to_tenant_region_aws %{
136-
"ap-southeast-1" => [
137-
"ap-east-1",
138-
"ap-northeast-1",
139-
"ap-northeast-2",
140-
"ap-south-1",
141-
"ap-southeast-1"
142-
],
143-
"ap-southeast-2" => ["ap-southeast-2"],
144-
"eu-west-2" => [
145-
"eu-central-1",
146-
"eu-central-2",
147-
"eu-north-1",
148-
"eu-west-1",
149-
"eu-west-2",
150-
"eu-west-3"
151-
],
152-
"us-east-1" => [
153-
"ca-central-1",
154-
"sa-east-1",
155-
"us-east-1",
156-
"us-east-2"
157-
],
158-
"us-west-1" => ["us-west-1", "us-west-2"]
159-
}
160-
@mapping_realtime_region_to_tenant_region_fly %{
161-
"iad" => ["ca-central-1", "sa-east-1", "us-east-1"],
162-
"lhr" => ["eu-central-1", "eu-west-1", "eu-west-2", "eu-west-3"],
163-
"sea" => ["us-west-1"],
164-
"syd" => [
165-
"ap-east-1",
166-
"ap-northeast-1",
167-
"ap-northeast-2",
168-
"ap-south-1",
169-
"ap-southeast-1",
170-
"ap-southeast-2"
171-
]
172-
}
156+
@all_regions ~w(eu-west-2 us-east-1 us-west-1 ap-southeast-1 ap-southeast-2)
173157

174-
@doc """
175-
Fetches the tenant regions for a given realtime reagion
176-
"""
177-
@spec region_to_tenant_regions(String.t()) :: list() | nil
178-
def region_to_tenant_regions(region) do
179-
platform = Application.get_env(:realtime, :platform)
180-
181-
mappings =
182-
case platform do
183-
:aws -> @mapping_realtime_region_to_tenant_region_aws
184-
:fly -> @mapping_realtime_region_to_tenant_region_fly
185-
_ -> %{}
186-
end
187-
188-
Map.get(mappings, region)
189-
end
158+
@spec all_node_regions() :: [String.t()]
159+
@doc "List all the regions where nodes can be launched"
160+
def all_node_regions(), do: @all_regions
190161
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.55.2",
7+
version: "2.56.0",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
defmodule Realtime.GenRpcPubSub.WorkerTest do
2+
use ExUnit.Case, async: true
3+
alias Realtime.GenRpcPubSub.Worker
4+
alias Realtime.GenRpc
5+
alias Realtime.Nodes
6+
7+
use Mimic
8+
9+
@topic "test_topic"
10+
11+
setup do
12+
worker = start_link_supervised!({Worker, {Realtime.PubSub, __MODULE__}})
13+
%{worker: worker}
14+
end
15+
16+
describe "forward to local" do
17+
test "local broadcast", %{worker: worker} do
18+
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, @topic)
19+
send(worker, Worker.forward_to_local(@topic, "le message", Phoenix.PubSub))
20+
21+
assert_receive "le message"
22+
refute_receive _any
23+
end
24+
end
25+
26+
describe "forward to region" do
27+
setup %{worker: worker} do
28+
GenRpc
29+
|> stub()
30+
|> allow(self(), worker)
31+
32+
Nodes
33+
|> stub()
34+
|> allow(self(), worker)
35+
36+
:ok
37+
end
38+
39+
test "local broadcast + forward to other nodes", %{worker: worker} do
40+
parent = self()
41+
expect(Nodes, :region_nodes, fn "us-east-1" -> [node(), :node_us_2, :node_us_3] end)
42+
43+
expect(GenRpc, :abcast, fn [:node_us_2, :node_us_3],
44+
Realtime.GenRpcPubSub.WorkerTest,
45+
{:ftl, "test_topic", "le message", Phoenix.PubSub},
46+
[key: Realtime.GenRpcPubSub.WorkerTest] ->
47+
send(parent, :abcast_called)
48+
:ok
49+
end)
50+
51+
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, @topic)
52+
send(worker, Worker.forward_to_region(@topic, "le message", Phoenix.PubSub))
53+
54+
assert_receive "le message"
55+
assert_receive :abcast_called
56+
refute_receive _any
57+
end
58+
59+
test "local broadcast and no other nodes", %{worker: worker} do
60+
expect(Nodes, :region_nodes, fn "us-east-1" -> [node()] end)
61+
62+
reject(GenRpc, :abcast, 4)
63+
64+
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, @topic)
65+
send(worker, Worker.forward_to_region(@topic, "le message", Phoenix.PubSub))
66+
67+
assert_receive "le message"
68+
refute_receive _any
69+
end
70+
end
71+
end

0 commit comments

Comments
 (0)