Skip to content

Commit 59341b5

Browse files
committed
backup: hooked up compaction processor logic to job
1 parent a62654c commit 59341b5

File tree

6 files changed

+97
-28
lines changed

6 files changed

+97
-28
lines changed

Diff for: pkg/backup/backup_job.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func backup(
246246
progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
247247
checkpointLoop := func(ctx context.Context) error {
248248
// When a processor is done exporting a span, it will send a progress update
249-
// to progCh.
249+
// to .
250250
defer close(requestFinishedCh)
251251
defer close(perNodeProgressCh)
252252
var numBackedUpFiles int64

Diff for: pkg/backup/compaction.go

+36-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3737
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
3838
"github.com/cockroachdb/cockroach/pkg/util"
39+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3940
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4041
"github.com/cockroachdb/cockroach/pkg/util/log"
4142
"github.com/cockroachdb/cockroach/pkg/util/retry"
@@ -341,7 +342,9 @@ func (b *backupResumer) ResumeCompaction(
341342
// dying), so if we receive a retryable error, re-plan and retry the backup.
342343
// TODO (kev-cao): Add progress tracking to compactions.
343344
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
344-
if err = compactChain.Compact(ctx, execCtx, updatedDetails, backupManifest, defaultStore, kmsEnv); err == nil {
345+
if err = doCompaction(
346+
ctx, execCtx, b.job.ID(), updatedDetails, compactChain, backupManifest, defaultStore, kmsEnv,
347+
); err == nil {
345348
break
346349
}
347350

@@ -728,7 +731,7 @@ func concludeBackupCompaction(
728731
func processProgress(
729732
ctx context.Context,
730733
manifest *backuppb.BackupManifest,
731-
progCh <-chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
734+
progCh <-chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
732735
) error {
733736
// When a processor is done exporting a span, it will send a progress update
734737
// to progCh.
@@ -772,6 +775,37 @@ func compactionJobDescription(details jobspb.BackupDetails) (string, error) {
772775
return fmtCtx.CloseAndGetString(), nil
773776
}
774777

778+
func doCompaction(
779+
ctx context.Context,
780+
execCtx sql.JobExecContext,
781+
jobID jobspb.JobID,
782+
details jobspb.BackupDetails,
783+
compactChain compactionChain,
784+
manifest *backuppb.BackupManifest,
785+
defaultStore cloud.ExternalStorage,
786+
kmsEnv cloud.KMSEnv,
787+
) error {
788+
progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
789+
runDistCompaction := func(ctx context.Context) error {
790+
return runCompactionPlan(
791+
ctx, execCtx, jobID, details, compactChain, manifest, defaultStore, kmsEnv, progCh,
792+
)
793+
}
794+
checkpointLoop := func(ctx context.Context) error {
795+
return processProgress(ctx, manifest, progCh)
796+
}
797+
798+
if err := ctxgroup.GoAndWait(
799+
ctx, runDistCompaction, checkpointLoop,
800+
); err != nil {
801+
return err
802+
}
803+
804+
return concludeBackupCompaction(
805+
ctx, execCtx, defaultStore, details.EncryptionOptions, kmsEnv, manifest,
806+
)
807+
}
808+
775809
func init() {
776810
builtins.StartCompactionJob = StartCompactionJob
777811
}

Diff for: pkg/backup/compaction_processor.go

+13-11
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ func runCompactBackups(
177177
return errors.Wrapf(err, "export configuration")
178178
}
179179
defaultStore, err := execCfg.DistSQLSrv.ExternalStorage(ctx, defaultConf)
180+
if err != nil {
181+
return errors.Wrapf(err, "external storage")
182+
}
180183

181184
compactChain, encryption, err := compactionChainFromSpec(ctx, execCfg, spec, user)
182185
if err != nil {
@@ -186,6 +189,7 @@ func runCompactBackups(
186189
intersectSpanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
187190
tasks := []func(context.Context) error{
188191
func(ctx context.Context) error {
192+
defer close(intersectSpanCh)
189193
return genIntersectingSpansForCompaction(ctx, execCfg, spec, compactChain, intersectSpanCh)
190194
},
191195
func(ctx context.Context) error {
@@ -284,9 +288,7 @@ func genIntersectingSpansForCompaction(
284288
compactChain compactionChain,
285289
intersectSpanCh chan execinfrapb.RestoreSpanEntry,
286290
) error {
287-
user := spec.User()
288-
289-
backupLocalityMap, err := makeBackupLocalityMap(compactChain.compactedLocalityInfo, user)
291+
backupLocalityMap, err := makeBackupLocalityMap(compactChain.compactedLocalityInfo, spec.User())
290292
if err != nil {
291293
return err
292294
}
@@ -299,7 +301,6 @@ func genIntersectingSpansForCompaction(
299301
}
300302
defer introducedSpanFrontier.Release()
301303

302-
entryCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
303304
targetSize := targetRestoreSpanSize.Get(&execCfg.Settings.SV)
304305
maxFiles := maxFileCount.Get(&execCfg.Settings.SV)
305306

@@ -311,12 +312,12 @@ func genIntersectingSpansForCompaction(
311312
targetSize,
312313
maxFiles,
313314
)
315+
if err != nil {
316+
return err
317+
}
314318

315-
genSpan := func(ctx context.Context) error {
319+
genSpan := func(ctx context.Context, entryCh chan execinfrapb.RestoreSpanEntry) error {
316320
defer close(entryCh)
317-
if err != nil {
318-
return err
319-
}
320321
return errors.Wrapf(generateAndSendImportSpans(
321322
ctx,
322323
spec.Spans,
@@ -329,7 +330,7 @@ func genIntersectingSpansForCompaction(
329330
), "generate and send import spans")
330331
}
331332

332-
filterIntersectingSpans := func(ctx context.Context) error {
333+
filterIntersectingSpans := func(ctx context.Context, entryCh chan execinfrapb.RestoreSpanEntry) error {
333334
for {
334335
select {
335336
case <-ctx.Done():
@@ -348,12 +349,13 @@ func genIntersectingSpansForCompaction(
348349
}
349350
}
350351

352+
entryCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
351353
tasks := []func(context.Context) error{
352354
func(ctx context.Context) error {
353-
return genSpan(ctx)
355+
return genSpan(ctx, entryCh)
354356
},
355357
func(ctx context.Context) error {
356-
return filterIntersectingSpans(ctx)
358+
return filterIntersectingSpans(ctx, entryCh)
357359
},
358360
}
359361
return ctxgroup.GoAndWait(ctx, tasks...)

Diff for: pkg/backup/compaction_processor_planning.go

+23-13
Original file line numberDiff line numberDiff line change
@@ -32,27 +32,34 @@ import (
3232
// in the backup chain that need to be compacted. It sends updates from the
3333
// BulkProcessor to the provided progress channel. It is the caller's
3434
// responsibility to close the progress channel.
35-
func (c *compactionChain) runCompactionPlan(
35+
func runCompactionPlan(
3636
ctx context.Context,
3737
execCtx sql.JobExecContext,
3838
jobID jobspb.JobID,
39-
manifest *backuppb.BackupManifest,
4039
details jobspb.BackupDetails,
40+
compactChain compactionChain,
41+
manifest *backuppb.BackupManifest,
4142
defaultStore cloud.ExternalStorage,
4243
kmsEnv cloud.KMSEnv,
4344
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
4445
) error {
46+
defer close(progCh)
4547
log.Infof(
4648
ctx, "planning compaction of %d backups: %s",
47-
len(c.chainToCompact), util.Map(c.chainToCompact, func(m backuppb.BackupManifest) string {
49+
len(compactChain.chainToCompact),
50+
util.Map(compactChain.chainToCompact, func(m backuppb.BackupManifest) string {
4851
return m.ID.String()
4952
}),
5053
)
51-
backupLocalityMap, err := makeBackupLocalityMap(c.compactedLocalityInfo, execCtx.User())
54+
backupLocalityMap, err := makeBackupLocalityMap(
55+
compactChain.compactedLocalityInfo, execCtx.User(),
56+
)
5257
if err != nil {
5358
return err
5459
}
55-
introducedSpanFrontier, err := createIntroducedSpanFrontier(c.backupChain, manifest.EndTime)
60+
introducedSpanFrontier, err := createIntroducedSpanFrontier(
61+
compactChain.backupChain, manifest.EndTime,
62+
)
5663
if err != nil {
5764
return err
5865
}
@@ -72,18 +79,17 @@ func (c *compactionChain) runCompactionPlan(
7279
}
7380

7481
spansToCompact, err := getSpansToCompact(
75-
ctx, execCtx, manifest, c.chainToCompact, details, defaultStore, kmsEnv,
82+
ctx, execCtx, manifest, compactChain.chainToCompact, details, defaultStore, kmsEnv,
7683
)
7784
if err != nil {
7885
return err
7986
}
8087
genSpan := func(ctx context.Context, spanCh chan execinfrapb.RestoreSpanEntry) error {
81-
defer close(spanCh)
8288
return errors.Wrap(generateAndSendImportSpans(
8389
ctx,
8490
spansToCompact,
85-
c.chainToCompact,
86-
c.compactedIterFactory,
91+
compactChain.chainToCompact,
92+
compactChain.compactedIterFactory,
8793
backupLocalityMap,
8894
filter,
8995
fsc,
@@ -105,9 +111,10 @@ func (c *compactionChain) runCompactionPlan(
105111
}
106112
return nil
107113
}
114+
rowResultWriter := sql.NewRowResultWriter(nil)
108115
recv := sql.MakeDistSQLReceiver(
109116
ctx,
110-
sql.NewMetadataCallbackWriter(nil, metaFn),
117+
sql.NewMetadataCallbackWriter(rowResultWriter, metaFn),
111118
tree.Rows,
112119
nil, /* rangeCache */
113120
nil, /* txn */
@@ -125,7 +132,7 @@ func (c *compactionChain) runCompactionPlan(
125132
return nil
126133
}
127134

128-
// createCompactionPlan creates an unfinalized physical plan that will
135+
// createCompactionPlan creates an un-finalized physical plan that will
129136
// distribute spans from a generator across the cluster for compaction.
130137
func createCompactionPlan(
131138
ctx context.Context,
@@ -184,6 +191,7 @@ func countRestoreSpanEntries(
184191
return nil
185192
},
186193
func(ctx context.Context) error {
194+
defer close(countSpansCh)
187195
return genSpan(ctx, countSpansCh)
188196
},
189197
}
@@ -213,13 +221,14 @@ func createCompactionCorePlacements(
213221
spanEntryCh := make(chan execinfrapb.RestoreSpanEntry, 1000)
214222
var tasks []func(ctx context.Context) error
215223
tasks = append(tasks, func(ctx context.Context) error {
224+
defer close(spanEntryCh)
216225
return genSpan(ctx, spanEntryCh)
217226
})
218227
tasks = append(tasks, func(ctx context.Context) error {
219228
numEntriesPerNode := numEntries / numNodes
220229
leftoverEntries := numEntries % numNodes
221230
getTargetNumEntries := func(nodeIdx int) int {
222-
if nodeIdx <= leftoverEntries {
231+
if nodeIdx < leftoverEntries {
223232
// This more evenly distributes the leftover entries across the nodes
224233
// after doing integer division to assign the entries to the nodes.
225234
return numEntriesPerNode + 1
@@ -231,10 +240,12 @@ func createCompactionCorePlacements(
231240
targetNumEntries := getTargetNumEntries(currNode)
232241

233242
for entry := range spanEntryCh {
243+
currEntries = append(currEntries, entry)
234244
if len(currEntries) == targetNumEntries {
235245
corePlacements[currNode].SQLInstanceID = sqlInstanceIDs[currNode]
236246
corePlacements[currNode].Core.CompactBackups = &execinfrapb.CompactBackupsSpec{
237247
JobID: int64(jobID),
248+
DefaultURI: details.URI,
238249
Destination: details.Destination,
239250
Encryption: details.EncryptionOptions,
240251
StartTime: details.StartTime,
@@ -253,7 +264,6 @@ func createCompactionCorePlacements(
253264
if currNode == numNodes {
254265
return nil
255266
}
256-
currEntries = append(currEntries, entry)
257267
}
258268
return nil
259269
})

Diff for: pkg/sql/execinfrapb/flow_diagram.go

+23
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,29 @@ func (i *IngestStoppedSpec) summary() (string, []string) {
771771
return "IngestStoppedSpec", []string{detail}
772772
}
773773

774+
// summary implements the diagramCellType interface.
775+
func (m *CompactBackupsSpec) summary() (string, []string) {
776+
var spanStr strings.Builder
777+
if len(m.Spans) > 0 {
778+
spanStr.WriteString(fmt.Sprintf("Spans [%d]: ", len(m.AssignedSpans)))
779+
const limit = 3
780+
for i := 0; i < len(m.AssignedSpans) && i < limit; i++ {
781+
if i > 0 {
782+
spanStr.WriteString(", ")
783+
}
784+
spanStr.WriteString(m.AssignedSpans[i].String())
785+
}
786+
if len(m.Spans) > limit {
787+
spanStr.WriteString("...")
788+
}
789+
}
790+
791+
details := []string{
792+
spanStr.String(),
793+
}
794+
return "CompactBackupsSpec", details
795+
}
796+
774797
type diagramCell struct {
775798
Title string `json:"title"`
776799
Details []string `json:"details"`

Diff for: pkg/sql/rowexec/processors.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ func NewProcessor(
412412
if NewCompactBackupsProcessor == nil {
413413
return nil, errors.New("CompactBackups processor unimplemented")
414414
}
415-
return NewCompactBackupsProcessor(ctx, flowCtx, processorID, *core.CompactBackups, post, inputs[0])
415+
return NewCompactBackupsProcessor(ctx, flowCtx, processorID, *core.CompactBackups, post)
416416
}
417417
return nil, errors.Errorf("unsupported processor core %q", core)
418418
}

0 commit comments

Comments
 (0)