Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/catalog-export-filter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@supabase/pg-delta": minor
---

Add `--filter` option to the `catalog-export` CLI command to scope the exported catalog to matching schemas/objects.
27 changes: 26 additions & 1 deletion packages/pg-delta/src/cli/commands/catalog-export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

import { writeFile } from "node:fs/promises";
import { buildCommand, type CommandContext } from "@stricli/core";
import { filterCatalog } from "../../core/catalog.filter.ts";
import { extractCatalog } from "../../core/catalog.model.ts";
import {
serializeCatalog,
stringifyCatalogSnapshot,
} from "../../core/catalog.snapshot.ts";
import type { FilterDSL } from "../../core/integrations/filter/dsl.ts";
import { createManagedPool } from "../../core/postgres-config.ts";

export const catalogExportCommand = buildCommand({
Expand All @@ -30,6 +32,21 @@ export const catalogExportCommand = buildCommand({
parse: String,
optional: true,
},
filter: {
kind: "parsed",
brief:
Comment thread
leandrocp marked this conversation as resolved.
'Filter DSL as inline JSON to filter changes (e.g., \'{"schema":"public"}\').',
parse: (value: string): FilterDSL => {
try {
return JSON.parse(value) as FilterDSL;
} catch (error) {
throw new Error(
`Invalid filter JSON: ${error instanceof Error ? error.message : String(error)}`,
);
}
},
optional: true,
},
},
aliases: {
t: "target",
Expand All @@ -48,6 +65,10 @@ Use cases:
- Snapshot template1 for use as an empty-database baseline
- Snapshot a production database to generate revert migrations
- Snapshot any state for reproducible offline diffs

Pass --filter to scope the snapshot to a subset of the catalog (same
Filter DSL accepted by plan/sync). Useful when committing a baseline
snapshot to a repo and only one schema's drift is interesting.
`.trim(),
},
async func(
Expand All @@ -56,6 +77,7 @@ Use cases:
target: string;
output: string;
role?: string;
filter?: FilterDSL;
},
) {
const { pool, close } = await createManagedPool(flags.target, {
Expand All @@ -65,7 +87,10 @@ Use cases:

try {
const catalog = await extractCatalog(pool);
const snapshot = serializeCatalog(catalog);
const scoped = flags.filter
? await filterCatalog(catalog, flags.filter)
: catalog;
const snapshot = serializeCatalog(scoped);
const json = stringifyCatalogSnapshot(snapshot);
await writeFile(flags.output, json, "utf-8");
this.process.stdout.write(
Expand Down
84 changes: 84 additions & 0 deletions packages/pg-delta/src/core/catalog.filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Prune a catalog to the objects that match a Filter DSL expression.
*
* The Filter DSL is defined over Change objects, so the catalog is
* diffed against an empty baseline first to materialize one CREATE
* change per object. The filter then evaluates against the same shape
* it would at plan time, and the surviving stableIds drive the prune.
*
* Dependency cascade is not applied. A scoped snapshot is partial by
* design: out-of-scope owners, roles, and types must exist on the
* target DB at apply time. Cascading would expand the filter beyond
* what the caller asked for and, in practice, collapse schema-scoped
* exports whose kept objects reference cluster-scoped owners.
*/

import { diffCatalogs } from "./catalog.diff.ts";
import { Catalog, createEmptyCatalog } from "./catalog.model.ts";
import { compileFilterDSL, type FilterDSL } from "./integrations/filter/dsl.ts";

export async function filterCatalog(
catalog: Catalog,
filter: FilterDSL,
): Promise<Catalog> {
const empty = await createEmptyCatalog(catalog.version, catalog.currentUser);
const changes = diffCatalogs(empty, catalog);
const filterFn = compileFilterDSL(filter);
Comment thread
avallete marked this conversation as resolved.

const keep = new Set<string>();
for (const change of changes) {
if (!filterFn(change)) continue;
for (const id of change.creates ?? []) keep.add(id);
}

return pruneCatalog(catalog, keep);
}

function filterRecord<T>(
record: Record<string, T>,
keep: ReadonlySet<string>,
): Record<string, T> {
return Object.fromEntries(
Object.entries(record).filter(([id]) => keep.has(id)),
);
}

function pruneCatalog(catalog: Catalog, keep: ReadonlySet<string>): Catalog {
const tables = filterRecord(catalog.tables, keep);
const materializedViews = filterRecord(catalog.materializedViews, keep);

return new Catalog({
aggregates: filterRecord(catalog.aggregates, keep),
collations: filterRecord(catalog.collations, keep),
compositeTypes: filterRecord(catalog.compositeTypes, keep),
domains: filterRecord(catalog.domains, keep),
enums: filterRecord(catalog.enums, keep),
extensions: filterRecord(catalog.extensions, keep),
procedures: filterRecord(catalog.procedures, keep),
indexes: filterRecord(catalog.indexes, keep),
materializedViews,
subscriptions: filterRecord(catalog.subscriptions, keep),
publications: filterRecord(catalog.publications, keep),
rlsPolicies: filterRecord(catalog.rlsPolicies, keep),
roles: filterRecord(catalog.roles, keep),
schemas: filterRecord(catalog.schemas, keep),
sequences: filterRecord(catalog.sequences, keep),
tables,
triggers: filterRecord(catalog.triggers, keep),
eventTriggers: filterRecord(catalog.eventTriggers, keep),
rules: filterRecord(catalog.rules, keep),
ranges: filterRecord(catalog.ranges, keep),
views: filterRecord(catalog.views, keep),
foreignDataWrappers: filterRecord(catalog.foreignDataWrappers, keep),
servers: filterRecord(catalog.servers, keep),
userMappings: filterRecord(catalog.userMappings, keep),
foreignTables: filterRecord(catalog.foreignTables, keep),
depends: catalog.depends.filter(
(d) =>
keep.has(d.dependent_stable_id) && keep.has(d.referenced_stable_id),
),
indexableObjects: { ...tables, ...materializedViews },
version: catalog.version,
currentUser: catalog.currentUser,
});
}
6 changes: 4 additions & 2 deletions packages/pg-delta/src/core/catalog.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,10 @@ let _pg1516Baseline: Catalog | null = null;
let _pg17Baseline: Catalog | null = null;

async function loadBaselineJson(): Promise<Record<string, unknown>> {
const mod =
await import("./fixtures/empty-catalogs/postgres-15-16-baseline.json");
const mod = await import(
"./fixtures/empty-catalogs/postgres-15-16-baseline.json",
{ with: { type: "json" } }
);
return mod.default as Record<string, unknown>;
}

Expand Down
153 changes: 153 additions & 0 deletions packages/pg-delta/tests/integration/catalog-export-filter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { describe, expect, test } from "bun:test";
import { extractCatalog } from "../../src/core/catalog.model.ts";
import { filterCatalog } from "../../src/core/catalog.filter.ts";
import {
deserializeCatalog,
serializeCatalog,
stringifyCatalogSnapshot,
} from "../../src/core/catalog.snapshot.ts";
import { createPlan } from "../../src/core/plan/create.ts";
import { POSTGRES_VERSIONS } from "../constants.ts";
import { withDb } from "../utils.ts";

for (const pgVersion of POSTGRES_VERSIONS) {
describe(`catalog-export --filter (pg${pgVersion})`, () => {
test(
"filterCatalog keeps only objects matching the filter",
withDb(pgVersion, async (db) => {
await db.branch.query(`
CREATE SCHEMA app;
CREATE TABLE app.users (id serial PRIMARY KEY, name text NOT NULL);
CREATE TABLE app.posts (id serial PRIMARY KEY, title text NOT NULL);
CREATE SCHEMA other;
CREATE TABLE other.config (key text PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const scoped = await filterCatalog(full, { "*/schema": "app" });

expect(Object.keys(scoped.schemas).sort()).toEqual(["schema:app"]);
expect(Object.keys(scoped.tables).sort()).toEqual([
"table:app.posts",
"table:app.users",
]);
expect(Object.keys(scoped.tables)).not.toContain("table:other.config");
expect(Object.keys(scoped.schemas)).not.toContain("schema:other");
expect(Object.keys(scoped.schemas)).not.toContain("schema:public");
}),
);

test(
"filterCatalog drops pg_depend edges that touch pruned objects",
withDb(pgVersion, async (db) => {
await db.branch.query(`
CREATE SCHEMA app;
CREATE TABLE app.users (id serial PRIMARY KEY);
CREATE SCHEMA other;
CREATE TABLE other.t (id serial PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const scoped = await filterCatalog(full, { "*/schema": "app" });

for (const dep of scoped.depends) {
expect(dep.dependent_stable_id).not.toContain("other");
expect(dep.referenced_stable_id).not.toContain("other");
}
}),
);

test(
"round-trip: filtered snapshot diffs to zero against live source with same filter",
withDb(pgVersion, async (db) => {
await db.branch.query(`
CREATE SCHEMA app;
CREATE TABLE app.users (id serial PRIMARY KEY, name text NOT NULL);
CREATE SCHEMA other;
CREATE TABLE other.config (key text PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const filter = { "*/schema": "app" };
const scoped = await filterCatalog(full, filter);

// Reconstruct via the snapshot serializer to prove the prune survives
// a real save→load cycle (which is what catalog-export does).
const roundTripped = deserializeCatalog(
JSON.parse(stringifyCatalogSnapshot(serializeCatalog(scoped))),
);

const plan = await createPlan(db.branch, roundTripped, { filter });
expect(plan).toBeNull();
}),
);

test(
"schema filter keeps schema even when its owner role is filtered out",
withDb(pgVersion, async (db) => {
// Reproduces a class of bug surfaced by Supabase images: the kept
// schema's CREATE change `requires` an owner role; if filterCatalog
// ran cascadeExclusions, the filter would drop the role change,
// cascade would propagate to the schema, and the snapshot would
// come out empty. The filter must keep the schema (and its objects)
// even when out-of-scope owners exist in the live catalog.
await db.branch.query(`
CREATE ROLE app_owner;
CREATE SCHEMA realtime AUTHORIZATION app_owner;
CREATE TABLE realtime.subscription (id serial PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const scoped = await filterCatalog(full, { "*/schema": "realtime" });

expect(Object.keys(scoped.schemas)).toContain("schema:realtime");
expect(Object.keys(scoped.tables)).toContain(
"table:realtime.subscription",
);
// The owner role itself is filtered out (no `role/schema` to match).
expect(Object.keys(scoped.roles)).not.toContain("role:app_owner");
}),
);

test(
"round-trip matches realtime usage: schema filter survives plan",
withDb(pgVersion, async (db) => {
// Mirrors the Realtime baseline workflow: snapshot a 'kitchen sink'
// database scoped to one schema, then drift-check tenant against it
// using the same filter at plan time.
await db.branch.query(`
CREATE SCHEMA realtime;
CREATE TABLE realtime.schema_migrations (
version bigint PRIMARY KEY,
inserted_at timestamp
);
CREATE TABLE realtime.subscription (
id bigserial PRIMARY KEY,
entity regclass NOT NULL,
filters jsonb DEFAULT '[]'::jsonb
);
CREATE SCHEMA auth;
CREATE TABLE auth.users (id uuid PRIMARY KEY);
CREATE TABLE auth.sessions (id uuid PRIMARY KEY);
`);

const full = await extractCatalog(db.branch);
const filter = { "*/schema": "realtime" };
const scoped = await filterCatalog(full, filter);

expect(Object.keys(scoped.schemas)).toContain("schema:realtime");
expect(Object.keys(scoped.schemas)).not.toContain("schema:auth");
expect(Object.keys(scoped.tables)).toContain(
"table:realtime.schema_migrations",
);
expect(Object.keys(scoped.tables)).not.toContain("table:auth.users");

const snapshot = deserializeCatalog(
JSON.parse(stringifyCatalogSnapshot(serializeCatalog(scoped))),
);
const plan = await createPlan(db.branch, snapshot, { filter });
expect(plan).toBeNull();
}),
);
});
}
Loading