diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 819845bc7847..bb6fde474e27 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -48,6 +48,7 @@ import { LazyPromise, PromiseCache, delay, + fail, } from "@fluidframework/core-utils/internal"; import type { IClientDetails, @@ -1266,7 +1267,7 @@ export class ContainerRuntime private readonly batchRunner = new BatchRunCounter(); private readonly _flushMode: FlushMode; private readonly offlineEnabled: boolean; - private flushTaskExists = false; + private flushScheduled = false; private _connected: boolean; @@ -2546,6 +2547,7 @@ export class ContainerRuntime // Since we don't submit ID Allocation ops when staged, any outstanding ranges would be from // before staging mode so we can simply say staged: false. this.submitIdAllocationOpIfNeeded({ resubmitOutstandingRanges: true, staged: false }); + this.scheduleFlush(); // replay the ops this.pendingStateManager.replayPendingStates(); @@ -3215,6 +3217,8 @@ export class ContainerRuntime * @param resubmitInfo - If defined, indicates this is a resubmission of a batch with the given Batch info needed for resubmit. */ private flush(resubmitInfo?: BatchResubmitInfo): void { + this.flushScheduled = false; + try { assert( !this.batchRunner.running, @@ -3452,13 +3456,6 @@ export class ContainerRuntime ); } - /** - * Typically ops are batched and later flushed together, but in some cases we want to flush immediately. - */ - private currentlyBatching(): boolean { - return this.flushMode !== FlushMode.Immediate || this.batchRunner.running; - } - private readonly _quorum: IQuorumClients; public getQuorum(): IQuorumClients { return this._quorum; @@ -4486,13 +4483,7 @@ export class ContainerRuntime this.outbox.submit(message); } - // Note: Technically, the system "always" batches - if this case is true we'll just have a single-message batch. - const flushImmediatelyOnSubmit = !this.currentlyBatching(); - if (flushImmediatelyOnSubmit) { - this.flush(); - } else { - this.scheduleFlush(); - } + this.scheduleFlush(); } catch (error) { const dpe = DataProcessingError.wrapIfUnrecognized(error, "ContainerRuntime.submit", { referenceSequenceNumber: this.deltaManager.lastSequenceNumber, @@ -4507,25 +4498,24 @@ export class ContainerRuntime } private scheduleFlush(): void { - if (this.flushTaskExists) { + if (this.flushScheduled) { return; } - - this.flushTaskExists = true; - - // TODO: hoist this out of the function scope to save unnecessary allocations - // eslint-disable-next-line unicorn/consistent-function-scoping -- Separate `flush` method already exists in outer scope - const flush = (): void => { - this.flushTaskExists = false; - this.flush(); - }; + this.flushScheduled = true; switch (this.flushMode) { + case FlushMode.Immediate: { + // When in Immediate flush mode, flush immediately unless we are intentionally batching multiple ops (e.g. via orderSequentially) + if (!this.batchRunner.running) { + this.flush(); + } + break; + } case FlushMode.TurnBased: { // When in TurnBased flush mode the runtime will buffer operations in the current turn and send them as a single // batch at the end of the turn - // eslint-disable-next-line @typescript-eslint/no-floating-promises - Promise.resolve().then(flush); + // eslint-disable-next-line @typescript-eslint/no-floating-promises -- Container will close if flush throws + Promise.resolve().then(() => this.flush()); break; } @@ -4534,16 +4524,12 @@ export class ContainerRuntime // When in Async flush mode, the runtime will accumulate all operations across JS turns and send them as a single // batch when all micro-tasks are complete. // Compared to TurnBased, this flush mode will capture more ops into the same batch. - setTimeout(flush, 0); + setTimeout(() => this.flush(), 0); break; } default: { - assert( - this.batchRunner.running, - 0x587 /* Unreachable unless manually accumulating a batch */, - ); - break; + fail(0x587 /* Unreachable unless manually accumulating a batch */); } } } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 7e48edac4355..67d94a159547 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -221,6 +221,7 @@ describe("Runtime", () => { mockStorage?: Partial; loadedFromVersion?: IVersion; baseSnapshot?: ISnapshotTree; + connected?: boolean; } = {}, clientId: string = mockClientId, ): Partial => { @@ -230,6 +231,7 @@ describe("Runtime", () => { mockStorage = defaultMockStorage, loadedFromVersion, baseSnapshot, + connected = true, } = params; const mockContext = { @@ -260,7 +262,7 @@ describe("Runtime", () => { }); // Note: this object shape is for testing only. Not representative of real signals. }, clientId, - connected: true, + connected, storage: mockStorage as IDocumentStorageService, baseSnapshot, } satisfies Partial; @@ -536,6 +538,49 @@ describe("Runtime", () => { ); }); } + + it("IdAllocation op from replayPendingStates is flushed, preventing outboxSequenceNumberCoherencyCheck error", async () => { + // Start out disconnected since step 1 is to trigger ID Allocation op on reconnect + const connected = false; + const mockContext = getMockContext({ connected }) as IContainerContext; + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + + const containerRuntime = await ContainerRuntime.loadRuntime({ + context: mockContext, + registryEntries: [], + existing: false, + runtimeOptions: { enableRuntimeIdCompressor: "on" }, + provideEntryPoint: mockProvideEntryPoint, + }); + + // 1st compressed id – queued while disconnected (goes to idAllocationBatch). + containerRuntime.idCompressor?.generateCompressedId(); + + // Re-connect – replayPendingStates will submit only an idAllocation op. + // It's now in the Outbox and a flush is scheduled (including this flush was a bug fix) + changeConnectionState(containerRuntime, true, mockClientId); + + // Simulate a remote op arriving before we submit anything else. + // Bump refSeq and continue execution at the end of the microtask queue. + // This is how Inbound Queue works, and this is necessary to simulate here to allow scheduled flush to happen + ++mockDeltaManager.lastSequenceNumber; + await Promise.resolve(); + + // 2nd compressed id – its idAllocation op will enter Outbox *after* the ref seq# bumped. + const id2 = containerRuntime.idCompressor?.generateCompressedId(); + + // This would throw a DataProcessingError from codepath "outboxSequenceNumberCoherencyCheck" + // if we didn't schedule a flush after the idAllocation op submitted during the reconnect. + // (On account of the two ID Allocation ops having different refSeqs but being in the same batch) + submitDataStoreOp(containerRuntime, "someDS", { id: id2 }); + + // Let the Outbox flush so we can check submittedOps length + await Promise.resolve(); + assert( + submittedOps.length === 3, + "Expected 3 ops to be submitted (2 ID Allocation, 1 data)", + ); + }); }); describe("orderSequentially", () => {