Skip to content

Outbox/no partial batch #24576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 8 additions & 26 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1398,13 +1398,6 @@ export class ContainerRuntime
return runtimeCompatDetailsForLoader;
}

/**
* If true, will skip Outbox flushing before processing an incoming message (and on DeltaManager "op" event for loader back-compat),
* and instead the Outbox will check for a split batch on every submit.
* This is a kill-bit switch for this simplification of logic, in case it causes unexpected issues.
*/
private readonly skipSafetyFlushDuringProcessStack: boolean;

/***/
protected constructor(
context: IContainerContext,
Expand Down Expand Up @@ -1831,10 +1824,6 @@ export class ContainerRuntime

const legacySendBatchFn = makeLegacySendBatchFn(submitFn, this.innerDeltaManager);

this.skipSafetyFlushDuringProcessStack =
// Keep the old flag name even though we renamed the class member (it shipped in 2.31.0)
this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableFlushBeforeProcess") === true;

this.outbox = new Outbox({
shouldSend: () => this.canSendOps(),
pendingStateManager: this.pendingStateManager,
Expand All @@ -1845,8 +1834,6 @@ export class ContainerRuntime
config: {
compressionOptions,
maxBatchSizeInBytes: runtimeOptions.maxBatchSizeInBytes,
// If we disable flush before process, we must be ready to flush partial batches
flushPartialBatches: this.skipSafetyFlushDuringProcessStack,
},
logger: this.mc.logger,
groupingManager: opGroupingManager,
Expand Down Expand Up @@ -1901,14 +1888,12 @@ export class ContainerRuntime
this.attachState !== AttachState.Attached || this.hasPendingMessages();
context.updateDirtyContainerState(this.dirtyContainer);

if (!this.skipSafetyFlushDuringProcessStack) {
// Reference Sequence Number may have just changed, and it must be consistent across a batch,
// so we should flush now to clear the way for the next ops.
// NOTE: This will be redundant whenever CR.process was called for the op (since we flush there too) -
// But we need this coverage for old loaders that don't call ContainerRuntime.process for non-runtime messages.
// (We have to call flush _before_ processing a runtime op, but after is ok for non-runtime op)
this.deltaManager.on("op", () => this.flush());
}
// Reference Sequence Number may have just changed, and it must be consistent across a batch,
// so we should flush now to clear the way for the next ops.
// NOTE: This will be redundant whenever CR.process was called for the op (since we flush there too) -
// But we need this coverage for old loaders that don't call ContainerRuntime.process for non-runtime messages.
// (We have to call flush _before_ processing a runtime op, but after is ok for non-runtime op)
this.deltaManager.on("op", () => this.flush());

// logging hardware telemetry
this.baseLogger.send({
Expand All @@ -1932,7 +1917,6 @@ export class ContainerRuntime
featureGates: JSON.stringify({
...featureGatesForTelemetry,
closeSummarizerDelayOverride,
disableFlushBeforeProcess: this.skipSafetyFlushDuringProcessStack,
}),
telemetryDocumentId: this.telemetryDocumentId,
groupedBatchingEnabled: this.groupedBatchingEnabled,
Expand Down Expand Up @@ -2752,10 +2736,8 @@ export class ContainerRuntime

this.verifyNotClosed();

if (!this.skipSafetyFlushDuringProcessStack) {
// Reference Sequence Number may be about to change, and it must be consistent across a batch, so flush now
this.flush();
}
// Reference Sequence Number may be about to change, and it must be consistent across a batch, so flush now
this.flush();

this.ensureNoDataModelChanges(() => {
this.processInboundMessageOrBatch(messageCopy, local);
Expand Down
27 changes: 7 additions & 20 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ export interface IOutboxConfig {
* The maximum size of a batch that we can send over the wire.
*/
readonly maxBatchSizeInBytes: number;
/**
* If true, maybeFlushPartialBatch will flush the batch if the reference sequence number changed
* since the batch started. Otherwise, it will throw in this case (apart from reentrancy which is handled elsewhere).
* Once the new throw-based flow is proved in a production environment, this option will be removed.
*/
readonly flushPartialBatches: boolean;
}

export interface IOutboxParameters {
Expand Down Expand Up @@ -236,6 +230,7 @@ export class Outbox {
return this.messageCount === 0;
}

//* Rename and update comment
/**
* Detect whether batching has been interrupted by an incoming message being processed. In this case,
* we will flush the accumulated messages to account for that (if allowed) and create a new batch with the new
Expand All @@ -247,7 +242,7 @@ export class Outbox {
* last message processed by the ContainerRuntime. In the absence of op reentrancy, this
* pair will remain stable during a single JS turn during which the batch is being built up.
*/
private maybeFlushPartialBatch(): void {
private validateSequenceNumberCoherency(): void {
const mainBatchSeqNums = this.mainBatch.sequenceNumbers;
const blobAttachSeqNums = this.blobAttachBatch.sequenceNumbers;
const idAllocSeqNums = this.idAllocationBatch.sequenceNumbers;
Expand All @@ -257,6 +252,7 @@ export class Outbox {
0x58d /* Reference sequence numbers from both batches must be in sync */,
);

//* Can we prove we don't need to track CSN here anymore? (aka sequence within batch)
const currentSequenceNumbers = this.params.getCurrentSequenceNumbers();

if (
Expand Down Expand Up @@ -285,10 +281,7 @@ export class Outbox {
this.logger.sendTelemetryEvent(
{
// Only log error if this is truly unexpected
category:
expectedDueToReentrancy || this.params.config.flushPartialBatches
? "generic"
: "error",
category: expectedDueToReentrancy ? "generic" : "error",
eventName: "ReferenceSequenceNumberMismatch",
details: {
expectedDueToReentrancy,
Expand All @@ -304,12 +297,6 @@ export class Outbox {
);
}

// If we're configured to flush partial batches, do that now and return (don't throw)
if (this.params.config.flushPartialBatches) {
this.flushAll();
return;
}

// If we are in a reentrant context, we know this can happen without causing any harm.
if (expectedDueToReentrancy) {
return;
Expand All @@ -319,19 +306,19 @@ export class Outbox {
}

public submit(message: LocalBatchMessage): void {
this.maybeFlushPartialBatch();
this.validateSequenceNumberCoherency();

this.addMessageToBatchManager(this.mainBatch, message);
}

public submitBlobAttach(message: LocalBatchMessage): void {
this.maybeFlushPartialBatch();
this.validateSequenceNumberCoherency();

this.addMessageToBatchManager(this.blobAttachBatch, message);
}

public submitIdAllocation(message: LocalBatchMessage): void {
this.maybeFlushPartialBatch();
this.validateSequenceNumberCoherency();

this.addMessageToBatchManager(this.idAllocationBatch, message);
}
Expand Down
Loading