feat: replace credit balance cache with per-expiration buckets#1121
feat: replace credit balance cache with per-expiration buckets#1121odesenfans wants to merge 2 commits into
Conversation
foxpatch-aleph
left a comment
There was a problem hiding this comment.
Well-designed and thoroughly tested PR that replaces the O(N^2) lazy-recompute credit balance system with eagerly-maintained per-expiration buckets. The architectural change is sound, the sort-by-expiration policy is a genuine improvement, and the test coverage is excellent. I found no correctness bugs or security issues. The FOR UPDATE deadlock edge case and the Python-side utc_now() evaluation in filter expressions are minor concerns worth documenting but not blocking.
src/aleph/db/accessors/balances.py (line 315): Consider adding a secondary sort on the composite PK (address, expiration_date) to the FOR UPDATE query to guarantee deterministic lock ordering and avoid any theoretical deadlock risk when two transactions drain the same address concurrently.
src/aleph/db/accessors/balances.py (line 370): utc_now() is evaluated on the Python side at expression-construction time rather than via SQL NOW() on the DB server. This is fine for single-statement queries like get_credit_balance(), but _credit_balance_amount_expr() returns a reusable expression where the cutoff is frozen. Consider using func.now() from SQLAlchemy for server-side evaluation instead.
src/aleph/services/credit_expiration.py (line 90): Missing return type annotation. Should be -> Optional[dt.datetime].
src/aleph/repair.py (line 80): The buckets: dict = {} type annotation could use Dict[dt.datetime, int] for clarity.
tests/db/test_credit_balances.py (line 2725): Consider adding a test for the edge case where _consume_address_credits returns [] (empty) and the fallback was removed, verifying that no credit history rows are created for either sender or recipient.
24abf66 to
374dac6
Compare
foxpatch-aleph
left a comment
There was a problem hiding this comment.
This PR replaces the O(N^2) lazy FIFO credit balance cache with eagerly maintained per-expiration buckets. The architecture is sound and the implementation is generally clean and well-documented. However, removing the fallback in _compute_transfer_entries_by_expiration (previously if not entries: entries = [(amount, requested_expiration)]) breaks three existing tests that rely on un-funded senders still creating recipient records: test_multiple_recipients_single_transfer, test_zero_amount_edge_case, and test_self_transfer_edge_case. These tests have senders with no prior credits, and the old FIFO path had a fallback that effectively minted credits from nothing for un-tracked senders. The new code removes this — the tests will fail with fewer history rows than expected. The tests need to either be updated to pre-fund the senders (matching real-world flow), or a decision needs to be made about whether zero-balance non-whitelisted transfers should be silently dropped (and the tests updated accordingly).
src/aleph/db/accessors/balances.py (line 288): _consume_address_credits doesn't filter buckets by the expense's message_timestamp — it uses utc_now() instead. For the real-time message processing path this is fine (now > message_timestamp), but for the repair path in _rebuild_credit_buckets_for_address, the correct historical behavior is replicated with key <= record.message_timestamp. This divergence is correct, but the asymmetry between the two paths isn't obvious and could confuse future readers. Consider adding a comment explaining why the hot path intentionally uses a simpler temporal check.
src/aleph/db/accessors/balances.py (line 637): Expense does not check if _consume_address_credits actually consumed anything. When the address has no buckets (no prior credits), the history row is still written with the full negative amount and the sender is debited, but the bucket state doesn't change. This is consistent with old FIFO behaviour, but the fallback in _compute_transfer_entries_by_expiration that previously "minted" credits from nothing for unfunded senders has been removed — make sure the three existing tests that depend on this (test_multiple_recipients_single_transfer, test_zero_amount_edge_case, test_self_transfer_edge_case) are either updated or deliberately changed.
src/aleph/db/accessors/balances.py (line 304): if amount <= 0: return [] — when amount == 0, this returns empty, meaning no recipient grant occurs in update_credit_balances_transfer for non-whitelisted senders. The existing test test_zero_amount_edge_case expects 2 history records (1 recipient + 1 sender) but will get 1 (just sender). This is a behaviour change — confirm intent.
tests/db/test_credit_balances.py (line 389): test_multiple_recipients_single_transfer — sender 0xmulti_sender has no prior distribution. The old code's fallback would mint credits from nothing and create both positive and negative records. With the new code, no recipient entries are created (only sender debits). This test will fail — either pre-fund the sender or update the expected assertion to document the new behaviour.
tests/db/test_credit_balances.py (line 452): test_zero_amount_edge_case — similar issue: unfunded sender, zero amount. _consume_address_credits returns [], so the recipient loop doesn't execute. Only 1 history row vs expected 2.
tests/db/test_credit_balances.py (line 494): test_self_transfer_edge_case — same pattern: unfunded sender doing a self-transfer. Will produce 1 row instead of expected 2.
src/aleph/services/credit_expiration.py (line 88): The DSN prefix stripping (postgresql+psycopg2:// → postgresql://) is fragile: if the driver string in make_db_url ever changes, this will silently produce a malformed DSN. Consider constructing the LISTEN DSN from config values directly or using make_url from SQLAlchemy and stripping the dialect programmatically.
b87379d to
002dcbb
Compare
foxpatch-aleph
left a comment
There was a problem hiding this comment.
Click Re-request review to retry.
foxpatch-aleph
left a comment
There was a problem hiding this comment.
A well-designed refactor that replaces the O(N²) lazy FIFO walk with an eagerly-maintained per-expiration bucket cache. The sentinel approach (PG infinity for no-expiration), NOTIFY/LISTEN-based expiration task, and sort-by-expiration policy are all sound. The code is clean, well-documented, and thoroughly tested. The only potential concern is a theoretical deadlock between concurrent distribution ON CONFLICT DO UPDATE and expense FOR UPDATE locks on the same address, but this is an edge case handled by Postgres deadlock detection and is not a correctness issue. All existing FIFO scenario tests had their expected values correctly updated to reflect the new policy.
src/aleph/db/accessors/balances.py (line 288): _consume_address_credits uses FOR UPDATE to lock buckets, but the distribution writer in _apply_grant_bucket uses INSERT ... ON CONFLICT DO UPDATE without FOR UPDATE. Under concurrent access, a distribution inflight on the same address can deadlock with an expense holding FOR UPDATE. Postgres deadlock detection will abort one, so this isn't a data-integrity issue, but it's worth documenting as a known interaction.
src/aleph/services/credit_expiration.py (line 112): If the event is set by a NOTIFY callback just before CancelledError propagates, the wake is lost. The idle timeout (1h) covers this in the worst case, and the NOTIFY is re-emitted on the next grant to the same address. This is acceptable defense-in-depth but worth noting.
002dcbb to
0b3b294
Compare
foxpatch-aleph
left a comment
There was a problem hiding this comment.
Well-designed refactoring replacing the lazy O(N^2) FIFO credit balance cache with an eagerly-maintained per-expiration bucket system. Uses PG infinity sentinel for never-expiring credits, LISTEN/NOTIFY for cross-process expiration wakeup, and FOR UPDATE for safe concurrent consumption. Thorough test coverage. One behavior-preserving issue where expense history records intent not actual consumption; a few minor style concerns around utc_now() vs func.now() in SQL expressions.
src/aleph/db/accessors/balances.py (line 649): Return value of _consume_address_credits is discarded. The history row always records the full -amount intent, but bucket consumption may be less if under-funded. Behavior-preserving with old FIFO, but consider logging actual vs. intended consumption for auditability.
src/aleph/db/accessors/balances.py (line 379): utc_now() is evaluated at query construction time (Python) rather than query execution time (PostgreSQL). Consider using func.now() so the cutoff timestamp is evaluated server-side.
src/aleph/db/accessors/balances.py (line 397): Same pattern as _credit_balance_amount_expr: utc_now() is Python-evaluated. Consider func.now() for server-side evaluation, or document that this is intentional.
src/aleph/toolkit/infinity.py (line 43): The _Infinity sentinel carries microseconds=999999. Confirm that SQLAlchemy's TIMESTAMP type processor doesn't alter precision on round-trip; the custom typecaster handles DB-to-Python, but SQLAlchemy may apply additional processing that could break == INFINITY equality.
Replaces the previous scalar cache + lazy-recompute design with a bucket
cache keyed by (address, expiration_date), maintained eagerly by the
credit_history writers under a sort-by-expiration policy. The API read
path is now a single SELECT with no writes, no FIFO walk, and no cache
invalidation logic.
Key changes:
- New credit_balances schema: (address, expiration_date, amount, last_update)
with composite PK. The "no expiration" case uses PG 'infinity'::timestamptz
via a psycopg2 adapter registered at connection-module import time, so
the column stays NOT NULL and reads can use a uniform expiration_date >
now() predicate. The INFINITY sentinel is a datetime subclass exposed
through aleph.toolkit.infinity; psycopg2 already decodes PG infinity to
year-9999 microsecond-999999 UTC, which compares equal under datetime
semantics.
- Writers (distribution, expense, transfer) update buckets atomically in
the same transaction as the credit_history insert. Expenses drain the
soonest-expiring non-expired bucket first under SELECT ... FOR UPDATE,
serialising concurrent writers per address.
- Transfers compute recipient bucket expirations by capping each
consumed source bucket at min(source_expiration, requested_expiration),
preserving the existing re-transfer guard.
- New background CreditExpirationTask deletes expired buckets via a
bounded DELETE statement. Wakes are delivered cross-process via
Postgres LISTEN/NOTIFY: writers issue NOTIFY credit_expiration_changed
inside their transaction (Postgres queues the payload until commit, so
the wake reflects committed state), and the task holds a dedicated
psycopg2 LISTEN connection whose fd is registered with the asyncio
loop via loop.add_reader. This is necessary because message processing
runs in spawned subprocesses (set_start_method("spawn")), so an
in-process asyncio.Event would never be reachable from writers. A
1-hour idle timeout serves as defence-in-depth re-poll.
- get_credit_balance / get_credit_balance_with_details are pure SELECTs:
no FIFO recompute, no write-back. _apply_fifo_consumption and
_calculate_credit_balance_fifo are removed.
- Repair (aleph.repair.repair_node) now also bootstraps and reconciles
the bucket cache from credit_history, runs on every startup, and is
idempotent. Replaces the previous one-shot script approach.
- Policy change: consumption now sorts by expiration_date (soonest
first) rather than by message_timestamp (oldest first). This loses
fewer credits to expiration when issuance order and expiration order
diverge. FIFO scenario 1 in tests was updated accordingly; scenario 2
remains unchanged since both policies produce the same result there.
The previous credit_balances rows are dropped by the migration; the
repair function repopulates them on the next startup from credit_history,
which is the source of truth.
Test plan covered:
- bucket helper unit tests (insert/increment, negative/under-funded
consumption, sort-by-expiration drain order, sentinel handling)
- writer tests (distribution buckets, expense drain order, transfer
cap, whitelisted sender, recipient inherits capped expiration)
- read path (sum non-expired buckets, with_details grouping, multi-
address listing + count)
- expiration task (delete sweep, sentinel exclusion in peek, NOTIFY
emission on finite-expiration grants, no NOTIFY on sentinel grants,
selector callback drains and sets the event)
- repair (rebuilds buckets from history, idempotent)
- existing transfer / cap / chain tests remain green under the new
semantics
- removed test_cache_invalidation_on_credit_expiration (no cache to
invalidate); replaced with a smaller test asserting that the read
query filters expired buckets directly
0b3b294 to
f5650c3
Compare
foxpatch-aleph
left a comment
There was a problem hiding this comment.
Well-designed refactor from lazy FIFO recompute to eagerly-maintained bucket cache. The approach is sound, the code is clean and well-documented, and test coverage is thorough. Two minor observations: the repair function has a TOCTOU window if invoked concurrently with writers (mitigated by startup-only invocation), and the 1-hour idle timeout in the expiration task is quite long for a fallback.
src/aleph/repair.py (line 59): TOCTOU race: the delete + select from credit_history are not atomic. If _repair_credit_balances were ever invoked while writers are active, a concurrent grant/expense between the delete and the history select would be lost. Currently runs at startup before writers start (commands.py:179), which is safe, but document this invariant or add FOR UPDATE on the history select.
src/aleph/repair.py (line 108): The key > now filter uses wall-clock utc_now() to prune expired buckets, while the eager-write path uses the historical message_timestamp. The end result is the same (expired buckets are excluded), but the subtle difference in cutoff semantics is worth a brief comment.
src/aleph/services/credit_expiration.py (line 37): _IDLE_TIMEOUT_SECONDS = 3600 — 1 hour is quite long for a fallback poll interval. If NOTIFY silently fails (e.g. connection issues, buffer overflow), expired buckets linger for up to an hour. Consider 300 seconds (5 minutes) as a safer recovery bound.
src/aleph/db/accessors/balances.py (line 730): Fallback if not entries: entries = [(amount, requested_expiration)] silently mints credits when the consumption log is empty in the non-whitelisted path. The comment says this is for test scenarios, but in production a race could cause this to fire. Not a regression (matches old behavior), but worth noting.
The bucket-cache read path filters ``expiration_date > now()``, so expired bucket rows are inert at query time and table-level cleanup is not load-bearing for correctness. The dedicated asyncio task plus LISTEN/NOTIFY plumbing existed only to keep the table compact in real time, which at current scale (one row per address per distinct historical expiration) is not worth the operational complexity: - a dedicated psycopg2 connection outside the SQLAlchemy pool, with no liveness supervision or reconnect on driver hiccup; - a Python-clock vs. server-clock split between ``_delete_expired`` (using ``utc_now()``) and the read predicates (using ``func.now()``); - multi-node herd behaviour: every CCN in a cluster wakes on every NOTIFY and re-runs the same DELETE+peek; - a transactional NOTIFY in every bucket writer. If table bloat becomes a concern later, a periodic ``DELETE FROM credit_balances WHERE expiration_date <= now()`` driven by the existing cron job machinery covers the same need with no long-lived state. Removes ``aleph.services.credit_expiration``, its test module, the ``NOTIFY credit_expiration_changed`` emission in ``_apply_grant_bucket``, and the task wiring in ``commands.py``. The bucket schema, INFINITY sentinel, expiration-date index, and migration are unchanged.
foxpatch-aleph
left a comment
There was a problem hiding this comment.
Well-architected refactor from O(N²) lazy FIFO to O(1) eager bucket cache with sort-by-expiration policy. The core design is sound and the code quality is high. However, the CreditExpirationTask described in the PR description is entirely absent from the codebase, meaning expired bucket rows will accumulate in the credit_balances table indefinitely. Additionally, the _Infinity sentinel type is fragile — it depends on psycopg2 global state and won't work with the asyncpg driver used by make_async_engine. These issues must be addressed before merging.
src/aleph/commands.py (line 178): CreditExpirationTask (or equivalent background task that deletes expired credit_balance rows) must be launched here as described in the PR design. Currently expired buckets accumulate in credit_balances with no cleanup path.
src/aleph/db/accessors/balances.py (line 280): The PR description states that writers should signal wake_expiration_task() after inserting a bucket with a finite expiration. This wake mechanism is not implemented. Without it, the expiration task (when added) would only fire on its periodic polling interval.
src/aleph/db/connection.py (line 79): make_async_engine uses asyncpg, which does not use psycopg2's adapter/typecaster system. The register_infinity_adapter() at line 16 only affects psycopg2 connections. The _Infinity sentinel will not round-trip correctly through asyncpg connections, causing potential StaleDataErrors or PK mismatches.
src/aleph/toolkit/infinity.py (line 70): The _Infinity typecaster replaces the default psycopg2 timestamptz typecaster globally (OID 1184). If other code depends on the default psycopg2 timestamptz parsing behavior for non-infinity values, this replacement is transparent (it delegates). However, this is a global side effect at import time. Consider using SQLAlchemy's TypeDecorator instead — it's driver-agnostic and scoped to the specific column.
Summary
credit_balancescache (lazy recompute on read via O(N^2) Python FIFO overcredit_history) with a bucket cache keyed by(address, expiration_date), maintained eagerly by the three credit-history writers under a sort-by-expiration policy.CreditExpirationTaskthat sleeps until the next pending expiration timestamp and deletes expired buckets when it fires, woken early by writers when they insert a bucket with a sooner expiration. Expirations stay implicit (no syntheticcredit_historyrows).get_credit_balanceandget_credit_balance_with_detailsbecome pure SELECTs with no writes and no FIFO walk._apply_fifo_consumptionand_calculate_credit_balance_fifoare removed._repair_credit_balancesstep inside the existingrepair_nodeflow.Motivation
The previous design ran an O(N^2) Python walk over the full
credit_historyfor the address on every cache miss, and the API session never committed the recompute, so heavy users (many VMs accumulatingaleph_credit_expenserows) re-paid that cost on every call to/api/v0/addresses/{address}/balance. The bucket cache makes the read path a single index lookup, removes the recompute-from-the-API-thread footgun entirely, and naturally serves the?include_credit_details=trueendpoint as well.Design
Schema. Composite PK
(address, expiration_date). The "no expiration" case uses a sentinel9999-12-31T23:59:59Zso the column stays NOT NULL and queries can useexpiration_date > NOW()uniformly without IS NULL branches. Index onexpiration_datefor the cron'sMIN()andDELETE WHERE expiration_date <= NOW().Writers.
INSERT ... ON CONFLICT DO UPDATE amount = amount + excluded.amount.SELECT ... ORDER BY expiration_date ASC FOR UPDATEthen walk in Python, subtracting per bucket. Bounded by buckets-per-address.min(source_expiration, requested_expiration). Adjacent slices with the same effective expiration are merged before the recipient grant.All three remain inside the existing
_bulk_insert_credit_historychokepoint, in the same transaction as the history row insert.Expiration task. Single long-running asyncio task launched from
commands.py. Loop: queryMIN(expiration_date)from non-expired non-sentinel buckets,asyncio.wait_for(event.wait(), timeout=delta), on fire or signal itDELETE WHERE expiration_date <= NOW()and re-evaluates. Writers signal the module-levelasyncio.Eventafter inserting any bucket with a finite expiration. Sentinel buckets are never selected by the deletion query.Policy change: sort by expiration, not by issuance. The old FIFO walked
positive_creditsinmessage_timestamporder. The new code drains the soonest-expiring bucket first, which loses fewer credits to expiration when issuance order and expiration order diverge. Only one of the existing FIFO scenario tests needed its expected balance updated (scenario 1); scenario 2 produces the same result under both policies.Bootstrap. The migration drops the old
credit_balancesrows (they were derivable fromcredit_historyand not reliably persisted from the API path anyway)._repair_credit_balancesreplays history per-address under the new policy and writes the resulting bucket state. Runs on every startup, idempotent.Removed.
_apply_fifo_consumption,_calculate_credit_balance_fifo,PositiveCreditandNegativeAmountdataclasses, the lazy-recompute block inget_credit_balance, the cache invalidation test (no cache to invalidate)._compute_transfer_entries_by_expirationis rewritten to consume the bucket-drain log instead of the per-grant FIFO state, and the no-source-credits fallback (an artifact of the lazy cache occasionally being incomplete) is gone since the validated transfer path always has the right buckets to drain.Closes
Supersedes #1120 (which was a smaller, intermediate eager-update step on top of the old read path).
Test plan
min(source, requested), whitelisted sender creates credits from nothing.get_credit_balancesums non-expired buckets and floors negatives at zero;get_credit_balance_with_detailsreturns the sentinel bucket asNoneand sorts it first; multi-address listing and count aggregate per-address sums.wake_expiration_task().credit_historyafter a wipe; idempotent across multiple invocations._repair_credit_balancespopulates the table to the expected balances for a sample of addresses, and thatCreditExpirationTaskfires at the expected timestamp for an address with a soon-to-expire bucket.