From f8fb542482368c06d4f29cfd5a59ef98deb1f9b1 Mon Sep 17 00:00:00 2001 From: Malte Rohde Date: Tue, 26 Aug 2025 13:08:22 +0200 Subject: [PATCH] Accept ecto dynamic/2 expr as :filter option --- CHANGELOG.md | 1 + lib/carbonite.ex | 12 ++++++++---- test/carbonite_test.exs | 11 +++++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df5560a..93f7f97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Added - Allow to set the `last_transaction_id` when outbox is created +- Accept Ecto `dynamic/2` expressions for the `:filter` option on `Carbonite.process/4`. ## [0.15.2] - 2025-07-15 diff --git a/lib/carbonite.ex b/lib/carbonite.ex index 0b58b08..e212060 100644 --- a/lib/carbonite.ex +++ b/lib/carbonite.ex @@ -13,6 +13,7 @@ defmodule Carbonite do @moduledoc since: "0.1.0" + import Ecto.Query alias Carbonite.{Outbox, Prefix, Query, Schema, Transaction, Trigger} require Prefix require Schema @@ -136,7 +137,7 @@ defmodule Carbonite do @type process_option :: Carbonite.Query.outbox_queue_option() - | {:filter, (Ecto.Query.t() -> Ecto.Query.t())} + | {:filter, (Ecto.Query.t() -> Ecto.Query.t()) | Ecto.Query.dynamic_expr()} | {:chunk, pos_integer()} @type process_func_option :: {:memo, Outbox.memo()} | {:discard_last, boolean()} @@ -262,7 +263,7 @@ defmodule Carbonite do * `min_age` - the minimum age of a record, defaults to 300 seconds (set nil to disable) * `limit` - limits the query in size, defaults to 100 (set nil to disable) - * `filter` - function for refining the batch query, defaults to nil + * `filter` - Ecto `dynamic/2` or function for refining the batch query, defaults to nil * `chunk` - defines the size of the chunk passed to the process function, defaults to 1 * `carbonite_prefix` - defines the audit trail's schema, defaults to `"carbonite_default"` """ @@ -291,18 +292,21 @@ defmodule Carbonite do end defp query_func(repo, opts) do - filter = Keyword.get(opts, :filter) || (& &1) + filter = Keyword.get(opts, :filter, dynamic(true)) chunk = Keyword.get(opts, :chunk, 1) fn outbox -> outbox |> Carbonite.Query.outbox_queue(opts) - |> filter.() + |> apply_filter(filter) |> repo.all() |> Enum.chunk_every(chunk) end end + defp apply_filter(query, filter) when is_function(filter), do: filter.(query) + defp apply_filter(query, %Ecto.Query.DynamicExpr{} = filter), do: where(query, ^filter) + defp process_func(process_func) do fn chunk, outbox -> case process_func.(chunk, outbox.memo) do diff --git a/test/carbonite_test.exs b/test/carbonite_test.exs index 4e2c451..a62446b 100644 --- a/test/carbonite_test.exs +++ b/test/carbonite_test.exs @@ -226,6 +226,17 @@ defmodule CarboniteTest do end) end + test "accepts a filter dynamic expression for refining the batch query" do + max_inserted_at = DateTime.utc_now() |> DateTime.add(-9_000) + filter = dynamic([t], t.inserted_at < ^max_inserted_at) + + assert {:ok, _outbox} = + process(TestRepo, "rabbits", [chunk: 100, filter: filter], fn txs, _memo -> + assert ids(txs) == [100_000] + :cont + end) + end + test "carbonite_prefix option works as expected" do {:ok, outbox} = process(TestRepo, "alternate_outbox", [carbonite_prefix: "alternate_test_schema"], fn txs,