forked from keybase/kbfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconflict_resolver.go
3381 lines (3079 loc) · 107 KB
/
conflict_resolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2016 Keybase Inc. All rights reserved.
// Use of this source code is governed by a BSD
// license that can be found in the LICENSE file.
package libkbfs
import (
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/keybase/kbfs/kbfsblock"
"github.com/keybase/kbfs/kbfscrypto"
"github.com/keybase/kbfs/kbfsmd"
"github.com/keybase/kbfs/kbfssync"
"golang.org/x/net/context"
)
// CtxCRTagKey is the type used for unique context tags related to
// conflict resolution
type CtxCRTagKey int
const (
// CtxCRIDKey is the type of the tag for unique operation IDs
// related to conflict resolution
CtxCRIDKey CtxCRTagKey = iota
// If the number of outstanding unmerged revisions that need to be
// resolved together is greater than this number, then block
// unmerged writes to make sure we don't get *too* unmerged.
// TODO: throttle unmerged writes before resorting to complete
// blockage.
crMaxRevsThresholdDefault = 500
// How long we're allowed to block writes for if we exceed the max
// revisions threshold.
crMaxWriteLockTime = 10 * time.Second
)
// CtxCROpID is the display name for the unique operation
// conflict resolution ID tag.
const CtxCROpID = "CRID"
type conflictInput struct {
unmerged kbfsmd.Revision
merged kbfsmd.Revision
}
// ConflictResolver is responsible for resolving conflicts in the
// background.
type ConflictResolver struct {
config Config
fbo *folderBranchOps
prepper folderUpdatePrepper
log traceLogger
deferLog traceLogger
maxRevsThreshold int
inputChanLock sync.RWMutex
inputChan chan conflictInput
// resolveGroup tracks the outstanding resolves.
resolveGroup kbfssync.RepeatedWaitGroup
inputLock sync.Mutex
currInput conflictInput
currCancel context.CancelFunc
lockNextTime bool
canceledCount int
}
// NewConflictResolver constructs a new ConflictResolver (and launches
// any necessary background goroutines).
func NewConflictResolver(
config Config, fbo *folderBranchOps) *ConflictResolver {
// make a logger with an appropriate module name
branchSuffix := ""
if fbo.branch() != MasterBranch {
branchSuffix = " " + string(fbo.branch())
}
tlfStringFull := fbo.id().String()
log := config.MakeLogger(fmt.Sprintf("CR %s%s", tlfStringFull[:8],
branchSuffix))
cr := &ConflictResolver{
config: config,
fbo: fbo,
prepper: folderUpdatePrepper{
config: config,
folderBranch: fbo.folderBranch,
blocks: &fbo.blocks,
log: log,
},
log: traceLogger{log},
deferLog: traceLogger{log.CloneWithAddedDepth(1)},
maxRevsThreshold: crMaxRevsThresholdDefault,
currInput: conflictInput{
unmerged: kbfsmd.RevisionUninitialized,
merged: kbfsmd.RevisionUninitialized,
},
}
if config.Mode().ConflictResolutionEnabled() {
cr.startProcessing(BackgroundContextWithCancellationDelayer())
}
return cr
}
func (cr *ConflictResolver) startProcessing(baseCtx context.Context) {
cr.inputChanLock.Lock()
defer cr.inputChanLock.Unlock()
if cr.inputChan != nil {
return
}
cr.inputChan = make(chan conflictInput)
go cr.processInput(baseCtx, cr.inputChan)
}
func (cr *ConflictResolver) stopProcessing() {
cr.inputChanLock.Lock()
defer cr.inputChanLock.Unlock()
if cr.inputChan == nil {
return
}
close(cr.inputChan)
cr.inputChan = nil
}
// cancelExistingLocked must be called while holding cr.inputLock.
func (cr *ConflictResolver) cancelExistingLocked(ci conflictInput) bool {
// The input is only interesting if one of the revisions is
// greater than what we've looked at to date.
if ci.unmerged <= cr.currInput.unmerged &&
ci.merged <= cr.currInput.merged {
return false
}
if cr.currCancel != nil {
cr.currCancel()
}
return true
}
// ForceCancel cancels any currently-running CR, regardless of what
// its inputs were.
func (cr *ConflictResolver) ForceCancel() {
cr.inputLock.Lock()
defer cr.inputLock.Unlock()
if cr.currCancel != nil {
cr.currCancel()
}
}
// processInput processes conflict resolution jobs from the given
// channel until it is closed. This function uses a parameter for the
// channel instead of accessing cr.inputChan directly so that it
// doesn't have to hold inputChanLock.
func (cr *ConflictResolver) processInput(baseCtx context.Context,
inputChan <-chan conflictInput) {
// Start off with a closed prevCRDone, so that the first CR call
// doesn't have to wait.
prevCRDone := make(chan struct{})
close(prevCRDone)
defer func() {
cr.inputLock.Lock()
defer cr.inputLock.Unlock()
if cr.currCancel != nil {
cr.currCancel()
}
CleanupCancellationDelayer(baseCtx)
}()
for ci := range inputChan {
ctx := CtxWithRandomIDReplayable(baseCtx, CtxCRIDKey, CtxCROpID, cr.log)
valid := func() bool {
cr.inputLock.Lock()
defer cr.inputLock.Unlock()
valid := cr.cancelExistingLocked(ci)
if !valid {
return false
}
cr.log.CDebugf(ctx, "New conflict input %v following old "+
"input %v", ci, cr.currInput)
cr.currInput = ci
ctx, cr.currCancel = context.WithCancel(ctx)
return true
}()
if !valid {
cr.log.CDebugf(ctx, "Ignoring uninteresting input: %v", ci)
cr.resolveGroup.Done()
continue
}
waitChan := prevCRDone
prevCRDone = make(chan struct{}) // closed when doResolve finishes
go func(ci conflictInput, done chan<- struct{}) {
defer cr.resolveGroup.Done()
defer close(done)
// Wait for the previous CR without blocking any
// Resolve callers, as that could result in deadlock
// (KBFS-1001).
select {
case <-waitChan:
case <-ctx.Done():
cr.log.CDebugf(ctx, "Resolution canceled before starting")
return
}
cr.doResolve(ctx, ci)
}(ci, prevCRDone)
}
}
// Resolve takes the latest known unmerged and merged revision
// numbers, and kicks off the resolution process.
func (cr *ConflictResolver) Resolve(ctx context.Context,
unmerged kbfsmd.Revision, merged kbfsmd.Revision) {
cr.inputChanLock.RLock()
defer cr.inputChanLock.RUnlock()
// CR can end up trying to cancel itself via the SyncAll call, so
// prevent that from happening.
if crOpID := ctx.Value(CtxCRIDKey); crOpID != nil {
cr.log.CDebugf(ctx, "Ignoring self-resolve during CR")
return
}
if cr.inputChan == nil {
return
}
ci := conflictInput{unmerged, merged}
func() {
cr.inputLock.Lock()
defer cr.inputLock.Unlock()
// Cancel any running CR before we return, so the caller can be
// confident any ongoing CR superseded by this new input will be
// canceled before it releases any locks it holds.
//
// TODO: return early if this returns false, and log something
// using a newly-pass-in context.
_ = cr.cancelExistingLocked(ci)
}()
cr.resolveGroup.Add(1)
cr.inputChan <- ci
}
// Wait blocks until the current set of submitted resolutions are
// complete (though not necessarily successful), or until the given
// context is canceled.
func (cr *ConflictResolver) Wait(ctx context.Context) error {
return cr.resolveGroup.Wait(ctx)
}
// Shutdown cancels any ongoing resolutions and stops any background
// goroutines.
func (cr *ConflictResolver) Shutdown() {
cr.stopProcessing()
}
// Pause cancels any ongoing resolutions and prevents any new ones from
// starting.
func (cr *ConflictResolver) Pause() {
cr.stopProcessing()
}
// Restart re-enables conflict resolution, with a base context for CR
// operations. baseCtx must have a cancellation delayer.
func (cr *ConflictResolver) Restart(baseCtx context.Context) {
cr.startProcessing(baseCtx)
}
// BeginNewBranch resets any internal state to be ready to accept
// resolutions from a new branch.
func (cr *ConflictResolver) BeginNewBranch() {
cr.inputLock.Lock()
defer cr.inputLock.Unlock()
// Reset the curr input so we don't ignore a future CR
// request that uses the same revision number (i.e.,
// because the previous CR failed to flush due to a
// conflict).
cr.currInput = conflictInput{}
}
func (cr *ConflictResolver) checkDone(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
func (cr *ConflictResolver) getMDs(ctx context.Context, lState *lockState,
writerLocked bool) (unmerged []ImmutableRootMetadata,
merged []ImmutableRootMetadata, err error) {
// First get all outstanding unmerged MDs for this device.
var branchPoint kbfsmd.Revision
if writerLocked {
branchPoint, unmerged, err =
cr.fbo.getUnmergedMDUpdatesLocked(ctx, lState)
} else {
branchPoint, unmerged, err =
cr.fbo.getUnmergedMDUpdates(ctx, lState)
}
if err != nil {
return nil, nil, err
}
if len(unmerged) > 0 && unmerged[0].BID() == kbfsmd.PendingLocalSquashBranchID {
cr.log.CDebugf(ctx, "Squashing local branch")
return unmerged, nil, nil
}
// Now get all the merged MDs, starting from after the branch
// point. We fetch the branch point (if possible) to make sure
// it's the right predecessor of the unmerged branch. TODO: stop
// fetching the branch point and remove the successor check below
// once we fix KBFS-1664.
fetchFrom := branchPoint + 1
if branchPoint >= kbfsmd.RevisionInitial {
fetchFrom = branchPoint
}
merged, err = getMergedMDUpdates(
ctx, cr.fbo.config, cr.fbo.id(), fetchFrom, nil)
if err != nil {
return nil, nil, err
}
if len(unmerged) > 0 {
err := merged[0].CheckValidSuccessor(
merged[0].mdID, unmerged[0].ReadOnly())
if err != nil {
cr.log.CDebugf(ctx, "Branch point (rev=%d, mdID=%s) is not a "+
"valid successor for unmerged rev %d (mdID=%s, bid=%s)",
merged[0].Revision(), merged[0].mdID, unmerged[0].Revision(),
unmerged[0].mdID, unmerged[0].BID())
return nil, nil, err
}
}
// Remove branch point.
if len(merged) > 0 && fetchFrom == branchPoint {
merged = merged[1:]
}
return unmerged, merged, nil
}
// updateCurrInput assumes that both unmerged and merged are
// non-empty.
func (cr *ConflictResolver) updateCurrInput(ctx context.Context,
unmerged, merged []ImmutableRootMetadata) (err error) {
cr.inputLock.Lock()
defer cr.inputLock.Unlock()
// check done while holding the lock, so we know for sure if
// we've already been canceled and replaced by a new input.
err = cr.checkDone(ctx)
if err != nil {
return err
}
prevInput := cr.currInput
defer func() {
// reset the currInput if we get an error below
if err != nil {
cr.currInput = prevInput
}
}()
rev := unmerged[len(unmerged)-1].bareMd.RevisionNumber()
if rev < cr.currInput.unmerged {
return fmt.Errorf("Unmerged revision %d is lower than the "+
"expected unmerged revision %d", rev, cr.currInput.unmerged)
}
cr.currInput.unmerged = rev
if len(merged) > 0 {
rev = merged[len(merged)-1].bareMd.RevisionNumber()
if rev < cr.currInput.merged {
return fmt.Errorf("Merged revision %d is lower than the "+
"expected merged revision %d", rev, cr.currInput.merged)
}
} else {
rev = kbfsmd.RevisionUninitialized
}
cr.currInput.merged = rev
// Take the lock right away next time if either there are lots of
// unmerged revisions, or this is a local squash and we won't
// block for very long.
//
// TODO: if there are a lot of merged revisions, and they keep
// coming, we might consider doing a "partial" resolution, writing
// the result back to the unmerged branch (basically "rebasing"
// it). See KBFS-1896.
if (len(unmerged) > cr.maxRevsThreshold) ||
(len(unmerged) > 0 && unmerged[0].BID() == kbfsmd.PendingLocalSquashBranchID) {
cr.lockNextTime = true
}
return nil
}
func (cr *ConflictResolver) makeChains(ctx context.Context,
unmerged, merged []ImmutableRootMetadata) (
unmergedChains, mergedChains *crChains, err error) {
unmergedChains, err = newCRChainsForIRMDs(
ctx, cr.config.Codec(), unmerged, &cr.fbo.blocks, true)
if err != nil {
return nil, nil, err
}
// Make sure we don't try to unref any blocks that have already
// been GC'd in the merged branch.
for _, md := range merged {
for _, op := range md.data.Changes.Ops {
_, isGCOp := op.(*GCOp)
if !isGCOp {
continue
}
for _, ptr := range op.Unrefs() {
unmergedChains.doNotUnrefPointers[ptr] = true
}
}
}
// If there are no new merged changes, don't make any merged
// chains.
if len(merged) == 0 {
return unmergedChains, newCRChainsEmpty(), nil
}
mergedChains, err = newCRChainsForIRMDs(
ctx, cr.config.Codec(), merged, &cr.fbo.blocks, true)
if err != nil {
return nil, nil, err
}
// Make the chain summaries. Identify using the unmerged chains,
// since those are most likely to be able to identify a node in
// the cache.
unmergedSummary := unmergedChains.summary(unmergedChains, cr.fbo.nodeCache)
mergedSummary := mergedChains.summary(unmergedChains, cr.fbo.nodeCache)
// Ignore CR summaries for pending local squashes.
if len(unmerged) == 0 || unmerged[0].BID() != kbfsmd.PendingLocalSquashBranchID {
cr.fbo.status.setCRSummary(unmergedSummary, mergedSummary)
}
return unmergedChains, mergedChains, nil
}
// A helper class that implements sort.Interface to sort paths by
// descending path length.
type crSortedPaths []path
// Len implements sort.Interface for crSortedPaths
func (sp crSortedPaths) Len() int {
return len(sp)
}
// Less implements sort.Interface for crSortedPaths
func (sp crSortedPaths) Less(i, j int) bool {
return len(sp[i].path) > len(sp[j].path)
}
// Swap implements sort.Interface for crSortedPaths
func (sp crSortedPaths) Swap(i, j int) {
sp[j], sp[i] = sp[i], sp[j]
}
func createdFileWithConflictingWrite(unmergedChains, mergedChains *crChains,
unmergedOriginal, mergedOriginal BlockPointer) bool {
mergedChain := mergedChains.byOriginal[mergedOriginal]
unmergedChain := unmergedChains.byOriginal[unmergedOriginal]
if mergedChain == nil || unmergedChain == nil {
return false
}
unmergedWriteRange := unmergedChain.getCollapsedWriteRange()
mergedWriteRange := mergedChain.getCollapsedWriteRange()
// Are they exactly equivalent?
if writeRangesEquivalent(unmergedWriteRange, mergedWriteRange) {
unmergedChain.removeSyncOps()
return false
}
// If the unmerged one is just a truncate, we can safely ignore
// the unmerged chain.
if len(unmergedWriteRange) == 1 && unmergedWriteRange[0].isTruncate() &&
unmergedWriteRange[0].Off == 0 {
unmergedChain.removeSyncOps()
return false
}
// If the merged one is just a truncate, we can safely ignore
// the merged chain.
if len(mergedWriteRange) == 1 && mergedWriteRange[0].isTruncate() &&
mergedWriteRange[0].Off == 0 {
mergedChain.removeSyncOps()
return false
}
return true
}
// createdFileWithNonzeroSizes checks two possibly-conflicting
// createOps and returns true if the corresponding file has non-zero
// directory entry sizes in both the unmerged and merged branch. We
// need to check this sometimes, because a call to
// `createdFileWithConflictingWrite` might not have access to syncOps
// that have been resolved away in a previous iteration. See
// KBFS-2825 for details.
func (cr *ConflictResolver) createdFileWithNonzeroSizes(
ctx context.Context, unmergedChains, mergedChains *crChains,
unmergedChain *crChain, mergedChain *crChain,
unmergedCop, mergedCop *createOp) (bool, error) {
lState := makeFBOLockState()
kmd := mergedChains.mostRecentChainMDInfo.kmd
mergedDirBlock, err := cr.fbo.blocks.GetDirBlockForReading(
ctx, lState, kmd, mergedChain.mostRecent,
mergedCop.getFinalPath().Branch, path{})
if err != nil {
return false, err
}
kmd = unmergedChains.mostRecentChainMDInfo.kmd
unmergedDirBlock, err := cr.fbo.blocks.GetDirBlockForReading(
ctx, lState, kmd, unmergedChain.mostRecent,
unmergedCop.getFinalPath().Branch, path{})
if err != nil {
return false, err
}
mergedEntry, mergedOk :=
mergedDirBlock.Children[mergedCop.NewName]
unmergedEntry, unmergedOk :=
unmergedDirBlock.Children[mergedCop.NewName]
if mergedOk && unmergedOk &&
mergedEntry.Size > 0 && unmergedEntry.Size > 0 {
cr.log.CDebugf(ctx,
"Not merging files named %s with non-zero sizes "+
"(merged=%d unmerged=%d)",
unmergedCop.NewName, mergedEntry.Size, unmergedEntry.Size)
return true, nil
}
return false, nil
}
// checkPathForMerge checks whether the given unmerged chain and path
// contains any newly-created subdirectories that were created
// simultaneously in the merged branch as well. If so, it recursively
// checks that directory as well. It returns a slice of any new
// unmerged paths that need to be checked for conflicts later in
// conflict resolution, for all subdirectories of the given path.
func (cr *ConflictResolver) checkPathForMerge(ctx context.Context,
unmergedChain *crChain, unmergedPath path,
unmergedChains, mergedChains *crChains) ([]path, error) {
mergedChain, ok := mergedChains.byOriginal[unmergedChain.original]
if !ok {
// No corresponding merged chain means we don't have to merge
// any directories.
return nil, nil
}
// Find instances of the same directory being created in both
// branches. Only merge completely new directories -- anything
// involving a rename will result in a conflict for now.
//
// TODO: have a better merge strategy for renamed directories!
mergedCreates := make(map[string]*createOp)
for _, op := range mergedChain.ops {
cop, ok := op.(*createOp)
if !ok || len(cop.Refs()) == 0 || cop.renamed {
continue
}
mergedCreates[cop.NewName] = cop
}
if len(mergedCreates) == 0 {
return nil, nil
}
var newUnmergedPaths []path
toDrop := make(map[int]bool)
for i, op := range unmergedChain.ops {
cop, ok := op.(*createOp)
if !ok || len(cop.Refs()) == 0 || cop.renamed {
continue
}
// Is there a corresponding merged create with the same type?
mergedCop, ok := mergedCreates[cop.NewName]
if !ok || mergedCop.Type != cop.Type {
continue
}
unmergedOriginal := cop.Refs()[0]
mergedOriginal := mergedCop.Refs()[0]
if cop.Type != Dir {
// Only merge files if they don't both have writes.
// Double-check the directory blocks to see if the files
// have non-zero sizes, because an earlier resolution
// might have collapsed all the sync ops away.
if createdFileWithConflictingWrite(unmergedChains, mergedChains,
unmergedOriginal, mergedOriginal) {
continue
}
conflicts, err := cr.createdFileWithNonzeroSizes(
ctx, unmergedChains, mergedChains, unmergedChain, mergedChain,
cop, mergedCop)
if err != nil {
return nil, err
}
if conflicts {
continue
}
}
toDrop[i] = true
cr.log.CDebugf(ctx, "Merging name %s (%s) in %v (unmerged original %v "+
"changed to %v)", cop.NewName, cop.Type, unmergedChain.mostRecent,
unmergedOriginal, mergedOriginal)
// Change the original to match the merged original, so we can
// check for conflicts later. Note that the most recent will
// stay the same, so we can still match the unmerged path
// correctly.
err := unmergedChains.changeOriginal(unmergedOriginal, mergedOriginal)
if _, notFound := err.(NoChainFoundError); notFound {
unmergedChains.toUnrefPointers[unmergedOriginal] = true
continue
} else if err != nil {
return nil, err
}
unmergedChain, ok := unmergedChains.byOriginal[mergedOriginal]
if !ok {
return nil, fmt.Errorf("Change original (%v -> %v) didn't work",
unmergedOriginal, mergedOriginal)
}
newPath := unmergedPath.ChildPath(cop.NewName, unmergedChain.mostRecent)
if cop.Type == Dir {
// recurse for this chain
newPaths, err := cr.checkPathForMerge(ctx, unmergedChain, newPath,
unmergedChains, mergedChains)
if err != nil {
return nil, err
}
// Add any further subdirectories that need merging under this
// subdirectory.
newUnmergedPaths = append(newUnmergedPaths, newPaths...)
} else {
// Set the path for all child ops
unrefedOrig := false
for _, op := range unmergedChain.ops {
op.setFinalPath(newPath)
_, isSyncOp := op.(*syncOp)
// If a later write overwrites the original, take it
// out of the unmerged created list so it can be
// properly unreferenced.
if !unrefedOrig && isSyncOp {
unrefedOrig = true
delete(unmergedChains.createdOriginals, mergedOriginal)
}
}
}
// Then add this create's path.
newUnmergedPaths = append(newUnmergedPaths, newPath)
}
// Remove the unneeded create ops
if len(toDrop) > 0 {
newOps := make([]op, 0, len(unmergedChain.ops)-len(toDrop))
for i, op := range unmergedChain.ops {
if toDrop[i] {
cr.log.CDebugf(ctx,
"Dropping double create unmerged operation: %s", op)
} else {
newOps = append(newOps, op)
}
}
unmergedChain.ops = newOps
}
return newUnmergedPaths, nil
}
// findCreatedDirsToMerge finds directories that were created in both
// the unmerged and merged branches, and resets the original unmerged
// pointer to match the original merged pointer. It returns a slice of
// new unmerged paths that need to be combined with the unmergedPaths
// slice.
func (cr *ConflictResolver) findCreatedDirsToMerge(ctx context.Context,
unmergedPaths []path, unmergedChains, mergedChains *crChains) (
[]path, error) {
var newUnmergedPaths []path
for _, unmergedPath := range unmergedPaths {
unmergedChain, ok :=
unmergedChains.byMostRecent[unmergedPath.tailPointer()]
if !ok {
return nil, fmt.Errorf("findCreatedDirsToMerge: No unmerged chain "+
"for most recent %v", unmergedPath.tailPointer())
}
newPaths, err := cr.checkPathForMerge(ctx, unmergedChain, unmergedPath,
unmergedChains, mergedChains)
if err != nil {
return nil, err
}
newUnmergedPaths = append(newUnmergedPaths, newPaths...)
}
return newUnmergedPaths, nil
}
type createMapKey struct {
ptr BlockPointer
name string
}
// addChildBlocksIfIndirectFile adds refblocks for all child blocks of
// the given file. It will return an error if called with a pointer
// that doesn't represent a file.
func (cr *ConflictResolver) addChildBlocksIfIndirectFile(ctx context.Context,
lState *lockState, unmergedChains *crChains, currPath path, op op) error {
// For files with indirect pointers, add all child blocks
// as refblocks for the re-created file.
infos, err := cr.fbo.blocks.GetIndirectFileBlockInfos(
ctx, lState, unmergedChains.mostRecentChainMDInfo.kmd, currPath)
if err != nil {
return err
}
if len(infos) > 0 {
cr.log.CDebugf(ctx, "Adding child pointers for recreated "+
"file %s", currPath)
for _, info := range infos {
op.AddRefBlock(info.BlockPointer)
}
}
return nil
}
// resolvedMergedPathTail takes an unmerged path, and returns as much
// of the tail-end of the corresponding merged path that it can, using
// only information within the chains. It may not be able to return a
// complete chain if, for example, a directory was changed in the
// unmerged branch but not in the merged branch, and so the merged
// chain would not have enough information to construct the merged
// branch completely. This function returns the partial path, as well
// as the most recent pointer to the first changed node in the merged
// chains (which can be subsequently used to find the beginning of the
// merged path).
//
// The given unmerged path should be for a node that wasn't created
// during the unmerged branch.
//
// It is also possible for directories used in the unmerged path to
// have been completely removed from the merged path. In this case,
// they need to be recreated. So this function also returns a slice
// of create ops that will need to be replayed in the merged branch
// for the conflicts to be resolved; all of these ops have their
// writer info set to the given one.
func (cr *ConflictResolver) resolveMergedPathTail(ctx context.Context,
lState *lockState, unmergedPath path,
unmergedChains, mergedChains *crChains,
currUnmergedWriterInfo writerInfo) (
path, BlockPointer, []*createOp, error) {
unmergedOriginal, err :=
unmergedChains.originalFromMostRecent(unmergedPath.tailPointer())
if err != nil {
cr.log.CDebugf(ctx, "Couldn't find original pointer for %v",
unmergedPath.tailPointer())
return path{}, BlockPointer{}, nil, err
}
var recreateOps []*createOp // fill in backwards, and reverse at the end
currOriginal := unmergedOriginal
currPath := unmergedPath
mergedPath := path{
FolderBranch: unmergedPath.FolderBranch,
path: nil, // fill in backwards, and reverse at the end
}
// First find the earliest merged parent.
for mergedChains.isDeleted(currOriginal) {
cr.log.CDebugf(ctx, "%v was deleted in the merged branch (%s)",
currOriginal, currPath)
if !currPath.hasValidParent() {
return path{}, BlockPointer{}, nil,
fmt.Errorf("Couldn't find valid merged parent path for %v",
unmergedOriginal)
}
// If this node has been deleted, we need to search
// backwards in the path to find the latest node that
// hasn't been deleted and re-recreate nodes upward from
// there.
name := currPath.tailName()
mergedPath.path = append(mergedPath.path, pathNode{
BlockPointer: currOriginal,
Name: name,
})
parentPath := *currPath.parentPath()
parentOriginal, err :=
unmergedChains.originalFromMostRecent(parentPath.tailPointer())
if err != nil {
cr.log.CDebugf(ctx, "Couldn't find original pointer for %v",
parentPath.tailPointer())
return path{}, BlockPointer{}, nil, err
}
// Drop the merged rmOp since we're recreating it, and we
// don't want to replay that notification locally.
if mergedChain, ok := mergedChains.byOriginal[parentOriginal]; ok {
mergedMostRecent, err :=
mergedChains.mostRecentFromOriginalOrSame(currOriginal)
if err != nil {
return path{}, BlockPointer{}, nil, err
}
outer:
for i, op := range mergedChain.ops {
ro, ok := op.(*rmOp)
if !ok {
continue
}
// Use the unref'd pointer, and not the name, to identify
// the operation, since renames might have happened on the
// merged branch.
for _, unref := range ro.Unrefs() {
if unref != mergedMostRecent {
continue
}
mergedChain.ops =
append(mergedChain.ops[:i], mergedChain.ops[i+1:]...)
break outer
}
}
} else {
// If there's no chain, then likely a previous resolution
// removed an entire directory tree, and so the individual
// rm operations aren't listed. In that case, there's no
// rm op to remove.
cr.log.CDebugf(ctx, "No corresponding merged chain for parent "+
"%v; skipping rm removal", parentOriginal)
}
de, err := cr.fbo.blocks.GetDirtyEntry(ctx, lState,
unmergedChains.mostRecentChainMDInfo.kmd,
currPath)
if err != nil {
return path{}, BlockPointer{}, nil, err
}
co, err := newCreateOp(name, parentOriginal, de.Type)
if err != nil {
return path{}, BlockPointer{}, nil, err
}
co.AddSelfUpdate(parentOriginal)
co.setFinalPath(parentPath)
co.AddRefBlock(currOriginal)
co.setWriterInfo(currUnmergedWriterInfo)
if co.Type != Dir {
err = cr.addChildBlocksIfIndirectFile(ctx, lState,
unmergedChains, currPath, co)
if err != nil {
return path{}, BlockPointer{}, nil, err
}
}
// If this happens to have been renamed on the unmerged
// branch, drop the rm half of the rename operation; just
// leave it as a create.
if ri, ok := unmergedChains.renamedOriginals[currOriginal]; ok {
oldParent, ok := unmergedChains.byOriginal[ri.originalOldParent]
if !ok {
cr.log.CDebugf(ctx, "Couldn't find chain for original "+
"old parent: %v", ri.originalOldParent)
return path{}, BlockPointer{}, nil,
NoChainFoundError{ri.originalOldParent}
}
for _, op := range oldParent.ops {
ro, ok := op.(*rmOp)
if !ok {
continue
}
if ro.OldName == ri.oldName {
ro.dropThis = true
break
}
}
// Replace the create op with the new recreate op,
// which contains the proper refblock.
newParent, ok := unmergedChains.byOriginal[ri.originalNewParent]
if !ok {
cr.log.CDebugf(ctx, "Couldn't find chain for original new "+
"parent: %v", ri.originalNewParent)
return path{}, BlockPointer{}, nil,
NoChainFoundError{ri.originalNewParent}
}
for i, op := range newParent.ops {
oldCo, ok := op.(*createOp)
if !ok {
continue
}
if oldCo.NewName == ri.newName {
newParent.ops[i] = co
break
}
}
} else {
recreateOps = append(recreateOps, co)
}
currOriginal = parentOriginal
currPath = parentPath
}
// Now we have the latest pointer along the path that is
// shared between the branches. Our next step is to find the
// current merged path to the most recent version of that
// original. We can do that as follows:
// * If the pointer has been changed in the merged branch, we
// can search for it later using fbo.blocks.SearchForNodes
// * If it hasn't been changed, check if it has been renamed to
// somewhere else. If so, use fbo.blocks.SearchForNodes on
// that parent later.
// * Otherwise, iterate up the path towards the root.
var mostRecent BlockPointer
for i := len(currPath.path) - 1; i >= 0; i-- {
currOriginal, err := unmergedChains.originalFromMostRecent(
currPath.path[i].BlockPointer)
if err != nil {
cr.log.CDebugf(ctx, "Couldn't find original pointer for %v",
currPath.path[i])
return path{}, BlockPointer{}, nil, err
}
// Has it changed in the merged branch?
mostRecent, err = mergedChains.mostRecentFromOriginal(currOriginal)
if err == nil {
break
}
mergedPath.path = append(mergedPath.path, pathNode{
BlockPointer: currOriginal,
Name: currPath.path[i].Name,
})
// Has it been renamed?
if originalParent, newName, ok :=
mergedChains.renamedParentAndName(currOriginal); ok {
cr.log.CDebugf(ctx, "%v has been renamed in the merged branch",
currOriginal)
mostRecentParent, err :=
mergedChains.mostRecentFromOriginal(originalParent)
if err != nil {
cr.log.CDebugf(ctx, "Couldn't find original pointer for %v",
originalParent)
return path{}, BlockPointer{}, nil, err
}
mostRecent = mostRecentParent
// update the name for this renamed node
mergedPath.path[len(mergedPath.path)-1].Name = newName
break
}
}
// reverse the merged path
for i, j := 0, len(mergedPath.path)-1; i < j; i, j = i+1, j-1 {
mergedPath.path[i], mergedPath.path[j] =
mergedPath.path[j], mergedPath.path[i]
}
// reverse recreateOps
for i, j := 0, len(recreateOps)-1; i < j; i, j = i+1, j-1 {
recreateOps[i], recreateOps[j] = recreateOps[j], recreateOps[i]
}
return mergedPath, mostRecent, recreateOps, nil
}
// resolveMergedPaths maps each tail most recent pointer for all the
// given unmerged paths to a corresponding path in the merged branch.
// The merged branch may be missing some nodes that have been deleted;
// in that case, the merged path will contain placeholder path nodes
// using the original pointers for those directories.
//
// This function also returns a set of createOps that can be used to
// recreate the missing directories in the merged branch. If the
// parent directory needing the create has been deleted, then the
// unref ptr in the createOp contains the original pointer for the
// directory rather than the most recent merged pointer.
//
// It also potentially returns a new slice of unmerged paths that the
// caller should combine with the existing slice, corresponding to
// deleted unmerged chains that still have relevant operations to
// resolve.