Skip to content

Commit cd1dadb

Browse files
authored
Use entry path for garbage collection (#822)
* use entry path for garbage collection * entriesPath test * test WithGarbageCollectionInterval test WithGarbageCollectionInterval * more comments * address comments
1 parent 3b422a8 commit cd1dadb

File tree

7 files changed

+498
-27
lines changed

7 files changed

+498
-27
lines changed

storage/aws/aws.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ type sequencer interface {
120120
publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(ctx context.Context, size uint64, root []byte) error) error
121121

122122
// garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation.
123-
garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error) error
123+
garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error, entriesPath func(uint64, uint8) string) error
124124
}
125125

126126
// consumeFunc is the signature of a function which can consume entries from the sequencer.
@@ -341,7 +341,7 @@ func (a *Appender) garbageCollectorJob(ctx context.Context, i time.Duration) {
341341
return
342342
}
343343

344-
if err := a.sequencer.garbageCollect(ctx, pubSize, maxBundlesPerRun, a.logStore.objStore.deleteObjectsWithPrefix); err != nil {
344+
if err := a.sequencer.garbageCollect(ctx, pubSize, maxBundlesPerRun, a.logStore.objStore.deleteObjectsWithPrefix, a.logStore.entriesPath); err != nil {
345345
klog.Warningf("GarbageCollect failed: %v", err)
346346
return
347347
}
@@ -1279,7 +1279,7 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
12791279
//
12801280
// Uses the `GCCoord` table to ensure that only one binary is actively garbage collecting at any given time, and to track progress so that we don't
12811281
// needlessly attempt to GC over regions which have already been cleaned.
1282-
func (s *mySQLSequencer) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint, deleteWithPrefix func(ctx context.Context, prefix string) error) error {
1282+
func (s *mySQLSequencer) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint, deleteWithPrefix func(ctx context.Context, prefix string) error, entriesPath func(uint64, uint8) string) error {
12831283
tx, err := s.dbPool.Begin()
12841284
if err != nil {
12851285
return err
@@ -1310,7 +1310,7 @@ func (s *mySQLSequencer) garbageCollect(ctx context.Context, treeSize uint64, ma
13101310
}
13111311

13121312
// GC any partial versions of the entry bundle itself and the tile which sits immediately above it.
1313-
eg.Go(func() error { return deleteWithPrefix(ctx, layout.EntriesPath(ri.Index, 0)+".p/") })
1313+
eg.Go(func() error { return deleteWithPrefix(ctx, entriesPath(ri.Index, 0)+".p/") })
13141314
eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(0, ri.Index, 0)+".p/") })
13151315
fromSize += uint64(ri.N)
13161316
d++

storage/aws/aws_test.go

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -536,15 +536,15 @@ func TestGarbageCollect(t *testing.T) {
536536
}
537537

538538
t.Logf("Running GC at size %d", size)
539-
if err := s.garbageCollect(ctx, size, 1000, m.deleteObjectsWithPrefix); err != nil {
539+
if err := s.garbageCollect(ctx, size, 1000, m.deleteObjectsWithPrefix, appender.logStore.entriesPath); err != nil {
540540
t.Fatalf("garbageCollect: %v", err)
541541
}
542542
t.Logf("GC complete at size %d", size)
543543

544544
// Compare any remaining partial resources to the list of places
545545
// we'd expect them to be, given the tree size.
546546
wantPartialPrefixes := make(map[string]struct{})
547-
for _, p := range expectedPartialPrefixes(size) {
547+
for _, p := range expectedPartialPrefixes(size, appender.logStore.entriesPath) {
548548
wantPartialPrefixes[p] = struct{}{}
549549
}
550550
for k := range m.mem {
@@ -565,17 +565,159 @@ func TestGarbageCollect(t *testing.T) {
565565
}
566566
}
567567

568+
func TestGarbageCollectOption(t *testing.T) {
569+
batchSize := uint64(60000)
570+
integrateEvery := uint64(31234)
571+
garbageCollectionInterval := 100 * time.Millisecond
572+
573+
for _, test := range []struct {
574+
name string
575+
withCTLayout bool
576+
withGarbageCollectionInterval time.Duration
577+
}{
578+
{
579+
name: "on",
580+
withGarbageCollectionInterval: garbageCollectionInterval,
581+
withCTLayout: false,
582+
},
583+
{
584+
name: "on-ct",
585+
withGarbageCollectionInterval: garbageCollectionInterval,
586+
withCTLayout: true,
587+
},
588+
{
589+
name: "off",
590+
withGarbageCollectionInterval: time.Duration(0),
591+
withCTLayout: false,
592+
},
593+
} {
594+
t.Run(test.name, func(t *testing.T) {
595+
596+
ctx := t.Context()
597+
if canSkipMySQLTest(t, ctx) {
598+
klog.Warningf("MySQL not available, skipping %s", t.Name())
599+
t.Skip("MySQL not available, skipping test")
600+
}
601+
// Clean tables in case there's already something in there.
602+
mustDropTables(t, ctx)
603+
604+
s, err := newMySQLSequencer(ctx, *mySQLURI, batchSize, 0, 0)
605+
if err != nil {
606+
t.Fatalf("newMySQLSequencer: %v", err)
607+
}
608+
defer func() {
609+
if err := s.dbPool.Close(); err != nil {
610+
t.Fatalf("Close: %v", err)
611+
}
612+
}()
613+
614+
sk, vk := mustGenerateKeys(t)
615+
616+
m := newMemObjStore()
617+
storage := &Storage{}
618+
619+
opts := tessera.NewAppendOptions().
620+
WithCheckpointInterval(1200*time.Millisecond).
621+
WithBatching(uint(batchSize), 100*time.Millisecond).
622+
// Disable GC so we can manually invoke below.
623+
WithGarbageCollectionInterval(test.withGarbageCollectionInterval).
624+
WithCheckpointSigner(sk)
625+
626+
if test.withCTLayout {
627+
opts.WithCTLayout()
628+
}
629+
630+
appender, lr, err := storage.newAppender(ctx, m, s, opts)
631+
if err != nil {
632+
t.Fatalf("newAppender: %v", err)
633+
}
634+
if err := appender.publishCheckpoint(ctx, 0, []byte("")); err != nil {
635+
t.Fatalf("publishCheckpoint: %v", err)
636+
}
637+
638+
// Build a reasonably-sized tree with a bunch of partial resouces present, and wait for
639+
// it to be published.
640+
treeSize := uint64(256 * 384)
641+
642+
a := tessera.NewPublicationAwaiter(ctx, lr.ReadCheckpoint, 100*time.Millisecond)
643+
wantPartialPrefixes := make(map[string]struct{})
644+
645+
// Grow the tree several times to check continued correct operation over lifetime of the log.
646+
// Let garbage collection happen in the background.
647+
for size := uint64(0); size < treeSize; {
648+
t.Logf("Adding entries from %d", size)
649+
for range batchSize {
650+
f := appender.Add(ctx, tessera.NewEntry(fmt.Appendf(nil, "entry %d", size)))
651+
if size%integrateEvery == 0 {
652+
t.Logf("Awaiting entry %d", size)
653+
if _, _, err := a.Await(ctx, f); err != nil {
654+
t.Fatalf("Await: %v", err)
655+
}
656+
// If garbage collection is off, we want partial tiles and bundles to stick around.
657+
if test.withGarbageCollectionInterval == time.Duration(0) {
658+
for _, p := range expectedPartialPrefixes(size, appender.logStore.entriesPath) {
659+
wantPartialPrefixes[p] = struct{}{}
660+
}
661+
}
662+
}
663+
size++
664+
}
665+
t.Logf("Awaiting tree at size %d", size)
666+
if _, _, err := a.Await(ctx, func() (tessera.Index, error) { return tessera.Index{Index: size - 1}, nil }); err != nil {
667+
t.Fatalf("Await final tree: %v", err)
668+
}
669+
670+
// Leave a bit of time for Garbage Collection to run.
671+
time.Sleep(3 * garbageCollectionInterval)
672+
673+
// Compare any remaining partial resources to the list of places
674+
// we'd expect them to be, given the tree size.
675+
676+
// Regardless of whether garbage collection is on, partial tiles corresponding to the last
677+
// checkpoint should alway be here.
678+
for _, p := range expectedPartialPrefixes(size, appender.logStore.entriesPath) {
679+
wantPartialPrefixes[p] = struct{}{}
680+
}
681+
allPartialDirs := make(map[string]struct{})
682+
for k := range m.mem {
683+
if strings.Contains(k, ".p/") {
684+
allPartialDirs[strings.SplitAfter(k, ".p/")[0]] = struct{}{}
685+
}
686+
}
687+
// If gargabe collection is on, no partial tiles other than the ones we expect should be
688+
// present.
689+
for p := range allPartialDirs {
690+
if _, ok := wantPartialPrefixes[p]; !ok && test.withGarbageCollectionInterval > 0 {
691+
t.Errorf("Found unwanted partial: %s", p)
692+
}
693+
delete(wantPartialPrefixes, p)
694+
}
695+
for p := range wantPartialPrefixes {
696+
t.Errorf("Did not find expected partial: %s", p)
697+
}
698+
}
699+
700+
// And finally, for good measure, assert that all the resources implied by the log's checkpoint
701+
// are present.
702+
f := fsck.New(vk.Name(), vk, lr, defaultMerkleLeafHasher, fsck.Opts{N: 1})
703+
if err := f.Check(ctx); err != nil {
704+
t.Fatalf("FSCK failed: %v", err)
705+
}
706+
})
707+
}
708+
}
709+
568710
// expectedPartialPrefixes returns a slice containing resource prefixes where it's acceptable for a
569711
// tree of the provided size to have partial resources.
570712
//
571713
// These are really just the right-hand tiles/entry bundle in the tree.
572-
func expectedPartialPrefixes(size uint64) []string {
714+
func expectedPartialPrefixes(size uint64, entriesPath func(uint64, uint8) string) []string {
573715
r := []string{}
574716
for l, c := uint64(0), size; c > 0; l, c = l+1, c>>8 {
575717
idx, p := c/256, c%256
576718
if p != 0 {
577719
if l == 0 {
578-
r = append(r, layout.EntriesPath(idx, 0)+".p/")
720+
r = append(r, entriesPath(idx, 0)+".p/")
579721
}
580722
r = append(r, layout.TilePath(l, idx, 0)+".p/")
581723
}

storage/gcp/gcp.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ type sequencer interface {
114114
// publishCheckpoint coordinates the publication of new checkpoints based on the current integrated tree.
115115
publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(ctx context.Context, size uint64, root []byte) error) error
116116
// garbageCollect coordinates the removal of unneeded partial tiles/entry bundles for the provided tree size, up to a maximum number of deletes per invocation.
117-
garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error) error
117+
garbageCollect(ctx context.Context, treeSize uint64, maxDeletes uint, removePrefix func(ctx context.Context, prefix string) error, entriesPath func(uint64, uint8) string) error
118118
}
119119

120120
// consumeFunc is the signature of a function which can consume entries from the sequencer and integrate
@@ -394,7 +394,7 @@ func (a *Appender) garbageCollectorJob(ctx context.Context, i time.Duration) {
394394
return
395395
}
396396

397-
if err := a.sequencer.garbageCollect(ctx, pubSize, maxBundlesPerRun, a.logStore.objStore.deleteObjectsWithPrefix); err != nil {
397+
if err := a.sequencer.garbageCollect(ctx, pubSize, maxBundlesPerRun, a.logStore.objStore.deleteObjectsWithPrefix, a.logStore.entriesPath); err != nil {
398398
klog.Warningf("GarbageCollect failed: %v", err)
399399
return
400400
}
@@ -1072,7 +1072,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
10721072
//
10731073
// Uses the `GCCoord` table to ensure that only one binary is actively garbage collecting at any given time, and to track progress so that we don't
10741074
// needlessly attempt to GC over regions which have already been cleaned.
1075-
func (s *spannerCoordinator) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint, deleteWithPrefix func(ctx context.Context, prefix string) error) error {
1075+
func (s *spannerCoordinator) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint, deleteWithPrefix func(ctx context.Context, prefix string) error, entriesPath func(uint64, uint8) string) error {
10761076
_, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
10771077
row, err := txn.ReadRowWithOptions(ctx, "GCCoord", spanner.Key{0}, []string{"fromSize"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
10781078
if err != nil {
@@ -1099,7 +1099,7 @@ func (s *spannerCoordinator) garbageCollect(ctx context.Context, treeSize uint64
10991099
}
11001100

11011101
// GC any partial versions of the entry bundle itself and the tile which sits immediately above it.
1102-
eg.Go(func() error { return deleteWithPrefix(ctx, layout.EntriesPath(ri.Index, 0)+".p/") })
1102+
eg.Go(func() error { return deleteWithPrefix(ctx, entriesPath(ri.Index, 0)+".p/") })
11031103
eg.Go(func() error { return deleteWithPrefix(ctx, layout.TilePath(0, ri.Index, 0)+".p/") })
11041104
fromSize += uint64(ri.N)
11051105
d++

0 commit comments

Comments
 (0)