diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 21d83d27..5394abb5 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,4 +1,4 @@ -import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { mongo } from '@powersync/lib-service-mongodb'; import { container, DatabaseConnectionError, @@ -31,7 +31,7 @@ import { STANDALONE_CHECKPOINT_ID } from './MongoRelation.js'; import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js'; -import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js'; +import { CHECKPOINTS_COLLECTION, rawChangeStreamBatches, timestampToDate } from './replication-utils.js'; export interface ChangeStreamOptions { connections: MongoManager; @@ -253,47 +253,47 @@ export class ChangeStream { const LSN_TIMEOUT_SECONDS = 60; const LSN_CREATE_INTERVAL_SECONDS = 1; - await using streamManager = this.openChangeStream({ lsn: null, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; const startTime = performance.now(); let lastCheckpointCreated = -10_000; let eventsSeen = 0; + let batchesSeen = 0; + + const filters = this.getSourceNamespaceFilters(); + const iter = this.rawChangeStreamBatches({ lsn: null, maxAwaitTimeMs: 0, signal: this.abort_signal, filters }); + for await (let { eventBatch } of iter) { + if (performance.now() - startTime >= LSN_TIMEOUT_SECONDS * 1000) { + break; + } - while (performance.now() - startTime < LSN_TIMEOUT_SECONDS * 1000) { if (performance.now() - lastCheckpointCreated >= LSN_CREATE_INTERVAL_SECONDS * 1000) { await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); lastCheckpointCreated = performance.now(); } - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - const changeDocument = await stream.tryNext().catch((e) => { - throw mapChangeStreamError(e); - }); - if (changeDocument == null) { - continue; - } - - const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + for (let changeDocument of eventBatch) { + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; - if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { - const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; - if (!this.checkpointStreamId.equals(checkpointId)) { - continue; + if (ns?.coll == CHECKPOINTS_COLLECTION && 'documentKey' in changeDocument) { + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + if (!this.checkpointStreamId.equals(checkpointId)) { + continue; + } + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + return lsn; } - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - return lsn; - } - eventsSeen += 1; + eventsSeen += 1; + } + batchesSeen += 1; } // Could happen if there is a very large replication lag? throw new ServiceError( ErrorCode.PSYNC_S1301, - `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}` + `Timeout after while waiting for checkpoint document for ${LSN_TIMEOUT_SECONDS}s. Streamed events = ${eventsSeen}, batches = ${batchesSeen}` ); } @@ -301,15 +301,17 @@ export class ChangeStream { * Given a snapshot LSN, validate that we can read from it, by opening a change stream. */ private async validateSnapshotLsn(lsn: string) { - await using streamManager = this.openChangeStream({ lsn: lsn, maxAwaitTimeMs: 0 }); - const { stream } = streamManager; - try { - // tryNext() doesn't block, while next() / hasNext() does block until there is data on the stream - await stream.tryNext(); - } catch (e) { - // Note: A timeout here is not handled as a ChangeStreamInvalidatedError, even though - // we possibly cannot recover from it. - throw mapChangeStreamError(e); + const filters = this.getSourceNamespaceFilters(); + const stream = this.rawChangeStreamBatches({ + lsn: lsn, + // maxAwaitTimeMs should never actually be used here + maxAwaitTimeMs: 0, + filters + }); + for await (let batch of stream) { + // We got a response from the aggregate command, so consider the LSN valid. + // Close the stream immediately. + break; } } @@ -708,68 +710,23 @@ export class ChangeStream { } } - private openChangeStream(options: { lsn: string | null; maxAwaitTimeMs?: number }) { - const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; - const startAfter = lastLsn?.timestamp; - const resumeAfter = lastLsn?.resumeToken; - - const filters = this.getSourceNamespaceFilters(); - - const pipeline: mongo.Document[] = [ - { - $match: filters.$match - }, - { $changeStreamSplitLargeEvent: {} } - ]; - - let fullDocument: 'required' | 'updateLookup'; - - if (this.usePostImages) { - // 'read_only' or 'auto_configure' - // Configuration happens during snapshot, or when we see new - // collections. - fullDocument = 'required'; - } else { - fullDocument = 'updateLookup'; - } - - const streamOptions: mongo.ChangeStreamOptions = { - showExpandedEvents: true, - maxAwaitTimeMS: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, - fullDocument: fullDocument - }; - - /** - * Only one of these options can be supplied at a time. - */ - if (resumeAfter) { - streamOptions.resumeAfter = resumeAfter; - } else { - // Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the - // case if we have an old one. - streamOptions.startAtOperationTime = startAfter; - } - - let stream: mongo.ChangeStream; - if (filters.multipleDatabases) { - // Requires readAnyDatabase@admin on Atlas - stream = this.client.watch(pipeline, streamOptions); - } else { - // Same general result, but requires less permissions than the above - stream = this.defaultDb.watch(pipeline, streamOptions); - } - - this.abort_signal.addEventListener('abort', () => { - stream.close(); + private rawChangeStreamBatches(options: { + lsn: string | null; + maxAwaitTimeMs?: number; + batchSize?: number; + filters: { $match: any; multipleDatabases: boolean }; + signal?: AbortSignal; + }): AsyncIterableIterator<{ eventBatch: mongo.ChangeStreamDocument[]; resumeToken: unknown }> { + return rawChangeStreamBatches({ + client: this.client, + filters: options.filters, + db: options.filters.multipleDatabases ? this.client.db('admin') : this.defaultDb, + batchSize: options.batchSize ?? this.snapshotChunkLength, + maxAwaitTimeMs: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS, + lsn: options.lsn, + usePostImages: this.usePostImages, + signal: options.signal }); - - return { - stream, - filters, - [Symbol.asyncDispose]: async () => { - return stream.close(); - } - }; } async streamChangesInternal() { @@ -795,12 +752,7 @@ export class ChangeStream { this.logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`); - await using streamManager = this.openChangeStream({ lsn: resumeFromLsn }); - const { stream, filters } = streamManager; - if (this.abort_signal.aborted) { - await stream.close(); - return; - } + const filters = this.getSourceNamespaceFilters(); // Always start with a checkpoint. // This helps us to clear errors when restarting, even if there is @@ -818,38 +770,24 @@ export class ChangeStream { let lastEmptyResume = performance.now(); - while (true) { - if (this.abort_signal.aborted) { - break; - } - - const originalChangeDocument = await stream.tryNext().catch((e) => { - throw mapChangeStreamError(e); - }); - // The stream was closed, we will only ever receive `null` from it - if (!originalChangeDocument && stream.closed) { - break; - } - + // This is closed when the for loop below returns/breaks/throws + const batchStream = this.rawChangeStreamBatches({ + lsn: resumeFromLsn, + filters, + signal: this.abort_signal + }); + for await (let { eventBatch, resumeToken } of batchStream) { if (this.abort_signal.aborted) { break; } - if (originalChangeDocument == null) { - // We get a new null document after `maxAwaitTimeMS` if there were no other events. - // In this case, stream.resumeToken is the resume token associated with the last response. - // stream.resumeToken is not updated if stream.tryNext() returns data, while stream.next() - // does update it. - // From observed behavior, the actual resumeToken changes around once every 10 seconds. - // If we don't update it on empty events, we do keep consistency, but resuming the stream - // with old tokens may cause connection timeouts. - // We throttle this further by only persisting a keepalive once a minute. - // We add an additional check for waitForCheckpointLsn == null, to make sure we're not - // doing a keepalive in the middle of a transaction. + this.touch(); + if (eventBatch.length == 0) { + // No changes in this batch, but we still want to keep the connection alive. + // We do this by persisting a keepalive checkpoint. if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) { - const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(stream.resumeToken); + const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(resumeToken); await batch.keepalive(lsn); - this.touch(); lastEmptyResume = performance.now(); // Log the token update. This helps as a general "replication is still active" message in the logs. // This token would typically be around 10s behind. @@ -858,200 +796,207 @@ export class ChangeStream { ); this.isStartingReplication = false; } - continue; - } - - this.touch(); - - if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { - continue; - } - - let changeDocument = originalChangeDocument; - if (originalChangeDocument?.splitEvent != null) { - // Handle split events from $changeStreamSplitLargeEvent. - // This is only relevant for very large update operations. - const splitEvent = originalChangeDocument?.splitEvent; - - if (splitDocument == null) { - splitDocument = originalChangeDocument; - } else { - splitDocument = Object.assign(splitDocument, originalChangeDocument); - } - if (splitEvent.fragment == splitEvent.of) { - // Got all fragments - changeDocument = splitDocument; - splitDocument = null; - } else { - // Wait for more fragments + // If we have no changes, we can just persist the keepalive. + // This is throttled to once per minute. + if (performance.now() - lastEmptyResume < 60_000) { continue; } - } else if (splitDocument != null) { - // We were waiting for fragments, but got a different event - throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } - if ( - !filters.multipleDatabases && - 'ns' in changeDocument && - changeDocument.ns.db != this.defaultDb.databaseName && - changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) - ) { - // When all of the following conditions are met: - // 1. We're replicating from an Atlas Flex instance. - // 2. There were changestream events recorded while the PowerSync service is paused. - // 3. We're only replicating from a single database. - // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, - // instead of the expected {db: 'ps'}. - // We correct this. - changeDocument.ns.db = this.defaultDb.databaseName; - - if (!flexDbNameWorkaroundLogged) { - flexDbNameWorkaroundLogged = true; - this.logger.warn( - `Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.` - ); + const batchStart = Date.now(); + for (let originalChangeDocument of eventBatch) { + if (this.abort_signal.aborted) { + break; } - } - const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; - - if (ns?.coll == CHECKPOINTS_COLLECTION) { - /** - * Dropping the database does not provide an `invalidate` event. - * We typically would receive `drop` events for the collection which we - * would process below. - * - * However we don't commit the LSN after collections are dropped. - * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. - * The stream also closes after the drop events. - * This causes an infinite loop of processing the collection drop events. - * - * This check here invalidates the change stream if our `_checkpoints` collection - * is dropped. This allows for detecting when the DB is dropped. - */ - if (changeDocument.operationType == 'drop') { - throw new ChangeStreamInvalidatedError( - 'Internal collections have been dropped', - new Error('_checkpoints collection was dropped') - ); + let changeDocument = originalChangeDocument; + if (originalChangeDocument?.splitEvent != null) { + // Handle split events from $changeStreamSplitLargeEvent. + // This is only relevant for very large update operations. + const splitEvent = originalChangeDocument?.splitEvent; + + if (splitDocument == null) { + splitDocument = originalChangeDocument; + } else { + splitDocument = Object.assign(splitDocument, originalChangeDocument); + } + + if (splitEvent.fragment == splitEvent.of) { + // Got all fragments + changeDocument = splitDocument; + splitDocument = null; + } else { + // Wait for more fragments + continue; + } + } else if (splitDocument != null) { + // We were waiting for fragments, but got a different event + throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } if ( - !( - changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' - ) + !filters.multipleDatabases && + 'ns' in changeDocument && + changeDocument.ns.db != this.defaultDb.databaseName && + changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`) ) { - continue; + // When all of the following conditions are met: + // 1. We're replicating from an Atlas Flex instance. + // 2. There were changestream events recorded while the PowerSync service is paused. + // 3. We're only replicating from a single database. + // Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'}, + // instead of the expected {db: 'ps'}. + // We correct this. + changeDocument.ns.db = this.defaultDb.databaseName; + + if (!flexDbNameWorkaroundLogged) { + flexDbNameWorkaroundLogged = true; + this.logger.warn( + `Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.` + ); + } } - // We handle two types of checkpoint events: - // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these - // immediately, regardless of where they were created. - // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate - // limiting of commits, so we specifically want to exclude checkpoints from other streams. - // - // It may be useful to also throttle commits due to standalone checkpoints in the future. - // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. - - const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; - if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) { - continue; - } - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { - // Checkpoint out of order - should never happen with MongoDB. - // If it does happen, we throw an error to stop the replication - restarting should recover. - // Since we use batch.lastCheckpointLsn for the next resumeAfter, this should not result in an infinite loop. - // This is a workaround for the issue below, but we can keep this as a safety-check even if the issue is fixed. - // Driver issue report: https://jira.mongodb.org/browse/NODE-7042 - throw new ReplicationAssertionError( - `Change resumeToken ${(changeDocument._id as any)._data} (${timestampToDate(changeDocument.clusterTime!).toISOString()}) is less than last checkpoint LSN ${batch.lastCheckpointLsn}. Restarting replication.` - ); - } + const ns = 'ns' in changeDocument && 'coll' in changeDocument.ns ? changeDocument.ns : undefined; + + if (ns?.coll == CHECKPOINTS_COLLECTION) { + /** + * Dropping the database does not provide an `invalidate` event. + * We typically would receive `drop` events for the collection which we + * would process below. + * + * However we don't commit the LSN after collections are dropped. + * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. + * The stream also closes after the drop events. + * This causes an infinite loop of processing the collection drop events. + * + * This check here invalidates the change stream if our `_checkpoints` collection + * is dropped. This allows for detecting when the DB is dropped. + */ + if (changeDocument.operationType == 'drop') { + throw new ChangeStreamInvalidatedError( + 'Internal collections have been dropped', + new Error('_checkpoints collection was dropped') + ); + } - if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { - waitForCheckpointLsn = null; - } - const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange }); + if ( + !( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' + ) + ) { + continue; + } - if (didCommit) { - this.oldestUncommittedChange = null; - this.isStartingReplication = false; - changesSinceLastCheckpoint = 0; - } - } else if ( - changeDocument.operationType == 'insert' || - changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace' || - changeDocument.operationType == 'delete' - ) { - if (waitForCheckpointLsn == null) { - waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); - } - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel, { - // In most cases, we should not need to snapshot this. But if this is the first time we see the collection - // for whatever reason, then we do need to snapshot it. - // This may result in some duplicate operations when a collection is created for the first time after - // sync rules was deployed. - snapshot: true - }); - if (table.syncAny) { - if (this.oldestUncommittedChange == null && changeDocument.clusterTime != null) { - this.oldestUncommittedChange = timestampToDate(changeDocument.clusterTime); + // We handle two types of checkpoint events: + // 1. "Standalone" checkpoints, typically write checkpoints. We want to process these + // immediately, regardless of where they were created. + // 2. "Batch" checkpoints for the current stream. This is used as a form of dynamic rate + // limiting of commits, so we specifically want to exclude checkpoints from other streams. + // + // It may be useful to also throttle commits due to standalone checkpoints in the future. + // However, these typically have a much lower rate than batch checkpoints, so we don't do that for now. + + const checkpointId = changeDocument.documentKey._id as string | mongo.ObjectId; + if (!(checkpointId == STANDALONE_CHECKPOINT_ID || this.checkpointStreamId.equals(checkpointId))) { + continue; + } + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + if (batch.lastCheckpointLsn != null && lsn < batch.lastCheckpointLsn) { + // Checkpoint out of order - should never happen with MongoDB. + // If it does happen, we throw an error to stop the replication - restarting should recover. + // Since we use batch.lastCheckpointLsn for the next resumeAfter, this should not result in an infinite loop. + // This is a workaround for the issue below, but we can keep this as a safety-check even if the issue is fixed. + // Driver issue report: https://jira.mongodb.org/browse/NODE-7042 + throw new ReplicationAssertionError( + `Change resumeToken ${(changeDocument._id as any)._data} (${timestampToDate(changeDocument.clusterTime!).toISOString()}) is less than last checkpoint LSN ${batch.lastCheckpointLsn}. Restarting replication.` + ); } - const flushResult = await this.writeChange(batch, table, changeDocument); - changesSinceLastCheckpoint += 1; - if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { - // When we are catching up replication after an initial snapshot, there may be a very long delay - // before we do a commit(). In that case, we need to periodically persist the resume LSN, so - // we don't restart from scratch if we restart replication. - // The same could apply if we need to catch up on replication after some downtime. - const { comparable: lsn } = new MongoLSN({ - timestamp: changeDocument.clusterTime!, - resume_token: changeDocument._id - }); - this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); - await batch.setResumeLsn(lsn); + + if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { + waitForCheckpointLsn = null; + } + const didCommit = await batch.commit(lsn, { oldestUncommittedChange: this.oldestUncommittedChange }); + + if (didCommit) { + this.oldestUncommittedChange = null; + this.isStartingReplication = false; changesSinceLastCheckpoint = 0; } + } else if ( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' || + changeDocument.operationType == 'delete' + ) { + if (waitForCheckpointLsn == null) { + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb, this.checkpointStreamId); + } + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel, { + // In most cases, we should not need to snapshot this. But if this is the first time we see the collection + // for whatever reason, then we do need to snapshot it. + // This may result in some duplicate operations when a collection is created for the first time after + // sync rules was deployed. + snapshot: true + }); + if (table.syncAny) { + if (this.oldestUncommittedChange == null && changeDocument.clusterTime != null) { + this.oldestUncommittedChange = timestampToDate(changeDocument.clusterTime); + } + const flushResult = await this.writeChange(batch, table, changeDocument); + changesSinceLastCheckpoint += 1; + if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { + // When we are catching up replication after an initial snapshot, there may be a very long delay + // before we do a commit(). In that case, we need to periodically persist the resume LSN, so + // we don't restart from scratch if we restart replication. + // The same could apply if we need to catch up on replication after some downtime. + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); + await batch.setResumeLsn(lsn); + changesSinceLastCheckpoint = 0; + } + } + } else if (changeDocument.operationType == 'drop') { + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); + if (table.syncAny) { + await batch.drop([table]); + this.relationCache.delete(table); + } + } else if (changeDocument.operationType == 'rename') { + const relFrom = getMongoRelation(changeDocument.ns); + const relTo = getMongoRelation(changeDocument.to); + const tableFrom = await this.getRelation(batch, relFrom, { + // We're "dropping" this collection, so never snapshot it. + snapshot: false + }); + if (tableFrom.syncAny) { + await batch.drop([tableFrom]); + this.relationCache.delete(relFrom); + } + // Here we do need to snapshot the new table + const collection = await this.getCollectionInfo(relTo.schema, relTo.name); + await this.handleRelation(batch, relTo, { + // This is a new (renamed) collection, so always snapshot it. + snapshot: true, + collectionInfo: collection + }); } - } else if (changeDocument.operationType == 'drop') { - const rel = getMongoRelation(changeDocument.ns); - const table = await this.getRelation(batch, rel, { - // We're "dropping" this collection, so never snapshot it. - snapshot: false - }); - if (table.syncAny) { - await batch.drop([table]); - this.relationCache.delete(table); - } - } else if (changeDocument.operationType == 'rename') { - const relFrom = getMongoRelation(changeDocument.ns); - const relTo = getMongoRelation(changeDocument.to); - const tableFrom = await this.getRelation(batch, relFrom, { - // We're "dropping" this collection, so never snapshot it. - snapshot: false - }); - if (tableFrom.syncAny) { - await batch.drop([tableFrom]); - this.relationCache.delete(relFrom); - } - // Here we do need to snapshot the new table - const collection = await this.getCollectionInfo(relTo.schema, relTo.name); - await this.handleRelation(batch, relTo, { - // This is a new (renamed) collection, so always snapshot it. - snapshot: true, - collectionInfo: collection - }); } + this.logger.info(`Processed batch of ${eventBatch.length} changes in ${Date.now() - batchStart}ms`); } } ); @@ -1082,21 +1027,3 @@ export class ChangeStream { } } } - -function mapChangeStreamError(e: any) { - if (isMongoNetworkTimeoutError(e)) { - // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". - // We wrap the error to make it more useful. - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); - } else if ( - isMongoServerError(e) && - e.codeName == 'NoMatchingDocument' && - e.errmsg?.includes('post-image was not found') - ) { - throw new ChangeStreamInvalidatedError(e.errmsg, e); - } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new ChangeStreamInvalidatedError(e.message, e); - } else { - throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); - } -} diff --git a/modules/module-mongodb/src/replication/replication-utils.ts b/modules/module-mongodb/src/replication/replication-utils.ts index f3fbb4f4..a3989537 100644 --- a/modules/module-mongodb/src/replication/replication-utils.ts +++ b/modules/module-mongodb/src/replication/replication-utils.ts @@ -1,7 +1,11 @@ -import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; +import { DatabaseConnectionError, ErrorCode, ServiceError } from '@powersync/lib-services-framework'; import { MongoManager } from './MongoManager.js'; import { PostImagesOption } from '../types/types.js'; import * as bson from 'bson'; +import { mongo } from '@powersync/lib-service-mongodb'; +import { isMongoNetworkTimeoutError, isMongoServerError } from '@powersync/lib-service-mongodb'; +import { ChangeStreamInvalidatedError } from './ChangeStream.js'; +import { MongoLSN } from '../common/MongoLSN.js'; export const CHECKPOINTS_COLLECTION = '_powersync_checkpoints'; @@ -91,3 +95,144 @@ export async function checkSourceConfiguration(connectionManager: MongoManager): export function timestampToDate(timestamp: bson.Timestamp) { return new Date(timestamp.getHighBitsUnsigned() * 1000); } + +export function mapChangeStreamError(e: any) { + if (isMongoNetworkTimeoutError(e)) { + // This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out". + // We wrap the error to make it more useful. + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e); + } else if ( + isMongoServerError(e) && + e.codeName == 'NoMatchingDocument' && + e.errmsg?.includes('post-image was not found') + ) { + throw new ChangeStreamInvalidatedError(e.errmsg, e); + } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message, e); + } else { + throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e); + } +} + +export async function* rawChangeStreamBatches(options: { + client: mongo.MongoClient; + db: mongo.Db; + usePostImages: boolean; + lsn: string | null; + maxAwaitTimeMs?: number; + batchSize?: number; + filters: { $match: any; multipleDatabases: boolean }; + signal?: AbortSignal; +}): AsyncIterableIterator<{ eventBatch: mongo.ChangeStreamDocument[]; resumeToken: unknown }> { + const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null; + const startAfter = lastLsn?.timestamp; + const resumeAfter = lastLsn?.resumeToken; + + const filters = options.filters; + + let fullDocument: 'required' | 'updateLookup'; + + if (options.usePostImages) { + // 'read_only' or 'auto_configure' + // Configuration happens during snapshot, or when we see new + // collections. + fullDocument = 'required'; + } else { + fullDocument = 'updateLookup'; + } + + const streamOptions: mongo.ChangeStreamOptions = { + showExpandedEvents: true, + fullDocument: fullDocument + }; + /** + * Only one of these options can be supplied at a time. + */ + if (resumeAfter) { + streamOptions.resumeAfter = resumeAfter; + } else { + // Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the + // case if we have an old one. + streamOptions.startAtOperationTime = startAfter; + } + + const pipeline: mongo.Document[] = [ + { + $changeStream: streamOptions + }, + { + $match: filters.$match + }, + { $changeStreamSplitLargeEvent: {} } + ]; + + let cursorId: bigint | null = null; + + const db = options.db; + const maxTimeMS = options.maxAwaitTimeMs; + const batchSize = options.batchSize; + options?.signal?.addEventListener('abort', () => { + if (cursorId != null && cursorId !== 0n) { + // This would result in a CursorKilled error. + db.command({ + killCursors: '$cmd.aggregate', + cursors: [cursorId] + }); + } + }); + + const session = options.client.startSession(); + try { + // Step 1: Send the aggregate command to start the change stream + const aggregateResult = await db + .command( + { + aggregate: 1, + pipeline, + cursor: { batchSize } + }, + { session } + ) + .catch((e) => { + throw mapChangeStreamError(e); + }); + + cursorId = BigInt(aggregateResult.cursor.id); + let batch = aggregateResult.cursor.firstBatch; + + yield { eventBatch: batch, resumeToken: aggregateResult.cursor.postBatchResumeToken }; + + // Step 2: Poll using getMore until the cursor is closed + while (cursorId && cursorId !== 0n) { + if (options.signal?.aborted) { + break; + } + const getMoreResult: mongo.Document = await db + .command( + { + getMore: cursorId, + collection: '$cmd.aggregate', + batchSize, + maxTimeMS + }, + { session } + ) + .catch((e) => { + throw mapChangeStreamError(e); + }); + + cursorId = BigInt(getMoreResult.cursor.id); + const nextBatch = getMoreResult.cursor.nextBatch; + + yield { eventBatch: nextBatch, resumeToken: getMoreResult.cursor.postBatchResumeToken }; + } + } finally { + if (cursorId != null && cursorId !== 0n) { + await db.command({ + killCursors: '$cmd.aggregate', + cursors: [cursorId] + }); + } + await session.endSession(); + } +}