Skip to content

Commit

Permalink
Merge pull request #10 from Logflare/feat/logger-backend
Browse files Browse the repository at this point in the history
feat: Batched requesting, Logger backend, telemetry reporter
  • Loading branch information
Ziinc authored Nov 22, 2023
2 parents 29e8c9e + 3623272 commit 912358c
Show file tree
Hide file tree
Showing 25 changed files with 1,773 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
nodejs 18.13.0
elixir 1.15.5-otp-26
erlang 26.0.2
erlang 26.1.2
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,15 @@ Supported languages and integrations:
- Javascript/Typescript
- Elixir

## Elixir

The Elixir SDK has the following features:

- LoggerBackend - a Logger backend for Logger
- TelemetryReporter - a telemetry reporter module for `:telemetry_metrics`

## Development

Release tags are prefixed with the language followed by the version, such as `js/v0.1.0`.

Publishing to registry will be triggered after release is created via Github Actions.
Publishing to registry will be triggered after release is created via Github Actions.
4 changes: 3 additions & 1 deletion logflare-ex/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import Config
# metadata: [:user_id]
#

config :logflare_ex, api_url: "https://api.logflare.app", api_key: "some-key"
config :logflare_ex,
api_url: "https://api.logflare.app",
api_key: "some-key"

import_config "#{Mix.env()}.exs"
6 changes: 6 additions & 0 deletions logflare-ex/config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import Config

config :logflare_ex,
api_url: "https://api.logflare.app",
api_key: "some-key",
env: :test
5 changes: 5 additions & 0 deletions logflare-ex/config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import Config

config :logflare_ex,
env: :test,
api_url: "https://localhost:4006"
28 changes: 20 additions & 8 deletions logflare-ex/lib/application.ex
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
defmodule LogflareEx.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false

use Application

@impl true
def start(_type, _args) do
children = [
# Starts a worker by calling: LogflareApiClient.Worker.start_link(arg)
# {LogflareApiClient.Worker, arg}
{Finch, name: LogflareEx.Finch}
]
env = Application.get_env(:logflare_ex, :env)

children = get_children(env)

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: LogflareEx.Supervisor]
Supervisor.start_link(children, opts)
end

defp get_children(:test) do
[
LogflareEx.Repo,
{Registry, keys: :unique, name: LogflareEx.BatcherRegistry},
{Finch, name: LogflareEx.Finch}
]
end

defp get_children(_) do
[
LogflareEx.Repo,
{DynamicSupervisor, name: LogflareEx.BatcherSup},
{Registry, keys: :unique, name: LogflareEx.BatcherRegistry},
{Finch, name: LogflareEx.Finch}
]
end
end
31 changes: 31 additions & 0 deletions logflare-ex/lib/batched_event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule LogflareEx.BatchedEvent do
@moduledoc false
use TypedEctoSchema

import Ecto.Changeset

typed_schema "logflare_events" do
field(:source_token, :string)
field(:source_name, :string)
field(:body, :map)
field(:created_at, :naive_datetime_usec, enforce: true)
field(:inflight_at, :naive_datetime_usec)
end

def changeset(struct, params) do
struct
|> cast(params, [:body, :source_token, :source_name, :inflight_at])
|> validate_required([:body])
|> then(fn change ->
missing_fields = Enum.filter([:source_name, :source_token], &field_missing?(change, &1))

if length(missing_fields) > 1 do
change
|> add_error(:source_name, "either source token or name must be provided")
|> add_error(:source_token, "either source token or name must be provided")
else
change
end
end)
end
end
238 changes: 238 additions & 0 deletions logflare-ex/lib/batcher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
defmodule LogflareEx.Batcher do
@moduledoc """
Batching cache is an Etso repo, `LogflareEx.Repo`, and stores all events to be sent to the Logflare service.
There are 2 states that an event can be in:
- pending
- inflight
If an event is inflight, it will have an `:inflight_at` timestamp stored on the struct.
"""
use GenServer

import Ecto.Query
alias LogflareEx.BatchedEvent
alias LogflareEx.BatcherRegistry
alias LogflareEx.Client
alias LogflareEx.Repo

# API

@doc """
Creates an event in the batching cache. This event will be considered as pending if it does not have an `:inflight_at` value set.
An event should only be created after all payload manipulations has been performed. The payload will be stored on the `:body` key.
All timestamp fields internally on the struct are NaiveDateTime.
Required fields:
- :body
- :source_token or :source_name
"""
@spec create_event(map()) :: {:ok, BatchedEvent.t()}
def create_event(attrs) do
%BatchedEvent{created_at: NaiveDateTime.utc_now()}
|> BatchedEvent.changeset(attrs)
|> Repo.insert()
end

@doc """
Lists all events within the cache. All arguments provided are considered additive filters.
### Example
```elixir
list_events_by(:pending)
list_events_by(:all)
list_events_by(:inflight)
list_events_by(:all, source_token: "...")
list_events_by(:all, source_name: "...")
list_events_by(:all, source_name: "...", limit: 5)
```
### Limitations
Etso does not support the Ecto.Query `:limit` option, hence filtering is done post result fetch.
"""
@typep list_opts :: [
{:source_name, String.t()}
| {:source_token, String.t()}
| {:limit, non_neg_integer()}
]
@typep status_filter :: :all | :pending | :inflight
@spec list_events_by(status_filter(), list_opts()) :: [BatchedEvent.t()]
def list_events_by(type, opts \\ []) when type in [:all, :pending, :inflight] do
opts =
Enum.into(opts, %{
source_name: nil,
source_token: nil,
limit: nil
})

from(e in BatchedEvent)
|> then(fn
q when type == :pending -> where(q, [e], is_nil(e.inflight_at))
q when type == :inflight -> where(q, [e], not is_nil(e.inflight_at))
q -> q
end)
|> then(fn
q when opts.source_token != nil -> where(q, [e], e.source_token == ^opts.source_token)
q when opts.source_name != nil -> where(q, [e], e.source_name == ^opts.source_name)
q -> q
end)
|> Repo.all()
|> then(fn
data when opts.limit != nil ->
Enum.take(data, opts.limit)

data ->
data
end)
end

@doc """
Updates the event within the batching cache.
"""
@spec update_event(BatchedEvent.t(), map()) :: {:ok, BatchedEvent.t()}
def update_event(event, attrs) do
event
|> BatchedEvent.changeset(attrs)
|> Repo.update()
end

@doc """
Deletes all events in the cache, regardless of the status.
"""
@spec delete_all_events() :: :ok
def delete_all_events do
Repo.delete_all(BatchedEvent)
:ok
end

@doc """
Performs a flush for the given source.
Accepts the following filters: `:source_name` or `:source_token`
Flush is performed asyncronously.
"""
@typep kw_filter :: [{:source_name, String.t()} | {:source_token, String.t()}]
@spec flush(kw_filter()) :: :ok
def flush(%Client{source_name: source_name}), do: flush(source_name: source_name)
def flush(%Client{source_token: source_token}), do: flush(source_token: source_token)

def flush(kw) do
kw
|> via()
|> GenServer.cast(:flush)
end

@doc """
Deletes a single event in the cache.
### Example
```elixir
iex> delete_event(event)
{:ok, %BatchedEvent{...}}
```
"""
@spec delete_event(BatchedEvent.t()) :: {:ok, BatchedEvent.t()}
def delete_event(%BatchedEvent{} = event) do
Repo.delete(event)
end

@doc """
Returns the via for each partitioned Batcher. Accepts a `source_token` or `source_name` filter or a `%LogflareEx.Client{}` struct.
### Example
```elixir
via(source_name: "my source")
via(source_token: "some-uuid")
via(%LogflareEx.Client{...})
```
"""
@spec via(Client.t() | kw_filter()) :: identifier()
def via(%Client{source_token: "" <> token}), do: via(source_token: token)
def via(%Client{source_name: "" <> name}), do: via(source_name: name)
def via(source_name: name), do: {:via, Registry, {BatcherRegistry, {:source_name, name}}}
def via(source_token: token), do: {:via, Registry, {BatcherRegistry, {:source_token, token}}}

# GenServer

def start_link(opts) when is_list(opts) do
opts
|> Client.new()
|> start_link()
end

def start_link(%Client{} = client) do
GenServer.start_link(__MODULE__, client, name: via(client))
end

@impl GenServer
def init(%Client{source_name: name, source_token: token} = client) do
partition_key =
cond do
token != nil -> {:source_token, token}
name != nil -> {:source_name, name}
true -> nil
end

state = %{
client: client,
key: partition_key
}

schedule_flush(state)
{:ok, state}
end

@impl GenServer
def handle_cast(:flush, state) do
flush_events(state)
{:noreply, state}
end

# Flushes the cache of all items matching the Batcher's key.
@impl GenServer
def handle_info(:flush, state) do
flush_events(state)
schedule_flush(state)
{:noreply, state}
end

defp flush_events(state) do
events =
case state.key do
{:source_name, name} ->
list_events_by(:pending, source_name: name, limit: state.client.batch_size)

{:source_token, token} ->
list_events_by(:pending, source_token: token, limit: state.client.batch_size)
end

event_ids = for e <- events, do: e.id

batch =
for event <- events do
{:ok, e} = update_event(event, %{inflight_at: NaiveDateTime.utc_now()})
e.body
end

# Task to send batch
Task.start_link(fn ->
LogflareEx.send_events(state.client, batch)
Repo.delete_all(from(e in BatchedEvent, where: e.id in ^event_ids))
end)

:ok
end

defp schedule_flush(%{client: %{auto_flush: false}} = state), do: state

defp schedule_flush(state) do
Process.send_after(self(), :flush, state.client.flush_interval)
state
end
end
Loading

0 comments on commit 912358c

Please sign in to comment.