Skip to content

Matrix: Enable rollback and local server stress #24604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
de6a085
add matrix to stress
anthony-murphy May 6, 2025
4eadde8
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy May 7, 2025
c1ab401
Add FFW to matrix fuzz
anthony-murphy May 14, 2025
011d733
rever lss
anthony-murphy May 14, 2025
874a53d
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy May 14, 2025
7eb5cbf
export source maps
anthony-murphy May 14, 2025
43169e5
Merge branch 'sm/matrix-stress' into sm/matrix-stress-stFWW
anthony-murphy May 14, 2025
6219d37
fix attw
anthony-murphy May 14, 2025
0f9624b
Merge branch 'sm/matrix-stress' into sm/matrix-stress-stFWW
anthony-murphy May 14, 2025
90a2dd9
use undefined rather than -1
anthony-murphy May 16, 2025
35882b1
combine fwwPolicy
anthony-murphy May 16, 2025
fdd4196
fix backcompat
anthony-murphy May 19, 2025
4d989a0
move fww to state
anthony-murphy May 19, 2025
1e25590
add local state
anthony-murphy May 19, 2025
8e5fc5e
move cellLastWriteTracker to ffwPolicy
anthony-murphy May 19, 2025
ebd247a
clean up typing
anthony-murphy May 19, 2025
db3cf27
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy May 19, 2025
a1a9a50
remove isPreviousSetCellPolicyModeFWW
anthony-murphy May 19, 2025
8954fc3
cleanup summarize code
anthony-murphy May 19, 2025
65eddcc
work around snapshot test issue
anthony-murphy May 19, 2025
45c1c97
expand comments
anthony-murphy May 19, 2025
90509e1
Matrix: Make pending an array
anthony-murphy May 19, 2025
ae207dd
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy May 19, 2025
daef1fb
most tests working
anthony-murphy May 20, 2025
f92bf5f
permutation vector fix
anthony-murphy May 20, 2025
5b34434
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy May 21, 2025
4d2b8e4
fix post remove allocation
anthony-murphy May 21, 2025
efcf8e0
extend walkSegments
anthony-murphy May 21, 2025
b598b71
remove undefined case
anthony-murphy May 21, 2025
c132c93
Update packages/dds/matrix/src/permutationvector.ts
anthony-murphy May 21, 2025
2db5dbc
comments and clean up
anthony-murphy May 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 163 additions & 99 deletions packages/dds/matrix/src/matrix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,22 @@ type FirstWriterWinsPolicy =
cellLastWriteTracker: SparseArray2D<CellLastWriteTrackerItem>;
};

/**
* Used to track pending local changes for a cell
*/
interface PendingCellChanges<T> {
/**
* the local changes including the local seq, and the value set at that local seq
*/
local: { localSeq: number; value: MatrixItem<T> }[];
/**
* the latest consensus value across all clients.
* this will either be a remote value or ack'd local
* value.
*/
consensus?: MatrixItem<T> | undefined;
}

/**
* A SharedMatrix holds a rectangular 2D array of values. Supported operations
* include setting values and inserting/removing rows and columns.
Expand Down Expand Up @@ -252,7 +268,7 @@ export class SharedMatrix<T = any>
private readonly cols: PermutationVector; // Map logical col to storage handle (if any)

private cells = new SparseArray2D<MatrixItem<T>>(); // Stores cell values.
private readonly pending = new SparseArray2D<number>(); // Tracks pending writes.
private readonly pending = new SparseArray2D<PendingCellChanges<T>>(); // Tracks pending writes.

private fwwPolicy: FirstWriterWinsPolicy = {
state: "off",
Expand Down Expand Up @@ -418,21 +434,22 @@ export class SharedMatrix<T = any>
value: MatrixItem<T>,
rowHandle = this.rows.getAllocatedHandle(row),
colHandle = this.cols.getAllocatedHandle(col),
rollback?: boolean,
): void {
this.protectAgainstReentrancy(() => {
if (this.undo !== undefined) {
let oldValue = this.cells.getCell(rowHandle, colHandle);
if (oldValue === null) {
oldValue = undefined;
}
const oldValue = this.cells.getCell(rowHandle, colHandle) ?? undefined;

if (this.undo !== undefined) {
this.undo.cellSet(rowHandle, colHandle, oldValue);
}

this.cells.setCell(rowHandle, colHandle, value);

if (this.isAttached()) {
this.sendSetCellOp(row, col, value, rowHandle, colHandle);
if (this.isAttached() && rollback !== true) {
const pending = this.sendSetCellOp(row, col, value, rowHandle, colHandle);
if (pending.local.length === 1) {
pending.consensus ??= oldValue;
}
}

// Avoid reentrancy by raising change notifications after the op is queued.
Expand Down Expand Up @@ -467,7 +484,7 @@ export class SharedMatrix<T = any>
rowHandle: Handle,
colHandle: Handle,
localSeq = this.nextLocalSeq(),
): void {
): PendingCellChanges<T> {
assert(
this.isAttached(),
0x1e2 /* "Caller must ensure 'isAttached()' before calling 'sendSetCellOp'." */,
Expand All @@ -493,7 +510,12 @@ export class SharedMatrix<T = any>
};

this.submitLocalMessage(op, metadata);
this.pending.setCell(rowHandle, colHandle, localSeq);
const pendingCell: PendingCellChanges<T> = this.pending.getCell(rowHandle, colHandle) ?? {
local: [],
};
pendingCell.local.push({ localSeq, value });
this.pending.setCell(rowHandle, colHandle, pendingCell);
return pendingCell;
}

/**
Expand Down Expand Up @@ -679,7 +701,16 @@ export class SharedMatrix<T = any>
| undefined
| number
| ReturnType<SparseArray2D<MatrixItem<T> | number>["snapshot"]>
)[] = [this.cells.snapshot(), this.pending.snapshot()];
)[] = [
this.cells.snapshot(),
/**
* we used to write this.pending.snapshot(). this should have never been done, as pending is only for local
* changes, and there should never be local changes in the summarizer. This was also never used on load
* as there is no way to understand a previous clients pending changes. so we just set this to a constant
* which matches an empty this.pending.snapshot() for back-compat in terms of the array length
*/
[undefined],
];

// Only need to store it in the snapshot if we have switched the policy already.
if (this.fwwPolicy.state === "on") {
Expand Down Expand Up @@ -806,22 +837,30 @@ export class SharedMatrix<T = any>
const col = this.rebasePosition(this.cols, colsRef, localSeq);
this.rows.removeLocalReferencePosition(rowsRef);
this.cols.removeLocalReferencePosition(colsRef);
if (row !== undefined && col !== undefined && row >= 0 && col >= 0) {
// If the mode is LWW, then send the op.

const pendingCell = this.pending.getCell(rowHandle, colHandle);
assert(pendingCell !== undefined, "local operation must have a pending array");
const { local } = pendingCell;
assert(local !== undefined, "local operation must have a pending array");
const localSeqIndex = local.findIndex((p) => p.localSeq === localSeq);
assert(localSeqIndex >= 0, "local operation must have a pending entry");
const [change] = local.splice(localSeqIndex, 1);
assert(change.localSeq === localSeq, "must match");

if (
row !== undefined &&
col !== undefined &&
row >= 0 &&
col >= 0 && // If the mode is LWW, then send the op.
// Otherwise if the current mode is FWW and if we generated this op, after seeing the
// last set op, or it is the first set op for the cell, then regenerate the op,
// otherwise raise conflict. We want to check the current mode here and not that
// whether op was made in FWW or not.
if (
this.fwwPolicy.state !== "on" ||
(this.fwwPolicy.state !== "on" ||
referenceSeqNumber >=
(this.fwwPolicy.cellLastWriteTracker.getCell(rowHandle, colHandle)?.seqNum ?? 0)
) {
this.sendSetCellOp(row, col, setOp.value, rowHandle, colHandle, localSeq);
} else if (this.pending.getCell(rowHandle, colHandle) !== undefined) {
// Clear the pending changes if any as we are not sending the op.
this.pending.setCell(rowHandle, colHandle, undefined);
}
(this.fwwPolicy.cellLastWriteTracker.getCell(rowHandle, colHandle)?.seqNum ?? 0))
) {
this.sendSetCellOp(row, col, setOp.value, rowHandle, colHandle, localSeq);
}
} else {
switch (content.target) {
Expand All @@ -840,6 +879,47 @@ export class SharedMatrix<T = any>
}
}

protected rollback(content: unknown, localOpMetadata: unknown): void {
const contents = content as MatrixSetOrVectorOp<T>;
const target = contents.target;

switch (target) {
case SnapshotPath.cols: {
this.cols.rollback(content, localOpMetadata);
break;
}
case SnapshotPath.rows: {
this.rows.rollback(content, localOpMetadata);
break;
}
case undefined: {
assert(contents.type === MatrixOp.set, "only sets supported");
const setMetadata = localOpMetadata as ISetOpMetadata;

const pendingCell = this.pending.getCell(setMetadata.rowHandle, setMetadata.colHandle);
assert(pendingCell !== undefined, "must have pending");

const change = pendingCell.local.pop();
assert(change?.localSeq === setMetadata.localSeq, "must have change");

const previous =
pendingCell.local.length > 0
? pendingCell.local[pendingCell.local.length - 1].value
: pendingCell.consensus;

this.setCellCore(
contents.row,
contents.col,
previous,
setMetadata.rowHandle,
setMetadata.colHandle,
true,
);
}
default:
}
}

protected onDisconnect(): void {}

/**
Expand Down Expand Up @@ -968,73 +1048,86 @@ export class SharedMatrix<T = any>
// We are receiving the ACK for a local pending set operation.
const { rowHandle, colHandle, localSeq, rowsRef, colsRef } =
localOpMetadata as ISetOpMetadata;
const isLatestPendingOp = this.isLatestPendingWrite(rowHandle, colHandle, localSeq);
this.rows.removeLocalReferencePosition(rowsRef);
this.cols.removeLocalReferencePosition(colsRef);

const pendingCell = this.pending.getCell(rowHandle, colHandle);
const ackedChange = pendingCell?.local.shift();
assert(ackedChange?.localSeq === localSeq, "must match");
if (pendingCell?.local.length === 0) {
this.pending.setCell(rowHandle, colHandle, undefined);
}

// If policy is switched and cell should be modified too based on policy, then update the tracker.
// If policy is not switched, then also update the tracker in case it is the latest.
if (
this.fwwPolicy.state === "on" &&
this.shouldSetCellBasedOnFWW(rowHandle, colHandle, msg)
) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pendingCell!.consensus = ackedChange.value;
this.fwwPolicy.cellLastWriteTracker.setCell(rowHandle, colHandle, {
seqNum: msg.sequenceNumber,
clientId: msg.clientId,
});
}

if (isLatestPendingOp) {
this.pending.setCell(rowHandle, colHandle, undefined);
}
} else {
const adjustedRow = this.rows.adjustPosition(row, msg);
if (adjustedRow !== undefined) {
const adjustedCol = this.cols.adjustPosition(col, msg);

if (adjustedCol !== undefined) {
const rowHandle = this.rows.getAllocatedHandle(adjustedRow);
const colHandle = this.cols.getAllocatedHandle(adjustedCol);

assert(
isHandleValid(rowHandle) && isHandleValid(colHandle),
0x022 /* "SharedMatrix row and/or col handles are invalid!" */,
);
if (this.fwwPolicy.state === "on") {
// If someone tried to Overwrite the cell value or first write on this cell or
// same client tried to modify the cell or if the previous mode was LWW, then we need to still
// overwrite the cell and raise conflict if we have pending changes as our change is going to be lost.
if (this.shouldSetCellBasedOnFWW(rowHandle, colHandle, msg)) {
const previousValue = this.cells.getCell(rowHandle, colHandle);
this.cells.setCell(rowHandle, colHandle, value);
this.fwwPolicy.cellLastWriteTracker.setCell(rowHandle, colHandle, {
seqNum: msg.sequenceNumber,
clientId: msg.clientId,
});
for (const consumer of this.consumers.values()) {
consumer.cellsChanged(adjustedRow, adjustedCol, 1, 1, this);
}
// Check is there are any pending changes, which will be rejected. If so raise conflict.
if (this.pending.getCell(rowHandle, colHandle) !== undefined) {
// Don't reset the pending value yet, as there maybe more fww op from same client, so we want
// to raise conflict event for that op also.
this.emit(
"conflict",
row,
col,
value, // Current value
previousValue, // Ignored local value
this,
);
}
const adjustedCol = this.cols.adjustPosition(col, msg);

const rowHandle = adjustedRow.handle;
const colHandle = adjustedCol.handle;

assert(
isHandleValid(rowHandle) && isHandleValid(colHandle),
0x022 /* "SharedMatrix row and/or col handles are invalid!" */,
);
const pendingCell = this.pending.getCell(rowHandle, colHandle);
if (this.fwwPolicy.state === "on") {
// If someone tried to Overwrite the cell value or first write on this cell or
// same client tried to modify the cell or if the previous mode was LWW, then we need to still
// overwrite the cell and raise conflict if we have pending changes as our change is going to be lost.
if (this.shouldSetCellBasedOnFWW(rowHandle, colHandle, msg)) {
const previousValue = this.cells.getCell(rowHandle, colHandle);
this.cells.setCell(rowHandle, colHandle, value);
this.fwwPolicy.cellLastWriteTracker.setCell(rowHandle, colHandle, {
seqNum: msg.sequenceNumber,
clientId: msg.clientId,
});
if (pendingCell !== undefined) {
pendingCell.consensus = value;
}
if (adjustedRow.pos !== undefined && adjustedCol.pos !== undefined) {
for (const consumer of this.consumers.values()) {
consumer.cellsChanged(adjustedRow.pos, adjustedCol.pos, 1, 1, this);
}
// Check is there are any pending changes, which will be rejected. If so raise conflict.
if (pendingCell !== undefined && pendingCell.local.length > 0) {
// Don't reset the pending value yet, as there maybe more fww op from same client, so we want
// to raise conflict event for that op also.
this.emit(
"conflict",
row,
col,
value, // Current value
previousValue, // Ignored local value
this,
);
}
} else if (this.pending.getCell(rowHandle, colHandle) === undefined) {
// If there is a pending (unACKed) local write to the same cell, skip the current op
// since it "happened before" the pending write.
this.cells.setCell(rowHandle, colHandle, value);
}
}
} else {
if (pendingCell === undefined || pendingCell.local.length === 0) {
// If there is a pending (unACKed) local write to the same cell, skip the current op
// since it "happened before" the pending write.
this.cells.setCell(rowHandle, colHandle, value);
if (adjustedRow.pos !== undefined && adjustedCol.pos !== undefined) {
for (const consumer of this.consumers.values()) {
consumer.cellsChanged(adjustedRow, adjustedCol, 1, 1, this);
consumer.cellsChanged(adjustedRow.pos, adjustedCol.pos, 1, 1, this);
}
}
} else {
pendingCell.consensus = value;
}
}
}
Expand Down Expand Up @@ -1106,35 +1199,6 @@ export class SharedMatrix<T = any>
}
}

/**
* Returns true if the latest pending write to the cell indicated by the given row/col handles
* matches the given 'localSeq'.
*
* A return value of `true` indicates that there are no later local operations queued that will
* clobber the write op at the given 'localSeq'. This includes later ops that overwrite the cell
* with a different value as well as row/col removals that might recycled the given row/col handles.
*/
private isLatestPendingWrite(
rowHandle: Handle,
colHandle: Handle,
localSeq: number,
): boolean {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const pendingLocalSeq = this.pending.getCell(rowHandle, colHandle)!;

// Note while we're awaiting the ACK for a local set, it's possible for the row/col to be
// locally removed and the row/col handles recycled. If this happens, the pendingLocalSeq will
// be 'undefined' or > 'localSeq'.
assert(
!(pendingLocalSeq < localSeq),
0x023 /* "The 'localSeq' of pending write (if any) must be <= the localSeq of the currently processed op." */,
);

// If this is the most recent write to the cell by the local client, the stored localSeq
// will be an exact match for the given 'localSeq'.
return pendingLocalSeq === localSeq;
}

public toString(): string {
let s = `client:${
this.runtime.clientId
Expand Down
Loading
Loading