Skip to content

Commit 2d1991a

Browse files
avalleteclaude
andauthored
feat(pg-delta): retry catalog extractors when pg_get_*def returns NULL (#238)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent 3b9eb91 commit 2d1991a

22 files changed

Lines changed: 1241 additions & 44 deletions

.changeset/feat-extract-retries.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
---
2+
"@supabase/pg-delta": minor
3+
---
4+
5+
feat(pg-delta): retry catalog extractors when `pg_get_*def()` returns NULL
6+
7+
`pg_get_indexdef`, `pg_get_constraintdef`, `pg_get_viewdef`, `pg_get_triggerdef`, `pg_get_ruledef`, and `pg_get_functiondef` can transiently return NULL when the underlying catalog row is dropped concurrently or the catalog state is in flux. Previously such rows were dropped silently after one attempt; now extraction retries the affected query a configurable number of times before falling back to filtering. In practice the second attempt no longer sees the dropped object (or successfully resolves the definition), so a real CREATE/DROP racing with `createPlan` is reliably preserved or excluded rather than half-captured.
8+
9+
Configuration (precedence: option > env > default):
10+
11+
- `CreatePlanOptions.extractRetries?: number` — public API option on `createPlan`.
12+
- `PGDELTA_EXTRACT_RETRIES` env var — same value, useful for CLI usage.
13+
- Default `1` (i.e. the first attempt plus one retry, 2 attempts total).
14+
15+
After retries are exhausted, rows whose `pg_get_*def()` is still NULL are filtered out and a warning is emitted via `debug('pg-delta:extract')` (visible with `DEBUG=pg-delta:extract` or `DEBUG=pg-delta:*`). Setting `extractRetries: 0` disables retrying entirely and reproduces the previous "filter-on-first-attempt" behavior.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@supabase/pg-delta": patch
3+
---
4+
5+
fix(pg-delta): skip rows when `pg_get_viewdef`, `pg_get_triggerdef`, `pg_get_ruledef`, or `pg_get_functiondef` returns NULL instead of crashing the relevant `extract*` with a ZodError. Same race conditions as the prior `pg_get_indexdef` (#223) and `pg_get_constraintdef` fixes — the underlying catalog row can vanish (concurrent DDL, transient catalog state, recovery edges). A single unreadable view, materialized view, trigger, rule, or function no longer aborts the whole catalog extraction and `createPlan` call.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@supabase/pg-delta": patch
3+
---
4+
5+
fix(pg-delta): skip table constraints where `pg_get_constraintdef()` returns NULL instead of crashing `extractTables` with a ZodError. Like `pg_get_indexdef`, `pg_get_constraintdef` can return NULL under race conditions with concurrent DDL or transient catalog inconsistencies. Such constraints are now filtered out at extraction time so a single unreadable constraint no longer aborts the whole catalog extraction and `createPlan` call.

packages/pg-delta/src/core/catalog.model.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,19 @@ export async function createEmptyCatalog(
302302
});
303303
}
304304

305-
export async function extractCatalog(pool: Pool) {
305+
interface ExtractCatalogOptions {
306+
/**
307+
* Number of retry attempts for catalog extractors when `pg_get_*def()`
308+
* returns NULL for at least one row. See `ExtractRetryOptions.retries`.
309+
*/
310+
extractRetries?: number;
311+
}
312+
313+
export async function extractCatalog(
314+
pool: Pool,
315+
options: ExtractCatalogOptions = {},
316+
) {
317+
const retryOptions = { retries: options.extractRetries };
306318
const [
307319
aggregates,
308320
collations,
@@ -339,21 +351,21 @@ export async function extractCatalog(pool: Pool) {
339351
extractDomains(pool).then(listToRecord),
340352
extractEnums(pool).then(listToRecord),
341353
extractExtensions(pool).then(listToRecord),
342-
extractIndexes(pool).then(listToRecord),
343-
extractMaterializedViews(pool).then(listToRecord),
354+
extractIndexes(pool, retryOptions).then(listToRecord),
355+
extractMaterializedViews(pool, retryOptions).then(listToRecord),
344356
extractSubscriptions(pool).then(listToRecord),
345357
extractPublications(pool).then(listToRecord),
346-
extractProcedures(pool).then(listToRecord),
358+
extractProcedures(pool, retryOptions).then(listToRecord),
347359
extractRlsPolicies(pool).then(listToRecord),
348360
extractRoles(pool).then(listToRecord),
349361
extractSchemas(pool).then(listToRecord),
350362
extractSequences(pool).then(listToRecord),
351-
extractTables(pool).then(listToRecord),
352-
extractTriggers(pool).then(listToRecord),
363+
extractTables(pool, retryOptions).then(listToRecord),
364+
extractTriggers(pool, retryOptions).then(listToRecord),
353365
extractEventTriggers(pool).then(listToRecord),
354-
extractRules(pool).then(listToRecord),
366+
extractRules(pool, retryOptions).then(listToRecord),
355367
extractRanges(pool).then(listToRecord),
356-
extractViews(pool).then(listToRecord),
368+
extractViews(pool, retryOptions).then(listToRecord),
357369
extractForeignDataWrappers(pool).then(listToRecord),
358370
extractServers(pool).then(listToRecord),
359371
extractUserMappings(pool).then(listToRecord),
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import { afterEach, describe, expect, test } from "bun:test";
2+
import {
3+
extractWithDefinitionRetry,
4+
resolveExtractRetries,
5+
} from "./extract-with-retry.ts";
6+
7+
type Row = { id: string; definition: string | null };
8+
9+
const hasNullDefinition = (r: Row) => r.definition === null;
10+
11+
describe("resolveExtractRetries", () => {
12+
const originalEnv = process.env.PGDELTA_EXTRACT_RETRIES;
13+
afterEach(() => {
14+
if (originalEnv === undefined) {
15+
process.env.PGDELTA_EXTRACT_RETRIES = undefined;
16+
delete process.env.PGDELTA_EXTRACT_RETRIES;
17+
} else {
18+
process.env.PGDELTA_EXTRACT_RETRIES = originalEnv;
19+
}
20+
});
21+
22+
test("defaults to 1 when option and env are unset", () => {
23+
delete process.env.PGDELTA_EXTRACT_RETRIES;
24+
expect(resolveExtractRetries()).toBe(1);
25+
});
26+
27+
test("uses option when provided", () => {
28+
process.env.PGDELTA_EXTRACT_RETRIES = "5";
29+
expect(resolveExtractRetries(0)).toBe(0);
30+
expect(resolveExtractRetries(1)).toBe(1);
31+
expect(resolveExtractRetries(7)).toBe(7);
32+
});
33+
34+
test("falls back to env when option is undefined", () => {
35+
process.env.PGDELTA_EXTRACT_RETRIES = "4";
36+
expect(resolveExtractRetries()).toBe(4);
37+
});
38+
39+
test("clamps negative values to 0", () => {
40+
delete process.env.PGDELTA_EXTRACT_RETRIES;
41+
expect(resolveExtractRetries(-3)).toBe(0);
42+
process.env.PGDELTA_EXTRACT_RETRIES = "-9";
43+
expect(resolveExtractRetries()).toBe(0);
44+
});
45+
46+
test("ignores non-numeric env values", () => {
47+
process.env.PGDELTA_EXTRACT_RETRIES = "not-a-number";
48+
expect(resolveExtractRetries()).toBe(1);
49+
});
50+
51+
test("ignores empty env string", () => {
52+
process.env.PGDELTA_EXTRACT_RETRIES = "";
53+
expect(resolveExtractRetries()).toBe(1);
54+
});
55+
});
56+
57+
describe("extractWithDefinitionRetry", () => {
58+
test("returns first attempt when no row has null definition", async () => {
59+
let attempts = 0;
60+
const rows = await extractWithDefinitionRetry<Row>({
61+
label: "test",
62+
query: async () => {
63+
attempts++;
64+
return [{ id: "a", definition: "OK" }];
65+
},
66+
hasNullDefinition,
67+
options: { retries: 2, backoffMs: 0 },
68+
});
69+
expect(attempts).toBe(1);
70+
expect(rows).toEqual([{ id: "a", definition: "OK" }]);
71+
});
72+
73+
test("retries when definition is null and succeeds on attempt 2", async () => {
74+
let attempts = 0;
75+
const rows = await extractWithDefinitionRetry<Row>({
76+
label: "test",
77+
query: async () => {
78+
attempts++;
79+
if (attempts === 1) {
80+
return [
81+
{ id: "a", definition: "OK" },
82+
{ id: "b", definition: null },
83+
];
84+
}
85+
return [{ id: "a", definition: "OK" }];
86+
},
87+
hasNullDefinition,
88+
options: { retries: 2, backoffMs: 0 },
89+
});
90+
expect(attempts).toBe(2);
91+
expect(rows).toEqual([{ id: "a", definition: "OK" }]);
92+
});
93+
94+
test("returns last-attempt rows (with offenders) once retries are exhausted", async () => {
95+
let attempts = 0;
96+
const rows = await extractWithDefinitionRetry<Row>({
97+
label: "test",
98+
query: async () => {
99+
attempts++;
100+
return [
101+
{ id: "a", definition: "OK" },
102+
{ id: "b", definition: null },
103+
];
104+
},
105+
hasNullDefinition,
106+
options: { retries: 2, backoffMs: 0 },
107+
});
108+
expect(attempts).toBe(3);
109+
expect(rows).toEqual([
110+
{ id: "a", definition: "OK" },
111+
{ id: "b", definition: null },
112+
]);
113+
});
114+
115+
test("retries: 0 disables retrying entirely", async () => {
116+
let attempts = 0;
117+
const rows = await extractWithDefinitionRetry<Row>({
118+
label: "test",
119+
query: async () => {
120+
attempts++;
121+
return [{ id: "b", definition: null }];
122+
},
123+
hasNullDefinition,
124+
options: { retries: 0, backoffMs: 0 },
125+
});
126+
expect(attempts).toBe(1);
127+
expect(rows).toEqual([{ id: "b", definition: null }]);
128+
});
129+
130+
test("retries: 5 attempts up to 6 times before giving up", async () => {
131+
let attempts = 0;
132+
await extractWithDefinitionRetry<Row>({
133+
label: "test",
134+
query: async () => {
135+
attempts++;
136+
return [{ id: "b", definition: null }];
137+
},
138+
hasNullDefinition,
139+
options: { retries: 5, backoffMs: 0 },
140+
});
141+
expect(attempts).toBe(6);
142+
});
143+
});
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import debug from "debug";
2+
3+
const log = debug("pg-delta:extract");
4+
5+
const DEFAULT_RETRIES = 1;
6+
const DEFAULT_BACKOFF_MS = 50;
7+
8+
export interface ExtractRetryOptions {
9+
/**
10+
* Number of retry attempts to make when a `pg_get_*def()` call returns NULL
11+
* for at least one row. Total attempts is `retries + 1`. Negative values are
12+
* clamped to 0. When this option is undefined the value is read from the
13+
* `PGDELTA_EXTRACT_RETRIES` environment variable, falling back to a default
14+
* of 1 (i.e. the first attempt plus one retry, 2 attempts total).
15+
*/
16+
retries?: number;
17+
/**
18+
* Delay between retry attempts in milliseconds; the actual wait is
19+
* `backoffMs * attemptNumber` (linear). Defaults to 50. Set to 0 in tests.
20+
*/
21+
backoffMs?: number;
22+
}
23+
24+
export function resolveExtractRetries(option?: number): number {
25+
if (typeof option === "number" && Number.isFinite(option)) {
26+
return Math.max(0, Math.floor(option));
27+
}
28+
const envVal = process.env.PGDELTA_EXTRACT_RETRIES;
29+
if (envVal !== undefined && envVal !== "") {
30+
const n = Number(envVal);
31+
if (Number.isFinite(n)) return Math.max(0, Math.floor(n));
32+
}
33+
return DEFAULT_RETRIES;
34+
}
35+
36+
const sleep = (ms: number) =>
37+
ms > 0 ? new Promise<void>((r) => setTimeout(r, ms)) : Promise.resolve();
38+
39+
/**
40+
* Runs `query()` up to `retries + 1` times, retrying as long as at least one
41+
* row in the result satisfies `hasNullDefinition`. The retry exists because
42+
* `pg_get_<x>def()` can return NULL transiently when the underlying catalog
43+
* row is dropped concurrently or the catalog state is in flux; in practice a
44+
* second attempt either no longer sees the dropped row or succeeds in
45+
* resolving the definition.
46+
*
47+
* Returns the rows from the first attempt with no offenders, or — once
48+
* retries are exhausted — the rows from the final attempt (still containing
49+
* offenders). The caller is responsible for the final filter so this helper
50+
* works for both flat schemas (definition on the row) and nested schemas
51+
* (definition on a child collection, e.g. table constraints).
52+
*/
53+
export async function extractWithDefinitionRetry<TRow>(params: {
54+
label: string;
55+
query: () => Promise<TRow[]>;
56+
hasNullDefinition: (row: TRow) => boolean;
57+
options?: ExtractRetryOptions;
58+
}): Promise<TRow[]> {
59+
const retries = resolveExtractRetries(params.options?.retries);
60+
const backoffMs = params.options?.backoffMs ?? DEFAULT_BACKOFF_MS;
61+
const maxAttempts = retries + 1;
62+
63+
let rows: TRow[] = [];
64+
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
65+
rows = await params.query();
66+
const offenders = rows.filter(params.hasNullDefinition).length;
67+
if (offenders === 0) return rows;
68+
if (attempt < maxAttempts) {
69+
log(
70+
"%s: pg_get_*def() returned NULL for %d row(s) on attempt %d/%d; retrying",
71+
params.label,
72+
offenders,
73+
attempt,
74+
maxAttempts,
75+
);
76+
await sleep(backoffMs * attempt);
77+
} else {
78+
log(
79+
"%s: pg_get_*def() returned NULL for %d row(s) after %d attempt(s); skipping",
80+
params.label,
81+
offenders,
82+
maxAttempts,
83+
);
84+
}
85+
}
86+
return rows;
87+
}

packages/pg-delta/src/core/objects/index/index.model.test.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,19 @@ const baseRow = {
3636
const mockPool = (rows: unknown[]): Pool =>
3737
({ query: async () => ({ rows }) }) as unknown as Pool;
3838

39+
const mockPoolSequence = (...attempts: unknown[][]): Pool => {
40+
let i = 0;
41+
return {
42+
query: async () => ({
43+
rows: attempts[Math.min(i++, attempts.length - 1)],
44+
}),
45+
} as unknown as Pool;
46+
};
47+
48+
const NO_BACKOFF = { backoffMs: 0 } as const;
49+
3950
describe("extractIndexes", () => {
40-
test("skips rows where pg_get_indexdef returned NULL", async () => {
51+
test("skips rows where pg_get_indexdef returned NULL after exhausting retries", async () => {
4152
const indexes = await extractIndexes(
4253
mockPool([
4354
{
@@ -47,6 +58,7 @@ describe("extractIndexes", () => {
4758
},
4859
{ ...baseRow, name: '"orphan_idx"', definition: null },
4960
]),
61+
NO_BACKOFF,
5062
);
5163

5264
expect(indexes).toHaveLength(1);
@@ -59,6 +71,7 @@ describe("extractIndexes", () => {
5971
await expect(
6072
extractIndexes(
6173
mockPool([{ ...baseRow, name: '"orphan"', definition: null }]),
74+
NO_BACKOFF,
6275
),
6376
).resolves.toEqual([]);
6477
});
@@ -77,7 +90,30 @@ describe("extractIndexes", () => {
7790
definition: "CREATE INDEX b ON users (id)",
7891
},
7992
]),
93+
NO_BACKOFF,
8094
);
8195
expect(indexes.map((i) => i.name)).toEqual(['"a"', '"b"']);
8296
});
97+
98+
test("recovers when pg_get_indexdef is NULL on first attempt but resolved on retry", async () => {
99+
const indexes = await extractIndexes(
100+
mockPoolSequence(
101+
// attempt 1: definition is NULL (transient race)
102+
[{ ...baseRow, name: '"racy_idx"', definition: null }],
103+
// attempt 2: catalog scan no longer sees the dropped row, or
104+
// pg_get_indexdef successfully resolves the definition
105+
[
106+
{
107+
...baseRow,
108+
name: '"racy_idx"',
109+
definition: "CREATE INDEX racy_idx ON users (id)",
110+
},
111+
],
112+
),
113+
{ retries: 2, backoffMs: 0 },
114+
);
115+
expect(indexes).toHaveLength(1);
116+
expect(indexes[0]?.name).toBe('"racy_idx"');
117+
expect(indexes[0]?.definition).toBe("CREATE INDEX racy_idx ON users (id)");
118+
});
83119
});

0 commit comments

Comments
 (0)