From 690038fc4c043dd27792f47d4aa03161801242f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 7 Apr 2022 15:01:24 +0000 Subject: [PATCH 1/6] Bootstrap opinionated GRCP client --- lib/util/grpc.ex | 99 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 lib/util/grpc.ex diff --git a/lib/util/grpc.ex b/lib/util/grpc.ex new file mode 100644 index 0000000..4a09f6e --- /dev/null +++ b/lib/util/grpc.ex @@ -0,0 +1,99 @@ +defmodule Util.Grpc do + + @type client_name :: String.t() + + defmodule Client do + defstruct :name, :endpoint, :timeout, :log_level, :publish_metrics + + @type t() :: { + name: String.t(), + endpoint: String.t(), + timeout: number(), + log_level: atom(), + publish_metrics: boolean() + } + + @default_timeout 30_000 # 30 seconds + + def new(name, endpoint), do: new(name, endpoint, @default_timeout, :info, true) + def new(name, endpoint, timeout), do: new(name, endpoint, timeout, :info, true) + + def new(name, endpoint, timeout, log_level, publish_metrics) do + %__MODULE__{ + name: name, + endpoint: endpoint, + timeout: timeout, + log_level: log_level, + publish_metrics: publish_metrics + } + end + + def do(client, callback) do + Wormhole.capture(fn -> + Watchman.benchmark("grpc.#{client.name}.duration", fn -> + Watchman.increment("grpc.#{client.name}.connect") + + case Grpc.Stub.connect(client.endpoint) do + {:ok, channel} -> + callback.(channel) + + {:error, error} -> + Logger.log(client.level, "Failed to connect to #{client.name} service err='#{inspect(error)}'") + Watchman.increment("grpc.#{client.name}.connect.error") + end + end) + end) + end + end + + def start_link do + Util.Grpc.State.start_link() + end + + @doc """ + Sets up Grpc connection information. + + Example: + + Util.Grpc.setup([ + Client.new(:service_a, Application.get_env("SERVICE_A_ENDPOINT"), :info, true) + Client.new(:service_b, Application.get_env("SERVICE_B_ENDPOINT"), :info, true) + ]) + + """ + def setup(clients) do + :ok = State.reset() + + Enum.each(clients, fn c -> Util.Grpc.State.add_client(c) end) + + :ok + end + + @spec rpc(client_name(), GRPC.Channel.t(), function()) :: {:ok, any()} | {:error, :failed_to_connect, any()} | {:error, :timeout, any()} | {:error, any()} + @doc """ + Executes a stateless RPC call to a remote service. + + 1. It opens a new connection + 2. Sends the RPC request + 3. Waits for the result, or times out + """ + def do(client_name, callback) do + {:ok, client} = Agent.find(client_name) + + Client.rpc(client, callback) + end + + end + + defmodule State do + use Agent + + def start_link do + Agent.start_link(fn -> %{} end, name: __MODULE__) + end + + def reset do + Agent.update(__MODULE__, %{}) + end + end +end From 48868706a930b9214258fb6da64a436608ab0f57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Thu, 7 Apr 2022 17:12:47 +0000 Subject: [PATCH 2/6] Refinments for the API --- lib/util/grpc.ex | 107 +++++++++++++++++----------------------- lib/util/grpc/client.ex | 65 ++++++++++++++++++++++++ lib/util/grpc/state.ex | 29 +++++++++++ 3 files changed, 140 insertions(+), 61 deletions(-) create mode 100644 lib/util/grpc/client.ex create mode 100644 lib/util/grpc/state.ex diff --git a/lib/util/grpc.ex b/lib/util/grpc.ex index 4a09f6e..0965523 100644 --- a/lib/util/grpc.ex +++ b/lib/util/grpc.ex @@ -1,50 +1,45 @@ defmodule Util.Grpc do + @moduledoc """ + This module provides an opiniated interface for communicating with + GRPC services. It handles starting and closing connections, logging, + metrics, and proper error handling. - @type client_name :: String.t() + Usage: - defmodule Client do - defstruct :name, :endpoint, :timeout, :log_level, :publish_metrics - - @type t() :: { - name: String.t(), - endpoint: String.t(), - timeout: number(), - log_level: atom(), - publish_metrics: boolean() - } - - @default_timeout 30_000 # 30 seconds - - def new(name, endpoint), do: new(name, endpoint, @default_timeout, :info, true) - def new(name, endpoint, timeout), do: new(name, endpoint, timeout, :info, true) - - def new(name, endpoint, timeout, log_level, publish_metrics) do - %__MODULE__{ - name: name, - endpoint: endpoint, - timeout: timeout, - log_level: log_level, - publish_metrics: publish_metrics - } - end - - def do(client, callback) do - Wormhole.capture(fn -> - Watchman.benchmark("grpc.#{client.name}.duration", fn -> - Watchman.increment("grpc.#{client.name}.connect") - - case Grpc.Stub.connect(client.endpoint) do - {:ok, channel} -> - callback.(channel) - - {:error, error} -> - Logger.log(client.level, "Failed to connect to #{client.name} service err='#{inspect(error)}'") - Watchman.increment("grpc.#{client.name}.connect.error") - end - end) - end) - end - end + 1. First, add Util.Grpc to your application's supervision tree. + This process keeps the configuration values for your Grpc clients. + + children = [ + worker(Util.Grpc, []) + ] + + Supervisor.start_link(children, opts) + + 2. Describe the outgoing connections from your service: + + Util.Grpc.setup([ + Util.Grpc.Client.new(:user_service, "localhost:50051", UserApi.Stub), + Util.Grpc.Client.new(:billing_service, "localhost:50051", BillingApi.Stub) + ]) + + 3. User Grpc.do to communicate with your upstream services: + + req = ExampleApi.DescribeRequest.new(name: "a") + + {:ok, res} = Util.Grpc.call(:user_service, :describe, req) + + During the execution of the call, the following metrics are published: + + - gprc...duration + - gprc...connect + - gprc...connect.error.count + - gprc...response.success.count + - gprc...response.error.count + + In case of errors, log messages are logged via the Logger module. + """ + + @type client_name :: String.t() def start_link do Util.Grpc.State.start_link() @@ -69,7 +64,6 @@ defmodule Util.Grpc do :ok end - @spec rpc(client_name(), GRPC.Channel.t(), function()) :: {:ok, any()} | {:error, :failed_to_connect, any()} | {:error, :timeout, any()} | {:error, any()} @doc """ Executes a stateless RPC call to a remote service. @@ -77,23 +71,14 @@ defmodule Util.Grpc do 2. Sends the RPC request 3. Waits for the result, or times out """ - def do(client_name, callback) do - {:ok, client} = Agent.find(client_name) + @spec rpc(client_name(), GRPC.Channel.t(), function()) :: + {:ok, any()} + | {:error, :failed_to_connect, any()} + | {:error, :timeout, any()} + | {:error, any()} + def call(client_name, callback) do + {:ok, client} = State.find_client(client_name) Client.rpc(client, callback) end - - end - - defmodule State do - use Agent - - def start_link do - Agent.start_link(fn -> %{} end, name: __MODULE__) - end - - def reset do - Agent.update(__MODULE__, %{}) - end - end end diff --git a/lib/util/grpc/client.ex b/lib/util/grpc/client.ex new file mode 100644 index 0000000..8a2b384 --- /dev/null +++ b/lib/util/grpc/client.ex @@ -0,0 +1,65 @@ +defmodule Util.Grpc.Client do + defstruct :name, :endpoint, :timeout, :log_level, :publish_metrics, :stub + + @type t() :: { + name: String.t(), + endpoint: String.t(), + timeout: number(), + log_level: atom(), + publish_metrics: boolean(), + stub: Grpc.Stub.t() + } + + @default_timeout 30_000 # 30 seconds + + def new(name, endpoint), do: new(name, endpoint, @default_timeout, :info, true) + def new(name, endpoint, timeout), do: new(name, endpoint, timeout, :info, true) + + def new(name, endpoint, timeout, log_level, publish_metrics) do + %__MODULE__{ + name: name, + endpoint: endpoint, + timeout: timeout, + log_level: log_level, + publish_metrics: publish_metrics + } + end + + def call(client, method_name, request, opts \\ []) do + metric_name = "grpc.#{client.name}.#{method_name}" + + result = Wormhole.capture(fn -> + Watchman.benchmark("#{metric_name}.duration", fn -> + Watchman.increment("#{metric_name}.connect.count") + + case Grpc.Stub.connect(client.endpoint) do + {:ok, channel} -> + res = GRPC.Stub.call(client.stub, method_name, channel, request, opts) + + case res do + {:ok, res} -> + Watchman.increment("#{metric_name}.response.success.count") + {:ok, res} + + {:error, err} -> + Watchman.increment("#{metric_name}.response.error.count") + {:error, err} + end + + {:error, error} -> + Logger.log(client.level, "Failed to connect to #{client.name} service err='#{inspect(error)}'") + Watchman.increment("#{metric_name}.connect.error.count") + end + end) + end) + + case result do + {:ok, result} -> + result + + {:error, err} -> + Logger.log(client.level, "Error while processing #{client.name} service err='#{inspect(error)}'") + Watchman.increment("grpc.#{client.name}.execution.error") + end + end +end diff --git a/lib/util/grpc/state.ex b/lib/util/grpc/state.ex new file mode 100644 index 0000000..1d74078 --- /dev/null +++ b/lib/util/grpc/state.ex @@ -0,0 +1,29 @@ +defmodule Util.Grpc.State do + use Agent + + @moduledoc """ + Keeps the configuration state for every Grcp client. + + Example usage: + + State.add_client(Client.new(name: "service1", endpoint: "localhost:9000", ...)) + State.find_client("service1") => %Client{} + + """ + + def start_link do + Agent.start_link(fn -> %{} end, name: __MODULE__) + end + + def add_client(client) do + Agent.update(__MODULE__, fn state -> Map.put(state, client.name, client) end) + end + + def find_client(name) do + Agent.get(__MODULE__, fn state -> Map.fetch!(state, name) end) + end + + def reset do + Agent.update(__MODULE__, %{}) + end +end From f0e12a3e9f65d38f5dcf4aa392891cfede9877a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Sat, 9 Apr 2022 12:07:28 +0000 Subject: [PATCH 3/6] Clean up code --- lib/util/grpc/client.ex | 65 ---------------------- lib/util/grpc/state.ex | 29 ---------- lib/util/{grpc.ex => grpc_x.ex} | 55 ++++++------------ lib/util/grpc_x/client.ex | 42 ++++++++++++++ lib/util/grpc_x/rpc_call.ex | 98 +++++++++++++++++++++++++++++++++ lib/util/grpc_x/state.ex | 18 ++++++ lib/util/result.ex | 83 ++++++++++++++++++++++++++++ mix.exs | 17 +++--- mix.lock | 11 +++- test/grpc_x_test.exs | 20 +++++++ 10 files changed, 297 insertions(+), 141 deletions(-) delete mode 100644 lib/util/grpc/client.ex delete mode 100644 lib/util/grpc/state.ex rename lib/util/{grpc.ex => grpc_x.ex} (60%) create mode 100644 lib/util/grpc_x/client.ex create mode 100644 lib/util/grpc_x/rpc_call.ex create mode 100644 lib/util/grpc_x/state.ex create mode 100644 lib/util/result.ex create mode 100644 test/grpc_x_test.exs diff --git a/lib/util/grpc/client.ex b/lib/util/grpc/client.ex deleted file mode 100644 index 8a2b384..0000000 --- a/lib/util/grpc/client.ex +++ /dev/null @@ -1,65 +0,0 @@ -defmodule Util.Grpc.Client do - defstruct :name, :endpoint, :timeout, :log_level, :publish_metrics, :stub - - @type t() :: { - name: String.t(), - endpoint: String.t(), - timeout: number(), - log_level: atom(), - publish_metrics: boolean(), - stub: Grpc.Stub.t() - } - - @default_timeout 30_000 # 30 seconds - - def new(name, endpoint), do: new(name, endpoint, @default_timeout, :info, true) - def new(name, endpoint, timeout), do: new(name, endpoint, timeout, :info, true) - - def new(name, endpoint, timeout, log_level, publish_metrics) do - %__MODULE__{ - name: name, - endpoint: endpoint, - timeout: timeout, - log_level: log_level, - publish_metrics: publish_metrics - } - end - - def call(client, method_name, request, opts \\ []) do - metric_name = "grpc.#{client.name}.#{method_name}" - - result = Wormhole.capture(fn -> - Watchman.benchmark("#{metric_name}.duration", fn -> - Watchman.increment("#{metric_name}.connect.count") - - case Grpc.Stub.connect(client.endpoint) do - {:ok, channel} -> - res = GRPC.Stub.call(client.stub, method_name, channel, request, opts) - - case res do - {:ok, res} -> - Watchman.increment("#{metric_name}.response.success.count") - {:ok, res} - - {:error, err} -> - Watchman.increment("#{metric_name}.response.error.count") - {:error, err} - end - - {:error, error} -> - Logger.log(client.level, "Failed to connect to #{client.name} service err='#{inspect(error)}'") - Watchman.increment("#{metric_name}.connect.error.count") - end - end) - end) - - case result do - {:ok, result} -> - result - - {:error, err} -> - Logger.log(client.level, "Error while processing #{client.name} service err='#{inspect(error)}'") - Watchman.increment("grpc.#{client.name}.execution.error") - end - end -end diff --git a/lib/util/grpc/state.ex b/lib/util/grpc/state.ex deleted file mode 100644 index 1d74078..0000000 --- a/lib/util/grpc/state.ex +++ /dev/null @@ -1,29 +0,0 @@ -defmodule Util.Grpc.State do - use Agent - - @moduledoc """ - Keeps the configuration state for every Grcp client. - - Example usage: - - State.add_client(Client.new(name: "service1", endpoint: "localhost:9000", ...)) - State.find_client("service1") => %Client{} - - """ - - def start_link do - Agent.start_link(fn -> %{} end, name: __MODULE__) - end - - def add_client(client) do - Agent.update(__MODULE__, fn state -> Map.put(state, client.name, client) end) - end - - def find_client(name) do - Agent.get(__MODULE__, fn state -> Map.fetch!(state, name) end) - end - - def reset do - Agent.update(__MODULE__, %{}) - end -end diff --git a/lib/util/grpc.ex b/lib/util/grpc_x.ex similarity index 60% rename from lib/util/grpc.ex rename to lib/util/grpc_x.ex index 0965523..8d8ff7b 100644 --- a/lib/util/grpc.ex +++ b/lib/util/grpc_x.ex @@ -1,4 +1,4 @@ -defmodule Util.Grpc do +defmodule Util.GrpcX do @moduledoc """ This module provides an opiniated interface for communicating with GRPC services. It handles starting and closing connections, logging, @@ -9,20 +9,18 @@ defmodule Util.Grpc do 1. First, add Util.Grpc to your application's supervision tree. This process keeps the configuration values for your Grpc clients. + grpc_clients = [ + Util.Grpc.Client.new(:user_service, "localhost:50051", UserApi.Stub), + Util.Grpc.Client.new(:billing_service, "localhost:50051", BillingApi.Stub) + ] + children = [ - worker(Util.Grpc, []) + worker(Util.Grpc, gprc_clients) ] Supervisor.start_link(children, opts) - 2. Describe the outgoing connections from your service: - - Util.Grpc.setup([ - Util.Grpc.Client.new(:user_service, "localhost:50051", UserApi.Stub), - Util.Grpc.Client.new(:billing_service, "localhost:50051", BillingApi.Stub) - ]) - - 3. User Grpc.do to communicate with your upstream services: + 3. Use Grpc.call to communicate with your upstream services: req = ExampleApi.DescribeRequest.new(name: "a") @@ -39,29 +37,14 @@ defmodule Util.Grpc do In case of errors, log messages are logged via the Logger module. """ - @type client_name :: String.t() - - def start_link do - Util.Grpc.State.start_link() - end - - @doc """ - Sets up Grpc connection information. - - Example: + alias Util.GrpcX.State + alias Util.GrpcX.Client - Util.Grpc.setup([ - Client.new(:service_a, Application.get_env("SERVICE_A_ENDPOINT"), :info, true) - Client.new(:service_b, Application.get_env("SERVICE_B_ENDPOINT"), :info, true) - ]) - - """ - def setup(clients) do - :ok = State.reset() - - Enum.each(clients, fn c -> Util.Grpc.State.add_client(c) end) + @type client_name :: String.t() - :ok + @spec start_link([Util.Grpc.Client.t()]) :: {:ok, pid()} | {:error, any()} + def start_link(clients) do + Util.GrpcX.State.start_link(clients) end @doc """ @@ -71,14 +54,10 @@ defmodule Util.Grpc do 2. Sends the RPC request 3. Waits for the result, or times out """ - @spec rpc(client_name(), GRPC.Channel.t(), function()) :: - {:ok, any()} - | {:error, :failed_to_connect, any()} - | {:error, :timeout, any()} - | {:error, any()} - def call(client_name, callback) do + @spec call(client_name(), Atom.t(), any(), any()) :: Util.GrpcX.RPCCall.response() + def call(client_name, method_name, request, opts \\ []) do {:ok, client} = State.find_client(client_name) - Client.rpc(client, callback) + Client.call(client, method_name, request, opts) end end diff --git a/lib/util/grpc_x/client.ex b/lib/util/grpc_x/client.ex new file mode 100644 index 0000000..43ad638 --- /dev/null +++ b/lib/util/grpc_x/client.ex @@ -0,0 +1,42 @@ +defmodule Util.GrpcX.Client do + require Logger + + alias Util.GrpcX.RPCCall + + @enforce_keys [:name, :endpoint, :timeout, :log_level, :publish_metrics, :stub] + + defstruct @enforce_keys + + @type t() :: %__MODULE__{ + name: String.t(), + endpoint: String.t(), + timeout: number(), + log_level: atom(), + publish_metrics: boolean(), + stub: Grpc.Stub.t() + } + + # 30 seconds + @default_timeout 30_000 + + def new(name, endpoint, stub), do: new(name, endpoint, stub, @default_timeout, :info, true) + def new(name, endpoint, stub, timeout), do: new(name, endpoint, stub, timeout, :info, true) + + def new(name, endpoint, stub, timeout, log_level, publish_metrics) do + %__MODULE__{ + name: name, + endpoint: endpoint, + timeout: timeout, + log_level: log_level, + stub: stub, + publish_metrics: publish_metrics + } + end + + @spec call(Client.t(), atom(), any, any) :: {:ok, any} | {:error, any} + def call(client, method_name, request, opts \\ []) do + rpc_call = RPCCall.new(client, method_name, request, opts) + + Wormhole.capture(fn -> RPCCall.execute(rpc_call) end, timeout: rpc_call.timeout) + end +end diff --git a/lib/util/grpc_x/rpc_call.ex b/lib/util/grpc_x/rpc_call.ex new file mode 100644 index 0000000..9115103 --- /dev/null +++ b/lib/util/grpc_x/rpc_call.ex @@ -0,0 +1,98 @@ +defmodule Util.GrpcX.RPCCall do + require Logger + + @type response :: + {:ok, any()} + | {:error, :failed_to_connect, any()} + | {:error, :timeout, any()} + | {:error, any()} + + def new(client, method_name, request, opts) do + default_call_opts = [timeout: client.timeout] + call_opts = Keyword.merge(default_call_opts, opts) + + %{ + endpoint: client.endpoint, + stub: client.stub, + method_name: method_name, + request: request, + log_level: client.log_level, + opts: call_opts, + publish_metrics: client.publish_metrics, + metric_prefix: "grpc.#{client.name}.#{method_name}", + timeout: client.timeout + } + end + + @spec execute(RPRCall.t()) :: response() + def execute(rpc_call) do + benchmark(rpc_call, fn -> + with {:ok, channel} <- connect(rpc_call) do + result = send_req(rpc_call, channel) + + disconnect(channel) + + result + end + end) + end + + defp connect(rpc_call) do + inc(rpc_call, "connect.count") + + case GRPC.Stub.connect(rpc_call.endpoint) do + {:ok, channel} -> + {:ok, channel} + + {:error, err} -> + inc(rpc_call, "connect.failure.count") + log(rpc_call, "Failed to connect") + + {:error, err} + end + end + + defp disconnect(channel) do + GRPC.Stub.disconnect(channel) + end + + defp send_req(rpc, channel) do + inc(rpc, "request.count") + + case do_call(rpc, channel) do + {:ok, result} -> + inc(rpc, "response.success.count") + {:ok, result} + + {:error, err} -> + inc(rpc, "response.error.count") + log(rpc, "response error err='#{inspect(err)}'") + + {:error, err} + end + end + + defp do_call(rpc, channel) do + GRPC.Stub.call(rpc.stub, rpc.method_name, channel, rpc.request, rpc.opts) + rescue + e -> {:error, e} + end + + defp inc(rpc, metric) do + if rpc.publish_metrics do + Watchman.increment("#{rpc.metric_prefix}.#{metric}") + end + end + + defp benchmark(rpc, cb) do + if rpc.publish_metrics do + Watchman.benchmark("#{rpc.metric_prefix}.duration", cb) + else + cb.() + end + end + + defp log(rpc, msg) do + Logger.log(rpc.log_level, "GrpcX: #{rpc.client_name} #{rpc.method_name} #{msg}") + end +end diff --git a/lib/util/grpc_x/state.ex b/lib/util/grpc_x/state.ex new file mode 100644 index 0000000..5e05893 --- /dev/null +++ b/lib/util/grpc_x/state.ex @@ -0,0 +1,18 @@ +defmodule Util.GrpcX.State do + use Agent + + def start_link(clients) do + state = + clients + |> Enum.map(fn c -> {c.name, c} end) + |> Enum.into(%{}) + + Agent.start_link(fn -> state end, name: __MODULE__) + end + + def find_client(name) do + state = Agent.get(__MODULE__, &Function.identity/1) + + Map.fetch(state, name) + end +end diff --git a/lib/util/result.ex b/lib/util/result.ex new file mode 100644 index 0000000..efc8a4d --- /dev/null +++ b/lib/util/result.ex @@ -0,0 +1,83 @@ +defmodule Util.Result do + @doc """ + A result monad similar to one that exists in Rust/Haskell/... + + The main use case is to have an easily pipeble {:ok, val} | {:error, val}. + + result = endpoint + |> Result.ok() + |> Result.then(fn endpoint -> connect(endpoint) end) + |> Result.then(fn channel -> send_req(channel, "hello") end) + |> Result.then(fn result -> submit_metrics(result) end) + + case result do + {:ok, val} -> val... + {:error, err} -> err... + end + + The above code would be identival to either a with statement: + + with {:ok, channel} <- connect(endpoint), + {:ok, result} <- send_req(channel, "hello"), + {:ok, result} <- submit_metrics(result) do + val + else + {:error, error} -> error... + end + + Or to a series of pipable functions that pattern match on :ok, :error: + + result = {:ok, endpoint} + |> connect(endpoint) end) + |> send_req(channel, "hello") + |> submit_metrics(result) + + def connect({:ok, endpoint}), do: ... + def connect({:error, err}), do: {:error, err} + + def send_req({:ok, channel}), do: ... + def send_req({:error, err}), do: {:error, err} + + def submit_metrics({:ok, result}), do: ... + def submit_metrics({:error, err}), do: {:error, err} + + So why would you choose Result over with or pattern matched functions: + + 1. "with" is not working hand-in-hand with pipes, and usually it prevents + you from splitting the pipeing logic into multiple functions + + 2. Functions and pattern matching is just too many boilerplate. + + """ + + @type then_function :: (ok_val() -> any) + @type ok_val :: {:ok, any} + @type error_val :: {:error, any} + + @type t() :: ok_val() | error_val() + + @spec ok(any()) :: ok_val() + def ok(val), do: {:ok, val} + + @spec wrap(any()) :: ok_val() + def wrap(val), do: ok(val) + + @spec error(any) :: error_val() + def error(val), do: {:error, val} + + @spec then(any, then_function) :: any + def then({:ok, val}, f), do: f.(val) + def then(anything_else, _f), do: anything_else + + @spec unwrap(Result.t()) :: any() + def unwrap({:ok, val}), do: val + def unwrap(any), do: any + + @spec ok?(Result.t()) :: boolean() + def ok?({:ok, _}), do: true + def ok?(_), do: false + + @spec error?(Result.t()) :: boolean() + def error?({:error, _}), do: false + def error?(_), do: true +end diff --git a/mix.exs b/mix.exs index 5dc5f8c..88ae6f6 100644 --- a/mix.exs +++ b/mix.exs @@ -2,12 +2,14 @@ defmodule Util.Mixfile do use Mix.Project def project do - [app: :util, - version: "0.0.1", - elixir: "~> 1.4", - build_embedded: Mix.env == :prod, - start_permanent: Mix.env == :prod, - deps: deps()] + [ + app: :util, + version: "0.0.1", + elixir: "~> 1.4", + build_embedded: Mix.env() == :prod, + start_permanent: Mix.env() == :prod, + deps: deps() + ] end def application do @@ -19,7 +21,8 @@ defmodule Util.Mixfile do {:watchman, github: "renderedtext/ex-watchman"}, {:wormhole, "~> 2.2"}, {:protobuf, "~> 0.5"}, - {:mock, "~> 0.3.0", only: :test}, + {:grpc, "0.5.0-beta.1", override: true}, + {:mock, "~> 0.3.0", only: :test} ] end end diff --git a/mix.lock b/mix.lock index 7fa690f..3cae27e 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,12 @@ -%{"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, +%{ + "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"}, + "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"}, + "grpc": {:hex, :grpc, "0.5.0-beta.1", "7d43f52e138fe261f5b4981f1ada515dfc2e1bfa9dc92c7022e8f41e7e49b571", [:mix], [{:cowboy, "~> 2.7.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:gun, "~> 2.0.0", [hex: :grpc_gun, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.5", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "fbbf8872935c295b7575435fe4128372c23c6ded89c2ef8058af3c6167bb3f65"}, + "gun": {:hex, :grpc_gun, "2.0.0", "f99678a2ab975e74372a756c86ec30a8384d3ac8a8b86c7ed6243ef4e61d2729", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "03dbbca1a9c604a0267a40ea1d69986225091acb822de0b2dbea21d5815e410b"}, + "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, "mock": {:hex, :mock, "0.3.7", "75b3bbf1466d7e486ea2052a73c6e062c6256fb429d6797999ab02fa32f29e03", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "4da49a4609e41fd99b7836945c26f373623ea968cfb6282742bcb94440cf7e5c"}, "protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm", "994348a4592408bc99c132603b0fdb686a2b5df0321a8eb1a582ec2bd3495886"}, + "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, "watchman": {:git, "https://github.com/renderedtext/ex-watchman.git", "3286b9d999db11696fffbccab9637cd31e93a305", []}, - "wormhole": {:hex, :wormhole, "2.2.0", "4fa107a2a1cf4c0ada5d0ee29590fa78ce684993e6950a52c1488cb9fe313f31", [:mix], [], "hexpm", "945388051723c02a5bd2cd404f0bcc04008a415390235b1ad69e22dee8c38de3"}} + "wormhole": {:hex, :wormhole, "2.2.0", "4fa107a2a1cf4c0ada5d0ee29590fa78ce684993e6950a52c1488cb9fe313f31", [:mix], [], "hexpm", "945388051723c02a5bd2cd404f0bcc04008a415390235b1ad69e22dee8c38de3"}, +} diff --git a/test/grpc_x_test.exs b/test/grpc_x_test.exs new file mode 100644 index 0000000..0b51602 --- /dev/null +++ b/test/grpc_x_test.exs @@ -0,0 +1,20 @@ +defmodule Util.GrpcXTest do + use ExUnit.Case + + alias Util.GrpcX + + setup_all do + clients = [ + Util.GrpcX.Client.new(:service_a, "localhost:50051", :test, 1000), + Util.GrpcX.Client.new(:service_b, "localhost:50051", :test, 1000) + ] + + {:ok, pid} = Util.GrpcX.start_link(clients) + + on_exit(fn -> Process.exit(pid, :kill) end) + end + + test "first test" do + assert {:timeout, 1000} = GrpcX.call(:service_a, :describe, %{}) + end +end From 332b9f5722421e734a4b08bb6c532e754ae5a97c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Mon, 11 Apr 2022 08:53:53 +0000 Subject: [PATCH 4/6] Install stubs for calling out to a remote server --- lib/util/grpc_x/client.ex | 8 +++++--- mix.exs | 5 +++++ test/grpc_x_test.exs | 32 ++++++++++++++++++++++++-------- test/protos/helloworld.pb.ex | 31 +++++++++++++++++++++++++++++++ test/protos/helloworld.proto | 24 ++++++++++++++++++++++++ test/protos/server.ex | 12 ++++++++++++ 6 files changed, 101 insertions(+), 11 deletions(-) create mode 100644 test/protos/helloworld.pb.ex create mode 100644 test/protos/helloworld.proto create mode 100644 test/protos/server.ex diff --git a/lib/util/grpc_x/client.ex b/lib/util/grpc_x/client.ex index 43ad638..ed317db 100644 --- a/lib/util/grpc_x/client.ex +++ b/lib/util/grpc_x/client.ex @@ -18,11 +18,13 @@ defmodule Util.GrpcX.Client do # 30 seconds @default_timeout 30_000 + @default_log_level :info - def new(name, endpoint, stub), do: new(name, endpoint, stub, @default_timeout, :info, true) - def new(name, endpoint, stub, timeout), do: new(name, endpoint, stub, timeout, :info, true) + def new(name, endpoint, stub, opts \\ []) do + timeout = Keyword.get(opts, :timeout, @default_timeout) + log_level = Keyword.get(opts, :log_level, @default_log_level) + publish_metrics = Keyword.get(opts, :publish_metrics, true) - def new(name, endpoint, stub, timeout, log_level, publish_metrics) do %__MODULE__{ name: name, endpoint: endpoint, diff --git a/mix.exs b/mix.exs index 88ae6f6..c5533e4 100644 --- a/mix.exs +++ b/mix.exs @@ -6,6 +6,7 @@ defmodule Util.Mixfile do app: :util, version: "0.0.1", elixir: "~> 1.4", + elixirc_paths: elixirc_paths(Mix.env()), build_embedded: Mix.env() == :prod, start_permanent: Mix.env() == :prod, deps: deps() @@ -16,6 +17,10 @@ defmodule Util.Mixfile do [extra_applications: [:logger]] end + def elixirc_paths(:test), do: ["lib", "test/protos"] + def elixirc_paths(:dev), do: ["lib", "test/protos"] + def elixirc_paths(_), do: ["lib"] + defp deps do [ {:watchman, github: "renderedtext/ex-watchman"}, diff --git a/test/grpc_x_test.exs b/test/grpc_x_test.exs index 0b51602..b1669a2 100644 --- a/test/grpc_x_test.exs +++ b/test/grpc_x_test.exs @@ -4,17 +4,33 @@ defmodule Util.GrpcXTest do alias Util.GrpcX setup_all do - clients = [ - Util.GrpcX.Client.new(:service_a, "localhost:50051", :test, 1000), - Util.GrpcX.Client.new(:service_b, "localhost:50051", :test, 1000) - ] + server_pid = start_helloworld_server() + clients_pid = start_clients() - {:ok, pid} = Util.GrpcX.start_link(clients) + on_exit(fn -> + Process.exit(server_pid, :kill) + Process.exit(clients_pid, :kill) + end) + end - on_exit(fn -> Process.exit(pid, :kill) end) + test "connecting to an existing services works and returns an {:ok, reply} tuple" do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + + assert {:ok, reply} = GrpcX.call(:hello_service, :say_hello, req) + assert reply.message == "Hello shiroyasha" end - test "first test" do - assert {:timeout, 1000} = GrpcX.call(:service_a, :describe, %{}) + def start_helloworld_server() do + spawn_link(fn -> + GRPC.Server.start(Helloworld.Greeter.Server, 50_051) + end) + end + + def start_clients() do + clients = [ + Util.GrpcX.Client.new(:hello_service, "localhost:50051", Helloworld.Greeter.Stub) + ] + + {:ok, client_pid} = Util.GrpcX.start_link(clients) end end diff --git a/test/protos/helloworld.pb.ex b/test/protos/helloworld.pb.ex new file mode 100644 index 0000000..bc737a0 --- /dev/null +++ b/test/protos/helloworld.pb.ex @@ -0,0 +1,31 @@ +defmodule Helloworld.HelloRequest do + use Protobuf + + @type t :: %__MODULE__{ + name: String.t() + } + defstruct [:name] + + field(:name, 1, optional: true, type: :string) +end + +defmodule Helloworld.HelloReply do + use Protobuf + + @type t :: %__MODULE__{ + message: String.t() + } + defstruct [:message] + + field(:message, 1, optional: true, type: :string) +end + +defmodule Helloworld.Greeter.Service do + use GRPC.Service, name: "helloworld.Greeter" + + rpc(:SayHello, Helloworld.HelloRequest, Helloworld.HelloReply) +end + +defmodule Helloworld.Greeter.Stub do + use GRPC.Stub, service: Helloworld.Greeter.Service +end diff --git a/test/protos/helloworld.proto b/test/protos/helloworld.proto new file mode 100644 index 0000000..688974b --- /dev/null +++ b/test/protos/helloworld.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/test/protos/server.ex b/test/protos/server.ex new file mode 100644 index 0000000..4cad11f --- /dev/null +++ b/test/protos/server.ex @@ -0,0 +1,12 @@ +defmodule Helloworld.Greeter.Server do + use GRPC.Server, service: Helloworld.Greeter.Service + + alias Helloworld.{HelloReply, HelloRequest} + + @spec say_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: HelloReply.t() + def say_hello(request, _stream) do + IO.inspect("Say Hello requested") + + Helloworld.HelloReply.new(message: "Hello #{request.name}") + end +end From a45ab9e3a506ab8e5be61b7177326d78ca9b0b17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Tue, 12 Apr 2022 08:26:17 +0000 Subject: [PATCH 5/6] Full tests --- lib/util/grpc_x.ex | 8 ++- lib/util/grpc_x/client.ex | 7 ++- lib/util/grpc_x/rpc_call.ex | 28 +++++---- test/grpc_x_test.exs | 112 +++++++++++++++++++++++++++++++++--- test/protos/server.ex | 12 +++- 5 files changed, 144 insertions(+), 23 deletions(-) diff --git a/lib/util/grpc_x.ex b/lib/util/grpc_x.ex index 8d8ff7b..279a764 100644 --- a/lib/util/grpc_x.ex +++ b/lib/util/grpc_x.ex @@ -56,8 +56,12 @@ defmodule Util.GrpcX do """ @spec call(client_name(), Atom.t(), any(), any()) :: Util.GrpcX.RPCCall.response() def call(client_name, method_name, request, opts \\ []) do - {:ok, client} = State.find_client(client_name) + case State.find_client(client_name) do + {:ok, client} -> + Client.call(client, method_name, request, opts) - Client.call(client, method_name, request, opts) + :error -> + {:error, "GrpcX client with name='#{client_name}' not registered in GrpcX"} + end end end diff --git a/lib/util/grpc_x/client.ex b/lib/util/grpc_x/client.ex index ed317db..96ef8bf 100644 --- a/lib/util/grpc_x/client.ex +++ b/lib/util/grpc_x/client.ex @@ -39,6 +39,11 @@ defmodule Util.GrpcX.Client do def call(client, method_name, request, opts \\ []) do rpc_call = RPCCall.new(client, method_name, request, opts) - Wormhole.capture(fn -> RPCCall.execute(rpc_call) end, timeout: rpc_call.timeout) + result = Wormhole.capture(fn -> RPCCall.execute(rpc_call) end, timeout: rpc_call.timeout) + + case result do + {:ok, result} -> result + {:error, result} -> {:error, result} + end end end diff --git a/lib/util/grpc_x/rpc_call.ex b/lib/util/grpc_x/rpc_call.ex index 9115103..e05a5a1 100644 --- a/lib/util/grpc_x/rpc_call.ex +++ b/lib/util/grpc_x/rpc_call.ex @@ -13,6 +13,7 @@ defmodule Util.GrpcX.RPCCall do %{ endpoint: client.endpoint, + client_name: client.name, stub: client.stub, method_name: method_name, request: request, @@ -37,16 +38,16 @@ defmodule Util.GrpcX.RPCCall do end) end - defp connect(rpc_call) do - inc(rpc_call, "connect.count") + defp connect(rpc) do + inc(rpc, "connect.count") - case GRPC.Stub.connect(rpc_call.endpoint) do + case GRPC.Stub.connect(rpc.endpoint) do {:ok, channel} -> {:ok, channel} {:error, err} -> - inc(rpc_call, "connect.failure.count") - log(rpc_call, "Failed to connect") + inc(rpc, "connect.error.count") + log_err(rpc, "failed to connect to #{rpc.endpoint}") {:error, err} end @@ -64,18 +65,22 @@ defmodule Util.GrpcX.RPCCall do inc(rpc, "response.success.count") {:ok, result} + {:error, {:unknown_rpc, _}} = e -> + e + {:error, err} -> inc(rpc, "response.error.count") - log(rpc, "response error err='#{inspect(err)}'") + log_err(rpc, "err='#{inspect(err)}'") {:error, err} end end defp do_call(rpc, channel) do - GRPC.Stub.call(rpc.stub, rpc.method_name, channel, rpc.request, rpc.opts) + apply(rpc.stub, rpc.method_name, [channel, rpc.request, rpc.opts]) rescue - e -> {:error, e} + e in UndefinedFunctionError -> + {:error, {:unknown_rpc, "no RPC method named='#{e.function}'"}} end defp inc(rpc, metric) do @@ -92,7 +97,10 @@ defmodule Util.GrpcX.RPCCall do end end - defp log(rpc, msg) do - Logger.log(rpc.log_level, "GrpcX: #{rpc.client_name} #{rpc.method_name} #{msg}") + defp log_err(rpc, msg) do + Logger.log( + rpc.log_level, + "GrpcX ERROR client='#{rpc.client_name}' rpc='#{rpc.method_name}' #{msg}" + ) end end diff --git a/test/grpc_x_test.exs b/test/grpc_x_test.exs index b1669a2..12422e9 100644 --- a/test/grpc_x_test.exs +++ b/test/grpc_x_test.exs @@ -1,11 +1,12 @@ defmodule Util.GrpcXTest do use ExUnit.Case + import Mock alias Util.GrpcX setup_all do - server_pid = start_helloworld_server() - clients_pid = start_clients() + {:ok, server_pid} = start_helloworld_server() + {:ok, clients_pid} = start_clients() on_exit(fn -> Process.exit(server_pid, :kill) @@ -13,24 +14,119 @@ defmodule Util.GrpcXTest do end) end - test "connecting to an existing services works and returns an {:ok, reply} tuple" do + test "it can connect to existing services, result is in form {:ok, reply}" do req = Helloworld.HelloRequest.new(name: "shiroyasha") assert {:ok, reply} = GrpcX.call(:hello_service, :say_hello, req) assert reply.message == "Hello shiroyasha" end + test "it raises an error if you pass an unknown service name" do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + + assert {:error, err} = GrpcX.call(:hellooooooo, :say_hello, req) + assert err == "GrpcX client with name='hellooooooo' not registered in GrpcX" + end + + test "it raises an error if you request an unknown rpc method" do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + + assert {:error, {:unknown_rpc, err}} = GrpcX.call(:hello_service, :describe, req) + + assert err == "no RPC method named='describe'" + end + + test "it timeouts long calls" do + req = Helloworld.HelloRequest.new(name: "please take a long time") + + assert {:error, err} = GrpcX.call(:hello_service, :say_hello, req, timeout: 500) + assert err == %GRPC.RPCError{message: "Deadline expired", status: 4} + end + + test "it reports connection errors" do + req = Helloworld.HelloRequest.new(name: "please take a long time") + + assert {:error, err} = GrpcX.call(:not_running_service, :say_hello, req) + assert err == "Error when opening connection: :timeout" + end + + describe "when metrics are disabled" do + test_with_mock "no increments are submitted", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service_not_measured, :say_hello, req) + + assert_not_called(Watchman.increment(:_)) + end + + test_with_mock "no benchmarks are submitted", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service_not_measured, :say_hello, req) + + assert_not_called(Watchman.benchmark(:_, :_)) + end + end + + describe "when metrics are enabled" do + test_with_mock "it measures number of connections", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.connect.count")) + end + + test_with_mock "it measures number of requests", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.request.count")) + end + + test_with_mock "it measures connection failures", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:not_running_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.not_running_service.say_hello.connect.error.count")) + end + + test_with_mock "it measures response successes", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.response.success.count")) + end + + test_with_mock "it measures response errors", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "please fail") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.response.error.count")) + end + + test_with_mock "it measures the duration of the rpc call", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.benchmark("grpc.hello_service.say_hello.duration", :_)) + end + end + def start_helloworld_server() do - spawn_link(fn -> - GRPC.Server.start(Helloworld.Greeter.Server, 50_051) - end) + {:ok, pid, _} = GRPC.Server.start(Helloworld.Greeter.Server, 50_052) + {:ok, pid} end def start_clients() do clients = [ - Util.GrpcX.Client.new(:hello_service, "localhost:50051", Helloworld.Greeter.Stub) + Util.GrpcX.Client.new(:hello_service, "localhost:50052", Helloworld.Greeter.Stub), + Util.GrpcX.Client.new(:not_running_service, "localhost:60000", Helloworld.Greeter.Stub), + Util.GrpcX.Client.new( + :hello_service_not_measured, + "localhost:50052", + Helloworld.Greeter.Stub, + publish_metrics: false + ) ] - {:ok, client_pid} = Util.GrpcX.start_link(clients) + Util.GrpcX.start_link(clients) end end diff --git a/test/protos/server.ex b/test/protos/server.ex index 4cad11f..d76ce21 100644 --- a/test/protos/server.ex +++ b/test/protos/server.ex @@ -5,8 +5,16 @@ defmodule Helloworld.Greeter.Server do @spec say_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: HelloReply.t() def say_hello(request, _stream) do - IO.inspect("Say Hello requested") + case request.name do + "please take a long time" -> + :timer.sleep(60_000) + Helloworld.HelloReply.new(message: "Hello") - Helloworld.HelloReply.new(message: "Hello #{request.name}") + "please fail" -> + raise "I'm failing" + + name -> + Helloworld.HelloReply.new(message: "Hello #{name}") + end end end From 5162d2cf14818751fc0d4a03795fbcfbfd88b962 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Igor=20=C5=A0ar=C4=8Devi=C4=87?= Date: Tue, 12 Apr 2022 08:48:14 +0000 Subject: [PATCH 6/6] Introduce middlewares --- .../middlewares/response_status_to_error.ex | 23 +++++++++++++++++++ lib/util/grpc_x/rpc_call.ex | 2 ++ test/grpc_x_test.exs | 20 +++++++++++++--- 3 files changed, 42 insertions(+), 3 deletions(-) create mode 100644 lib/util/grpc_x/middlewares/response_status_to_error.ex diff --git a/lib/util/grpc_x/middlewares/response_status_to_error.ex b/lib/util/grpc_x/middlewares/response_status_to_error.ex new file mode 100644 index 0000000..d307cdf --- /dev/null +++ b/lib/util/grpc_x/middlewares/response_status_to_error.ex @@ -0,0 +1,23 @@ +defmodule Util.GrpcX.Middlewares.ResponseStatusToErorr do + def call(req, opts, next) do + resp = next.(req, opts) + + case resp do + {:ok, reply} -> + if Map.has_key?(reply, :response_status) do + status = Map.fetch!(reply, :response_status) + + if status.code == 0 do + {:ok, reply} + else + {:error, reply} + end + else + {:ok, reply} + end + + any -> + any + end + end +end diff --git a/lib/util/grpc_x/rpc_call.ex b/lib/util/grpc_x/rpc_call.ex index e05a5a1..09d5aa4 100644 --- a/lib/util/grpc_x/rpc_call.ex +++ b/lib/util/grpc_x/rpc_call.ex @@ -77,6 +77,8 @@ defmodule Util.GrpcX.RPCCall do end defp do_call(rpc, channel) do + # todo handle middlewares + apply(rpc.stub, rpc.method_name, [channel, rpc.request, rpc.opts]) rescue e in UndefinedFunctionError -> diff --git a/test/grpc_x_test.exs b/test/grpc_x_test.exs index 12422e9..19a6620 100644 --- a/test/grpc_x_test.exs +++ b/test/grpc_x_test.exs @@ -117,14 +117,28 @@ defmodule Util.GrpcXTest do def start_clients() do clients = [ - Util.GrpcX.Client.new(:hello_service, "localhost:50052", Helloworld.Greeter.Stub), - Util.GrpcX.Client.new(:not_running_service, "localhost:60000", Helloworld.Greeter.Stub), + Util.GrpcX.Client.new( + :hello_service, + "localhost:50052", + Helloworld.Greeter.Stub + ), + Util.GrpcX.Client.new( + :not_running_service, + "localhost:60000", + Helloworld.Greeter.Stub + ), Util.GrpcX.Client.new( :hello_service_not_measured, "localhost:50052", Helloworld.Greeter.Stub, publish_metrics: false - ) + ), + Util.GrpcX.Client.new( + :hello_service, + "localhost:50052", + Helloworld.Greeter.Stub + response_middlewares: [Util.GrpcX.Middlewares.ResponseStatusToStatus] + ), ] Util.GrpcX.start_link(clients)