From 9b86e3cdeaf21e61a31d76fcabf61ccddff3c1c4 Mon Sep 17 00:00:00 2001 From: Mike Nomitch Date: Mon, 22 Aug 2022 19:50:01 -0700 Subject: [PATCH 1/2] Adding nomad strategy --- CHANGELOG.md | 2 +- README.md | 6 +- lib/strategy/nomad.ex | 218 ++++++++++++++++++++++++++++++++++++++++++ test/nomat_test.exs | 0 4 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 lib/strategy/nomad.ex create mode 100644 test/nomat_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f929ae..3aea87d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Use new cypher names +- Adds clustering strategy for Nomad orchestratory (see: github.com/hashicorp/nomad) ### 3.3.0 @@ -10,7 +11,6 @@ - Default multicast address is now 233.252.1.32, was 230.1.1.251, [commit](https://github.com/bitwalker/libcluster/commit/449a65e14f152a83a0f8ee371f05743610cd292f) - ### 2.3.0 ### Added diff --git a/README.md b/README.md index c22a1a2..95333cf 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ You can find supporting documentation [here](https://hexdocs.pm/libcluster). - Kubernetes via its metadata API using via a configurable label selector and node basename; or alternatively, using DNS. - Rancher, via its [metadata API][rancher-api] + - Nomad, via its [services API][nomad-api] - Easy to provide your own custom clustering strategies for your specific environment. - Easy to use provide your own distribution plumbing (i.e. something other than Distributed Erlang), by implementing a small set of callbacks. This allows @@ -120,7 +121,9 @@ You have a handful of choices with regards to cluster management out of the box: nodes based on a label selector and basename. - `Cluster.Strategy.Kubernetes.DNS`, which uses DNS to join nodes under a shared headless service in a given namespace. -- `Cluster.Strategy.Rancher`, which like the Kubernetes strategy, uses a +- `Cluster.Strategy.Rancher`, which like the Kubernetes and Nomad strategies, uses a + metadata API to query nodes to cluster with. +- `Cluster.Strategy.Nomad`, which like the Kubernetes and Ranches strategies, uses a metadata API to query nodes to cluster with. You can also define your own strategy implementation, by implementing the @@ -153,3 +156,4 @@ This library is MIT licensed. See the [LICENSE.md](https://github.com/bitwalker/libcluster/blob/master/LICENSE.md) for details. [rancher-api]: http://rancher.com/docs/rancher/latest/en/rancher-services/metadata-service/ +[nomad-api]: https://www.nomadproject.io/api-docs/services diff --git a/lib/strategy/nomad.ex b/lib/strategy/nomad.ex new file mode 100644 index 0000000..45175ba --- /dev/null +++ b/lib/strategy/nomad.ex @@ -0,0 +1,218 @@ +defmodule Cluster.Strategy.Nomad do + @moduledoc """ + This clustering strategy works by querying Nomad for a service specified + by name. It will poll for new service addresses based on the polling interval + specified (in milliseconds). + + ## Options + + * `service_name` - The name of the Nomad service you wish to get the addresses for (required; e.g. "my-elixir-app") + * `namespace` - The Nomad namespace to query (optional; default: "default") + * `nomad_server_url` - The short name of the nodes you wish to connect to (required; e.g. "https://127.0.0.1:4646") + * `node_basename` - The erland node basename (required; e.g. "app") + * `poll_interval` - How often to poll in milliseconds (optional; default: 5_000) + + ## Usage + + config :libcluster, + topologies: [ + dns_poll_example: [ + strategy: #{__MODULE__}, + config: [ + service_name: "my-elixir-app", + nomad_server_url: "https://my-nomad-url:4646", + namespace: "engineering", + node_basename: "app", + polling_interval: 5_000]]] + """ + + use GenServer + import Cluster.Logger + + alias Cluster.Strategy.State + alias Cluster.Strategy + + @default_polling_interval 5_000 + @default_namespace "default" + @default_token "" + + def start_link(args), do: GenServer.start_link(__MODULE__, args) + + @impl true + def init([%State{meta: nil} = state]) do + init([%State{state | :meta => MapSet.new()}]) + end + + def init([%State{} = state]) do + {:ok, do_poll(state)} + end + + @impl true + def handle_info(:timeout, state), do: handle_info(:poll, state) + def handle_info(:poll, state), do: {:noreply, do_poll(state)} + def handle_info(_, state), do: {:noreply, state} + + defp do_poll( + %State{ + topology: topology, + connect: connect, + disconnect: disconnect, + list_nodes: list_nodes + } = state + ) do + new_nodelist = state |> get_nodes() |> MapSet.new() + removed = MapSet.difference(state.meta, new_nodelist) + + new_nodelist = + case Strategy.disconnect_nodes( + topology, + disconnect, + list_nodes, + MapSet.to_list(removed) + ) do + :ok -> + new_nodelist + + {:error, bad_nodes} -> + # Add back the nodes which should have been removed, but which couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.put(acc, n) + end) + end + + new_nodelist = + case Strategy.connect_nodes( + topology, + connect, + list_nodes, + MapSet.to_list(new_nodelist) + ) do + :ok -> + new_nodelist + + {:error, bad_nodes} -> + # Remove the nodes which should have been added, but couldn't be for some reason + Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> + MapSet.delete(acc, n) + end) + end + + Process.send_after(self(), :poll, polling_interval(state)) + + %{state | :meta => new_nodelist} + end + + defp polling_interval(%{config: config}) do + Keyword.get(config, :polling_interval, @default_polling_interval) + end + + defp get_namespace(config) do + Keyword.get(config, :namespace, @default_namespace) + end + + defp get_token(config) do + Keyword.get(config, :token, @default_token) + end + + @spec get_nodes(State.t()) :: [atom()] + defp get_nodes(%State{config: config} = state) do + server_url = Keyword.fetch(config, :nomad_server_url) + service_name = Keyword.fetch(config, :service_name) + node_basename = Keyword.fetch(config, :node_basename) + namespace = get_namespace(config) + token = get_token(config) + + fetch_nodes(server_url, service_name, node_basename, namespace, token, state) + end + + defp fetch_nodes( + {:ok, server_url}, + {:ok, service_name}, + {:ok, node_basename}, + namespace, + token, + %State{ + topology: topology + } + ) + when server_url != "" and service_name != "" and node_basename != "" do + debug(topology, "polling nomad for '#{service_name}' in namespace '#{namespace}'") + + headers = [{'X-Nomad-Token', '#{token}'}] + http_options = [] + url = '#{server_url}/v1/service/#{service_name}' + + case :httpc.request(:get, {url, headers}, http_options, []) do + {:ok, {{_version, 200, _status}, _headers, body}} -> + Jason.decode!(body) + |> Enum.map(fn %{"Address" => addr} -> :"#{node_basename}@#{addr}" end) + + {:ok, {{_version, 403, _status}, _headers, _body}} -> + warn(topology, "cannot query nomad (unauthorized)") + [] + + {:ok, {{_version, code, status}, _headers, body}} -> + warn(topology, "cannot query nomad (#{code} #{status}): #{inspect(body)}") + [] + + {:error, reason} -> + error(topology, "request to nomad failed!: #{inspect(reason)}") + [] + + _ -> + error(topology, "unknown error fetching nomad service info") + [] + end + end + + defp fetch_nodes( + {:ok, invalid_server_url}, + {:ok, invalid_service_name}, + {:ok, invalid_node_base_name}, + _namespace, + _token, + %State{ + topology: topology + } + ) do + warn( + topology, + "nomad strategy is selected, but server_url, service_name, or node_base_name param is invalid: #{inspect(%{nomad_server_url: invalid_server_url, service_name: invalid_service_name, node_basename: invalid_node_base_name})}" + ) + + [] + end + + defp fetch_nodes(:error, _service_name, _node_base_name, _namespace, _token, %State{ + topology: topology + }) do + warn( + topology, + "nomad polling strategy is selected, but nomad_server_url param missed" + ) + + [] + end + + defp fetch_nodes(_server_url, :error, _node_base_name, _namespace, _token, %State{ + topology: topology + }) do + warn( + topology, + "nomad polling strategy is selected, but service_name param missed" + ) + + [] + end + + defp fetch_nodes(_server_url, _service_name, :error, _namespace, _token, %State{ + topology: topology + }) do + warn( + topology, + "nomad polling strategy is selected, but node_base_name param missed" + ) + + [] + end +end diff --git a/test/nomat_test.exs b/test/nomat_test.exs new file mode 100644 index 0000000..e69de29 From 4f8b1ec14f5e33111cacfcb6ce7f069a5a2490df Mon Sep 17 00:00:00 2001 From: Mike Nomitch Date: Thu, 13 Oct 2022 19:53:11 -0700 Subject: [PATCH 2/2] charlist debugging --- lib/strategy/nomad.ex | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/strategy/nomad.ex b/lib/strategy/nomad.ex index 45175ba..d0801b2 100644 --- a/lib/strategy/nomad.ex +++ b/lib/strategy/nomad.ex @@ -140,7 +140,11 @@ defmodule Cluster.Strategy.Nomad do headers = [{'X-Nomad-Token', '#{token}'}] http_options = [] - url = '#{server_url}/v1/service/#{service_name}' + url_string = "#{server_url}/v1/service/#{service_name}" + IO.puts("url_string") + IO.puts(url_string) + + url = to_charlist(url_string) case :httpc.request(:get, {url, headers}, http_options, []) do {:ok, {{_version, 200, _status}, _headers, body}} ->