Skip to content

Commit 0e9aa94

Browse files
authored
[Postgres] Improve handling of "lost" replication slots (#387)
* Add test to reproduce a "lost" replication slot. * Detect "can no longer access replication slot" error. * Check replication slot status upfront. * Limit slot health check based on time, not iterations. * Fix for invalidation_reason not on pg < 16. * Skip test on older postgres versions. * Further tweaks to postgres compatibility fallbacks. * Remove some version checks. * Add changeset. * Resolve todo.
1 parent 48a8678 commit 0e9aa94

File tree

4 files changed

+168
-35
lines changed

4 files changed

+168
-35
lines changed

.changeset/cold-dodos-cheer.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@powersync/service-module-postgres': patch
3+
'@powersync/lib-service-postgres': patch
4+
---
5+
6+
Improve replication slot health detection, automatically re-creating "lost" slots.

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ import * as lib_postgres from '@powersync/lib-service-postgres';
22
import {
33
container,
44
DatabaseConnectionError,
5+
logger as defaultLogger,
56
ErrorCode,
67
errors,
78
Logger,
8-
logger as defaultLogger,
9-
ReplicationAssertionError,
10-
ReplicationAbortedError
9+
ReplicationAbortedError,
10+
ReplicationAssertionError
1111
} from '@powersync/lib-services-framework';
1212
import {
1313
BucketStorageBatch,
@@ -33,10 +33,10 @@ import {
3333
toSyncRulesRow
3434
} from '@powersync/service-sync-rules';
3535

36+
import { ReplicationMetric } from '@powersync/service-types';
3637
import { PgManager } from './PgManager.js';
3738
import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js';
3839
import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js';
39-
import { ReplicationMetric } from '@powersync/service-types';
4040
import {
4141
ChunkedSnapshotQuery,
4242
IdSnapshotQuery,
@@ -295,15 +295,18 @@ export class WalStream {
295295
}
296296

297297
// Check if replication slot exists
298-
const rs = await this.connections.pool.query({
299-
statement: 'SELECT 1 FROM pg_replication_slots WHERE slot_name = $1',
300-
params: [{ type: 'varchar', value: slotName }]
301-
});
302-
const slotExists = rs.rows.length > 0;
298+
const slot = pgwire.pgwireRows(
299+
await this.connections.pool.query({
300+
// We specifically want wal_status and invalidation_reason, but it's not available on older versions,
301+
// so we just query *.
302+
statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1',
303+
params: [{ type: 'varchar', value: slotName }]
304+
})
305+
)[0];
303306

304-
if (slotExists) {
307+
if (slot != null) {
305308
// This checks that the slot is still valid
306-
const r = await this.checkReplicationSlot();
309+
const r = await this.checkReplicationSlot(slot as any);
307310
if (snapshotDone && r.needsNewSlot) {
308311
// We keep the current snapshot, and create a new replication slot
309312
throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`);
@@ -330,24 +333,32 @@ export class WalStream {
330333
/**
331334
* If a replication slot exists, check that it is healthy.
332335
*/
333-
private async checkReplicationSlot(): Promise<{ needsNewSlot: boolean }> {
334-
let last_error = null;
336+
private async checkReplicationSlot(slot: {
337+
// postgres 13+
338+
wal_status?: string;
339+
// postgres 17+
340+
invalidation_reason?: string | null;
341+
}): Promise<{ needsNewSlot: boolean }> {
342+
// Start with a placeholder error, should be replaced if there is an actual issue.
343+
let last_error = new ReplicationAssertionError(`Slot health check failed to execute`);
344+
335345
const slotName = this.slot_name;
336346

337-
// Check that replication slot exists
338-
for (let i = 120; i >= 0; i--) {
339-
this.touch();
347+
const lost = slot.wal_status == 'lost';
348+
if (lost) {
349+
this.logger.warn(
350+
`Replication slot ${slotName} is invalidated. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`
351+
);
352+
return {
353+
needsNewSlot: true
354+
};
355+
}
340356

341-
if (i == 0) {
342-
container.reporter.captureException(last_error, {
343-
level: errors.ErrorSeverity.ERROR,
344-
metadata: {
345-
replication_slot: slotName
346-
}
347-
});
357+
// Check that replication slot exists, trying for up to 2 minutes.
358+
const startAt = performance.now();
359+
while (performance.now() - startAt < 120_000) {
360+
this.touch();
348361

349-
throw last_error;
350-
}
351362
try {
352363
// We peek a large number of changes here, to make it more likely to pick up replication slot errors.
353364
// For example, "publication does not exist" only occurs here if the peek actually includes changes related
@@ -379,18 +390,21 @@ export class WalStream {
379390
throw e;
380391
}
381392

382-
// Could also be `publication "powersync" does not exist`, although this error may show up much later
383-
// in some cases.
384-
385393
if (
386394
/incorrect prev-link/.test(e.message) ||
387395
/replication slot.*does not exist/.test(e.message) ||
388-
/publication.*does not exist/.test(e.message)
396+
/publication.*does not exist/.test(e.message) ||
397+
// Postgres 18 - exceeded max_slot_wal_keep_size
398+
/can no longer access replication slot/.test(e.message) ||
399+
// Postgres 17 - exceeded max_slot_wal_keep_size
400+
/can no longer get changes from replication slot/.test(e.message)
389401
) {
402+
// Fatal error. In most cases since Postgres 13+, the `wal_status == 'lost'` check should pick this up, but this
403+
// works as a fallback.
404+
390405
container.reporter.captureException(e, {
391406
level: errors.ErrorSeverity.WARNING,
392407
metadata: {
393-
try_index: i,
394408
replication_slot: slotName
395409
}
396410
});
@@ -409,7 +423,14 @@ export class WalStream {
409423
}
410424
}
411425

412-
throw new ReplicationAssertionError('Unreachable');
426+
container.reporter.captureException(last_error, {
427+
level: errors.ErrorSeverity.ERROR,
428+
metadata: {
429+
replication_slot: slotName
430+
}
431+
});
432+
433+
throw last_error;
413434
}
414435

415436
async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise<number> {

modules/module-postgres/test/src/wal_stream.test.ts

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import { METRICS_HELPER, putOp, removeOp } from '@powersync/service-core-tests';
44
import { pgwireRows } from '@powersync/service-jpgwire';
55
import { ReplicationMetric } from '@powersync/service-types';
66
import * as crypto from 'crypto';
7-
import { describe, expect, test } from 'vitest';
7+
import { afterAll, beforeAll, describe, expect, test } from 'vitest';
88
import { describeWithStorage } from './util.js';
9-
import { WalStreamTestContext } from './wal_stream_utils.js';
9+
import { WalStreamTestContext, withMaxWalSize } from './wal_stream_utils.js';
10+
import { JSONBig } from '@powersync/service-jsonbig';
1011

1112
const BASIC_SYNC_RULES = `
1213
bucket_definitions:
@@ -321,8 +322,8 @@ bucket_definitions:
321322

322323
if (serverVersion!.compareMain('18.0.0') >= 0) {
323324
await context.replicateSnapshot();
324-
// No error expected in Postres 18
325-
// TODO: introduce new test scenario for Postgres 18 that _does_ invalidate the replication slot.
325+
// No error expected in Postres 18. Replication keeps on working depite the
326+
// publication being re-created.
326327
} else {
327328
// Postgres < 18 invalidates the replication slot when the publication is re-created.
328329
// The error is handled on a higher level, which triggers
@@ -386,6 +387,89 @@ bucket_definitions:
386387
}
387388
});
388389

390+
test('replication slot lost', async () => {
391+
await using baseContext = await WalStreamTestContext.open(factory, { doNotClear: true });
392+
393+
const serverVersion = await baseContext.connectionManager.getServerVersion();
394+
if (serverVersion!.compareMain('13.0.0') < 0) {
395+
console.warn(`max_slot_wal_keep_size not supported on postgres ${serverVersion} - skipping test.`);
396+
return;
397+
}
398+
399+
// Configure max_slot_wal_keep_size for the test, reverting afterwards.
400+
await using s = await withMaxWalSize(baseContext.pool, '100MB');
401+
402+
{
403+
await using context = await WalStreamTestContext.open(factory);
404+
const { pool } = context;
405+
await context.updateSyncRules(`
406+
bucket_definitions:
407+
global:
408+
data:
409+
- SELECT id, description FROM "test_data"`);
410+
411+
await pool.query(
412+
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)`
413+
);
414+
await pool.query(
415+
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
416+
);
417+
await context.replicateSnapshot();
418+
await context.startStreaming();
419+
420+
const data = await context.getBucketData('global[]');
421+
422+
expect(data).toMatchObject([
423+
putOp('test_data', {
424+
id: '8133cd37-903b-4937-a022-7c8294015a3a',
425+
description: 'test1'
426+
})
427+
]);
428+
429+
expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true });
430+
}
431+
432+
{
433+
await using context = await WalStreamTestContext.open(factory, { doNotClear: true });
434+
const { pool } = context;
435+
const storage = await context.factory.getActiveStorage();
436+
const slotName = storage?.slot_name!;
437+
438+
// Here, we write data to the WAL until the replication slot is lost.
439+
const TRIES = 100;
440+
for (let i = 0; i < TRIES; i++) {
441+
// Write something to the WAL.
442+
await pool.query(`select pg_logical_emit_message(true, 'test', 'x')`);
443+
// Switch WAL file. With default settings, each WAL file is around 16MB.
444+
await pool.query(`select pg_switch_wal()`);
445+
// Checkpoint command forces the old WAL files to be archived/removed.
446+
await pool.query(`checkpoint`);
447+
// Now check if the slot is still active.
448+
const slot = pgwireRows(
449+
await context.pool.query({
450+
statement: `select slot_name, wal_status, safe_wal_size, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag from pg_replication_slots where slot_name = $1`,
451+
params: [{ type: 'varchar', value: slotName }]
452+
})
453+
)[0];
454+
if (slot.wal_status == 'lost') {
455+
break;
456+
} else if (i == TRIES - 1) {
457+
throw new Error(
458+
`Could not generate test conditions to expire replication slot. Current status: ${JSONBig.stringify(slot)}`
459+
);
460+
}
461+
}
462+
463+
await context.loadActiveSyncRules();
464+
465+
// The error is handled on a higher level, which triggers
466+
// creating a new replication slot.
467+
await expect(async () => {
468+
await context.replicateSnapshot();
469+
}).rejects.toThrowError(MissingReplicationSlotError);
470+
}
471+
});
472+
389473
test('old date format', async () => {
390474
await using context = await WalStreamTestContext.open(factory);
391475
await context.updateSyncRules(BASIC_SYNC_RULES);

modules/module-postgres/test/src/wal_stream_utils.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,25 @@ export class WalStreamTestContext implements AsyncDisposable {
203203
return batches[0]?.chunkData.data ?? [];
204204
}
205205
}
206+
207+
export async function withMaxWalSize(db: pgwire.PgClient, size: string) {
208+
try {
209+
const r1 = await db.query(`SHOW max_slot_wal_keep_size`);
210+
211+
await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '100MB'`);
212+
await db.query(`SELECT pg_reload_conf()`);
213+
214+
const oldSize = r1.results[0].rows[0][0];
215+
216+
return {
217+
[Symbol.asyncDispose]: async () => {
218+
await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '${oldSize}'`);
219+
await db.query(`SELECT pg_reload_conf()`);
220+
}
221+
};
222+
} catch (e) {
223+
const err = new Error(`Failed to configure max_slot_wal_keep_size for test`);
224+
err.cause = e;
225+
throw err;
226+
}
227+
}

0 commit comments

Comments
 (0)