Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Added

- Allow to set the `last_transaction_id` when outbox is created
- Accept Ecto `dynamic/2` expressions for the `:filter` option on `Carbonite.process/4`.

## [0.15.2] - 2025-07-15

Expand Down
12 changes: 8 additions & 4 deletions lib/carbonite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Carbonite do

@moduledoc since: "0.1.0"

import Ecto.Query
alias Carbonite.{Outbox, Prefix, Query, Schema, Transaction, Trigger}
require Prefix
require Schema
Expand Down Expand Up @@ -136,7 +137,7 @@ defmodule Carbonite do

@type process_option ::
Carbonite.Query.outbox_queue_option()
| {:filter, (Ecto.Query.t() -> Ecto.Query.t())}
| {:filter, (Ecto.Query.t() -> Ecto.Query.t()) | Ecto.Query.dynamic_expr()}
| {:chunk, pos_integer()}

@type process_func_option :: {:memo, Outbox.memo()} | {:discard_last, boolean()}
Expand Down Expand Up @@ -262,7 +263,7 @@ defmodule Carbonite do

* `min_age` - the minimum age of a record, defaults to 300 seconds (set nil to disable)
* `limit` - limits the query in size, defaults to 100 (set nil to disable)
* `filter` - function for refining the batch query, defaults to nil
* `filter` - Ecto `dynamic/2` or function for refining the batch query, defaults to nil
* `chunk` - defines the size of the chunk passed to the process function, defaults to 1
* `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"`
"""
Expand Down Expand Up @@ -291,18 +292,21 @@ defmodule Carbonite do
end

defp query_func(repo, opts) do
filter = Keyword.get(opts, :filter) || (& &1)
filter = Keyword.get(opts, :filter, dynamic(true))
chunk = Keyword.get(opts, :chunk, 1)

fn outbox ->
outbox
|> Carbonite.Query.outbox_queue(opts)
|> filter.()
|> apply_filter(filter)
|> repo.all()
|> Enum.chunk_every(chunk)
end
end

defp apply_filter(query, filter) when is_function(filter), do: filter.(query)
defp apply_filter(query, %Ecto.Query.DynamicExpr{} = filter), do: where(query, ^filter)

defp process_func(process_func) do
fn chunk, outbox ->
case process_func.(chunk, outbox.memo) do
Expand Down
11 changes: 11 additions & 0 deletions test/carbonite_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@ defmodule CarboniteTest do
end)
end

test "accepts a filter dynamic expression for refining the batch query" do
max_inserted_at = DateTime.utc_now() |> DateTime.add(-9_000)
filter = dynamic([t], t.inserted_at < ^max_inserted_at)

assert {:ok, _outbox} =
process(TestRepo, "rabbits", [chunk: 100, filter: filter], fn txs, _memo ->
assert ids(txs) == [100_000]
:cont
end)
end

test "carbonite_prefix option works as expected" do
{:ok, outbox} =
process(TestRepo, "alternate_outbox", [carbonite_prefix: "alternate_test_schema"], fn txs,
Expand Down