Skip to content
This repository was archived by the owner on Jan 29, 2019. It is now read-only.

Commit 46051ca

Browse files
committed
All local elements of PG work correctly
1 parent 87ebdc8 commit 46051ca

File tree

2 files changed

+178
-53
lines changed

2 files changed

+178
-53
lines changed

lib/firenest/pg.ex

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,40 @@ defmodule Firenest.PG do
55

66
defdelegate child_spec(opts), to: Firenest.PG.Supervisor
77

8-
def track(pg, pid, group, key, meta) when node(pid) == node() do
8+
def join(pg, group, key, pid, meta) when node(pid) == node() do
99
server = partition_info!(pg, group)
10-
GenServer.call(server, {:track, pid, group, key, meta})
10+
GenServer.call(server, {:join, group, key, pid, meta})
1111
end
1212

13-
def untrack(pg, pid, group, key) when node(pid) == node() do
13+
def leave(pg, group, key, pid) when node(pid) == node() do
1414
server = partition_info!(pg, group)
15-
GenServer.call(server, {:untrack, pid, group, key})
15+
GenServer.call(server, {:leave, group, key, pid})
1616
end
1717

18-
def untrack(pg, pid) when node(pid) == node() do
18+
def leave(pg, pid) when node(pid) == node() do
1919
servers = partition_infos!(pg)
20-
multicall(servers, {:untrack, pid}, 5_000)
20+
replies = multicall(servers, {:leave, pid}, 5_000)
21+
22+
if :ok in replies do
23+
:ok
24+
else
25+
{:error, :not_member}
26+
end
2127
end
2228

23-
def update(pg, pid, group, key, meta) when node(pid) == node() do
29+
def update(pg, group, key, pid, update) when node(pid) == node() and is_function(update, 1) do
2430
server = partition_info!(pg, group)
25-
GenServer.call(server, {:update, pid, group, key, meta})
31+
GenServer.call(server, {:update, group, key, pid, update})
2632
end
2733

28-
def list(pg, group) do
34+
def replace(pg, group, key, pid, meta) when node(pid) == node() do
2935
server = partition_info!(pg, group)
30-
GenServer.call(server, {:list, group}).()
36+
GenServer.call(server, {:replace, group, key, pid, meta})
37+
end
38+
39+
def members(pg, group) do
40+
server = partition_info!(pg, group)
41+
GenServer.call(server, {:members, group}).()
3142
end
3243

3344
# TODO
@@ -41,7 +52,7 @@ defmodule Firenest.PG do
4152
send(pid, {:"$gen_call", {self(), ref}, request})
4253
ref
4354
end)
44-
|> Enum.each(fn ref ->
55+
|> Enum.map(fn ref ->
4556
receive do
4657
{^ref, reply} ->
4758
Process.demonitor(ref, [:flush])
@@ -134,22 +145,28 @@ defmodule Firenest.PG.Server do
134145
end
135146

136147
@impl true
137-
def handle_call({:track, pid, group, key, meta}, _from, state) do
148+
def handle_call({:join, group, key, pid, meta}, _from, state) do
138149
%{values: values, pids: pids} = state
139150
Process.link(pid)
140-
:ets.insert(values, {{group, pid, key}, meta})
141-
:ets.insert(pids, {group, pid, key})
142-
{:reply, :ok, state}
151+
ets_key = {group, pid, key}
152+
153+
if :ets.member(values, ets_key) do
154+
{:reply, {:error, :already_joined}, state}
155+
else
156+
:ets.insert(values, {{group, pid, key}, meta})
157+
:ets.insert(pids, {group, pid, key})
158+
{:reply, :ok, state}
159+
end
143160
end
144161

145-
def handle_call({:untrack, pid, group, key}, _from, state) do
162+
def handle_call({:leave, group, key, pid}, _from, state) do
146163
%{values: values, pids: pids} = state
147164
key = {group, pid, key}
148165
ms = [{key, [], [true]}]
149166

150167
case :ets.select_delete(pids, ms) do
151168
0 ->
152-
{:reply, {:error, :not_tracked}, state}
169+
{:reply, {:error, :not_member}, state}
153170

154171
1 ->
155172
unless :ets.member(pids, pid) do
@@ -161,18 +178,44 @@ defmodule Firenest.PG.Server do
161178
end
162179
end
163180

164-
def handle_call({:untrack, pid}, _from, state) do
181+
def handle_call({:update, group, key, pid, update}, _from, state) do
182+
%{values: values} = state
183+
ets_key = {group, pid, key}
184+
185+
case ets_fetch_element(values, ets_key, 2) do
186+
{:ok, value} ->
187+
:ets.insert(values, {ets_key, update.(value)})
188+
{:reply, :ok, state}
189+
190+
:error ->
191+
{:reply, {:error, :not_member}, state}
192+
end
193+
end
194+
195+
def handle_call({:replace, group, key, pid, meta}, _from, state) do
196+
%{values: values} = state
197+
ets_key = {group, pid, key}
198+
199+
if :ets.member(values, ets_key) do
200+
:ets.insert(values, {ets_key, meta})
201+
{:reply, :ok, state}
202+
else
203+
{:reply, {:error, :not_member}, state}
204+
end
205+
end
206+
207+
def handle_call({:leave, pid}, _from, state) do
165208
%{values: values, pids: pids} = state
166209

167210
if untrack_pid(pids, values, pid) do
168211
Process.unlink(pid)
169212
{:reply, :ok, state}
170213
else
171-
{:reply, {:error, :not_tracked}, state}
214+
{:reply, {:error, :not_member}, state}
172215
end
173216
end
174217

175-
def handle_call({:list, group}, _from, state) do
218+
def handle_call({:members, group}, _from, state) do
176219
%{values: values} = state
177220

178221
read = fn ->
@@ -205,4 +248,10 @@ defmodule Firenest.PG.Server do
205248
true
206249
end
207250
end
251+
252+
defp ets_fetch_element(table, key, pos) do
253+
{:ok, :ets.lookup_element(table, key, pos)}
254+
catch
255+
:error, :badarg -> :error
256+
end
208257
end

test/firenest/pg_test.exs

Lines changed: 109 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,49 +12,125 @@ defmodule Firenest.PGTest do
1212
{:ok, pg: test}
1313
end
1414

15-
test "tracks processes", %{pg: pg} do
16-
PG.track(pg, self(), :foo, :bar, :baz)
17-
assert [{:bar, :baz}] == PG.list(pg, :foo)
15+
describe "join/5" do
16+
test "adds process", %{pg: pg} do
17+
assert PG.join(pg, :foo, :bar, self(), :baz) == :ok
18+
assert [{:bar, :baz}] == PG.members(pg, :foo)
19+
end
20+
21+
test "rejects double joins", %{pg: pg} do
22+
assert PG.join(pg, :foo, :bar, self(), :baz) == :ok
23+
assert PG.join(pg, :foo, :bar, self(), :baz) == {:error, :already_joined}
24+
end
25+
26+
test "cleans up entries after process dies", %{pg: pg} do
27+
{pid, ref} = spawn_monitor(Process, :sleep, [:infinity])
28+
PG.join(pg, :foo, :bar, pid, :baz)
29+
assert [_] = PG.members(pg, :foo)
30+
Process.exit(pid, :kill)
31+
assert_receive {:DOWN, ^ref, _, _, _}
32+
assert [] = PG.members(pg, :foo)
33+
end
34+
35+
test "pg dies if other linked process dies", %{pg: pg} do
36+
parent = self()
37+
[{_, pid, _, _}] = Supervisor.which_children(Module.concat(pg, "Supervisor"))
38+
ref = Process.monitor(pid)
39+
40+
temp =
41+
spawn(fn ->
42+
Process.link(pid)
43+
send(parent, :continue)
44+
Process.sleep(:infinity)
45+
end)
46+
47+
assert_receive :continue
48+
49+
Process.exit(temp, :shutdown)
50+
assert_receive {:DOWN, ^ref, _, _, _}
51+
end
1852
end
1953

20-
test "processes are clened up when they die", %{pg: pg} do
21-
{pid, ref} = spawn_monitor(Process, :sleep, [:infinity])
22-
PG.track(pg, pid, :foo, :bar, :baz)
23-
assert [_] = PG.list(pg, :foo)
24-
Process.exit(pid, :kill)
25-
assert_receive {:DOWN, ^ref, _, _, _}
26-
assert [] = PG.list(pg, :foo)
54+
describe "leave/2" do
55+
test "removes entry", %{pg: pg} do
56+
PG.join(pg, :foo, :bar, self(), :baz)
57+
58+
assert [_] = PG.members(pg, :foo)
59+
assert PG.leave(pg, self()) == :ok
60+
assert [] == PG.members(pg, :foo)
61+
end
62+
63+
test "does not remove non members", %{pg: pg} do
64+
[{_, pid, _, _}] = Supervisor.which_children(Module.concat(pg, "Supervisor"))
65+
Process.link(pid)
66+
67+
assert PG.leave(pg, self()) == {:error, :not_member}
68+
{:links, links} = Process.info(self(), :links)
69+
assert pid in links
70+
end
2771
end
2872

29-
test "pg dies if linked, untracked process terminates", %{pg: pg} do
30-
parent = self()
31-
[{_, pid, _, _}] = Supervisor.which_children(Module.concat(pg, "Supervisor"))
32-
ref = Process.monitor(pid)
73+
describe "leave/4" do
74+
test "removes single entry", %{pg: pg} do
75+
PG.join(pg, :foo, :bar, self(), :baz)
76+
assert [_] = PG.members(pg, :foo)
77+
78+
assert PG.leave(pg, :foo, :bar, self()) == :ok
79+
assert [] == PG.members(pg, :foo)
80+
end
3381

34-
temp =
35-
spawn(fn ->
36-
Process.link(pid)
37-
send(parent, :continue)
38-
Process.sleep(:infinity)
39-
end)
82+
test "leaves other entries intact", %{pg: pg} do
83+
PG.join(pg, :foo, :bar, self(), :baz)
84+
PG.join(pg, :foo, :baar, self(), :baz)
85+
assert [_, _] = PG.members(pg, :foo)
4086

41-
assert_receive :continue
87+
assert PG.leave(pg, :foo, :bar, self()) == :ok
88+
assert [{:baar, :baz}] == PG.members(pg, :foo)
89+
end
4290

43-
Process.exit(temp, :shutdown)
44-
assert_receive {:DOWN, ^ref, _, _, _}
91+
test "does not remove non members", %{pg: pg} do
92+
[{_, pid, _, _}] = Supervisor.which_children(Module.concat(pg, "Supervisor"))
93+
Process.link(pid)
94+
95+
assert PG.leave(pg, :foo, :bar, self()) == {:error, :not_member}
96+
{:links, links} = Process.info(self(), :links)
97+
assert pid in links
98+
end
4599
end
46100

47-
test "untrack/2", %{pg: pg} do
48-
PG.track(pg, self(), :foo, :bar, :baz)
49-
assert [_] = PG.list(pg, :foo)
50-
PG.untrack(pg, self())
51-
assert [] == PG.list(pg, :foo)
101+
describe "update/5" do
102+
test "executes the update if entry is present", %{pg: pg} do
103+
parent = self()
104+
PG.join(pg, :foo, :bar, self(), 1)
105+
assert [{:bar, 1}] == PG.members(pg, :foo)
106+
107+
update = fn value ->
108+
send(parent, value)
109+
value + 1
110+
end
111+
112+
assert PG.update(pg, :foo, :bar, self(), update) == :ok
113+
assert_received 1
114+
assert [{:bar, 2}] == PG.members(pg, :foo)
115+
end
116+
117+
test "does not execute update if entry is absent", %{pg: pg} do
118+
parent = self()
119+
update = fn value -> Process.exit(parent, {:unexpected_update, value}) end
120+
assert PG.update(pg, :foo, :bar, self(), update) == {:error, :not_member}
121+
end
52122
end
53123

54-
test "untrack/4", %{pg: pg} do
55-
PG.track(pg, self(), :foo, :bar, :baz)
56-
assert [_] = PG.list(pg, :foo)
57-
PG.untrack(pg, self(), :foo, :bar)
58-
assert [] == PG.list(pg, :foo)
124+
describe "replace/5" do
125+
test "updates value if entry is present", %{pg: pg} do
126+
PG.join(pg, :foo, :bar, self(), 1)
127+
assert [{:bar, 1}] == PG.members(pg, :foo)
128+
assert PG.replace(pg, :foo, :bar, self(), 2) == :ok
129+
assert [{:bar, 2}] == PG.members(pg, :foo)
130+
end
131+
132+
test "does not update value if entry is absent", %{pg: pg} do
133+
assert PG.replace(pg, :foo, :bar, self(), 2) == {:error, :not_member}
134+
end
59135
end
60136
end

0 commit comments

Comments
 (0)