Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion executor/pkg/controller/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,48 @@ import (
"context"
"time"

"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
)

// gcMetrics holds the Prometheus instruments for the garbage collector. They are
// created once (in newGCMetrics) and only updated thereafter, never re-registered.
type gcMetrics struct {
deleted prometheus.Counter
errors prometheus.Counter
sweepTime promutils.StopWatch
}

// newGCMetrics builds the garbage collector instruments under the given scope.
// Call exactly once per scope to avoid duplicate registration panics.
func newGCMetrics(scope promutils.Scope) gcMetrics {
return gcMetrics{
deleted: scope.MustNewCounter("objects_deleted", "Total TaskActions deleted by the garbage collector"),
errors: scope.MustNewCounter("deletion_errors", "Total errors encountered while deleting expired TaskActions"),
sweepTime: scope.MustNewStopWatch("sweep_duration", "Duration of a full garbage collection sweep", time.Millisecond),
}
}

// GarbageCollector periodically deletes terminal TaskActions that have exceeded their TTL.
// It implements the controller-runtime manager.Runnable interface.
type GarbageCollector struct {
client client.Client
interval time.Duration
maxTTL time.Duration
metrics gcMetrics
}

// NewGarbageCollector creates a new GarbageCollector.
func NewGarbageCollector(c client.Client, interval, maxTTL time.Duration) *GarbageCollector {
func NewGarbageCollector(c client.Client, interval, maxTTL time.Duration, scope promutils.Scope) *GarbageCollector {
return &GarbageCollector{
client: c,
interval: interval,
maxTTL: maxTTL,
metrics: newGCMetrics(scope),
}
}

Expand Down Expand Up @@ -55,6 +77,10 @@ const gcPageSize = 500
func (gc *GarbageCollector) collect(ctx context.Context) error {
logger := log.FromContext(ctx).WithName("gc")

// Time the full sweep, defer guarantees it records on every return path.
timer := gc.metrics.sweepTime.Start()
defer timer.Stop()

cutoff := time.Now().UTC().Add(-gc.maxTTL).Format(labelTimeFormat)
deleted := 0
total := 0
Expand Down Expand Up @@ -87,11 +113,13 @@ func (gc *GarbageCollector) collect(ctx context.Context) error {
// The minute-precision format is lexicographically ordered, so string comparison works.
if completedTime < cutoff {
if err := gc.client.Delete(ctx, ta); err != nil {
gc.metrics.errors.Inc()
logger.Error(err, "failed to delete expired TaskAction",
"name", ta.Name, "namespace", ta.Namespace, "completedTime", completedTime)
continue
}
deleted++
gc.metrics.deleted.Inc()
}
}

Expand Down
97 changes: 97 additions & 0 deletions executor/pkg/controller/garbage_collector_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package controller

import (
"context"
"errors"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
)

// newExpiredTaskAction builds a terminated TaskAction whose completed time is well past
// any sane maxTTL, so the GC will treat it as expired and delete it.
func newExpiredTaskAction() *flyteorgv1.TaskAction {
expired := time.Now().UTC().Add(-2 * time.Hour).Format(labelTimeFormat)
return &flyteorgv1.TaskAction{
ObjectMeta: metav1.ObjectMeta{
Name: "gc-expired",
Namespace: "default",
Labels: map[string]string{
LabelTerminationStatus: LabelValueTerminated,
LabelCompletedTime: expired,
},
},
}
}

// newGCTestClient returns an in-memory fake client seeded with one expired TaskAction.
// If deleteErr is non-nil, every Delete fails with it, so we can exercise the error path.
func newGCTestClient(t *testing.T, deleteErr error) client.Client {
scheme := runtime.NewScheme()
require.NoError(t, flyteorgv1.AddToScheme(scheme))
builder := fake.NewClientBuilder().WithScheme(scheme).WithObjects(newExpiredTaskAction())
if deleteErr != nil {
builder = builder.WithInterceptorFuncs(interceptor.Funcs{
Delete: func(context.Context, client.WithWatch, client.Object, ...client.DeleteOption) error {
return deleteErr
},
})
}
return builder.Build()
}

// sweepObservations reports how many times the sweep_duration stopwatch has recorded.
// sweep_duration is a Summary, so testutil.ToFloat64 can not read it.
// Instead, we collect the metric and read its sample count directly.
func sweepObservations(t *testing.T, gc *GarbageCollector) uint64 {
t.Helper()
ch := make(chan prometheus.Metric, 1)
gc.metrics.sweepTime.Observer.(prometheus.Collector).Collect(ch)
close(ch)
var m dto.Metric
require.NoError(t, (<-ch).Write(&m))
return m.GetSummary().GetSampleCount()
}

// TestGarbageCollectorDeletedMetric: a successful delete moves objects_deleted
// from 0 to 1, records one sweep, and leaves deletion_errors at 0.
func TestGarbageCollectorDeletedMetric(t *testing.T) {
gc := NewGarbageCollector(newGCTestClient(t, nil), time.Minute, time.Hour, promutils.NewTestScope())

require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.deleted))
require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.errors))

require.NoError(t, gc.collect(context.Background()))

require.Equal(t, 1.0, testutil.ToFloat64(gc.metrics.deleted))
require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.errors))
require.GreaterOrEqual(t, sweepObservations(t, gc), uint64(1))
}

// TestGarbageCollectorDeletionErrorMetric: when a delete fails, deletion_errors moves
// 0 to 1 and objects_deleted stays at 0.
func TestGarbageCollectorDeletionErrorMetric(t *testing.T) {
gc := NewGarbageCollector(
newGCTestClient(t, errors.New("simulated delete failure")),
time.Minute, time.Hour, promutils.NewTestScope(),
)

require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.errors))

require.NoError(t, gc.collect(context.Background()))

require.Equal(t, 1.0, testutil.ToFloat64(gc.metrics.errors))
require.Equal(t, 0.0, testutil.ToFloat64(gc.metrics.deleted))
}
9 changes: 5 additions & 4 deletions executor/pkg/controller/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
"github.com/flyteorg/flyte/v2/flytestdlib/promutils"
)

func createTaskAction(ctx context.Context, name string, labels map[string]string) *flyteorgv1.TaskAction {
Expand Down Expand Up @@ -62,7 +63,7 @@ var _ = Describe("GarbageCollector", func() {
LabelCompletedTime: expiredTime,
})

gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour)
gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope())
Expect(gc.collect(ctx)).To(Succeed())

ta := &flyteorgv1.TaskAction{}
Expand All @@ -78,7 +79,7 @@ var _ = Describe("GarbageCollector", func() {
LabelCompletedTime: recentTime,
})

gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour)
gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope())
Expect(gc.collect(ctx)).To(Succeed())

ta := &flyteorgv1.TaskAction{}
Expand All @@ -89,7 +90,7 @@ var _ = Describe("GarbageCollector", func() {
It("should retain non-terminated TaskActions", func() {
createTaskAction(ctx, "gc-active", nil)

gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour)
gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope())
Expect(gc.collect(ctx)).To(Succeed())

ta := &flyteorgv1.TaskAction{}
Expand All @@ -98,7 +99,7 @@ var _ = Describe("GarbageCollector", func() {
})

It("should handle empty list gracefully", func() {
gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour)
gc := NewGarbageCollector(k8sClient, 1*time.Minute, 1*time.Hour, promutils.NewTestScope())
Expect(gc.collect(ctx)).To(Succeed())
})
})
Expand Down
2 changes: 1 addition & 1 deletion executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
if cfg.GC.MaxTTL.Duration <= 0 {
return fmt.Errorf("executor: gc.maxTTL must be positive when gc is enabled, got %v", cfg.GC.MaxTTL.Duration)
}
gc := controller.NewGarbageCollector(mgr.GetClient(), cfg.GC.Interval.Duration, cfg.GC.MaxTTL.Duration)
gc := controller.NewGarbageCollector(mgr.GetClient(), cfg.GC.Interval.Duration, cfg.GC.MaxTTL.Duration, executorScope.NewSubScope("gc"))
if err := mgr.Add(gc); err != nil {
return fmt.Errorf("executor: failed to add garbage collector: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ require (
require (
github.com/Masterminds/semver/v3 v3.5.0
github.com/alicebob/miniredis/v2 v2.38.0
github.com/prometheus/client_model v0.6.2
)

require (
Expand Down Expand Up @@ -189,7 +190,6 @@ require (
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/cast v1.7.1 // indirect
Expand Down
Loading