Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions lib/util/loader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
defmodule Util.Loader do
alias __MODULE__.Task
alias __MODULE__.Results

def load(definitions, opts \\ []) do
global_timeout = Keyword.get(opts, :whole_operation_timeout, :infinity)
per_resource_timeout = Keyword.get(opts, :per_resource_timeout, :infinity)

Wormhole.capture(fn ->
tasks = Enum.map(definitions, fn definition ->
{id, fun, opts} = definition

Task.new(id, fun, opts)
end)

with(
:ok <- check_unknown_deps(tasks),
:ok <- check_deps_cycle(tasks)
) do
execute(tasks, Results.new())
end
end, timeout: global_timeout)
|> case do
{:ok, res} -> res
e -> e
end
end

defp execute(tasks, results) do
{runnable, rest} = find_runnable(tasks, already_executed: Results.executed_task_ids(results))

if runnable == [] && rest != [] do
{:error, :unprocessed, Enum.map(rest, fn r -> r.id end)}
else
new_results =
try do
runnable
|> Enum.map(fn t -> Task.execute_async(t, Results.fetch(results, t.deps)) end)
|> Task.await_many()
|> Enum.into(%{})
rescue
e ->
IO.inspect("AAAAAAAAAAAAAAAAA", label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n")
IO.inspect(e, label: "\n\e[33m=== DEBUG (#{__ENV__.module}:#{__ENV__.line}) ===\e[0m\n")
%{}
end

case process(new_results) do
{:ok, new_results} ->
results = Map.merge(results, new_results)

if rest == [] do
{:ok, results}
else
execute(rest, results)
end

{:error, new_results} ->
results = Map.merge(results, new_results)

{:error, results}
end
end
end

defp process(raw_results) do
res = Enum.reduce(raw_results, {:ok, []}, fn r, {type, acc} ->
case r do
{name, {:ok, val}} -> {type, acc ++ [{name, val}]}
{name, e} -> {:error, acc ++ [{name, e}]}
end
end)

{elem(res, 0), Enum.into(elem(res, 1), %{})}
end

defp check_deps_cycle(tasks, visited \\ []) do
{visitable, rest} = find_runnable(tasks, already_executed: visited)

cond do
visitable == [] && rest == [] ->
:ok

visitable == [] && rest != [] ->
{:error, :dependency_cycle}

true ->
check_deps_cycle(rest, visited ++ Enum.map(visitable, &(&1.id)))
end
end

defp check_unknown_deps(tasks) do
names = Enum.map(tasks, &(&1.id))

tasks
|> Enum.map(fn task ->
unknown_deps = Enum.filter(task.deps, fn d -> d not in names end)

{task.id, unknown_deps}
end)
|> Enum.filter(fn {id, unknown_deps} ->
unknown_deps != []
end)
|> case do
[] -> :ok
e -> {:error, :unknown_dependency, Enum.into(e, %{})}
end
end

defp subset?(arr1, arr2) do
MapSet.subset?(MapSet.new(arr1), MapSet.new(arr2))
end

defp find_runnable(tasks, already_executed: ids) do
Enum.split_with(tasks, fn t -> subset?(t.deps, ids) end)
end


defmodule Results do
def new(), do: %{}
def executed_task_ids(r), do: Map.keys(r)

def fetch(r, ids) do
Enum.filter(r, fn {k, v} -> k in ids end) |> Enum.into(%{})
end
end

end
42 changes: 42 additions & 0 deletions lib/util/loader/task.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Util.Loader.Task do
@moduledoc """
An individual loading task.
"""

defstruct [
:id,
:fun,
:deps,
:timeout
]

def new(id, fun, opts) do
timeout = Keyword.get(opts, :timeout, :infinity)
deps = Keyword.get(opts, :depends_on, [])

{:ok, struct(__MODULE__,
id: id,
fun: fun,
deps: deps,
timeout: timeout
)}
end

def execute(task, deps) do
Wormhole.capture(fn -> dispatch_call(task.fun, deps) end, [timeout: task.timeout])
|> case do
{:ok, {:ok, res}} -> {:ok, res}
{:ok, {:error, err}} -> {:error, err}
{:ok, other} -> {:error, :unexpected_result_type, other}
{:error, err} -> {:error, err}
end
end

defp dispatch_call(fun, deps) do
case :erlang.fun_info(fun)[:arity] do
0 -> fun.()
1 -> fun.(deps)
2 -> fun.(deps, [])
end
end
end
52 changes: 52 additions & 0 deletions test/util/loader/task_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Util.Loader.TaskTest do
use ExUnit.Case, async: true

test "construction" do
assert {:ok, _} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, depends_on: [:b, :d])
end

describe "execution" do
test "ok result" do
assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:ok, "A"} end, [])
assert {:ok, "A"} = Util.Loader.Task.execute(task, %{})
end

test "error result" do
assert {:ok, task} = Util.Loader.Task.new(:a, fn -> {:error, "B"} end, [])
assert {:error, "B"} = Util.Loader.Task.execute(task, %{})
end

test "exception result" do
fun = fn -> raise "AAA" end

assert {:ok, task} = Util.Loader.Task.new(:a, fun, [])
assert {:error, {:shutdown, %RuntimeError{message: "AAA"}}} = Util.Loader.Task.execute(task, %{})
end

test "non-tuple result" do
assert {:ok, task} = Util.Loader.Task.new(:a, fn -> "A" end, [])
assert {:error, :unexpected_result_type, "A"} = Util.Loader.Task.execute(task, %{})
end

test "timeout result" do
fun = fn ->
:timer.sleep(1000)
{:ok, "A"}
end

assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100)
assert {:error, {:timeout, 100}} = Util.Loader.Task.execute(task, [])
end

test "with deps" do
deps = %{a: "A", b: "B"}

fun = fn deps ->
{:ok, deps.a <> deps.b}
end

assert {:ok, task} = Util.Loader.Task.new(:a, fun, timeout: 100)
assert {:ok, "AB"} = Util.Loader.Task.execute(task, deps)
end
end
end
122 changes: 122 additions & 0 deletions test/util/loader_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
defmodule Util.LoaderTest do
use ExUnit.Case, async: true

alias Util.Loader

test "empty loaders return :ok" do
assert {:ok, %{}} = Loader.load([])
end

test "it can laod things in parallel" do
assert {:ok, resources} = Loader.load([
{:user, fn -> {:ok, "Mike"} end},
{:org, fn -> {:ok, "Acme"} end}
])

assert resources.user == "Mike"
assert resources.org == "Acme"
end

test "it can wait on dependencies" do
assert {:ok, resources} = Loader.load([
{:user, fn -> {:ok, "Mike"} end},
{:permissions, fn deps -> {:ok, "#{deps.user} is an admin"} end, depends_on: [:user]}
])

assert resources.user == "Mike"
assert resources.permissions == "Mike is an admin"
end

test "multiple tasks with dependencies" do
assert {:ok, resources} = Loader.load([
{:a, fn -> {:ok, "a"} end},
{:b, fn deps -> {:ok, deps.a <> "b"} end, depends_on: [:a]},
{:c, fn deps -> {:ok, deps.b <> "c"} end, depends_on: [:b]},
])

assert resources.a == "a"
assert resources.b == "ab"
assert resources.c == "abc"
end

test "it can return errors" do
assert {:error, resources} = Loader.load([
{:user, fn -> {:ok, "Mike"} end},
{:org, fn -> {:error, :not_found} end}
])

assert resources.org == {:error, :not_found}
end

test "it returns an error if an unknown dependency is required" do
resources = [
{:a, fn -> {:ok, nil} end},
{:b, fn -> {:ok, nil} end, depends_on: [:c]},
]

assert {:error, :unknown_dependency, %{b: [:c]}} = Loader.load(resources)
end

test "it returns an error if there is a cycle in the deps" do
resources = [
{:a, fn -> {:ok, nil} end},
{:b, fn -> {:ok, nil} end, depends_on: [:c]},
{:c, fn -> {:ok, nil} end, depends_on: [:b]},
]

assert {:error, :dependency_cycle} = Loader.load(resources)

resources = [
{:a, fn -> {:ok, nil} end},
{:b, fn -> {:ok, nil} end, depends_on: [:d]},
{:c, fn -> {:ok, nil} end, depends_on: [:b]},
{:d, fn -> {:ok, nil} end, depends_on: [:c]},
]

assert {:error, :dependency_cycle} = Loader.load(resources)
end

test "it handles raised exceptions" do
assert {:error, resources} = Loader.load([
{:a, fn -> raise "failure" end},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
])

assert resources.a == {:error, {:shutdown, %RuntimeError{message: "failure"}}}
end

test "it respects the timeout" do
assert {:error, {:timeout, 100}} = Loader.load([
{:a, fn -> :timer.sleep(300) end},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
], whole_operation_timeout: 100)
end

test "it respects per task timeout" do
assert {:error, resources} = Loader.load([
{:a, fn -> :timer.sleep(300) end, timeout: 100},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
])

assert resources.a == {:error, {:timeout, 100}}
end

test "it respects per task timeout defined on global level" do
assert {:error, resources} = Loader.load([
{:a, fn -> :timer.sleep(300) end},
{:b, fn -> {:ok, nil} end, depends_on: [:a]},
], per_resource_timeout: 100)

assert resources.a == {:error, {:timeout, 100}}
end

test "it support fail-fast" do
assert {:error, resources} = Loader.load([
{:a, fn -> :timer.sleep(10000) end},
{:b, fn -> raise "aaa" end}
])

assert resources.b == {:error, {:shutdown, %RuntimeError{message: "aaa"}}}
assert resources.a == "AAA"
end
end