Skip to content

Commit 4fe97f6

Browse files
committed
backup: hooked up compaction processor logic to job
1 parent bc0f871 commit 4fe97f6

7 files changed

+258
-48
lines changed

Diff for: pkg/backup/BUILD.bazel

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ go_library(
1414
"backup_processor_planning.go",
1515
"backup_span_coverage.go",
1616
"backup_telemetry.go",
17-
"compaction.go",
17+
"compaction_job.go",
1818
"compaction_policy.go",
1919
"compaction_processor.go",
2020
"compaction_processor_planning.go",
@@ -180,6 +180,7 @@ go_test(
180180
"bench_covering_test.go",
181181
"bench_test.go",
182182
"compaction_policy_test.go",
183+
"compaction_processor_planning_test.go",
183184
"compaction_test.go",
184185
"create_scheduled_backup_test.go",
185186
"data_driven_generated_test.go", # keep

Diff for: pkg/backup/compaction.go renamed to pkg/backup/compaction_job.go

+53-8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3737
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
3838
"github.com/cockroachdb/cockroach/pkg/util"
39+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3940
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4041
"github.com/cockroachdb/cockroach/pkg/util/log"
4142
"github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -82,7 +83,10 @@ func maybeStartCompactionJob(
8283
execCfg.InternalDB,
8384
user,
8485
)
85-
chain, _, _, _, err := getBackupChain(ctx, execCfg, user, triggerJob, &kmsEnv)
86+
chain, _, _, _, err := getBackupChain(
87+
ctx, execCfg, user, triggerJob.Destination, triggerJob.EncryptionOptions,
88+
triggerJob.EndTime, &kmsEnv,
89+
)
8690
if err != nil {
8791
return 0, errors.Wrap(err, "failed to get backup chain")
8892
}
@@ -351,7 +355,9 @@ func (b *backupResumer) ResumeCompaction(
351355
// dying), so if we receive a retryable error, re-plan and retry the backup.
352356
// TODO (kev-cao): Add progress tracking to compactions.
353357
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
354-
if err = compactChain.Compact(ctx, execCtx, updatedDetails, backupManifest, defaultStore, kmsEnv); err == nil {
358+
if err = doCompaction(
359+
ctx, execCtx, b.job.ID(), updatedDetails, compactChain, backupManifest, defaultStore, kmsEnv,
360+
); err == nil {
355361
break
356362
}
357363

@@ -674,11 +680,19 @@ func getBackupChain(
674680
log.Warningf(ctx, "failed to cleanup incremental backup stores: %+v", err)
675681
}
676682
}()
677-
encryption, err := backupencryption.GetEncryptionFromBaseStore(
678-
ctx, baseStores[0], encryptionOpts, kmsEnv,
679-
)
680-
if err != nil {
681-
return nil, nil, nil, nil, err
683+
// If encryption keys have not already been computed, then we will compute
684+
// it using the base store.
685+
var encryption *jobspb.BackupEncryptionOptions
686+
if encryptionOpts != nil && encryptionOpts.Mode != jobspb.EncryptionMode_None &&
687+
(encryptionOpts.Key != nil || encryptionOpts.KMSInfo != nil) {
688+
encryption = encryptionOpts
689+
} else {
690+
encryption, err = backupencryption.GetEncryptionFromBaseStore(
691+
ctx, baseStores[0], encryptionOpts, kmsEnv,
692+
)
693+
if err != nil {
694+
return nil, nil, nil, nil, err
695+
}
682696
}
683697
mem := execCfg.RootMemoryMonitor.MakeBoundAccount()
684698
defer mem.Close(ctx)
@@ -738,7 +752,7 @@ func concludeBackupCompaction(
738752
func processProgress(
739753
ctx context.Context,
740754
manifest *backuppb.BackupManifest,
741-
progCh <-chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
755+
progCh <-chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
742756
) error {
743757
// When a processor is done exporting a span, it will send a progress update
744758
// to progCh.
@@ -782,6 +796,37 @@ func compactionJobDescription(details jobspb.BackupDetails) (string, error) {
782796
return fmtCtx.CloseAndGetString(), nil
783797
}
784798

799+
func doCompaction(
800+
ctx context.Context,
801+
execCtx sql.JobExecContext,
802+
jobID jobspb.JobID,
803+
details jobspb.BackupDetails,
804+
compactChain compactionChain,
805+
manifest *backuppb.BackupManifest,
806+
defaultStore cloud.ExternalStorage,
807+
kmsEnv cloud.KMSEnv,
808+
) error {
809+
progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
810+
runDistCompaction := func(ctx context.Context) error {
811+
return runCompactionPlan(
812+
ctx, execCtx, jobID, details, compactChain, manifest, defaultStore, kmsEnv, progCh,
813+
)
814+
}
815+
checkpointLoop := func(ctx context.Context) error {
816+
return processProgress(ctx, manifest, progCh)
817+
}
818+
819+
if err := ctxgroup.GoAndWait(
820+
ctx, runDistCompaction, checkpointLoop,
821+
); err != nil {
822+
return err
823+
}
824+
825+
return concludeBackupCompaction(
826+
ctx, execCtx, defaultStore, details.EncryptionOptions, kmsEnv, manifest,
827+
)
828+
}
829+
785830
func init() {
786831
builtins.StartCompactionJob = StartCompactionJob
787832
}

Diff for: pkg/backup/compaction_processor.go

+17-12
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,9 @@ func runCompactBackups(
167167
spec execinfrapb.CompactBackupsSpec,
168168
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
169169
) error {
170+
if len(spec.AssignedSpans) == 0 {
171+
return nil
172+
}
170173
user := spec.User()
171174
execCfg, ok := flowCtx.Cfg.ExecutorConfig.(*sql.ExecutorConfig)
172175
if !ok {
@@ -177,6 +180,9 @@ func runCompactBackups(
177180
return errors.Wrapf(err, "export configuration")
178181
}
179182
defaultStore, err := execCfg.DistSQLSrv.ExternalStorage(ctx, defaultConf)
183+
if err != nil {
184+
return errors.Wrapf(err, "external storage")
185+
}
180186

181187
compactChain, encryption, err := compactionChainFromSpec(ctx, execCfg, spec, user)
182188
if err != nil {
@@ -186,6 +192,7 @@ func runCompactBackups(
186192
intersectSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
187193
tasks := []func(context.Context) error{
188194
func(ctx context.Context) error {
195+
defer close(intersectSpanCh)
189196
return genIntersectingSpansForCompaction(ctx, execCfg, spec, compactChain, intersectSpanCh)
190197
},
191198
func(ctx context.Context) error {
@@ -284,9 +291,7 @@ func genIntersectingSpansForCompaction(
284291
compactChain compactionChain,
285292
intersectSpanCh chan execinfrapb.RestoreSpanEntry,
286293
) error {
287-
user := spec.User()
288-
289-
backupLocalityMap, err := makeBackupLocalityMap(compactChain.compactedLocalityInfo, user)
294+
backupLocalityMap, err := makeBackupLocalityMap(compactChain.compactedLocalityInfo, spec.User())
290295
if err != nil {
291296
return err
292297
}
@@ -299,7 +304,6 @@ func genIntersectingSpansForCompaction(
299304
}
300305
defer introducedSpanFrontier.Release()
301306

302-
entryCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
303307
targetSize := targetRestoreSpanSize.Get(&execCfg.Settings.SV)
304308
maxFiles := maxFileCount.Get(&execCfg.Settings.SV)
305309

@@ -311,12 +315,12 @@ func genIntersectingSpansForCompaction(
311315
targetSize,
312316
maxFiles,
313317
)
318+
if err != nil {
319+
return err
320+
}
314321

315-
genSpan := func(ctx context.Context) error {
322+
genSpan := func(ctx context.Context, entryCh chan execinfrapb.RestoreSpanEntry) error {
316323
defer close(entryCh)
317-
if err != nil {
318-
return err
319-
}
320324
return errors.Wrapf(generateAndSendImportSpans(
321325
ctx,
322326
spec.Spans,
@@ -329,7 +333,7 @@ func genIntersectingSpansForCompaction(
329333
), "generate and send import spans")
330334
}
331335

332-
filterIntersectingSpans := func(ctx context.Context) error {
336+
filterIntersectingSpans := func(ctx context.Context, entryCh chan execinfrapb.RestoreSpanEntry) error {
333337
for {
334338
select {
335339
case <-ctx.Done():
@@ -348,12 +352,13 @@ func genIntersectingSpansForCompaction(
348352
}
349353
}
350354

355+
entryCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
351356
tasks := []func(context.Context) error{
352357
func(ctx context.Context) error {
353-
return genSpan(ctx)
358+
return genSpan(ctx, entryCh)
354359
},
355360
func(ctx context.Context) error {
356-
return filterIntersectingSpans(ctx)
361+
return filterIntersectingSpans(ctx, entryCh)
357362
},
358363
}
359364
return ctxgroup.GoAndWait(ctx, tasks...)
@@ -368,7 +373,7 @@ func openSSTs(
368373
) (mergedSST, error) {
369374
var dirs []cloud.ExternalStorage
370375
storeFiles := make([]storageccl.StoreFile, 0, len(entry.Files))
371-
for idx := 0; idx < len(entry.Files); idx++ {
376+
for idx := range entry.Files {
372377
file := entry.Files[idx]
373378
dir, err := execCfg.DistSQLSrv.ExternalStorage(ctx, file.Dir)
374379
if err != nil {

Diff for: pkg/backup/compaction_processor_planning.go

+41-26
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,34 @@ import (
3232
// in the backup chain that need to be compacted. It sends updates from the
3333
// BulkProcessor to the provided progress channel. It is the caller's
3434
// responsibility to close the progress channel.
35-
func (c *compactionChain) runCompactionPlan(
35+
func runCompactionPlan(
3636
ctx context.Context,
3737
execCtx sql.JobExecContext,
3838
jobID jobspb.JobID,
39-
manifest *backuppb.BackupManifest,
4039
details jobspb.BackupDetails,
40+
compactChain compactionChain,
41+
manifest *backuppb.BackupManifest,
4142
defaultStore cloud.ExternalStorage,
4243
kmsEnv cloud.KMSEnv,
4344
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
4445
) error {
46+
defer close(progCh)
4547
log.Infof(
4648
ctx, "planning compaction of %d backups: %s",
47-
len(c.chainToCompact), util.Map(c.chainToCompact, func(m backuppb.BackupManifest) string {
49+
len(compactChain.chainToCompact),
50+
util.Map(compactChain.chainToCompact, func(m backuppb.BackupManifest) string {
4851
return m.ID.String()
4952
}),
5053
)
51-
backupLocalityMap, err := makeBackupLocalityMap(c.compactedLocalityInfo, execCtx.User())
54+
backupLocalityMap, err := makeBackupLocalityMap(
55+
compactChain.compactedLocalityInfo, execCtx.User(),
56+
)
5257
if err != nil {
5358
return err
5459
}
55-
introducedSpanFrontier, err := createIntroducedSpanFrontier(c.backupChain, manifest.EndTime)
60+
introducedSpanFrontier, err := createIntroducedSpanFrontier(
61+
compactChain.backupChain, manifest.EndTime,
62+
)
5663
if err != nil {
5764
return err
5865
}
@@ -72,18 +79,17 @@ func (c *compactionChain) runCompactionPlan(
7279
}
7380

7481
spansToCompact, err := getSpansToCompact(
75-
ctx, execCtx, manifest, c.chainToCompact, details, defaultStore, kmsEnv,
82+
ctx, execCtx, manifest, compactChain.chainToCompact, details, defaultStore, kmsEnv,
7683
)
7784
if err != nil {
7885
return err
7986
}
8087
genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error {
81-
defer close(spanCh)
8288
return errors.Wrap(generateAndSendImportSpans(
8389
ctx,
8490
spansToCompact,
85-
c.chainToCompact,
86-
c.compactedIterFactory,
91+
compactChain.chainToCompact,
92+
compactChain.compactedIterFactory,
8793
backupLocalityMap,
8894
filter,
8995
fsc,
@@ -105,9 +111,10 @@ func (c *compactionChain) runCompactionPlan(
105111
}
106112
return nil
107113
}
114+
rowResultWriter := sql.NewRowResultWriter(nil)
108115
recv := sql.MakeDistSQLReceiver(
109116
ctx,
110-
sql.NewMetadataCallbackWriter(nil, metaFn),
117+
sql.NewMetadataCallbackWriter(rowResultWriter, metaFn),
111118
tree.Rows,
112119
nil, /* rangeCache */
113120
nil, /* txn */
@@ -125,7 +132,7 @@ func (c *compactionChain) runCompactionPlan(
125132
return nil
126133
}
127134

128-
// createCompactionPlan creates an unfinalized physical plan that will
135+
// createCompactionPlan creates an un-finalized physical plan that will
129136
// distribute spans from a generator across the cluster for compaction.
130137
func createCompactionPlan(
131138
ctx context.Context,
@@ -184,6 +191,7 @@ func countRestoreSpanEntries(
184191
return nil
185192
},
186193
func(ctx context.Context) error {
194+
defer close(countSpansCh)
187195
return genSpan(ctx, countSpansCh)
188196
},
189197
}
@@ -194,7 +202,7 @@ func countRestoreSpanEntries(
194202
}
195203

196204
// createCompactionCorePlacements takes spans from a generator and evenly
197-
// distributes them across nodes in the cluster, returning the core core placements
205+
// distributes them across nodes in the cluster, returning the core placements
198206
// reflecting that distribution.
199207
func createCompactionCorePlacements(
200208
ctx context.Context,
@@ -209,17 +217,32 @@ func createCompactionCorePlacements(
209217
) ([]physicalplan.ProcessorCorePlacement, error) {
210218
numNodes := len(sqlInstanceIDs)
211219
corePlacements := make([]physicalplan.ProcessorCorePlacement, numNodes)
220+
for i := range corePlacements {
221+
corePlacements[i].SQLInstanceID = sqlInstanceIDs[i]
222+
corePlacements[i].Core.CompactBackups = &execinfrapb.CompactBackupsSpec{
223+
JobID: int64(jobID),
224+
DefaultURI: details.URI,
225+
Destination: details.Destination,
226+
Encryption: details.EncryptionOptions,
227+
StartTime: details.StartTime,
228+
EndTime: details.EndTime,
229+
ElideMode: elideMode,
230+
UserProto: user.EncodeProto(),
231+
Spans: spansToCompact,
232+
}
233+
}
212234

213235
spanEntryCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
214236
var tasks []func(ctx context.Context) error
215237
tasks = append(tasks, func(ctx context.Context) error {
238+
defer close(spanEntryCh)
216239
return genSpan(ctx, spanEntryCh)
217240
})
218241
tasks = append(tasks, func(ctx context.Context) error {
219242
numEntriesPerNode := numEntries / numNodes
220243
leftoverEntries := numEntries % numNodes
221244
getTargetNumEntries := func(nodeIdx int) int {
222-
if nodeIdx <= leftoverEntries {
245+
if nodeIdx < leftoverEntries {
223246
// This more evenly distributes the leftover entries across the nodes
224247
// after doing integer division to assign the entries to the nodes.
225248
return numEntriesPerNode + 1
@@ -231,29 +254,21 @@ func createCompactionCorePlacements(
231254
targetNumEntries := getTargetNumEntries(currNode)
232255

233256
for entry := range spanEntryCh {
257+
currEntries = append(currEntries, entry)
234258
if len(currEntries) == targetNumEntries {
235259
corePlacements[currNode].SQLInstanceID = sqlInstanceIDs[currNode]
236-
corePlacements[currNode].Core.CompactBackups = &execinfrapb.CompactBackupsSpec{
237-
JobID: int64(jobID),
238-
Destination: details.Destination,
239-
Encryption: details.EncryptionOptions,
240-
StartTime: details.StartTime,
241-
EndTime: details.EndTime,
242-
ElideMode: elideMode,
243-
UserProto: user.EncodeProto(),
244-
Spans: spansToCompact,
245-
AssignedSpans: util.Map(currEntries, func(entry execinfrapb.RestoreSpanEntry) roachpb.Span {
260+
corePlacements[currNode].Core.CompactBackups.AssignedSpans = util.Map(
261+
currEntries,
262+
func(entry execinfrapb.RestoreSpanEntry) roachpb.Span {
246263
return entry.Span
247-
}),
248-
}
264+
})
249265
currNode++
250266
targetNumEntries = getTargetNumEntries(currNode)
251267
currEntries = currEntries[:0]
252268
}
253269
if currNode == numNodes {
254270
return nil
255271
}
256-
currEntries = append(currEntries, entry)
257272
}
258273
return nil
259274
})

0 commit comments

Comments
 (0)