diff --git a/coordinator/internal/store/postgres.go b/coordinator/internal/store/postgres.go index dbea9e43..c184a2d6 100644 --- a/coordinator/internal/store/postgres.go +++ b/coordinator/internal/store/postgres.go @@ -195,6 +195,7 @@ func (s *PostgresStore) migrate(ctx context.Context) error { created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() )`, `CREATE INDEX IF NOT EXISTS idx_ledger_account ON ledger_entries(account_id, created_at DESC)`, + `CREATE UNIQUE INDEX IF NOT EXISTS idx_ledger_reference ON ledger_entries(account_id, entry_type, reference) WHERE reference <> ''`, // Referral system tables `CREATE TABLE IF NOT EXISTS referrers ( @@ -837,7 +838,29 @@ func nullableCreatedAt(ts time.Time) any { return ts } +// ErrAlreadyCredited is returned by creditTx when the (account_id, entry_type, +// reference) tuple already exists, indicating an idempotent duplicate. +var ErrAlreadyCredited = errors.New("store: credit already applied for this reference") + func creditTx(ctx context.Context, tx pgx.Tx, accountID string, amountMicroUSD int64, entryType LedgerEntryType, reference string, createdAt time.Time) error { + // Idempotency guard: check before touching balances so a duplicate returns + // ErrAlreadyCredited without any side-effects. The index is scoped to + // (account_id, entry_type, reference) so reusing a reference string across + // different accounts (e.g. "reservation_refund") is safe. + if reference != "" { + var exists bool + err := tx.QueryRow(ctx, + `SELECT EXISTS(SELECT 1 FROM ledger_entries WHERE account_id = $1 AND entry_type = $2 AND reference = $3)`, + accountID, string(entryType), reference, + ).Scan(&exists) + if err != nil { + return fmt.Errorf("store: check ledger reference: %w", err) + } + if exists { + return ErrAlreadyCredited + } + } + _, err := tx.Exec(ctx, `INSERT INTO balances (account_id, balance_micro_usd, updated_at) VALUES ($1, $2, NOW()) @@ -858,6 +881,10 @@ func creditTx(ctx context.Context, tx pgx.Tx, accountID string, amountMicroUSD i return fmt.Errorf("store: read balance: %w", err) } + // No ON CONFLICT here: the pre-check above is the idempotency gate. + // A race that slips past the pre-check will hit the unique index and + // return a constraint error, causing the caller's transaction to roll back + // (including the balance update), so balances stay consistent. _, err = tx.Exec(ctx, `INSERT INTO ledger_entries (account_id, entry_type, amount_micro_usd, balance_after, reference, created_at) VALUES ($1, $2, $3, $4, $5, COALESCE($6, NOW()))`, @@ -916,6 +943,9 @@ func (s *PostgresStore) Credit(accountID string, amountMicroUSD int64, entryType defer tx.Rollback(ctx) if err := creditTx(ctx, tx, accountID, amountMicroUSD, entryType, reference, time.Time{}); err != nil { + if errors.Is(err, ErrAlreadyCredited) { + return nil + } return err }