@@ -213,6 +213,11 @@ type FirstWriterWinsPolicy =
213
213
cellLastWriteTracker : SparseArray2D < CellLastWriteTrackerItem > ;
214
214
} ;
215
215
216
+ interface PendingChanges < T > {
217
+ local : { localSeq : number ; value : MatrixItem < T > } [ ] ;
218
+ consensus ?: MatrixItem < T > | undefined ;
219
+ }
220
+
216
221
/**
217
222
* A SharedMatrix holds a rectangular 2D array of values. Supported operations
218
223
* include setting values and inserting/removing rows and columns.
@@ -252,7 +257,7 @@ export class SharedMatrix<T = any>
252
257
private readonly cols : PermutationVector ; // Map logical col to storage handle (if any)
253
258
254
259
private cells = new SparseArray2D < MatrixItem < T > > ( ) ; // Stores cell values.
255
- private readonly pending = new SparseArray2D < number [ ] > ( ) ; // Tracks pending writes.
260
+ private readonly pending = new SparseArray2D < PendingChanges < T > > ( ) ; // Tracks pending writes.
256
261
257
262
private fwwPolicy : FirstWriterWinsPolicy = {
258
263
state : "off" ,
@@ -418,21 +423,22 @@ export class SharedMatrix<T = any>
418
423
value : MatrixItem < T > ,
419
424
rowHandle = this . rows . getAllocatedHandle ( row ) ,
420
425
colHandle = this . cols . getAllocatedHandle ( col ) ,
426
+ rollback ?: boolean ,
421
427
) : void {
422
428
this . protectAgainstReentrancy ( ( ) => {
423
- if ( this . undo !== undefined ) {
424
- let oldValue = this . cells . getCell ( rowHandle , colHandle ) ;
425
- if ( oldValue === null ) {
426
- oldValue = undefined ;
427
- }
429
+ const oldValue = this . cells . getCell ( rowHandle , colHandle ) ?? undefined ;
428
430
431
+ if ( this . undo !== undefined ) {
429
432
this . undo . cellSet ( rowHandle , colHandle , oldValue ) ;
430
433
}
431
434
432
435
this . cells . setCell ( rowHandle , colHandle , value ) ;
433
436
434
- if ( this . isAttached ( ) ) {
435
- this . sendSetCellOp ( row , col , value , rowHandle , colHandle ) ;
437
+ if ( this . isAttached ( ) && rollback !== true ) {
438
+ const pending = this . sendSetCellOp ( row , col , value , rowHandle , colHandle ) ;
439
+ if ( pending . local . length === 1 ) {
440
+ pending . consensus ??= oldValue ;
441
+ }
436
442
}
437
443
438
444
// Avoid reentrancy by raising change notifications after the op is queued.
@@ -467,7 +473,7 @@ export class SharedMatrix<T = any>
467
473
rowHandle : Handle ,
468
474
colHandle : Handle ,
469
475
localSeq = this . nextLocalSeq ( ) ,
470
- ) : void {
476
+ ) : PendingChanges < T > {
471
477
assert (
472
478
this . isAttached ( ) ,
473
479
0x1e2 /* "Caller must ensure 'isAttached()' before calling 'sendSetCellOp'." */ ,
@@ -493,9 +499,12 @@ export class SharedMatrix<T = any>
493
499
} ;
494
500
495
501
this . submitLocalMessage ( op , metadata ) ;
496
- const pending = this . pending . getCell ( rowHandle , colHandle ) ?? [ ] ;
497
- pending . push ( localSeq ) ;
502
+ const pending : PendingChanges < T > = this . pending . getCell ( rowHandle , colHandle ) ?? {
503
+ local : [ ] ,
504
+ } ;
505
+ pending . local . push ( { localSeq, value } ) ;
498
506
this . pending . setCell ( rowHandle , colHandle , pending ) ;
507
+ return pending ;
499
508
}
500
509
501
510
/**
@@ -820,10 +829,12 @@ export class SharedMatrix<T = any>
820
829
821
830
const pending = this . pending . getCell ( rowHandle , colHandle ) ;
822
831
assert ( pending !== undefined , "local operation must have a pending array" ) ;
823
- const localSeqIndex = pending . indexOf ( localSeq ) ;
832
+ const { local } = pending ;
833
+ assert ( local !== undefined , "local operation must have a pending array" ) ;
834
+ const localSeqIndex = local . findIndex ( ( p ) => p . localSeq === localSeq ) ;
824
835
assert ( localSeqIndex >= 0 , "local operation must have a pending entry" ) ;
825
- const [ pendingSeq ] = pending . splice ( localSeqIndex , 1 ) ;
826
- assert ( pendingSeq === localSeq , "must match" ) ;
836
+ const [ change ] = local . splice ( localSeqIndex , 1 ) ;
837
+ assert ( change . localSeq === localSeq , "must match" ) ;
827
838
828
839
if (
829
840
row !== undefined &&
@@ -857,6 +868,47 @@ export class SharedMatrix<T = any>
857
868
}
858
869
}
859
870
871
+ protected rollback ( content : unknown , localOpMetadata : unknown ) : void {
872
+ const contents = content as MatrixSetOrVectorOp < T > ;
873
+ const target = contents . target ;
874
+
875
+ switch ( target ) {
876
+ case SnapshotPath . cols : {
877
+ this . cols . rollback ( content , localOpMetadata ) ;
878
+ break ;
879
+ }
880
+ case SnapshotPath . rows : {
881
+ this . rows . rollback ( content , localOpMetadata ) ;
882
+ break ;
883
+ }
884
+ case undefined : {
885
+ assert ( contents . type === MatrixOp . set , "only sets supported" ) ;
886
+ const setMetadata = localOpMetadata as ISetOpMetadata ;
887
+
888
+ const pending = this . pending . getCell ( setMetadata . rowHandle , setMetadata . colHandle ) ;
889
+ assert ( pending !== undefined , "must have pending" ) ;
890
+
891
+ const change = pending . local . pop ( ) ;
892
+ assert ( change ?. localSeq === setMetadata . localSeq , "must have change" ) ;
893
+
894
+ const previous =
895
+ pending . local . length > 0
896
+ ? pending . local [ pending . local . length - 1 ] . value
897
+ : pending . consensus ;
898
+
899
+ this . setCellCore (
900
+ contents . row ,
901
+ contents . col ,
902
+ previous ,
903
+ setMetadata . rowHandle ,
904
+ setMetadata . colHandle ,
905
+ true ,
906
+ ) ;
907
+ }
908
+ default :
909
+ }
910
+ }
911
+
860
912
protected onDisconnect ( ) : void { }
861
913
862
914
/**
@@ -989,14 +1041,20 @@ export class SharedMatrix<T = any>
989
1041
this . cols . removeLocalReferencePosition ( colsRef ) ;
990
1042
991
1043
const pending = this . pending . getCell ( rowHandle , colHandle ) ;
992
- assert ( pending ?. shift ( ) === localSeq , "must match" ) ;
1044
+ const ackedChange = pending ?. local . shift ( ) ;
1045
+ assert ( ackedChange ?. localSeq === localSeq , "must match" ) ;
1046
+ if ( pending ?. local . length === 0 ) {
1047
+ this . pending . setCell ( rowHandle , colHandle , undefined ) ;
1048
+ }
993
1049
994
1050
// If policy is switched and cell should be modified too based on policy, then update the tracker.
995
1051
// If policy is not switched, then also update the tracker in case it is the latest.
996
1052
if (
997
1053
this . fwwPolicy . state === "on" &&
998
1054
this . shouldSetCellBasedOnFWW ( rowHandle , colHandle , msg )
999
1055
) {
1056
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
1057
+ pending ! . consensus = ackedChange . value ;
1000
1058
this . fwwPolicy . cellLastWriteTracker . setCell ( rowHandle , colHandle , {
1001
1059
seqNum : msg . sequenceNumber ,
1002
1060
clientId : msg . clientId ,
@@ -1015,6 +1073,7 @@ export class SharedMatrix<T = any>
1015
1073
isHandleValid ( rowHandle ) && isHandleValid ( colHandle ) ,
1016
1074
0x022 /* "SharedMatrix row and/or col handles are invalid!" */ ,
1017
1075
) ;
1076
+ const pending = this . pending . getCell ( rowHandle , colHandle ) ;
1018
1077
if ( this . fwwPolicy . state === "on" ) {
1019
1078
// If someone tried to Overwrite the cell value or first write on this cell or
1020
1079
// same client tried to modify the cell or if the previous mode was LWW, then we need to still
@@ -1026,11 +1085,14 @@ export class SharedMatrix<T = any>
1026
1085
seqNum : msg . sequenceNumber ,
1027
1086
clientId : msg . clientId ,
1028
1087
} ) ;
1088
+ if ( pending !== undefined ) {
1089
+ pending . consensus = value ;
1090
+ }
1029
1091
for ( const consumer of this . consumers . values ( ) ) {
1030
1092
consumer . cellsChanged ( adjustedRow , adjustedCol , 1 , 1 , this ) ;
1031
1093
}
1032
1094
// Check is there are any pending changes, which will be rejected. If so raise conflict.
1033
- if ( ( this . pending . getCell ( rowHandle , colHandle ) ?. length ?? 0 ) > 0 ) {
1095
+ if ( pending !== undefined && pending . local . length > 0 ) {
1034
1096
// Don't reset the pending value yet, as there maybe more fww op from same client, so we want
1035
1097
// to raise conflict event for that op also.
1036
1098
this . emit (
@@ -1043,12 +1105,16 @@ export class SharedMatrix<T = any>
1043
1105
) ;
1044
1106
}
1045
1107
}
1046
- } else if ( ( this . pending . getCell ( rowHandle , colHandle ) ?. length ?? 0 ) === 0 ) {
1047
- // If there is a pending (unACKed) local write to the same cell, skip the current op
1048
- // since it "happened before" the pending write.
1049
- this . cells . setCell ( rowHandle , colHandle , value ) ;
1050
- for ( const consumer of this . consumers . values ( ) ) {
1051
- consumer . cellsChanged ( adjustedRow , adjustedCol , 1 , 1 , this ) ;
1108
+ } else {
1109
+ if ( pending === undefined || pending . local . length === 0 ) {
1110
+ // If there is a pending (unACKed) local write to the same cell, skip the current op
1111
+ // since it "happened before" the pending write.
1112
+ this . cells . setCell ( rowHandle , colHandle , value ) ;
1113
+ for ( const consumer of this . consumers . values ( ) ) {
1114
+ consumer . cellsChanged ( adjustedRow , adjustedCol , 1 , 1 , this ) ;
1115
+ }
1116
+ } else {
1117
+ pending . consensus = value ;
1052
1118
}
1053
1119
}
1054
1120
}
0 commit comments