diff --git a/lib/util/grpc_x.ex b/lib/util/grpc_x.ex new file mode 100644 index 0000000..279a764 --- /dev/null +++ b/lib/util/grpc_x.ex @@ -0,0 +1,67 @@ +defmodule Util.GrpcX do + @moduledoc """ + This module provides an opiniated interface for communicating with + GRPC services. It handles starting and closing connections, logging, + metrics, and proper error handling. + + Usage: + + 1. First, add Util.Grpc to your application's supervision tree. + This process keeps the configuration values for your Grpc clients. + + grpc_clients = [ + Util.Grpc.Client.new(:user_service, "localhost:50051", UserApi.Stub), + Util.Grpc.Client.new(:billing_service, "localhost:50051", BillingApi.Stub) + ] + + children = [ + worker(Util.Grpc, gprc_clients) + ] + + Supervisor.start_link(children, opts) + + 3. Use Grpc.call to communicate with your upstream services: + + req = ExampleApi.DescribeRequest.new(name: "a") + + {:ok, res} = Util.Grpc.call(:user_service, :describe, req) + + During the execution of the call, the following metrics are published: + + - gprc...duration + - gprc...connect + - gprc...connect.error.count + - gprc...response.success.count + - gprc...response.error.count + + In case of errors, log messages are logged via the Logger module. + """ + + alias Util.GrpcX.State + alias Util.GrpcX.Client + + @type client_name :: String.t() + + @spec start_link([Util.Grpc.Client.t()]) :: {:ok, pid()} | {:error, any()} + def start_link(clients) do + Util.GrpcX.State.start_link(clients) + end + + @doc """ + Executes a stateless RPC call to a remote service. + + 1. It opens a new connection + 2. Sends the RPC request + 3. Waits for the result, or times out + """ + @spec call(client_name(), Atom.t(), any(), any()) :: Util.GrpcX.RPCCall.response() + def call(client_name, method_name, request, opts \\ []) do + case State.find_client(client_name) do + {:ok, client} -> + Client.call(client, method_name, request, opts) + + :error -> + {:error, "GrpcX client with name='#{client_name}' not registered in GrpcX"} + end + end +end diff --git a/lib/util/grpc_x/client.ex b/lib/util/grpc_x/client.ex new file mode 100644 index 0000000..96ef8bf --- /dev/null +++ b/lib/util/grpc_x/client.ex @@ -0,0 +1,49 @@ +defmodule Util.GrpcX.Client do + require Logger + + alias Util.GrpcX.RPCCall + + @enforce_keys [:name, :endpoint, :timeout, :log_level, :publish_metrics, :stub] + + defstruct @enforce_keys + + @type t() :: %__MODULE__{ + name: String.t(), + endpoint: String.t(), + timeout: number(), + log_level: atom(), + publish_metrics: boolean(), + stub: Grpc.Stub.t() + } + + # 30 seconds + @default_timeout 30_000 + @default_log_level :info + + def new(name, endpoint, stub, opts \\ []) do + timeout = Keyword.get(opts, :timeout, @default_timeout) + log_level = Keyword.get(opts, :log_level, @default_log_level) + publish_metrics = Keyword.get(opts, :publish_metrics, true) + + %__MODULE__{ + name: name, + endpoint: endpoint, + timeout: timeout, + log_level: log_level, + stub: stub, + publish_metrics: publish_metrics + } + end + + @spec call(Client.t(), atom(), any, any) :: {:ok, any} | {:error, any} + def call(client, method_name, request, opts \\ []) do + rpc_call = RPCCall.new(client, method_name, request, opts) + + result = Wormhole.capture(fn -> RPCCall.execute(rpc_call) end, timeout: rpc_call.timeout) + + case result do + {:ok, result} -> result + {:error, result} -> {:error, result} + end + end +end diff --git a/lib/util/grpc_x/middlewares/response_status_to_error.ex b/lib/util/grpc_x/middlewares/response_status_to_error.ex new file mode 100644 index 0000000..d307cdf --- /dev/null +++ b/lib/util/grpc_x/middlewares/response_status_to_error.ex @@ -0,0 +1,23 @@ +defmodule Util.GrpcX.Middlewares.ResponseStatusToErorr do + def call(req, opts, next) do + resp = next.(req, opts) + + case resp do + {:ok, reply} -> + if Map.has_key?(reply, :response_status) do + status = Map.fetch!(reply, :response_status) + + if status.code == 0 do + {:ok, reply} + else + {:error, reply} + end + else + {:ok, reply} + end + + any -> + any + end + end +end diff --git a/lib/util/grpc_x/rpc_call.ex b/lib/util/grpc_x/rpc_call.ex new file mode 100644 index 0000000..09d5aa4 --- /dev/null +++ b/lib/util/grpc_x/rpc_call.ex @@ -0,0 +1,108 @@ +defmodule Util.GrpcX.RPCCall do + require Logger + + @type response :: + {:ok, any()} + | {:error, :failed_to_connect, any()} + | {:error, :timeout, any()} + | {:error, any()} + + def new(client, method_name, request, opts) do + default_call_opts = [timeout: client.timeout] + call_opts = Keyword.merge(default_call_opts, opts) + + %{ + endpoint: client.endpoint, + client_name: client.name, + stub: client.stub, + method_name: method_name, + request: request, + log_level: client.log_level, + opts: call_opts, + publish_metrics: client.publish_metrics, + metric_prefix: "grpc.#{client.name}.#{method_name}", + timeout: client.timeout + } + end + + @spec execute(RPRCall.t()) :: response() + def execute(rpc_call) do + benchmark(rpc_call, fn -> + with {:ok, channel} <- connect(rpc_call) do + result = send_req(rpc_call, channel) + + disconnect(channel) + + result + end + end) + end + + defp connect(rpc) do + inc(rpc, "connect.count") + + case GRPC.Stub.connect(rpc.endpoint) do + {:ok, channel} -> + {:ok, channel} + + {:error, err} -> + inc(rpc, "connect.error.count") + log_err(rpc, "failed to connect to #{rpc.endpoint}") + + {:error, err} + end + end + + defp disconnect(channel) do + GRPC.Stub.disconnect(channel) + end + + defp send_req(rpc, channel) do + inc(rpc, "request.count") + + case do_call(rpc, channel) do + {:ok, result} -> + inc(rpc, "response.success.count") + {:ok, result} + + {:error, {:unknown_rpc, _}} = e -> + e + + {:error, err} -> + inc(rpc, "response.error.count") + log_err(rpc, "err='#{inspect(err)}'") + + {:error, err} + end + end + + defp do_call(rpc, channel) do + # todo handle middlewares + + apply(rpc.stub, rpc.method_name, [channel, rpc.request, rpc.opts]) + rescue + e in UndefinedFunctionError -> + {:error, {:unknown_rpc, "no RPC method named='#{e.function}'"}} + end + + defp inc(rpc, metric) do + if rpc.publish_metrics do + Watchman.increment("#{rpc.metric_prefix}.#{metric}") + end + end + + defp benchmark(rpc, cb) do + if rpc.publish_metrics do + Watchman.benchmark("#{rpc.metric_prefix}.duration", cb) + else + cb.() + end + end + + defp log_err(rpc, msg) do + Logger.log( + rpc.log_level, + "GrpcX ERROR client='#{rpc.client_name}' rpc='#{rpc.method_name}' #{msg}" + ) + end +end diff --git a/lib/util/grpc_x/state.ex b/lib/util/grpc_x/state.ex new file mode 100644 index 0000000..5e05893 --- /dev/null +++ b/lib/util/grpc_x/state.ex @@ -0,0 +1,18 @@ +defmodule Util.GrpcX.State do + use Agent + + def start_link(clients) do + state = + clients + |> Enum.map(fn c -> {c.name, c} end) + |> Enum.into(%{}) + + Agent.start_link(fn -> state end, name: __MODULE__) + end + + def find_client(name) do + state = Agent.get(__MODULE__, &Function.identity/1) + + Map.fetch(state, name) + end +end diff --git a/lib/util/result.ex b/lib/util/result.ex new file mode 100644 index 0000000..efc8a4d --- /dev/null +++ b/lib/util/result.ex @@ -0,0 +1,83 @@ +defmodule Util.Result do + @doc """ + A result monad similar to one that exists in Rust/Haskell/... + + The main use case is to have an easily pipeble {:ok, val} | {:error, val}. + + result = endpoint + |> Result.ok() + |> Result.then(fn endpoint -> connect(endpoint) end) + |> Result.then(fn channel -> send_req(channel, "hello") end) + |> Result.then(fn result -> submit_metrics(result) end) + + case result do + {:ok, val} -> val... + {:error, err} -> err... + end + + The above code would be identival to either a with statement: + + with {:ok, channel} <- connect(endpoint), + {:ok, result} <- send_req(channel, "hello"), + {:ok, result} <- submit_metrics(result) do + val + else + {:error, error} -> error... + end + + Or to a series of pipable functions that pattern match on :ok, :error: + + result = {:ok, endpoint} + |> connect(endpoint) end) + |> send_req(channel, "hello") + |> submit_metrics(result) + + def connect({:ok, endpoint}), do: ... + def connect({:error, err}), do: {:error, err} + + def send_req({:ok, channel}), do: ... + def send_req({:error, err}), do: {:error, err} + + def submit_metrics({:ok, result}), do: ... + def submit_metrics({:error, err}), do: {:error, err} + + So why would you choose Result over with or pattern matched functions: + + 1. "with" is not working hand-in-hand with pipes, and usually it prevents + you from splitting the pipeing logic into multiple functions + + 2. Functions and pattern matching is just too many boilerplate. + + """ + + @type then_function :: (ok_val() -> any) + @type ok_val :: {:ok, any} + @type error_val :: {:error, any} + + @type t() :: ok_val() | error_val() + + @spec ok(any()) :: ok_val() + def ok(val), do: {:ok, val} + + @spec wrap(any()) :: ok_val() + def wrap(val), do: ok(val) + + @spec error(any) :: error_val() + def error(val), do: {:error, val} + + @spec then(any, then_function) :: any + def then({:ok, val}, f), do: f.(val) + def then(anything_else, _f), do: anything_else + + @spec unwrap(Result.t()) :: any() + def unwrap({:ok, val}), do: val + def unwrap(any), do: any + + @spec ok?(Result.t()) :: boolean() + def ok?({:ok, _}), do: true + def ok?(_), do: false + + @spec error?(Result.t()) :: boolean() + def error?({:error, _}), do: false + def error?(_), do: true +end diff --git a/mix.exs b/mix.exs index 5dc5f8c..c5533e4 100644 --- a/mix.exs +++ b/mix.exs @@ -2,24 +2,32 @@ defmodule Util.Mixfile do use Mix.Project def project do - [app: :util, - version: "0.0.1", - elixir: "~> 1.4", - build_embedded: Mix.env == :prod, - start_permanent: Mix.env == :prod, - deps: deps()] + [ + app: :util, + version: "0.0.1", + elixir: "~> 1.4", + elixirc_paths: elixirc_paths(Mix.env()), + build_embedded: Mix.env() == :prod, + start_permanent: Mix.env() == :prod, + deps: deps() + ] end def application do [extra_applications: [:logger]] end + def elixirc_paths(:test), do: ["lib", "test/protos"] + def elixirc_paths(:dev), do: ["lib", "test/protos"] + def elixirc_paths(_), do: ["lib"] + defp deps do [ {:watchman, github: "renderedtext/ex-watchman"}, {:wormhole, "~> 2.2"}, {:protobuf, "~> 0.5"}, - {:mock, "~> 0.3.0", only: :test}, + {:grpc, "0.5.0-beta.1", override: true}, + {:mock, "~> 0.3.0", only: :test} ] end end diff --git a/mix.lock b/mix.lock index 7fa690f..3cae27e 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,12 @@ -%{"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, +%{ + "cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"}, + "cowlib": {:hex, :cowlib, "2.8.0", "fd0ff1787db84ac415b8211573e9a30a3ebe71b5cbff7f720089972b2319c8a4", [:rebar3], [], "hexpm", "79f954a7021b302186a950a32869dbc185523d99d3e44ce430cd1f3289f41ed4"}, + "grpc": {:hex, :grpc, "0.5.0-beta.1", "7d43f52e138fe261f5b4981f1ada515dfc2e1bfa9dc92c7022e8f41e7e49b571", [:mix], [{:cowboy, "~> 2.7.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:gun, "~> 2.0.0", [hex: :grpc_gun, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.5", [hex: :protobuf, repo: "hexpm", optional: false]}], "hexpm", "fbbf8872935c295b7575435fe4128372c23c6ded89c2ef8058af3c6167bb3f65"}, + "gun": {:hex, :grpc_gun, "2.0.0", "f99678a2ab975e74372a756c86ec30a8384d3ac8a8b86c7ed6243ef4e61d2729", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}], "hexpm", "03dbbca1a9c604a0267a40ea1d69986225091acb822de0b2dbea21d5815e410b"}, + "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, "mock": {:hex, :mock, "0.3.7", "75b3bbf1466d7e486ea2052a73c6e062c6256fb429d6797999ab02fa32f29e03", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "4da49a4609e41fd99b7836945c26f373623ea968cfb6282742bcb94440cf7e5c"}, "protobuf": {:hex, :protobuf, "0.5.4", "2e1b8eec211aff034ad8a14e3674220b0158bfb9a3c7128ac9d2a1ed1b3724d3", [:mix], [], "hexpm", "994348a4592408bc99c132603b0fdb686a2b5df0321a8eb1a582ec2bd3495886"}, + "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, "watchman": {:git, "https://github.com/renderedtext/ex-watchman.git", "3286b9d999db11696fffbccab9637cd31e93a305", []}, - "wormhole": {:hex, :wormhole, "2.2.0", "4fa107a2a1cf4c0ada5d0ee29590fa78ce684993e6950a52c1488cb9fe313f31", [:mix], [], "hexpm", "945388051723c02a5bd2cd404f0bcc04008a415390235b1ad69e22dee8c38de3"}} + "wormhole": {:hex, :wormhole, "2.2.0", "4fa107a2a1cf4c0ada5d0ee29590fa78ce684993e6950a52c1488cb9fe313f31", [:mix], [], "hexpm", "945388051723c02a5bd2cd404f0bcc04008a415390235b1ad69e22dee8c38de3"}, +} diff --git a/test/grpc_x_test.exs b/test/grpc_x_test.exs new file mode 100644 index 0000000..19a6620 --- /dev/null +++ b/test/grpc_x_test.exs @@ -0,0 +1,146 @@ +defmodule Util.GrpcXTest do + use ExUnit.Case + import Mock + + alias Util.GrpcX + + setup_all do + {:ok, server_pid} = start_helloworld_server() + {:ok, clients_pid} = start_clients() + + on_exit(fn -> + Process.exit(server_pid, :kill) + Process.exit(clients_pid, :kill) + end) + end + + test "it can connect to existing services, result is in form {:ok, reply}" do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + + assert {:ok, reply} = GrpcX.call(:hello_service, :say_hello, req) + assert reply.message == "Hello shiroyasha" + end + + test "it raises an error if you pass an unknown service name" do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + + assert {:error, err} = GrpcX.call(:hellooooooo, :say_hello, req) + assert err == "GrpcX client with name='hellooooooo' not registered in GrpcX" + end + + test "it raises an error if you request an unknown rpc method" do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + + assert {:error, {:unknown_rpc, err}} = GrpcX.call(:hello_service, :describe, req) + + assert err == "no RPC method named='describe'" + end + + test "it timeouts long calls" do + req = Helloworld.HelloRequest.new(name: "please take a long time") + + assert {:error, err} = GrpcX.call(:hello_service, :say_hello, req, timeout: 500) + assert err == %GRPC.RPCError{message: "Deadline expired", status: 4} + end + + test "it reports connection errors" do + req = Helloworld.HelloRequest.new(name: "please take a long time") + + assert {:error, err} = GrpcX.call(:not_running_service, :say_hello, req) + assert err == "Error when opening connection: :timeout" + end + + describe "when metrics are disabled" do + test_with_mock "no increments are submitted", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service_not_measured, :say_hello, req) + + assert_not_called(Watchman.increment(:_)) + end + + test_with_mock "no benchmarks are submitted", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service_not_measured, :say_hello, req) + + assert_not_called(Watchman.benchmark(:_, :_)) + end + end + + describe "when metrics are enabled" do + test_with_mock "it measures number of connections", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.connect.count")) + end + + test_with_mock "it measures number of requests", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.request.count")) + end + + test_with_mock "it measures connection failures", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:not_running_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.not_running_service.say_hello.connect.error.count")) + end + + test_with_mock "it measures response successes", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.response.success.count")) + end + + test_with_mock "it measures response errors", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "please fail") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.increment("grpc.hello_service.say_hello.response.error.count")) + end + + test_with_mock "it measures the duration of the rpc call", Watchman, [:passthrough], [] do + req = Helloworld.HelloRequest.new(name: "shiroyasha") + GrpcX.call(:hello_service, :say_hello, req) + + assert_called(Watchman.benchmark("grpc.hello_service.say_hello.duration", :_)) + end + end + + def start_helloworld_server() do + {:ok, pid, _} = GRPC.Server.start(Helloworld.Greeter.Server, 50_052) + {:ok, pid} + end + + def start_clients() do + clients = [ + Util.GrpcX.Client.new( + :hello_service, + "localhost:50052", + Helloworld.Greeter.Stub + ), + Util.GrpcX.Client.new( + :not_running_service, + "localhost:60000", + Helloworld.Greeter.Stub + ), + Util.GrpcX.Client.new( + :hello_service_not_measured, + "localhost:50052", + Helloworld.Greeter.Stub, + publish_metrics: false + ), + Util.GrpcX.Client.new( + :hello_service, + "localhost:50052", + Helloworld.Greeter.Stub + response_middlewares: [Util.GrpcX.Middlewares.ResponseStatusToStatus] + ), + ] + + Util.GrpcX.start_link(clients) + end +end diff --git a/test/protos/helloworld.pb.ex b/test/protos/helloworld.pb.ex new file mode 100644 index 0000000..bc737a0 --- /dev/null +++ b/test/protos/helloworld.pb.ex @@ -0,0 +1,31 @@ +defmodule Helloworld.HelloRequest do + use Protobuf + + @type t :: %__MODULE__{ + name: String.t() + } + defstruct [:name] + + field(:name, 1, optional: true, type: :string) +end + +defmodule Helloworld.HelloReply do + use Protobuf + + @type t :: %__MODULE__{ + message: String.t() + } + defstruct [:message] + + field(:message, 1, optional: true, type: :string) +end + +defmodule Helloworld.Greeter.Service do + use GRPC.Service, name: "helloworld.Greeter" + + rpc(:SayHello, Helloworld.HelloRequest, Helloworld.HelloReply) +end + +defmodule Helloworld.Greeter.Stub do + use GRPC.Stub, service: Helloworld.Greeter.Service +end diff --git a/test/protos/helloworld.proto b/test/protos/helloworld.proto new file mode 100644 index 0000000..688974b --- /dev/null +++ b/test/protos/helloworld.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.helloworld"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/test/protos/server.ex b/test/protos/server.ex new file mode 100644 index 0000000..d76ce21 --- /dev/null +++ b/test/protos/server.ex @@ -0,0 +1,20 @@ +defmodule Helloworld.Greeter.Server do + use GRPC.Server, service: Helloworld.Greeter.Service + + alias Helloworld.{HelloReply, HelloRequest} + + @spec say_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: HelloReply.t() + def say_hello(request, _stream) do + case request.name do + "please take a long time" -> + :timer.sleep(60_000) + Helloworld.HelloReply.new(message: "Hello") + + "please fail" -> + raise "I'm failing" + + name -> + Helloworld.HelloReply.new(message: "Hello #{name}") + end + end +end