Skip to content

Commit 24abf66

Browse files
committed
feat: replace credit balance cache with per-expiration buckets
Replaces the previous scalar cache + lazy-recompute design with a bucket cache keyed by (address, expiration_date), maintained eagerly by the credit_history writers under a sort-by-expiration policy. The API read path is now a single SELECT with no writes, no FIFO walk, and no cache invalidation logic. Key changes: - New credit_balances schema: (address, expiration_date, amount, last_update) with composite PK. The "no expiration" case uses a sentinel timestamptz (9999-12-31) so the column can stay NOT NULL. - Writers (distribution, expense, transfer) update buckets atomically in the same transaction as the credit_history insert. Expenses drain the soonest-expiring non-expired bucket first under SELECT ... FOR UPDATE, serialising concurrent writers per address. - Transfers compute recipient bucket expirations by capping each consumed source bucket at min(source_expiration, requested_expiration), preserving the existing re-transfer guard. - New background task (CreditExpirationTask) sleeps until the next pending expiration timestamp and deletes expired buckets when it fires. Writers wake the task via a module-level asyncio.Event when they insert a bucket whose expiration may be sooner than the one currently being awaited. Expirations remain implicit (no synthetic credit_history rows). - get_credit_balance / get_credit_balance_with_details are pure SELECTs: no FIFO recompute, no write-back. _apply_fifo_consumption and _calculate_credit_balance_fifo are removed. - Repair (aleph.repair.repair_node) now also bootstraps and reconciles the bucket cache from credit_history, runs on every startup, and is idempotent. Replaces the previous one-shot script approach. - Policy change: consumption now sorts by expiration_date (soonest first) rather than by message_timestamp (oldest first). This loses fewer credits to expiration when issuance order and expiration order diverge. FIFO scenario 1 in tests was updated accordingly; scenario 2 remains unchanged since both policies produce the same result there. The previous credit_balances rows are dropped by the migration; the repair function repopulates them on the next startup from credit_history, which is the source of truth. Test plan covered: - bucket helper unit tests (insert/increment, negative/under-funded consumption, sort-by-expiration drain order, sentinel handling) - writer tests (distribution buckets, expense drain order, transfer cap, whitelisted sender, recipient inherits capped expiration) - read path (sum non-expired buckets, with_details grouping, multi- address listing + count) - expiration task (deletes expired, ignores sentinel, wakes on signal) - repair (rebuilds buckets from history, idempotent) - existing transfer / cap / expiration tests remain green under the new semantics - removed test_cache_invalidation_on_credit_expiration (no cache to invalidate); replaced with a smaller test asserting that the read query filters expired buckets directly
1 parent 9e20d6d commit 24abf66

9 files changed

Lines changed: 1078 additions & 377 deletions

File tree

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
"""Replace credit_balances scalar cache with per-expiration buckets
2+
3+
Revision ID: b1c2d3e4f5a6
4+
Revises: 7e5a630e4b36
5+
Create Date: 2026-05-12
6+
7+
The previous credit_balances table held a single FIFO-derived scalar per
8+
address, lazily recomputed on read by an O(N^2) Python walk over
9+
credit_history. This migration replaces it with a bucket cache keyed by
10+
(address, expiration_date), eagerly maintained by the credit_history
11+
writers under a sort-by-expiration policy. The "no expiration" case is
12+
encoded with a sentinel timestamptz of 9999-12-31T23:59:59Z so that
13+
expiration_date is NOT NULL and can sit in the composite primary key.
14+
15+
The previous table is dropped rather than migrated; credit_history is the
16+
source of truth and aleph.repair.repair_credit_balances repopulates the
17+
new table from history on the next startup.
18+
"""
19+
20+
import sqlalchemy as sa
21+
from alembic import op
22+
23+
# revision identifiers, used by Alembic.
24+
revision = "b1c2d3e4f5a6"
25+
down_revision = "7e5a630e4b36"
26+
branch_labels = None
27+
depends_on = None
28+
29+
30+
def upgrade() -> None:
31+
op.drop_table("credit_balances")
32+
op.create_table(
33+
"credit_balances",
34+
sa.Column("address", sa.String(), nullable=False),
35+
sa.Column("expiration_date", sa.TIMESTAMP(timezone=True), nullable=False),
36+
sa.Column("amount", sa.BigInteger(), nullable=False),
37+
sa.Column(
38+
"last_update",
39+
sa.TIMESTAMP(timezone=True),
40+
nullable=False,
41+
server_default=sa.func.now(),
42+
),
43+
sa.PrimaryKeyConstraint(
44+
"address", "expiration_date", name="credit_balances_pkey"
45+
),
46+
)
47+
op.create_index(
48+
"credit_balances_expiration_date_idx",
49+
"credit_balances",
50+
["expiration_date"],
51+
)
52+
53+
54+
def downgrade() -> None:
55+
op.drop_table("credit_balances")
56+
op.create_table(
57+
"credit_balances",
58+
sa.Column("address", sa.String(), nullable=False),
59+
sa.Column("balance", sa.BigInteger(), nullable=False, server_default="0"),
60+
sa.Column(
61+
"last_update",
62+
sa.TIMESTAMP(timezone=True),
63+
nullable=False,
64+
server_default=sa.func.now(),
65+
),
66+
sa.PrimaryKeyConstraint("address", name="credit_balances_pkey"),
67+
)
68+
op.create_index("ix_credit_balances_address", "credit_balances", ["address"])

src/aleph/commands.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from aleph.repair import repair_node
3737
from aleph.services import p2p
3838
from aleph.services.cache.node_cache import NodeCache
39+
from aleph.services.credit_expiration import CreditExpirationTask
3940
from aleph.services.ipfs import IpfsService
4041
from aleph.services.keys import generate_keypair, save_keys
4142
from aleph.services.storage.fileystem_engine import FileSystemStorageEngine
@@ -224,6 +225,11 @@ async def main(args: List[str]) -> None:
224225
tasks.append(cron_job_task(config=config, cron_job=cron_job))
225226
LOGGER.debug("Initialized cron job task")
226227

228+
LOGGER.debug("Initializing credit expiration task")
229+
credit_expiration = CreditExpirationTask(session_factory=session_factory)
230+
tasks.append(credit_expiration.run())
231+
LOGGER.debug("Initialized credit expiration task")
232+
227233
LOGGER.debug("Running event loop")
228234
await asyncio.gather(*tasks)
229235

0 commit comments

Comments
 (0)