Skip to content

fix(id-allocation): Schedule a flush when submitting ID Allocation op before replaying pending states #24683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 19 additions & 33 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
LazyPromise,
PromiseCache,
delay,
fail,
} from "@fluidframework/core-utils/internal";
import type {
IClientDetails,
Expand Down Expand Up @@ -1266,7 +1267,7 @@ export class ContainerRuntime
private readonly batchRunner = new BatchRunCounter();
private readonly _flushMode: FlushMode;
private readonly offlineEnabled: boolean;
private flushTaskExists = false;
private flushScheduled = false;

private _connected: boolean;

Expand Down Expand Up @@ -2546,6 +2547,7 @@ export class ContainerRuntime
// Since we don't submit ID Allocation ops when staged, any outstanding ranges would be from
// before staging mode so we can simply say staged: false.
this.submitIdAllocationOpIfNeeded({ resubmitOutstandingRanges: true, staged: false });
this.scheduleFlush();

// replay the ops
this.pendingStateManager.replayPendingStates();
Expand Down Expand Up @@ -3215,6 +3217,8 @@ export class ContainerRuntime
* @param resubmitInfo - If defined, indicates this is a resubmission of a batch with the given Batch info needed for resubmit.
*/
private flush(resubmitInfo?: BatchResubmitInfo): void {
this.flushScheduled = false;

try {
assert(
!this.batchRunner.running,
Expand Down Expand Up @@ -3452,13 +3456,6 @@ export class ContainerRuntime
);
}

/**
* Typically ops are batched and later flushed together, but in some cases we want to flush immediately.
*/
private currentlyBatching(): boolean {
return this.flushMode !== FlushMode.Immediate || this.batchRunner.running;
}

private readonly _quorum: IQuorumClients;
public getQuorum(): IQuorumClients {
return this._quorum;
Expand Down Expand Up @@ -4486,13 +4483,7 @@ export class ContainerRuntime
this.outbox.submit(message);
}

// Note: Technically, the system "always" batches - if this case is true we'll just have a single-message batch.
const flushImmediatelyOnSubmit = !this.currentlyBatching();
if (flushImmediatelyOnSubmit) {
this.flush();
} else {
this.scheduleFlush();
}
this.scheduleFlush();
} catch (error) {
const dpe = DataProcessingError.wrapIfUnrecognized(error, "ContainerRuntime.submit", {
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
Expand All @@ -4507,25 +4498,24 @@ export class ContainerRuntime
}

private scheduleFlush(): void {
if (this.flushTaskExists) {
if (this.flushScheduled) {
return;
}

this.flushTaskExists = true;

// TODO: hoist this out of the function scope to save unnecessary allocations
// eslint-disable-next-line unicorn/consistent-function-scoping -- Separate `flush` method already exists in outer scope
const flush = (): void => {
this.flushTaskExists = false;
this.flush();
};
this.flushScheduled = true;

switch (this.flushMode) {
case FlushMode.Immediate: {
// When in Immediate flush mode, flush immediately unless we are intentionally batching multiple ops (e.g. via orderSequentially)
if (!this.batchRunner.running) {
this.flush();
}
break;
}
case FlushMode.TurnBased: {
// When in TurnBased flush mode the runtime will buffer operations in the current turn and send them as a single
// batch at the end of the turn
// eslint-disable-next-line @typescript-eslint/no-floating-promises
Promise.resolve().then(flush);
// eslint-disable-next-line @typescript-eslint/no-floating-promises -- Container will close if flush throws
Promise.resolve().then(() => this.flush());
break;
}

Expand All @@ -4534,16 +4524,12 @@ export class ContainerRuntime
// When in Async flush mode, the runtime will accumulate all operations across JS turns and send them as a single
// batch when all micro-tasks are complete.
// Compared to TurnBased, this flush mode will capture more ops into the same batch.
setTimeout(flush, 0);
setTimeout(() => this.flush(), 0);
break;
}

default: {
assert(
this.batchRunner.running,
0x587 /* Unreachable unless manually accumulating a batch */,
);
break;
fail(0x587 /* Unreachable unless manually accumulating a batch */);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ describe("Runtime", () => {
mockStorage?: Partial<IDocumentStorageService>;
loadedFromVersion?: IVersion;
baseSnapshot?: ISnapshotTree;
connected?: boolean;
} = {},
clientId: string = mockClientId,
): Partial<IContainerContext> => {
Expand All @@ -230,6 +231,7 @@ describe("Runtime", () => {
mockStorage = defaultMockStorage,
loadedFromVersion,
baseSnapshot,
connected = true,
} = params;

const mockContext = {
Expand Down Expand Up @@ -260,7 +262,7 @@ describe("Runtime", () => {
}); // Note: this object shape is for testing only. Not representative of real signals.
},
clientId,
connected: true,
connected,
storage: mockStorage as IDocumentStorageService,
baseSnapshot,
} satisfies Partial<IContainerContext>;
Expand Down Expand Up @@ -536,6 +538,49 @@ describe("Runtime", () => {
);
});
}

it("IdAllocation op from replayPendingStates is flushed, preventing outboxSequenceNumberCoherencyCheck error", async () => {
// Start out disconnected since step 1 is to trigger ID Allocation op on reconnect
const connected = false;
const mockContext = getMockContext({ connected }) as IContainerContext;
const mockDeltaManager = mockContext.deltaManager as MockDeltaManager;

const containerRuntime = await ContainerRuntime.loadRuntime({
context: mockContext,
registryEntries: [],
existing: false,
runtimeOptions: { enableRuntimeIdCompressor: "on" },
provideEntryPoint: mockProvideEntryPoint,
});

// 1st compressed id – queued while disconnected (goes to idAllocationBatch).
containerRuntime.idCompressor?.generateCompressedId();

// Re-connect – replayPendingStates will submit only an idAllocation op.
// It's now in the Outbox and a flush is scheduled (including this flush was a bug fix)
changeConnectionState(containerRuntime, true, mockClientId);

// Simulate a remote op arriving before we submit anything else.
// Bump refSeq and continue execution at the end of the microtask queue.
// This is how Inbound Queue works, and this is necessary to simulate here to allow scheduled flush to happen
++mockDeltaManager.lastSequenceNumber;
await Promise.resolve();

// 2nd compressed id – its idAllocation op will enter Outbox *after* the ref seq# bumped.
const id2 = containerRuntime.idCompressor?.generateCompressedId();

// This would throw a DataProcessingError from codepath "outboxSequenceNumberCoherencyCheck"
// if we didn't schedule a flush after the idAllocation op submitted during the reconnect.
// (On account of the two ID Allocation ops having different refSeqs but being in the same batch)
submitDataStoreOp(containerRuntime, "someDS", { id: id2 });

// Let the Outbox flush so we can check submittedOps length
await Promise.resolve();
assert(
submittedOps.length === 3,
"Expected 3 ops to be submitted (2 ID Allocation, 1 data)",
);
});
});

describe("orderSequentially", () => {
Expand Down
Loading