Skip to content

Commit bbfd731

Browse files
committed
Limit slot health check based on time, not iterations.
1 parent 8bca52f commit bbfd731

File tree

1 file changed

+21
-22
lines changed

1 file changed

+21
-22
lines changed

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ import * as lib_postgres from '@powersync/lib-service-postgres';
22
import {
33
container,
44
DatabaseConnectionError,
5+
logger as defaultLogger,
56
ErrorCode,
67
errors,
78
Logger,
8-
logger as defaultLogger,
9-
ReplicationAssertionError,
10-
ReplicationAbortedError
9+
ReplicationAbortedError,
10+
ReplicationAssertionError
1111
} from '@powersync/lib-services-framework';
1212
import {
1313
BucketStorageBatch,
@@ -33,10 +33,10 @@ import {
3333
toSyncRulesRow
3434
} from '@powersync/service-sync-rules';
3535

36+
import { ReplicationMetric } from '@powersync/service-types';
3637
import { PgManager } from './PgManager.js';
3738
import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js';
3839
import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js';
39-
import { ReplicationMetric } from '@powersync/service-types';
4040
import {
4141
ChunkedSnapshotQuery,
4242
IdSnapshotQuery,
@@ -335,7 +335,9 @@ export class WalStream {
335335
wal_status: string;
336336
invalidation_reason: string | null;
337337
}): Promise<{ needsNewSlot: boolean }> {
338-
let last_error = null;
338+
// Start with a placeholder error, should be replaced if there is an actual issue.
339+
let last_error = new ReplicationAssertionError(`Slot health check failed to execute`);
340+
339341
const slotName = this.slot_name;
340342

341343
const lost = slot.wal_status == 'lost';
@@ -346,20 +348,11 @@ export class WalStream {
346348
};
347349
}
348350

349-
// Check that replication slot exists
350-
for (let i = 120; i >= 0; i--) {
351+
// Check that replication slot exists, trying for up to 2 minutes.
352+
const startAt = performance.now();
353+
while (performance.now() - startAt < 120_000) {
351354
this.touch();
352355

353-
if (i == 0) {
354-
container.reporter.captureException(last_error, {
355-
level: errors.ErrorSeverity.ERROR,
356-
metadata: {
357-
replication_slot: slotName
358-
}
359-
});
360-
361-
throw last_error;
362-
}
363356
try {
364357
// We peek a large number of changes here, to make it more likely to pick up replication slot errors.
365358
// For example, "publication does not exist" only occurs here if the peek actually includes changes related
@@ -391,19 +384,18 @@ export class WalStream {
391384
throw e;
392385
}
393386

394-
// Could also be `publication "powersync" does not exist`, although this error may show up much later
395-
// in some cases.
396-
397387
if (
398388
/incorrect prev-link/.test(e.message) ||
399389
/replication slot.*does not exist/.test(e.message) ||
400390
/publication.*does not exist/.test(e.message) ||
401391
/can no longer access replication slot/.test(e.message)
402392
) {
393+
// Fatal error. In most cases, the `wal_status == 'lost'` check should pick this up, but this
394+
// works as a fallback.
395+
403396
container.reporter.captureException(e, {
404397
level: errors.ErrorSeverity.WARNING,
405398
metadata: {
406-
try_index: i,
407399
replication_slot: slotName
408400
}
409401
});
@@ -422,7 +414,14 @@ export class WalStream {
422414
}
423415
}
424416

425-
throw new ReplicationAssertionError('Unreachable');
417+
container.reporter.captureException(last_error, {
418+
level: errors.ErrorSeverity.ERROR,
419+
metadata: {
420+
replication_slot: slotName
421+
}
422+
});
423+
424+
throw last_error;
426425
}
427426

428427
async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise<number> {

0 commit comments

Comments
 (0)