Skip to content

Stateless grpc connection #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions lib/util/grpc_x.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Util.GrpcX do
@moduledoc """
This module provides an opiniated interface for communicating with
GRPC services. It handles starting and closing connections, logging,
metrics, and proper error handling.

Usage:

1. First, add Util.Grpc to your application's supervision tree.
This process keeps the configuration values for your Grpc clients.

grpc_clients = [
Util.Grpc.Client.new(:user_service, "localhost:50051", UserApi.Stub),
Util.Grpc.Client.new(:billing_service, "localhost:50051", BillingApi.Stub)
]

children = [
worker(Util.Grpc, gprc_clients)
]

Supervisor.start_link(children, opts)

3. Use Grpc.call to communicate with your upstream services:

req = ExampleApi.DescribeRequest.new(name: "a")

{:ok, res} = Util.Grpc.call(:user_service, :describe, req)

During the execution of the call, the following metrics are published:

- gprc.<client_name>.<method_name>.duration
- gprc.<client_name>.<method_name>.connect
- gprc.<client_name>.<method_name>.connect.error.count
- gprc.<client_name>.<method_name>.response.success.count
- gprc.<client_name>.<method_name>.response.error.count

In case of errors, log messages are logged via the Logger module.
"""

alias Util.GrpcX.State
alias Util.GrpcX.Client

@type client_name :: String.t()

@spec start_link([Util.Grpc.Client.t()]) :: {:ok, pid()} | {:error, any()}
def start_link(clients) do
Util.GrpcX.State.start_link(clients)
end

@doc """
Executes a stateless RPC call to a remote service.

1. It opens a new connection
2. Sends the RPC request
3. Waits for the result, or times out
"""
@spec call(client_name(), Atom.t(), any(), any()) :: Util.GrpcX.RPCCall.response()
def call(client_name, method_name, request, opts \\ []) do
case State.find_client(client_name) do
{:ok, client} ->
Client.call(client, method_name, request, opts)

:error ->
{:error, "GrpcX client with name='#{client_name}' not registered in GrpcX"}
end
end
end
49 changes: 49 additions & 0 deletions lib/util/grpc_x/client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Util.GrpcX.Client do
require Logger

alias Util.GrpcX.RPCCall

@enforce_keys [:name, :endpoint, :timeout, :log_level, :publish_metrics, :stub]

defstruct @enforce_keys

@type t() :: %__MODULE__{
name: String.t(),
endpoint: String.t(),
timeout: number(),
log_level: atom(),
publish_metrics: boolean(),
stub: Grpc.Stub.t()
}

# 30 seconds
@default_timeout 30_000
@default_log_level :info

def new(name, endpoint, stub, opts \\ []) do
timeout = Keyword.get(opts, :timeout, @default_timeout)
log_level = Keyword.get(opts, :log_level, @default_log_level)
publish_metrics = Keyword.get(opts, :publish_metrics, true)

%__MODULE__{
name: name,
endpoint: endpoint,
timeout: timeout,
log_level: log_level,
stub: stub,
publish_metrics: publish_metrics
}
end

@spec call(Client.t(), atom(), any, any) :: {:ok, any} | {:error, any}
def call(client, method_name, request, opts \\ []) do
rpc_call = RPCCall.new(client, method_name, request, opts)

result = Wormhole.capture(fn -> RPCCall.execute(rpc_call) end, timeout: rpc_call.timeout)

case result do
{:ok, result} -> result
{:error, result} -> {:error, result}
end
end
end
23 changes: 23 additions & 0 deletions lib/util/grpc_x/middlewares/response_status_to_error.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Util.GrpcX.Middlewares.ResponseStatusToErorr do
def call(req, opts, next) do
resp = next.(req, opts)

case resp do
{:ok, reply} ->
if Map.has_key?(reply, :response_status) do
status = Map.fetch!(reply, :response_status)

if status.code == 0 do
{:ok, reply}
else
{:error, reply}
end
else
{:ok, reply}
end

any ->
any
end
end
end
108 changes: 108 additions & 0 deletions lib/util/grpc_x/rpc_call.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
defmodule Util.GrpcX.RPCCall do
require Logger

@type response ::
{:ok, any()}
| {:error, :failed_to_connect, any()}
| {:error, :timeout, any()}
| {:error, any()}

def new(client, method_name, request, opts) do
default_call_opts = [timeout: client.timeout]
call_opts = Keyword.merge(default_call_opts, opts)

%{
endpoint: client.endpoint,
client_name: client.name,
stub: client.stub,
method_name: method_name,
request: request,
log_level: client.log_level,
opts: call_opts,
publish_metrics: client.publish_metrics,
metric_prefix: "grpc.#{client.name}.#{method_name}",
timeout: client.timeout
}
end

@spec execute(RPRCall.t()) :: response()
def execute(rpc_call) do
benchmark(rpc_call, fn ->
with {:ok, channel} <- connect(rpc_call) do
result = send_req(rpc_call, channel)

disconnect(channel)

result
end
end)
end

defp connect(rpc) do
inc(rpc, "connect.count")

case GRPC.Stub.connect(rpc.endpoint) do
{:ok, channel} ->
{:ok, channel}

{:error, err} ->
inc(rpc, "connect.error.count")
log_err(rpc, "failed to connect to #{rpc.endpoint}")

{:error, err}
end
end

defp disconnect(channel) do
GRPC.Stub.disconnect(channel)
end

defp send_req(rpc, channel) do
inc(rpc, "request.count")

case do_call(rpc, channel) do
{:ok, result} ->
inc(rpc, "response.success.count")
{:ok, result}

{:error, {:unknown_rpc, _}} = e ->
e

{:error, err} ->
inc(rpc, "response.error.count")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes we send:ok response, and an error status is present in the status field. Can we check if this field is there and log accordingly?

log_err(rpc, "err='#{inspect(err)}'")

{:error, err}
end
end

defp do_call(rpc, channel) do
# todo handle middlewares

apply(rpc.stub, rpc.method_name, [channel, rpc.request, rpc.opts])
rescue
e in UndefinedFunctionError ->
{:error, {:unknown_rpc, "no RPC method named='#{e.function}'"}}
end

defp inc(rpc, metric) do
if rpc.publish_metrics do
Watchman.increment("#{rpc.metric_prefix}.#{metric}")
end
end

defp benchmark(rpc, cb) do
if rpc.publish_metrics do
Watchman.benchmark("#{rpc.metric_prefix}.duration", cb)
else
cb.()
end
end

defp log_err(rpc, msg) do
Logger.log(
rpc.log_level,
"GrpcX ERROR client='#{rpc.client_name}' rpc='#{rpc.method_name}' #{msg}"
)
end
end
18 changes: 18 additions & 0 deletions lib/util/grpc_x/state.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Util.GrpcX.State do
use Agent

def start_link(clients) do
state =
clients
|> Enum.map(fn c -> {c.name, c} end)
|> Enum.into(%{})

Agent.start_link(fn -> state end, name: __MODULE__)
end

def find_client(name) do
state = Agent.get(__MODULE__, &Function.identity/1)

Map.fetch(state, name)
end
end
83 changes: 83 additions & 0 deletions lib/util/result.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
defmodule Util.Result do
@doc """
A result monad similar to one that exists in Rust/Haskell/...

The main use case is to have an easily pipeble {:ok, val} | {:error, val}.

result = endpoint
|> Result.ok()
|> Result.then(fn endpoint -> connect(endpoint) end)
|> Result.then(fn channel -> send_req(channel, "hello") end)
|> Result.then(fn result -> submit_metrics(result) end)

case result do
{:ok, val} -> val...
{:error, err} -> err...
end

The above code would be identival to either a with statement:

with {:ok, channel} <- connect(endpoint),
{:ok, result} <- send_req(channel, "hello"),
{:ok, result} <- submit_metrics(result) do
val
else
{:error, error} -> error...
end

Or to a series of pipable functions that pattern match on :ok, :error:

result = {:ok, endpoint}
|> connect(endpoint) end)
|> send_req(channel, "hello")
|> submit_metrics(result)

def connect({:ok, endpoint}), do: ...
def connect({:error, err}), do: {:error, err}

def send_req({:ok, channel}), do: ...
def send_req({:error, err}), do: {:error, err}

def submit_metrics({:ok, result}), do: ...
def submit_metrics({:error, err}), do: {:error, err}

So why would you choose Result over with or pattern matched functions:

1. "with" is not working hand-in-hand with pipes, and usually it prevents
you from splitting the pipeing logic into multiple functions

2. Functions and pattern matching is just too many boilerplate.

"""

@type then_function :: (ok_val() -> any)
@type ok_val :: {:ok, any}
@type error_val :: {:error, any}

@type t() :: ok_val() | error_val()

@spec ok(any()) :: ok_val()
def ok(val), do: {:ok, val}

@spec wrap(any()) :: ok_val()
def wrap(val), do: ok(val)

@spec error(any) :: error_val()
def error(val), do: {:error, val}

@spec then(any, then_function) :: any
def then({:ok, val}, f), do: f.(val)
def then(anything_else, _f), do: anything_else

@spec unwrap(Result.t()) :: any()
def unwrap({:ok, val}), do: val
def unwrap(any), do: any

@spec ok?(Result.t()) :: boolean()
def ok?({:ok, _}), do: true
def ok?(_), do: false

@spec error?(Result.t()) :: boolean()
def error?({:error, _}), do: false
def error?(_), do: true
end
22 changes: 15 additions & 7 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@ defmodule Util.Mixfile do
use Mix.Project

def project do
[app: :util,
version: "0.0.1",
elixir: "~> 1.4",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps()]
[
app: :util,
version: "0.0.1",
elixir: "~> 1.4",
elixirc_paths: elixirc_paths(Mix.env()),
build_embedded: Mix.env() == :prod,
start_permanent: Mix.env() == :prod,
deps: deps()
]
end

def application do
[extra_applications: [:logger]]
end

def elixirc_paths(:test), do: ["lib", "test/protos"]
def elixirc_paths(:dev), do: ["lib", "test/protos"]
def elixirc_paths(_), do: ["lib"]

defp deps do
[
{:watchman, github: "renderedtext/ex-watchman"},
{:wormhole, "~> 2.2"},
{:protobuf, "~> 0.5"},
{:mock, "~> 0.3.0", only: :test},
{:grpc, "0.5.0-beta.1", override: true},
{:mock, "~> 0.3.0", only: :test}
]
end
end
Loading