Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/catalog-export-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@supabase/pg-delta": minor
---

Add `--filter` option to the `catalog-export` CLI command to scope the exported catalog to matching schemas/objects.
27 changes: 26 additions & 1 deletion packages/pg-delta/src/cli/commands/catalog-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

import { writeFile } from "node:fs/promises";
import { buildCommand, type CommandContext } from "@stricli/core";
import { filterCatalog } from "../../core/catalog.filter.ts";
import { extractCatalog } from "../../core/catalog.model.ts";
import {
serializeCatalog,
stringifyCatalogSnapshot,
} from "../../core/catalog.snapshot.ts";
import type { FilterDSL } from "../../core/integrations/filter/dsl.ts";
import { createManagedPool } from "../../core/postgres-config.ts";

export const catalogExportCommand = buildCommand({
Expand All @@ -30,6 +32,21 @@ export const catalogExportCommand = buildCommand({
parse: String,
optional: true,
},
filter: {
kind: "parsed",
brief:
Comment thread
leandrocp marked this conversation as resolved.
'Filter DSL as inline JSON to filter changes (e.g., \'{"*/schema": "app"}\').',
parse: (value: string): FilterDSL => {
try {
return JSON.parse(value) as FilterDSL;
} catch (error) {
throw new Error(
`Invalid filter JSON: ${error instanceof Error ? error.message : String(error)}`,
);
}
},
optional: true,
},
},
aliases: {
t: "target",
Expand All @@ -48,6 +65,10 @@ Use cases:
- Snapshot template1 for use as an empty-database baseline
- Snapshot a production database to generate revert migrations
- Snapshot any state for reproducible offline diffs

Pass --filter to scope the snapshot to a subset of the catalog (same
Filter DSL accepted by plan/sync). Useful when committing a baseline
snapshot to a repo and only one schema's drift is interesting.
`.trim(),
},
async func(
Expand All @@ -56,6 +77,7 @@ Use cases:
target: string;
output: string;
role?: string;
filter?: FilterDSL;
},
) {
const { pool, close } = await createManagedPool(flags.target, {
Expand All @@ -65,7 +87,10 @@ Use cases:

try {
const catalog = await extractCatalog(pool);
const snapshot = serializeCatalog(catalog);
const scoped = flags.filter
? await filterCatalog(catalog, flags.filter)
: catalog;
const snapshot = serializeCatalog(scoped);
const json = stringifyCatalogSnapshot(snapshot);
await writeFile(flags.output, json, "utf-8");
this.process.stdout.write(
Expand Down
96 changes: 96 additions & 0 deletions packages/pg-delta/src/core/catalog.filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Prune a catalog to the objects that match a Filter DSL expression.
*
* The Filter DSL is defined over Change objects, so the catalog is
* diffed against an empty baseline first to materialize one CREATE
* change per object. The filter then evaluates against the same shape
* it would at plan time, and the surviving stableIds drive the prune.
*
* Dependency cascade is not applied. A scoped snapshot is partial by
* design: out-of-scope owners, roles, and types must exist on the
* target DB at apply time. Cascading would expand the filter beyond
* what the caller asked for and, in practice, collapse schema-scoped
* exports whose kept objects reference cluster-scoped owners.
*/

import { diffCatalogs } from "./catalog.diff.ts";
import { Catalog, createEmptyCatalog } from "./catalog.model.ts";
import { compileFilterDSL, type FilterDSL } from "./integrations/filter/dsl.ts";

export async function filterCatalog(
catalog: Catalog,
filter: FilterDSL,
): Promise<Catalog> {
if (
typeof filter === "object" &&
filter !== null &&
(filter as Record<string, unknown>).cascade === true
) {
throw new Error(
"Filter DSL `cascade: true` is not supported by catalog-export: " +
"scoped snapshots are intentionally partial. Out-of-scope owners, " +
"roles, and types must exist on the target DB at apply time.",
);
}

const empty = await createEmptyCatalog(catalog.version, catalog.currentUser);
const changes = diffCatalogs(empty, catalog);
const filterFn = compileFilterDSL(filter);
Comment thread
avallete marked this conversation as resolved.

const keep = new Set<string>();
for (const change of changes) {
if (!filterFn(change)) continue;
for (const id of change.creates ?? []) keep.add(id);
}

return pruneCatalog(catalog, keep);
}

function filterRecord<T>(
record: Record<string, T>,
keep: ReadonlySet<string>,
): Record<string, T> {
return Object.fromEntries(
Object.entries(record).filter(([id]) => keep.has(id)),
);
}

function pruneCatalog(catalog: Catalog, keep: ReadonlySet<string>): Catalog {
const tables = filterRecord(catalog.tables, keep);
const materializedViews = filterRecord(catalog.materializedViews, keep);

return new Catalog({
aggregates: filterRecord(catalog.aggregates, keep),
collations: filterRecord(catalog.collations, keep),
compositeTypes: filterRecord(catalog.compositeTypes, keep),
domains: filterRecord(catalog.domains, keep),
enums: filterRecord(catalog.enums, keep),
extensions: filterRecord(catalog.extensions, keep),
procedures: filterRecord(catalog.procedures, keep),
indexes: filterRecord(catalog.indexes, keep),
materializedViews,
subscriptions: filterRecord(catalog.subscriptions, keep),
publications: filterRecord(catalog.publications, keep),
rlsPolicies: filterRecord(catalog.rlsPolicies, keep),
roles: filterRecord(catalog.roles, keep),
schemas: filterRecord(catalog.schemas, keep),
sequences: filterRecord(catalog.sequences, keep),
tables,
triggers: filterRecord(catalog.triggers, keep),
eventTriggers: filterRecord(catalog.eventTriggers, keep),
rules: filterRecord(catalog.rules, keep),
ranges: filterRecord(catalog.ranges, keep),
views: filterRecord(catalog.views, keep),
foreignDataWrappers: filterRecord(catalog.foreignDataWrappers, keep),
servers: filterRecord(catalog.servers, keep),
userMappings: filterRecord(catalog.userMappings, keep),
foreignTables: filterRecord(catalog.foreignTables, keep),
depends: catalog.depends.filter(
(d) =>
keep.has(d.dependent_stable_id) && keep.has(d.referenced_stable_id),
),
indexableObjects: { ...tables, ...materializedViews },
version: catalog.version,
currentUser: catalog.currentUser,
});
}
6 changes: 4 additions & 2 deletions packages/pg-delta/src/core/catalog.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ let _pg1516Baseline: Catalog | null = null;
let _pg17Baseline: Catalog | null = null;

async function loadBaselineJson(): Promise<Record<string, unknown>> {
const mod =
await import("./fixtures/empty-catalogs/postgres-15-16-baseline.json");
const mod = await import(
"./fixtures/empty-catalogs/postgres-15-16-baseline.json",
{ with: { type: "json" } }
);
return mod.default as Record<string, unknown>;
}

Expand Down
161 changes: 161 additions & 0 deletions packages/pg-delta/tests/integration/catalog-export-filter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { describe, expect, test } from "bun:test";
import { filterCatalog } from "../../src/core/catalog.filter.ts";
import { type Catalog, extractCatalog } from "../../src/core/catalog.model.ts";
import {
deserializeCatalog,
serializeCatalog,
stringifyCatalogSnapshot,
} from "../../src/core/catalog.snapshot.ts";
import { createPlan } from "../../src/core/plan/create.ts";
import { POSTGRES_VERSIONS } from "../constants.ts";
import { withDb } from "../utils.ts";

for (const pgVersion of POSTGRES_VERSIONS) {
describe(`catalog-export --filter (pg${pgVersion})`, () => {
test(
"filterCatalog keeps only objects matching the filter",
withDb(pgVersion, async (db) => {
await db.branch.query(`
CREATE SCHEMA app;
CREATE TABLE app.users (id serial PRIMARY KEY, name text NOT NULL);
CREATE TABLE app.posts (id serial PRIMARY KEY, title text NOT NULL);
CREATE SCHEMA other;
CREATE TABLE other.config (key text PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const scoped = await filterCatalog(full, { "*/schema": "app" });

expect(Object.keys(scoped.schemas).sort()).toEqual(["schema:app"]);
expect(Object.keys(scoped.tables).sort()).toEqual([
"table:app.posts",
"table:app.users",
]);
expect(Object.keys(scoped.tables)).not.toContain("table:other.config");
expect(Object.keys(scoped.schemas)).not.toContain("schema:other");
expect(Object.keys(scoped.schemas)).not.toContain("schema:public");
}),
);

test(
"filterCatalog drops pg_depend edges that touch pruned objects",
withDb(pgVersion, async (db) => {
await db.branch.query(`
CREATE SCHEMA app;
CREATE TABLE app.users (id serial PRIMARY KEY);
CREATE SCHEMA other;
CREATE TABLE other.t (id serial PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const scoped = await filterCatalog(full, { "*/schema": "app" });

for (const dep of scoped.depends) {
expect(dep.dependent_stable_id).not.toContain("other");
expect(dep.referenced_stable_id).not.toContain("other");
}
}),
);

test(
"round-trip: filtered snapshot diffs to zero against live source with same filter",
withDb(pgVersion, async (db) => {
await db.branch.query(`
CREATE SCHEMA app;
CREATE TABLE app.users (id serial PRIMARY KEY, name text NOT NULL);
CREATE SCHEMA other;
CREATE TABLE other.config (key text PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const filter = { "*/schema": "app" };
const scoped = await filterCatalog(full, filter);

// Reconstruct via the snapshot serializer to prove the prune survives
// a real save→load cycle (which is what catalog-export does).
const roundTripped = deserializeCatalog(
JSON.parse(stringifyCatalogSnapshot(serializeCatalog(scoped))),
);

const plan = await createPlan(db.branch, roundTripped, { filter });
expect(plan).toBeNull();
}),
);

test(
"schema filter keeps schema even when its owner role is filtered out",
withDb(pgVersion, async (db) => {
// Reproduces a class of bug surfaced by Supabase images: the kept
// schema's CREATE change `requires` an owner role; if filterCatalog
// ran cascadeExclusions, the filter would drop the role change,
// cascade would propagate to the schema, and the snapshot would
// come out empty. The filter must keep the schema (and its objects)
// even when out-of-scope owners exist in the live catalog.
await db.branch.query(`
CREATE ROLE app_owner;
CREATE SCHEMA realtime AUTHORIZATION app_owner;
CREATE TABLE realtime.subscription (id serial PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const scoped = await filterCatalog(full, { "*/schema": "realtime" });

expect(Object.keys(scoped.schemas)).toContain("schema:realtime");
expect(Object.keys(scoped.tables)).toContain(
"table:realtime.subscription",
);
// The owner role itself is filtered out (no `role/schema` to match).
expect(Object.keys(scoped.roles)).not.toContain("role:app_owner");
}),
);

test(
"round-trip matches realtime usage: schema filter survives plan",
withDb(pgVersion, async (db) => {
// Mirrors the Realtime baseline workflow: snapshot a 'kitchen sink'
// database scoped to one schema, then drift-check tenant against it
// using the same filter at plan time.
await db.branch.query(`
CREATE SCHEMA realtime;
CREATE TABLE realtime.schema_migrations (
version bigint PRIMARY KEY,
inserted_at timestamp
);
CREATE TABLE realtime.subscription (
id bigserial PRIMARY KEY,
entity regclass NOT NULL,
filters jsonb DEFAULT '[]'::jsonb
);
CREATE SCHEMA auth;
CREATE TABLE auth.users (id uuid PRIMARY KEY);
CREATE TABLE auth.sessions (id uuid PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const filter = { "*/schema": "realtime" };
const scoped = await filterCatalog(full, filter);

expect(Object.keys(scoped.schemas)).toContain("schema:realtime");
expect(Object.keys(scoped.schemas)).not.toContain("schema:auth");
expect(Object.keys(scoped.tables)).toContain(
"table:realtime.schema_migrations",
);
expect(Object.keys(scoped.tables)).not.toContain("table:auth.users");

const snapshot = deserializeCatalog(
JSON.parse(stringifyCatalogSnapshot(serializeCatalog(scoped))),
);
const plan = await createPlan(db.branch, snapshot, { filter });
expect(plan).toBeNull();
}),
);
});
}

describe("catalog-export --filter (version-independent)", () => {
test("filterCatalog rejects cascade: true with an explanatory error", async () => {
await expect(
filterCatalog({} as Catalog, { "*/schema": "app", cascade: true }),
).rejects.toThrow(/cascade: true` is not supported by catalog-export/);
});
});
Loading