Skip to content

Commit 74b3421

Browse files
committedJun 26, 2023
db: refactor obsolete file deletion code
The code around obsolete file deletion is very convoluted and there is one known race (described in cockroachdb#2630). This change reworks the deletion code as follows: - we add a cleanup manager which has a background goroutine. - all file and object deletions happen in the manager's background goroutine. - callers can enqueue cleanup jobs with the manager - tests can wait for outstanding cleanup jobs to finish We also add a missing call to `deleteObsoleteObjects` from the ingest path (which now can cause files to be removed through excising). Finally, we fix an incorrect `Remove` call in the error path of `Checkpoint()` (which was missing a `PathJoin`). Fixes cockroachdb#2630.
1 parent cdb67df commit 74b3421

22 files changed

+388
-556
lines changed
 

‎checkpoint.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -215,11 +215,7 @@ func (d *DB) Checkpoint(
215215
}
216216
if ckErr != nil {
217217
// Attempt to cleanup on error.
218-
paths, _ := fs.List(destDir)
219-
for _, path := range paths {
220-
_ = fs.Remove(path)
221-
}
222-
_ = fs.Remove(destDir)
218+
_ = fs.RemoveAll(destDir)
223219
}
224220
}()
225221
dir, ckErr = mkdirAllAndSyncParents(fs, destDir)

‎checkpoint_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func TestCheckpoint(t *testing.T) {
8787
if err := d.Compact(nil, []byte("\xff"), false); err != nil {
8888
return err.Error()
8989
}
90+
d.TestOnlyWaitForCleaning()
9091
return memLog.String()
9192

9293
case "flush":
@@ -189,7 +190,7 @@ func TestCheckpointCompaction(t *testing.T) {
189190
defer close(check)
190191
defer wg.Done()
191192
for i := 0; ctx.Err() == nil && i < 200; i++ {
192-
dir := fmt.Sprintf("checkpoint%6d", i)
193+
dir := fmt.Sprintf("checkpoint%06d", i)
193194
if err := d.Checkpoint(dir); err != nil {
194195
t.Error(err)
195196
return

‎cleaner.go

+239-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,18 @@
44

55
package pebble
66

7-
import "github.com/cockroachdb/pebble/internal/base"
7+
import (
8+
"context"
9+
"runtime/pprof"
10+
"sync"
11+
"time"
12+
13+
"github.com/cockroachdb/errors/oserror"
14+
"github.com/cockroachdb/pebble/internal/base"
15+
"github.com/cockroachdb/pebble/internal/invariants"
16+
"github.com/cockroachdb/pebble/objstorage"
17+
"github.com/cockroachdb/tokenbucket"
18+
)
819

920
// Cleaner exports the base.Cleaner type.
1021
type Cleaner = base.Cleaner
@@ -14,3 +25,230 @@ type DeleteCleaner = base.DeleteCleaner
1425

1526
// ArchiveCleaner exports the base.ArchiveCleaner type.
1627
type ArchiveCleaner = base.ArchiveCleaner
28+
29+
type cleanupManager struct {
30+
opts *Options
31+
objProvider objstorage.Provider
32+
onTableDeleteFn func(fileSize uint64)
33+
deletePacer *deletionPacer
34+
35+
// jobsCh is used as the cleanup job queue.
36+
jobsCh chan *cleanupJob
37+
// waitGroup is used to wait for the background goroutine to exit.
38+
waitGroup sync.WaitGroup
39+
40+
mu struct {
41+
sync.Mutex
42+
queuedJobs int
43+
completedJobs int
44+
completedJobsCond sync.Cond
45+
}
46+
}
47+
48+
// In practice, we should rarely have more than a couple of jobs (in most cases
49+
// we Wait() after queueing a job).
50+
const jobsChLen = 1000
51+
52+
// obsoleteFile holds information about a file that needs to be deleted soon.
53+
type obsoleteFile struct {
54+
dir string
55+
fileNum base.DiskFileNum
56+
fileType fileType
57+
fileSize uint64
58+
}
59+
60+
type cleanupJob struct {
61+
jobID int
62+
obsoleteFiles []obsoleteFile
63+
}
64+
65+
// openCleanupManager creates a cleanupManager and starts its background goroutine.
66+
// The cleanupManager must be Close()d.
67+
func openCleanupManager(
68+
opts *Options,
69+
objProvider objstorage.Provider,
70+
onTableDeleteFn func(fileSize uint64),
71+
getDeletePacerInfo func() deletionPacerInfo,
72+
) *cleanupManager {
73+
cm := &cleanupManager{
74+
opts: opts,
75+
objProvider: objProvider,
76+
onTableDeleteFn: onTableDeleteFn,
77+
deletePacer: newDeletionPacer(getDeletePacerInfo),
78+
jobsCh: make(chan *cleanupJob, jobsChLen),
79+
}
80+
cm.mu.completedJobsCond.L = &cm.mu.Mutex
81+
cm.waitGroup.Add(1)
82+
83+
go func() {
84+
pprof.Do(context.Background(), gcLabels, func(context.Context) {
85+
cm.mainLoop()
86+
})
87+
}()
88+
89+
return cm
90+
}
91+
92+
// Close stops the background goroutine, waiting until all queued jobs are completed.
93+
// Delete pacing is disabled for the remaining jobs.
94+
func (cm *cleanupManager) Close() {
95+
close(cm.jobsCh)
96+
cm.waitGroup.Wait()
97+
}
98+
99+
// EnqueueJob adds a cleanup job to the manager's queue.
100+
func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
101+
job := &cleanupJob{
102+
jobID: jobID,
103+
obsoleteFiles: obsoleteFiles,
104+
}
105+
cm.mu.Lock()
106+
cm.mu.queuedJobs++
107+
cm.mu.Unlock()
108+
109+
if invariants.Enabled && len(cm.jobsCh) >= cap(cm.jobsCh)-2 {
110+
panic("cleanup jobs queue full")
111+
}
112+
cm.jobsCh <- job
113+
}
114+
115+
// Wait until the completion of all jobs that were already queued.
116+
//
117+
// Does not wait for jobs that are enqueued during the call.
118+
//
119+
// Note that DB.mu should not be held while calling this method; the background
120+
// goroutine needs to acquire DB.mu to update deleted table metrics.
121+
func (cm *cleanupManager) Wait() {
122+
cm.mu.Lock()
123+
defer cm.mu.Unlock()
124+
n := cm.mu.queuedJobs
125+
for cm.mu.completedJobs < n {
126+
cm.mu.completedJobsCond.Wait()
127+
}
128+
}
129+
130+
// mainLoop runs the manager's background goroutine.
131+
func (cm *cleanupManager) mainLoop() {
132+
defer cm.waitGroup.Done()
133+
useLimiter := false
134+
var limiter tokenbucket.TokenBucket
135+
136+
if r := cm.opts.TargetByteDeletionRate; r != 0 {
137+
useLimiter = true
138+
limiter.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(r))
139+
}
140+
141+
for job := range cm.jobsCh {
142+
for _, of := range job.obsoleteFiles {
143+
if of.fileType != fileTypeTable {
144+
path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum)
145+
cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize)
146+
} else {
147+
if useLimiter {
148+
cm.maybePace(&limiter, of.fileType, of.fileNum, of.fileSize)
149+
}
150+
cm.onTableDeleteFn(of.fileSize)
151+
cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum)
152+
}
153+
}
154+
cm.mu.Lock()
155+
cm.mu.completedJobs++
156+
cm.mu.completedJobsCond.Broadcast()
157+
cm.mu.Unlock()
158+
}
159+
}
160+
161+
// maybePace sleeps before deleting an object if appropriate. It is always
162+
// called from the background goroutine.
163+
func (cm *cleanupManager) maybePace(
164+
limiter *tokenbucket.TokenBucket,
165+
fileType base.FileType,
166+
fileNum base.DiskFileNum,
167+
fileSize uint64,
168+
) {
169+
meta, err := cm.objProvider.Lookup(fileType, fileNum)
170+
if err != nil {
171+
// The object was already removed from the provider; we won't actually
172+
// delete anything, so we don't need to pace.
173+
return
174+
}
175+
if meta.IsShared() {
176+
// Don't throttle deletion of shared objects.
177+
return
178+
}
179+
if !cm.deletePacer.shouldPace() {
180+
// The deletion pacer decided that we shouldn't throttle; account
181+
// for the operation but don't wait for tokens.
182+
limiter.Adjust(-tokenbucket.Tokens(fileSize))
183+
return
184+
}
185+
// Wait for tokens.
186+
for {
187+
ok, d := limiter.TryToFulfill(tokenbucket.Tokens(fileSize))
188+
if ok {
189+
break
190+
}
191+
time.Sleep(d)
192+
}
193+
}
194+
195+
// deleteObsoleteFile deletes a (non-object) file that is no longer needed.
196+
func (cm *cleanupManager) deleteObsoleteFile(
197+
fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
198+
) {
199+
// TODO(peter): need to handle this error, probably by re-adding the
200+
// file that couldn't be deleted to one of the obsolete slices map.
201+
err := cm.opts.Cleaner.Clean(cm.opts.FS, fileType, path)
202+
if oserror.IsNotExist(err) {
203+
return
204+
}
205+
206+
switch fileType {
207+
case fileTypeLog:
208+
cm.opts.EventListener.WALDeleted(WALDeleteInfo{
209+
JobID: jobID,
210+
Path: path,
211+
FileNum: fileNum.FileNum(),
212+
Err: err,
213+
})
214+
case fileTypeManifest:
215+
cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
216+
JobID: jobID,
217+
Path: path,
218+
FileNum: fileNum.FileNum(),
219+
Err: err,
220+
})
221+
case fileTypeTable:
222+
panic("invalid deletion of object file")
223+
}
224+
}
225+
226+
func (cm *cleanupManager) deleteObsoleteObject(
227+
fileType fileType, jobID int, fileNum base.DiskFileNum,
228+
) {
229+
if fileType != fileTypeTable {
230+
panic("not an object")
231+
}
232+
233+
var path string
234+
meta, err := cm.objProvider.Lookup(fileType, fileNum)
235+
if err != nil {
236+
path = "<nil>"
237+
} else {
238+
path = cm.objProvider.Path(meta)
239+
err = cm.objProvider.Remove(fileType, fileNum)
240+
}
241+
if cm.objProvider.IsNotExistError(err) {
242+
return
243+
}
244+
245+
switch fileType {
246+
case fileTypeTable:
247+
cm.opts.EventListener.TableDeleted(TableDeleteInfo{
248+
JobID: jobID,
249+
Path: path,
250+
FileNum: fileNum.FileNum(),
251+
Err: err,
252+
})
253+
}
254+
}

‎cleaner_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ func TestCleaner(t *testing.T) {
113113
if err != nil {
114114
return err.Error()
115115
}
116+
d.TestOnlyWaitForCleaning()
117+
d.testingAlwaysWaitForCleanup = true
116118
dbs[dir] = d
117119
return memLog.String()
118120

0 commit comments

Comments
 (0)
Please sign in to comment.