Skip to content

Commit b4e1fd1

Browse files
authored
fix(id-allocation): Schedule a flush when submitting ID Allocation op before replaying pending states (#24683)
In `ContainerRuntime`, we submit to the `Outbox` directly from `replayPendingStates`, and so we should also call `scheduleFlush` to ensure that op doesn't get stuck in the Outbox. Being "stuck in the Outbox" is a problem because it's assumed that all ops in a batch come during the same JS turn and have the same reference sequence number, and this can easily lead to violating that invariant. That one-line change is accompanied by some refactoring so we don't have to consider Immediate mode at that callsite (`scheduleFlush` handles it now).
1 parent daa0939 commit b4e1fd1

File tree

2 files changed

+65
-34
lines changed

2 files changed

+65
-34
lines changed

packages/runtime/container-runtime/src/containerRuntime.ts

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import {
4848
LazyPromise,
4949
PromiseCache,
5050
delay,
51+
fail,
5152
} from "@fluidframework/core-utils/internal";
5253
import type {
5354
IClientDetails,
@@ -1266,7 +1267,7 @@ export class ContainerRuntime
12661267
private readonly batchRunner = new BatchRunCounter();
12671268
private readonly _flushMode: FlushMode;
12681269
private readonly offlineEnabled: boolean;
1269-
private flushTaskExists = false;
1270+
private flushScheduled = false;
12701271

12711272
private _connected: boolean;
12721273

@@ -2541,6 +2542,7 @@ export class ContainerRuntime
25412542
// Since we don't submit ID Allocation ops when staged, any outstanding ranges would be from
25422543
// before staging mode so we can simply say staged: false.
25432544
this.submitIdAllocationOpIfNeeded({ resubmitOutstandingRanges: true, staged: false });
2545+
this.scheduleFlush();
25442546

25452547
// replay the ops
25462548
this.pendingStateManager.replayPendingStates();
@@ -3200,6 +3202,8 @@ export class ContainerRuntime
32003202
* @param resubmitInfo - If defined, indicates this is a resubmission of a batch with the given Batch info needed for resubmit.
32013203
*/
32023204
private flush(resubmitInfo?: BatchResubmitInfo): void {
3205+
this.flushScheduled = false;
3206+
32033207
try {
32043208
assert(
32053209
!this.batchRunner.running,
@@ -3431,13 +3435,6 @@ export class ContainerRuntime
34313435
);
34323436
}
34333437

3434-
/**
3435-
* Typically ops are batched and later flushed together, but in some cases we want to flush immediately.
3436-
*/
3437-
private currentlyBatching(): boolean {
3438-
return this.flushMode !== FlushMode.Immediate || this.batchRunner.running;
3439-
}
3440-
34413438
private readonly _quorum: IQuorumClients;
34423439
public getQuorum(): IQuorumClients {
34433440
return this._quorum;
@@ -4443,13 +4440,7 @@ export class ContainerRuntime
44434440
this.outbox.submit(message);
44444441
}
44454442

4446-
// Note: Technically, the system "always" batches - if this case is true we'll just have a single-message batch.
4447-
const flushImmediatelyOnSubmit = !this.currentlyBatching();
4448-
if (flushImmediatelyOnSubmit) {
4449-
this.flush();
4450-
} else {
4451-
this.scheduleFlush();
4452-
}
4443+
this.scheduleFlush();
44534444
} catch (error) {
44544445
const dpe = DataProcessingError.wrapIfUnrecognized(error, "ContainerRuntime.submit", {
44554446
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
@@ -4462,25 +4453,24 @@ export class ContainerRuntime
44624453
}
44634454

44644455
private scheduleFlush(): void {
4465-
if (this.flushTaskExists) {
4456+
if (this.flushScheduled) {
44664457
return;
44674458
}
4468-
4469-
this.flushTaskExists = true;
4470-
4471-
// TODO: hoist this out of the function scope to save unnecessary allocations
4472-
// eslint-disable-next-line unicorn/consistent-function-scoping -- Separate `flush` method already exists in outer scope
4473-
const flush = (): void => {
4474-
this.flushTaskExists = false;
4475-
this.flush();
4476-
};
4459+
this.flushScheduled = true;
44774460

44784461
switch (this.flushMode) {
4462+
case FlushMode.Immediate: {
4463+
// When in Immediate flush mode, flush immediately unless we are intentionally batching multiple ops (e.g. via orderSequentially)
4464+
if (!this.batchRunner.running) {
4465+
this.flush();
4466+
}
4467+
break;
4468+
}
44794469
case FlushMode.TurnBased: {
44804470
// When in TurnBased flush mode the runtime will buffer operations in the current turn and send them as a single
44814471
// batch at the end of the turn
4482-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
4483-
Promise.resolve().then(flush);
4472+
// eslint-disable-next-line @typescript-eslint/no-floating-promises -- Container will close if flush throws
4473+
Promise.resolve().then(() => this.flush());
44844474
break;
44854475
}
44864476

@@ -4489,16 +4479,12 @@ export class ContainerRuntime
44894479
// When in Async flush mode, the runtime will accumulate all operations across JS turns and send them as a single
44904480
// batch when all micro-tasks are complete.
44914481
// Compared to TurnBased, this flush mode will capture more ops into the same batch.
4492-
setTimeout(flush, 0);
4482+
setTimeout(() => this.flush(), 0);
44934483
break;
44944484
}
44954485

44964486
default: {
4497-
assert(
4498-
this.batchRunner.running,
4499-
0x587 /* Unreachable unless manually accumulating a batch */,
4500-
);
4501-
break;
4487+
fail(0x587 /* Unreachable unless manually accumulating a batch */);
45024488
}
45034489
}
45044490
}

packages/runtime/container-runtime/src/test/containerRuntime.spec.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ describe("Runtime", () => {
221221
mockStorage?: Partial<IDocumentStorageService>;
222222
loadedFromVersion?: IVersion;
223223
baseSnapshot?: ISnapshotTree;
224+
connected?: boolean;
224225
} = {},
225226
clientId: string = mockClientId,
226227
): Partial<IContainerContext> => {
@@ -230,6 +231,7 @@ describe("Runtime", () => {
230231
mockStorage = defaultMockStorage,
231232
loadedFromVersion,
232233
baseSnapshot,
234+
connected = true,
233235
} = params;
234236

235237
const mockContext = {
@@ -260,7 +262,7 @@ describe("Runtime", () => {
260262
}); // Note: this object shape is for testing only. Not representative of real signals.
261263
},
262264
clientId,
263-
connected: true,
265+
connected,
264266
storage: mockStorage as IDocumentStorageService,
265267
baseSnapshot,
266268
} satisfies Partial<IContainerContext>;
@@ -536,6 +538,49 @@ describe("Runtime", () => {
536538
);
537539
});
538540
}
541+
542+
it("IdAllocation op from replayPendingStates is flushed, preventing outboxSequenceNumberCoherencyCheck error", async () => {
543+
// Start out disconnected since step 1 is to trigger ID Allocation op on reconnect
544+
const connected = false;
545+
const mockContext = getMockContext({ connected }) as IContainerContext;
546+
const mockDeltaManager = mockContext.deltaManager as MockDeltaManager;
547+
548+
const containerRuntime = await ContainerRuntime.loadRuntime({
549+
context: mockContext,
550+
registryEntries: [],
551+
existing: false,
552+
runtimeOptions: { enableRuntimeIdCompressor: "on" },
553+
provideEntryPoint: mockProvideEntryPoint,
554+
});
555+
556+
// 1st compressed id – queued while disconnected (goes to idAllocationBatch).
557+
containerRuntime.idCompressor?.generateCompressedId();
558+
559+
// Re-connect – replayPendingStates will submit only an idAllocation op.
560+
// It's now in the Outbox and a flush is scheduled (including this flush was a bug fix)
561+
changeConnectionState(containerRuntime, true, mockClientId);
562+
563+
// Simulate a remote op arriving before we submit anything else.
564+
// Bump refSeq and continue execution at the end of the microtask queue.
565+
// This is how Inbound Queue works, and this is necessary to simulate here to allow scheduled flush to happen
566+
++mockDeltaManager.lastSequenceNumber;
567+
await Promise.resolve();
568+
569+
// 2nd compressed id – its idAllocation op will enter Outbox *after* the ref seq# bumped.
570+
const id2 = containerRuntime.idCompressor?.generateCompressedId();
571+
572+
// This would throw a DataProcessingError from codepath "outboxSequenceNumberCoherencyCheck"
573+
// if we didn't schedule a flush after the idAllocation op submitted during the reconnect.
574+
// (On account of the two ID Allocation ops having different refSeqs but being in the same batch)
575+
submitDataStoreOp(containerRuntime, "someDS", { id: id2 });
576+
577+
// Let the Outbox flush so we can check submittedOps length
578+
await Promise.resolve();
579+
assert(
580+
submittedOps.length === 3,
581+
"Expected 3 ops to be submitted (2 ID Allocation, 1 data)",
582+
);
583+
});
539584
});
540585

541586
describe("orderSequentially", () => {

0 commit comments

Comments
 (0)