From ce47c0fd3b73a9074529c4e35838022d0a1f05b7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 8 Jun 2026 15:26:21 -0700 Subject: [PATCH 1/9] feat(executor): add OTel metrics for the TaskAction controller Build on the meter provider from #7459 to instrument the TaskAction controller with two metrics the framework doesn't provide: - taskaction.active (Int64ObservableGauge): number of TaskAction CRDs by plugin phase, observed asynchronously by listing from the controller cache. - taskaction.crd.size_bytes (Int64Histogram): serialized size of a TaskAction CRD, recorded per reconcile. Both are emitted via otelutils.GetMeterProvider("executor"), so they flow through the same OTLP pipeline as the RPC metrics. Registration is non-fatal (degrades to no custom metrics) and observeCRDSize is nil-safe. Reconcile rate/latency, workqueue depth, and k8s API r/w latency are already exposed by the controller-runtime metrics server; event-proxy send latency is already captured by the otelconnect-wrapped events client. Signed-off-by: Kevin Su --- executor/pkg/controller/metrics.go | 85 +++++++++++++++++++ executor/pkg/controller/metrics_test.go | 39 +++++++++ .../pkg/controller/taskaction_controller.go | 8 ++ 3 files changed, 132 insertions(+) create mode 100644 executor/pkg/controller/metrics.go create mode 100644 executor/pkg/controller/metrics_test.go diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go new file mode 100644 index 0000000000..c034e599c4 --- /dev/null +++ b/executor/pkg/controller/metrics.go @@ -0,0 +1,85 @@ +package controller + +import ( + "context" + "encoding/json" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "sigs.k8s.io/controller-runtime/pkg/client" + + flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" + "github.com/flyteorg/flyte/v2/flytestdlib/otelutils" +) + +const taskActionMeterName = "taskaction-controller" + +// taskActionMetrics holds OTel instruments for the TaskAction controller. +// +// Reconcile latency and k8s API read/write latency already come from the +// controller-runtime metrics server (controller_runtime_reconcile_time_seconds, +// rest_client_request_duration_seconds), and event-proxy send latency comes from +// the otelconnect-wrapped events client (rpc_client_duration). This only adds what +// neither provides: TaskAction CRD count by phase and serialized CRD size. +type taskActionMetrics struct { + crdSizeBytes metric.Int64Histogram +} + +// registerTaskActionMetrics wires the TaskAction OTel meters onto the "executor" +// meter provider (registered in executor/setup.go). The active-by-phase gauge is +// observed asynchronously by listing TaskActions from the controller cache. +func registerTaskActionMetrics(c client.Client) (*taskActionMetrics, error) { + meter := otelutils.GetMeterProvider("executor").Meter(taskActionMeterName) + + crdSize, err := meter.Int64Histogram( + "taskaction.crd.size_bytes", + metric.WithDescription("Serialized (JSON) size of a TaskAction CRD"), + metric.WithUnit("By"), + ) + if err != nil { + return nil, err + } + + active, err := meter.Int64ObservableGauge( + "taskaction.active", + metric.WithDescription("Number of TaskAction CRDs, labeled by plugin phase"), + ) + if err != nil { + return nil, err + } + + _, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + var list flyteorgv1.TaskActionList + if err := c.List(ctx, &list); err != nil { + return err + } + counts := make(map[string]int64, 8) + for i := range list.Items { + phase := list.Items[i].Status.PluginPhase + if phase == "" { + phase = "Unknown" + } + counts[phase]++ + } + for phase, n := range counts { + o.ObserveInt64(active, n, metric.WithAttributes(attribute.String("phase", phase))) + } + return nil + }, active) + if err != nil { + return nil, err + } + + return &taskActionMetrics{crdSizeBytes: crdSize}, nil +} + +// observeCRDSize records the serialized size of a TaskAction CRD. No-op if metrics +// registration failed (m == nil). +func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) { + if m == nil || m.crdSizeBytes == nil { + return + } + if b, err := json.Marshal(ta); err == nil { + m.crdSizeBytes.Record(ctx, int64(len(b))) + } +} diff --git a/executor/pkg/controller/metrics_test.go b/executor/pkg/controller/metrics_test.go new file mode 100644 index 0000000000..6623386154 --- /dev/null +++ b/executor/pkg/controller/metrics_test.go @@ -0,0 +1,39 @@ +package controller + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" +) + +func TestRegisterTaskActionMetrics(t *testing.T) { + // The "executor" meter provider is not registered in unit tests, so this + // exercises the no-op meter path: registration must still succeed and return + // usable instruments. The async callback's client is only invoked on + // collection (never under the no-op provider), so a nil client is safe here. + m, err := registerTaskActionMetrics(nil) + require.NoError(t, err) + require.NotNil(t, m) +} + +func TestObserveCRDSize(t *testing.T) { + m, err := registerTaskActionMetrics(nil) + require.NoError(t, err) + + // Records without panicking for a populated CRD... + assert.NotPanics(t, func() { + m.observeCRDSize(context.Background(), &flyteorgv1.TaskAction{ + Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}, + }) + }) + + // ...and is a safe no-op when metrics registration failed (nil receiver). + var nilMetrics *taskActionMetrics + assert.NotPanics(t, func() { + nilMetrics.observeCRDSize(context.Background(), &flyteorgv1.TaskAction{}) + }) +} diff --git a/executor/pkg/controller/taskaction_controller.go b/executor/pkg/controller/taskaction_controller.go index 953de9e644..3405501075 100644 --- a/executor/pkg/controller/taskaction_controller.go +++ b/executor/pkg/controller/taskaction_controller.go @@ -96,6 +96,7 @@ type TaskActionReconciler struct { eventsClient workflowconnect.EventsProxyServiceClient cluster string MaxSystemFailures uint32 + metrics *taskActionMetrics } // isSystemRetryableFailure reports whether the plugin transition is a @@ -206,6 +207,11 @@ func NewTaskActionReconciler( eventsClient workflowconnect.EventsProxyServiceClient, cluster string, ) *TaskActionReconciler { + metrics, err := registerTaskActionMetrics(c) + if err != nil { + // Non-fatal: degrade to no custom metrics rather than failing controller setup. + log.Log.Error(err, "failed to register TaskAction OTel metrics") + } return &TaskActionReconciler{ Client: c, Scheme: scheme, @@ -213,6 +219,7 @@ func NewTaskActionReconciler( DataStore: dataStore, eventsClient: eventsClient, cluster: cluster, + metrics: metrics, } } @@ -231,6 +238,7 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err := r.Get(ctx, req.NamespacedName, taskAction); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } + r.metrics.observeCRDSize(ctx, taskAction) // Please do NOT modify `originalTaskActionInstance` in the following code. This is for checking // if the TaskAction instance changes From 9af0d9629971eed7bcdd809d3b55bfe755663298 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 8 Jun 2026 15:36:36 -0700 Subject: [PATCH 2/9] feat(executor): add TaskAction CRD r/w latency; drop server peer attrs - taskaction.k8s.duration{op,error}: per-operation latency (get / update / status_update) of TaskAction CRD calls to the Kubernetes API, recorded via thin timed wrappers around the embedded client. Fills the gap left by the controller-runtime metrics server, which does not register rest_client_request_duration_seconds in this version. - Add otelconnect.WithoutServerPeerAttributes() to every server-side interceptor (runs, actions, app, internal-app, dataproxy, cache, secret, events, executor). otelconnect otherwise tags rpc_server_* metrics with net.peer.port (the client's ephemeral source port), so each series is hit once -> rate() is always ~0 and the cumulative OTLP payload grows unbounded (exceeds the collector's default 4MiB). Signed-off-by: Kevin Su --- actions/setup.go | 1 + app/internal/setup.go | 1 + app/setup.go | 1 + cache_service/setup.go | 2 +- dataproxy/setup.go | 1 + events/setup.go | 1 + executor/pkg/controller/metrics.go | 39 +++++++++++++++---- executor/pkg/controller/metrics_test.go | 17 ++++++++ .../pkg/controller/taskaction_controller.go | 30 ++++++++++---- executor/setup.go | 1 + runs/setup.go | 1 + secret/setup.go | 1 + 12 files changed, 81 insertions(+), 15 deletions(-) diff --git a/actions/setup.go b/actions/setup.go index aa13c5f3a2..4d4e71b5ff 100644 --- a/actions/setup.go +++ b/actions/setup.go @@ -31,6 +31,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) diff --git a/app/internal/setup.go b/app/internal/setup.go index 9dcdb786e6..6947f3bfbd 100644 --- a/app/internal/setup.go +++ b/app/internal/setup.go @@ -51,6 +51,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) diff --git a/app/setup.go b/app/setup.go index 52896fda0c..ea332789f0 100644 --- a/app/setup.go +++ b/app/setup.go @@ -39,6 +39,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext) error { otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) diff --git a/cache_service/setup.go b/cache_service/setup.go index 171d752dc1..bc4583809c 100644 --- a/cache_service/setup.go +++ b/cache_service/setup.go @@ -35,7 +35,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { tracerProvider := otelutils.GetTracerProvider(otelServiceName) meterProvider := otelutils.GetMeterProvider(otelServiceName) - otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider)) + otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider), otelconnect.WithoutServerPeerAttributes()) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) } diff --git a/dataproxy/setup.go b/dataproxy/setup.go index 7a1f1c8c5d..84bd65e8de 100644 --- a/dataproxy/setup.go +++ b/dataproxy/setup.go @@ -36,6 +36,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) diff --git a/events/setup.go b/events/setup.go index 6a5e952b95..e51663e0e0 100644 --- a/events/setup.go +++ b/events/setup.go @@ -28,6 +28,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index c034e599c4..78060ba45d 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -3,6 +3,7 @@ package controller import ( "context" "encoding/json" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -16,13 +17,14 @@ const taskActionMeterName = "taskaction-controller" // taskActionMetrics holds OTel instruments for the TaskAction controller. // -// Reconcile latency and k8s API read/write latency already come from the -// controller-runtime metrics server (controller_runtime_reconcile_time_seconds, -// rest_client_request_duration_seconds), and event-proxy send latency comes from -// the otelconnect-wrapped events client (rpc_client_duration). This only adds what -// neither provides: TaskAction CRD count by phase and serialized CRD size. +// Reconcile throughput, active workers, and workqueue latency already come from +// the controller-runtime metrics server, and event-proxy send latency comes from +// the otelconnect-wrapped events client (rpc_client_duration). This adds what +// none of those provide: TaskAction CRD count by phase, serialized CRD size, and +// per-operation Kubernetes API read/write latency for the CRD. type taskActionMetrics struct { - crdSizeBytes metric.Int64Histogram + crdSizeBytes metric.Int64Histogram + crdOpDuration metric.Float64Histogram } // registerTaskActionMetrics wires the TaskAction OTel meters onto the "executor" @@ -40,6 +42,15 @@ func registerTaskActionMetrics(c client.Client) (*taskActionMetrics, error) { return nil, err } + crdOp, err := meter.Float64Histogram( + "taskaction.k8s.duration", + metric.WithDescription("Latency of TaskAction CRD operations against the Kubernetes API, labeled by op (get/update/status_update)"), + metric.WithUnit("ms"), + ) + if err != nil { + return nil, err + } + active, err := meter.Int64ObservableGauge( "taskaction.active", metric.WithDescription("Number of TaskAction CRDs, labeled by plugin phase"), @@ -70,7 +81,7 @@ func registerTaskActionMetrics(c client.Client) (*taskActionMetrics, error) { return nil, err } - return &taskActionMetrics{crdSizeBytes: crdSize}, nil + return &taskActionMetrics{crdSizeBytes: crdSize, crdOpDuration: crdOp}, nil } // observeCRDSize records the serialized size of a TaskAction CRD. No-op if metrics @@ -83,3 +94,17 @@ func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.T m.crdSizeBytes.Record(ctx, int64(len(b))) } } + +// timeK8sOp times a Kubernetes API operation against the TaskAction CRD and records +// its latency under taskaction.k8s.duration{op,error}. It is a transparent pass-through +// when metrics registration failed (m == nil), so callers can wrap unconditionally. +func (m *taskActionMetrics) timeK8sOp(ctx context.Context, op string, fn func() error) error { + if m == nil || m.crdOpDuration == nil { + return fn() + } + start := time.Now() + err := fn() + m.crdOpDuration.Record(ctx, float64(time.Since(start).Microseconds())/1000.0, + metric.WithAttributes(attribute.String("op", op), attribute.Bool("error", err != nil))) + return err +} diff --git a/executor/pkg/controller/metrics_test.go b/executor/pkg/controller/metrics_test.go index 6623386154..1bce1ed3be 100644 --- a/executor/pkg/controller/metrics_test.go +++ b/executor/pkg/controller/metrics_test.go @@ -2,6 +2,7 @@ package controller import ( "context" + "errors" "testing" "github.com/stretchr/testify/assert" @@ -37,3 +38,19 @@ func TestObserveCRDSize(t *testing.T) { nilMetrics.observeCRDSize(context.Background(), &flyteorgv1.TaskAction{}) }) } + +func TestTimeK8sOp(t *testing.T) { + m, err := registerTaskActionMetrics(nil) + require.NoError(t, err) + + // Propagates the wrapped op's result. + sentinel := errors.New("boom") + assert.ErrorIs(t, m.timeK8sOp(context.Background(), "get", func() error { return sentinel }), sentinel) + assert.NoError(t, m.timeK8sOp(context.Background(), "update", func() error { return nil })) + + // Nil receiver is a transparent pass-through that still runs the op. + var nilMetrics *taskActionMetrics + ran := false + assert.NoError(t, nilMetrics.timeK8sOp(context.Background(), "get", func() error { ran = true; return nil })) + assert.True(t, ran) +} diff --git a/executor/pkg/controller/taskaction_controller.go b/executor/pkg/controller/taskaction_controller.go index 3405501075..316e84648c 100644 --- a/executor/pkg/controller/taskaction_controller.go +++ b/executor/pkg/controller/taskaction_controller.go @@ -168,7 +168,7 @@ func (r *TaskActionReconciler) recordSystemError( } if taskActionStatusChanged(original.Status, taskAction.Status) { - if updErr := r.Status().Update(ctx, taskAction); updErr != nil { + if updErr := r.crdStatusUpdate(ctx, taskAction); updErr != nil { logger.Error(updErr, "failed to persist SystemFailures counter") } } @@ -223,6 +223,22 @@ func NewTaskActionReconciler( } } +// crdGet, crdUpdate, and crdStatusUpdate wrap the embedded client's TaskAction CRD +// operations so their Kubernetes API latency is recorded under taskaction.k8s.duration. +// They delegate through r.Client (not the promoted r.Get/r.Update/r.Status) so the +// timing is applied exactly once. +func (r *TaskActionReconciler) crdGet(ctx context.Context, key client.ObjectKey, obj client.Object) error { + return r.metrics.timeK8sOp(ctx, "get", func() error { return r.Client.Get(ctx, key, obj) }) +} + +func (r *TaskActionReconciler) crdUpdate(ctx context.Context, obj client.Object) error { + return r.metrics.timeK8sOp(ctx, "update", func() error { return r.Client.Update(ctx, obj) }) +} + +func (r *TaskActionReconciler) crdStatusUpdate(ctx context.Context, obj client.Object) error { + return r.metrics.timeK8sOp(ctx, "status_update", func() error { return r.Client.Status().Update(ctx, obj) }) +} + // +kubebuilder:rbac:groups=flyte.org,resources=taskactions,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=flyte.org,resources=taskactions/status,verbs=get;update;patch // +kubebuilder:rbac:groups=flyte.org,resources=taskactions/finalizers,verbs=update @@ -235,7 +251,7 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Fetch the TaskAction instance taskAction := &flyteorgv1.TaskAction{} - if err := r.Get(ctx, req.NamespacedName, taskAction); err != nil { + if err := r.crdGet(ctx, req.NamespacedName, taskAction); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } r.metrics.observeCRDSize(ctx, taskAction) @@ -269,14 +285,14 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.Recorder.Eventf(taskAction, corev1.EventTypeWarning, string(eventType), "%v", err) setCondition(taskAction, flyteorgv1.ConditionTypeFailed, metav1.ConditionTrue, reason, err.Error()) setCondition(taskAction, flyteorgv1.ConditionTypeProgressing, metav1.ConditionFalse, reason, err.Error()) - _ = r.Status().Update(ctx, taskAction) + _ = r.crdStatusUpdate(ctx, taskAction) return ctrl.Result{}, nil // terminal — do not requeue } // Ensure finalizer is present (once validation passes) if !controllerutil.ContainsFinalizer(taskAction, taskActionFinalizer) { controllerutil.AddFinalizer(taskAction, taskActionFinalizer) - if err := r.Update(ctx, taskAction); err != nil { + if err := r.crdUpdate(ctx, taskAction); err != nil { logger.Error(err, "Failed to update TaskAction with finalizer") return ctrl.Result{}, err } @@ -505,7 +521,7 @@ func (r *TaskActionReconciler) handleAbortAndFinalize(ctx context.Context, taskA func (r *TaskActionReconciler) removeFinalizer(ctx context.Context, taskAction *flyteorgv1.TaskAction) (ctrl.Result, error) { controllerutil.RemoveFinalizer(taskAction, taskActionFinalizer) - if err := r.Update(ctx, taskAction); err != nil { + if err := r.crdUpdate(ctx, taskAction); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil @@ -544,11 +560,11 @@ func (r *TaskActionReconciler) updateTaskActionStatus( // This will resolve the conflict error caused by k8s optimistic lock when 2 reconcile loops updating the same CRD if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { latest := &flyteorgv1.TaskAction{} - if getErr := r.Get(ctx, client.ObjectKeyFromObject(newTaskAction), latest); getErr != nil { + if getErr := r.crdGet(ctx, client.ObjectKeyFromObject(newTaskAction), latest); getErr != nil { return getErr } latest.Status = newTaskAction.Status - return r.Status().Update(ctx, latest) + return r.crdStatusUpdate(ctx, latest) }); err != nil { logger.Error(err, "Error updating status", "name", oldTaskAction.Name, "error", err, "TaskAction", newTaskAction) return err diff --git a/executor/setup.go b/executor/setup.go index 6adcfb6ac7..db7438a1fd 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -140,6 +140,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) diff --git a/runs/setup.go b/runs/setup.go index a2f53c25b3..665e8b036c 100644 --- a/runs/setup.go +++ b/runs/setup.go @@ -49,6 +49,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) diff --git a/secret/setup.go b/secret/setup.go index 77a87d5b42..7b33d8bd6c 100644 --- a/secret/setup.go +++ b/secret/setup.go @@ -27,6 +27,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { otelInterceptor, err := otelconnect.NewInterceptor( otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), + otelconnect.WithoutServerPeerAttributes(), ) if err != nil { return fmt.Errorf("creating otel interceptor: %w", err) From 8a14894da8c4b3cde7c5997aaad1b41d508f7b1c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 10 Jun 2026 12:48:15 -0700 Subject: [PATCH 3/9] refactor(executor): inject meter provider; instrument client instead of wrappers Apply review feedback on the TaskAction OTel metrics: - Inject the meter provider into registerTaskActionMetrics / NewTaskActionReconciler instead of hardcoding "executor" in metrics.go. The name lives next to otelutils.RegisterProvidersWithContext in executor/setup.go, so a rename can no longer silently route the metrics to the noop provider. - Replace the crdGet/crdUpdate/crdStatusUpdate wrappers with a client.Client decorator (newInstrumentedClient) embedded in the reconciler. Call sites use the idiomatic r.Get/r.Update/r.Status().Update again and cannot accidentally bypass the timing; non-TaskAction objects pass through untimed. Op labels become typed constants. - With a real (injectable) provider, tests now assert recorded data via a manual reader: CRD size sum, op/error labels on taskaction.k8s.duration, gauge phase counts through the async callback, and pass-through behaviour of the instrumented client. Signed-off-by: Kevin Su --- .../pkg/controller/instrumented_client.go | 59 +++++++ executor/pkg/controller/metrics.go | 18 +- executor/pkg/controller/metrics_test.go | 160 ++++++++++++++++-- .../pkg/controller/taskaction_controller.go | 41 ++--- executor/setup.go | 1 + 5 files changed, 229 insertions(+), 50 deletions(-) create mode 100644 executor/pkg/controller/instrumented_client.go diff --git a/executor/pkg/controller/instrumented_client.go b/executor/pkg/controller/instrumented_client.go new file mode 100644 index 0000000000..737443312a --- /dev/null +++ b/executor/pkg/controller/instrumented_client.go @@ -0,0 +1,59 @@ +package controller + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + + flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" +) + +// newInstrumentedClient wraps c so TaskAction CRD operations (Get, Update, and +// Status().Update) are timed under taskaction.k8s.duration. Operations on other +// object types pass through untimed. When metrics registration failed (m == nil) +// it returns c unchanged, so callers can wrap unconditionally. +// +// The reconciler embeds the wrapped client, which makes the instrumentation +// structural: call sites use the idiomatic r.Get/r.Update/r.Status().Update and +// cannot accidentally bypass the timing. +func newInstrumentedClient(c client.Client, m *taskActionMetrics) client.Client { + if m == nil { + return c + } + return &instrumentedClient{Client: c, metrics: m} +} + +type instrumentedClient struct { + client.Client + metrics *taskActionMetrics +} + +func (c *instrumentedClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if _, ok := obj.(*flyteorgv1.TaskAction); !ok { + return c.Client.Get(ctx, key, obj, opts...) + } + return c.metrics.timeK8sOp(ctx, opGet, func() error { return c.Client.Get(ctx, key, obj, opts...) }) +} + +func (c *instrumentedClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + if _, ok := obj.(*flyteorgv1.TaskAction); !ok { + return c.Client.Update(ctx, obj, opts...) + } + return c.metrics.timeK8sOp(ctx, opUpdate, func() error { return c.Client.Update(ctx, obj, opts...) }) +} + +func (c *instrumentedClient) Status() client.SubResourceWriter { + return &instrumentedStatusWriter{SubResourceWriter: c.Client.Status(), metrics: c.metrics} +} + +type instrumentedStatusWriter struct { + client.SubResourceWriter + metrics *taskActionMetrics +} + +func (w *instrumentedStatusWriter) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { + if _, ok := obj.(*flyteorgv1.TaskAction); !ok { + return w.SubResourceWriter.Update(ctx, obj, opts...) + } + return w.metrics.timeK8sOp(ctx, opStatusUpdate, func() error { return w.SubResourceWriter.Update(ctx, obj, opts...) }) +} diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index 78060ba45d..d1b1d7f4cd 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -10,11 +10,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" - "github.com/flyteorg/flyte/v2/flytestdlib/otelutils" ) const taskActionMeterName = "taskaction-controller" +// Values for the "op" attribute on taskaction.k8s.duration. +const ( + opGet = "get" + opUpdate = "update" + opStatusUpdate = "status_update" +) + // taskActionMetrics holds OTel instruments for the TaskAction controller. // // Reconcile throughput, active workers, and workqueue latency already come from @@ -27,11 +33,11 @@ type taskActionMetrics struct { crdOpDuration metric.Float64Histogram } -// registerTaskActionMetrics wires the TaskAction OTel meters onto the "executor" -// meter provider (registered in executor/setup.go). The active-by-phase gauge is -// observed asynchronously by listing TaskActions from the controller cache. -func registerTaskActionMetrics(c client.Client) (*taskActionMetrics, error) { - meter := otelutils.GetMeterProvider("executor").Meter(taskActionMeterName) +// registerTaskActionMetrics wires the TaskAction OTel meters onto the given meter +// provider (the executor's, registered in executor/setup.go). The active-by-phase +// gauge is observed asynchronously by listing TaskActions from the controller cache. +func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) (*taskActionMetrics, error) { + meter := provider.Meter(taskActionMeterName) crdSize, err := meter.Int64Histogram( "taskaction.crd.size_bytes", diff --git a/executor/pkg/controller/metrics_test.go b/executor/pkg/controller/metrics_test.go index 1bce1ed3be..1239b42555 100644 --- a/executor/pkg/controller/metrics_test.go +++ b/executor/pkg/controller/metrics_test.go @@ -2,37 +2,112 @@ package controller import ( "context" + "encoding/json" "errors" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" ) +// newTestMetricsScheme builds a scheme with the TaskAction CRD and core types +// (the latter for asserting non-TaskAction operations pass through untimed). +func newTestMetricsScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + require.NoError(t, flyteorgv1.AddToScheme(s)) + require.NoError(t, corev1.AddToScheme(s)) + return s +} + +// collectMetric drains the manual reader and returns the named metric. +func collectMetric(t *testing.T, reader *sdkmetric.ManualReader, name string) metricdata.Metrics { + t.Helper() + var rm metricdata.ResourceMetrics + require.NoError(t, reader.Collect(context.Background(), &rm)) + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + t.Fatalf("metric %q not found", name) + return metricdata.Metrics{} +} + +func findOpDataPoint(t *testing.T, h metricdata.Histogram[float64], op string) metricdata.HistogramDataPoint[float64] { + t.Helper() + for _, dp := range h.DataPoints { + if v, ok := dp.Attributes.Value(attribute.Key("op")); ok && v.AsString() == op { + return dp + } + } + t.Fatalf("no taskaction.k8s.duration datapoint with op=%q", op) + return metricdata.HistogramDataPoint[float64]{} +} + func TestRegisterTaskActionMetrics(t *testing.T) { - // The "executor" meter provider is not registered in unit tests, so this - // exercises the no-op meter path: registration must still succeed and return - // usable instruments. The async callback's client is only invoked on - // collection (never under the no-op provider), so a nil client is safe here. - m, err := registerTaskActionMetrics(nil) + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).WithObjects( + &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"}, + Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}}, + &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-2", Namespace: "default"}, + Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}}, + &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-3", Namespace: "default"}}, + ).Build() + + m, err := registerTaskActionMetrics(provider, c) require.NoError(t, err) require.NotNil(t, m) + + // Collection triggers the async gauge callback, which lists from the client + // and reports active CRD counts by phase ("" maps to Unknown). + gauge, ok := collectMetric(t, reader, "taskaction.active").Data.(metricdata.Gauge[int64]) + require.True(t, ok) + counts := map[string]int64{} + for _, dp := range gauge.DataPoints { + v, ok := dp.Attributes.Value(attribute.Key("phase")) + require.True(t, ok) + counts[v.AsString()] = dp.Value + } + assert.Equal(t, map[string]int64{"Executing": 2, "Unknown": 1}, counts) } func TestObserveCRDSize(t *testing.T) { - m, err := registerTaskActionMetrics(nil) + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).Build() + + m, err := registerTaskActionMetrics(provider, c) require.NoError(t, err) - // Records without panicking for a populated CRD... - assert.NotPanics(t, func() { - m.observeCRDSize(context.Background(), &flyteorgv1.TaskAction{ - Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}, - }) - }) + ta := &flyteorgv1.TaskAction{ + ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"}, + Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}, + } + m.observeCRDSize(context.Background(), ta) - // ...and is a safe no-op when metrics registration failed (nil receiver). + b, err := json.Marshal(ta) + require.NoError(t, err) + hist, ok := collectMetric(t, reader, "taskaction.crd.size_bytes").Data.(metricdata.Histogram[int64]) + require.True(t, ok) + require.Len(t, hist.DataPoints, 1) + assert.Equal(t, uint64(1), hist.DataPoints[0].Count) + assert.Equal(t, int64(len(b)), hist.DataPoints[0].Sum) + + // Safe no-op when metrics registration failed (nil receiver). var nilMetrics *taskActionMetrics assert.NotPanics(t, func() { nilMetrics.observeCRDSize(context.Background(), &flyteorgv1.TaskAction{}) @@ -40,17 +115,66 @@ func TestObserveCRDSize(t *testing.T) { } func TestTimeK8sOp(t *testing.T) { - m, err := registerTaskActionMetrics(nil) + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).Build() + + m, err := registerTaskActionMetrics(provider, c) require.NoError(t, err) - // Propagates the wrapped op's result. + // Propagates the wrapped op's result and records latency labeled by op/error. sentinel := errors.New("boom") - assert.ErrorIs(t, m.timeK8sOp(context.Background(), "get", func() error { return sentinel }), sentinel) - assert.NoError(t, m.timeK8sOp(context.Background(), "update", func() error { return nil })) + assert.ErrorIs(t, m.timeK8sOp(context.Background(), opGet, func() error { return sentinel }), sentinel) + assert.NoError(t, m.timeK8sOp(context.Background(), opUpdate, func() error { return nil })) + + hist, ok := collectMetric(t, reader, "taskaction.k8s.duration").Data.(metricdata.Histogram[float64]) + require.True(t, ok) + get := findOpDataPoint(t, hist, opGet) + errAttr, ok := get.Attributes.Value(attribute.Key("error")) + require.True(t, ok) + assert.True(t, errAttr.AsBool()) + update := findOpDataPoint(t, hist, opUpdate) + errAttr, ok = update.Attributes.Value(attribute.Key("error")) + require.True(t, ok) + assert.False(t, errAttr.AsBool()) // Nil receiver is a transparent pass-through that still runs the op. var nilMetrics *taskActionMetrics ran := false - assert.NoError(t, nilMetrics.timeK8sOp(context.Background(), "get", func() error { ran = true; return nil })) + assert.NoError(t, nilMetrics.timeK8sOp(context.Background(), opGet, func() error { ran = true; return nil })) assert.True(t, ran) } + +func TestInstrumentedClient(t *testing.T) { + reader := sdkmetric.NewManualReader() + provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + ta := &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"}} + cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "cm-1", Namespace: "default"}} + base := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).WithObjects(ta, cm). + WithStatusSubresource(&flyteorgv1.TaskAction{}).Build() + + m, err := registerTaskActionMetrics(provider, base) + require.NoError(t, err) + c := newInstrumentedClient(base, m) + + ctx := context.Background() + fetched := &flyteorgv1.TaskAction{} + require.NoError(t, c.Get(ctx, client.ObjectKeyFromObject(ta), fetched)) + require.NoError(t, c.Update(ctx, fetched)) + fetched.Status.PluginPhase = "Executing" + require.NoError(t, c.Status().Update(ctx, fetched)) + // Non-TaskAction operations pass through untimed. + require.NoError(t, c.Get(ctx, client.ObjectKeyFromObject(cm), &corev1.ConfigMap{})) + + hist, ok := collectMetric(t, reader, "taskaction.k8s.duration").Data.(metricdata.Histogram[float64]) + require.True(t, ok) + for _, op := range []string{opGet, opUpdate, opStatusUpdate} { + assert.Equal(t, uint64(1), findOpDataPoint(t, hist, op).Count, "op=%s", op) + } + // Exactly the three TaskAction ops — the ConfigMap Get recorded nothing. + assert.Len(t, hist.DataPoints, 3) + + // When metrics registration failed, the client is returned unwrapped. + _, wrapped := newInstrumentedClient(base, nil).(*instrumentedClient) + assert.False(t, wrapped) +} diff --git a/executor/pkg/controller/taskaction_controller.go b/executor/pkg/controller/taskaction_controller.go index 316e84648c..4ce3940542 100644 --- a/executor/pkg/controller/taskaction_controller.go +++ b/executor/pkg/controller/taskaction_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/util/retry" "connectrpc.com/connect" + "go.opentelemetry.io/otel/metric" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -168,7 +169,7 @@ func (r *TaskActionReconciler) recordSystemError( } if taskActionStatusChanged(original.Status, taskAction.Status) { - if updErr := r.crdStatusUpdate(ctx, taskAction); updErr != nil { + if updErr := r.Status().Update(ctx, taskAction); updErr != nil { logger.Error(updErr, "failed to persist SystemFailures counter") } } @@ -198,7 +199,10 @@ func (r *TaskActionReconciler) finalizePermanentFailure( return ctrl.Result{}, nil } -// NewTaskActionReconciler creates a new TaskActionReconciler +// NewTaskActionReconciler creates a new TaskActionReconciler. meterProvider is the +// executor's OTel meter provider (otelutils.GetMeterProvider(otelServiceName) in +// executor/setup.go); the embedded client is wrapped so TaskAction CRD operations +// are timed under taskaction.k8s.duration (see newInstrumentedClient). func NewTaskActionReconciler( c client.Client, scheme *runtime.Scheme, @@ -206,14 +210,15 @@ func NewTaskActionReconciler( dataStore *storage.DataStore, eventsClient workflowconnect.EventsProxyServiceClient, cluster string, + meterProvider metric.MeterProvider, ) *TaskActionReconciler { - metrics, err := registerTaskActionMetrics(c) + metrics, err := registerTaskActionMetrics(meterProvider, c) if err != nil { // Non-fatal: degrade to no custom metrics rather than failing controller setup. log.Log.Error(err, "failed to register TaskAction OTel metrics") } return &TaskActionReconciler{ - Client: c, + Client: newInstrumentedClient(c, metrics), Scheme: scheme, PluginRegistry: registry, DataStore: dataStore, @@ -223,22 +228,6 @@ func NewTaskActionReconciler( } } -// crdGet, crdUpdate, and crdStatusUpdate wrap the embedded client's TaskAction CRD -// operations so their Kubernetes API latency is recorded under taskaction.k8s.duration. -// They delegate through r.Client (not the promoted r.Get/r.Update/r.Status) so the -// timing is applied exactly once. -func (r *TaskActionReconciler) crdGet(ctx context.Context, key client.ObjectKey, obj client.Object) error { - return r.metrics.timeK8sOp(ctx, "get", func() error { return r.Client.Get(ctx, key, obj) }) -} - -func (r *TaskActionReconciler) crdUpdate(ctx context.Context, obj client.Object) error { - return r.metrics.timeK8sOp(ctx, "update", func() error { return r.Client.Update(ctx, obj) }) -} - -func (r *TaskActionReconciler) crdStatusUpdate(ctx context.Context, obj client.Object) error { - return r.metrics.timeK8sOp(ctx, "status_update", func() error { return r.Client.Status().Update(ctx, obj) }) -} - // +kubebuilder:rbac:groups=flyte.org,resources=taskactions,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=flyte.org,resources=taskactions/status,verbs=get;update;patch // +kubebuilder:rbac:groups=flyte.org,resources=taskactions/finalizers,verbs=update @@ -251,7 +240,7 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Fetch the TaskAction instance taskAction := &flyteorgv1.TaskAction{} - if err := r.crdGet(ctx, req.NamespacedName, taskAction); err != nil { + if err := r.Get(ctx, req.NamespacedName, taskAction); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } r.metrics.observeCRDSize(ctx, taskAction) @@ -285,14 +274,14 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.Recorder.Eventf(taskAction, corev1.EventTypeWarning, string(eventType), "%v", err) setCondition(taskAction, flyteorgv1.ConditionTypeFailed, metav1.ConditionTrue, reason, err.Error()) setCondition(taskAction, flyteorgv1.ConditionTypeProgressing, metav1.ConditionFalse, reason, err.Error()) - _ = r.crdStatusUpdate(ctx, taskAction) + _ = r.Status().Update(ctx, taskAction) return ctrl.Result{}, nil // terminal — do not requeue } // Ensure finalizer is present (once validation passes) if !controllerutil.ContainsFinalizer(taskAction, taskActionFinalizer) { controllerutil.AddFinalizer(taskAction, taskActionFinalizer) - if err := r.crdUpdate(ctx, taskAction); err != nil { + if err := r.Update(ctx, taskAction); err != nil { logger.Error(err, "Failed to update TaskAction with finalizer") return ctrl.Result{}, err } @@ -521,7 +510,7 @@ func (r *TaskActionReconciler) handleAbortAndFinalize(ctx context.Context, taskA func (r *TaskActionReconciler) removeFinalizer(ctx context.Context, taskAction *flyteorgv1.TaskAction) (ctrl.Result, error) { controllerutil.RemoveFinalizer(taskAction, taskActionFinalizer) - if err := r.crdUpdate(ctx, taskAction); err != nil { + if err := r.Update(ctx, taskAction); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil @@ -560,11 +549,11 @@ func (r *TaskActionReconciler) updateTaskActionStatus( // This will resolve the conflict error caused by k8s optimistic lock when 2 reconcile loops updating the same CRD if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { latest := &flyteorgv1.TaskAction{} - if getErr := r.crdGet(ctx, client.ObjectKeyFromObject(newTaskAction), latest); getErr != nil { + if getErr := r.Get(ctx, client.ObjectKeyFromObject(newTaskAction), latest); getErr != nil { return getErr } latest.Status = newTaskAction.Status - return r.crdStatusUpdate(ctx, latest) + return r.Status().Update(ctx, latest) }); err != nil { logger.Error(err, "Error updating status", "name", oldTaskAction.Name, "error", err, "TaskAction", newTaskAction) return err diff --git a/executor/setup.go b/executor/setup.go index db7438a1fd..23634b7aeb 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -167,6 +167,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { reconciler := controller.NewTaskActionReconciler( mgr.GetClient(), mgr.GetScheme(), registry, dataStore, eventsClient, cfg.Cluster, + otelutils.GetMeterProvider(otelServiceName), ) reconciler.CatalogClient = asyncCatalogClient reconciler.Catalog = cacheClient From 80be43defa143a245043a7b50cbd2352cbf81278 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 10 Jun 2026 13:55:23 -0700 Subject: [PATCH 4/9] refactor(executor): time CRD ops inline instead of via client decorator Drop the instrumentedClient wrapper in favour of explicit inline timing at each call site: start := time.Now() before the operation, then metrics.recordK8sOp(ctx, op, start, err) right after. The timing is now visible where it happens, with no hidden interception behind the embedded client. recordK8sOp stays nil-receiver-safe so struct-literal constructed reconcilers (tests) and failed registration degrade to no-ops. Signed-off-by: Kevin Su --- .../pkg/controller/instrumented_client.go | 59 ------------------- executor/pkg/controller/metrics.go | 15 +++-- executor/pkg/controller/metrics_test.go | 53 +++-------------- .../pkg/controller/taskaction_controller.go | 44 ++++++++++---- 4 files changed, 48 insertions(+), 123 deletions(-) delete mode 100644 executor/pkg/controller/instrumented_client.go diff --git a/executor/pkg/controller/instrumented_client.go b/executor/pkg/controller/instrumented_client.go deleted file mode 100644 index 737443312a..0000000000 --- a/executor/pkg/controller/instrumented_client.go +++ /dev/null @@ -1,59 +0,0 @@ -package controller - -import ( - "context" - - "sigs.k8s.io/controller-runtime/pkg/client" - - flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" -) - -// newInstrumentedClient wraps c so TaskAction CRD operations (Get, Update, and -// Status().Update) are timed under taskaction.k8s.duration. Operations on other -// object types pass through untimed. When metrics registration failed (m == nil) -// it returns c unchanged, so callers can wrap unconditionally. -// -// The reconciler embeds the wrapped client, which makes the instrumentation -// structural: call sites use the idiomatic r.Get/r.Update/r.Status().Update and -// cannot accidentally bypass the timing. -func newInstrumentedClient(c client.Client, m *taskActionMetrics) client.Client { - if m == nil { - return c - } - return &instrumentedClient{Client: c, metrics: m} -} - -type instrumentedClient struct { - client.Client - metrics *taskActionMetrics -} - -func (c *instrumentedClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { - if _, ok := obj.(*flyteorgv1.TaskAction); !ok { - return c.Client.Get(ctx, key, obj, opts...) - } - return c.metrics.timeK8sOp(ctx, opGet, func() error { return c.Client.Get(ctx, key, obj, opts...) }) -} - -func (c *instrumentedClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { - if _, ok := obj.(*flyteorgv1.TaskAction); !ok { - return c.Client.Update(ctx, obj, opts...) - } - return c.metrics.timeK8sOp(ctx, opUpdate, func() error { return c.Client.Update(ctx, obj, opts...) }) -} - -func (c *instrumentedClient) Status() client.SubResourceWriter { - return &instrumentedStatusWriter{SubResourceWriter: c.Client.Status(), metrics: c.metrics} -} - -type instrumentedStatusWriter struct { - client.SubResourceWriter - metrics *taskActionMetrics -} - -func (w *instrumentedStatusWriter) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { - if _, ok := obj.(*flyteorgv1.TaskAction); !ok { - return w.SubResourceWriter.Update(ctx, obj, opts...) - } - return w.metrics.timeK8sOp(ctx, opStatusUpdate, func() error { return w.SubResourceWriter.Update(ctx, obj, opts...) }) -} diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index d1b1d7f4cd..8115c575f0 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -101,16 +101,15 @@ func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.T } } -// timeK8sOp times a Kubernetes API operation against the TaskAction CRD and records -// its latency under taskaction.k8s.duration{op,error}. It is a transparent pass-through -// when metrics registration failed (m == nil), so callers can wrap unconditionally. -func (m *taskActionMetrics) timeK8sOp(ctx context.Context, op string, fn func() error) error { +// recordK8sOp records the latency of a Kubernetes API operation against the +// TaskAction CRD under taskaction.k8s.duration{op,error}. Call sites time the +// operation inline (start := time.Now() before the call) and record right after. +// No-op when metrics registration failed (m == nil), so callers can record +// unconditionally. +func (m *taskActionMetrics) recordK8sOp(ctx context.Context, op string, start time.Time, err error) { if m == nil || m.crdOpDuration == nil { - return fn() + return } - start := time.Now() - err := fn() m.crdOpDuration.Record(ctx, float64(time.Since(start).Microseconds())/1000.0, metric.WithAttributes(attribute.String("op", op), attribute.Bool("error", err != nil))) - return err } diff --git a/executor/pkg/controller/metrics_test.go b/executor/pkg/controller/metrics_test.go index 1239b42555..f384843b88 100644 --- a/executor/pkg/controller/metrics_test.go +++ b/executor/pkg/controller/metrics_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -14,7 +15,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" @@ -114,7 +114,7 @@ func TestObserveCRDSize(t *testing.T) { }) } -func TestTimeK8sOp(t *testing.T) { +func TestRecordK8sOp(t *testing.T) { reader := sdkmetric.NewManualReader() provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).Build() @@ -122,10 +122,9 @@ func TestTimeK8sOp(t *testing.T) { m, err := registerTaskActionMetrics(provider, c) require.NoError(t, err) - // Propagates the wrapped op's result and records latency labeled by op/error. - sentinel := errors.New("boom") - assert.ErrorIs(t, m.timeK8sOp(context.Background(), opGet, func() error { return sentinel }), sentinel) - assert.NoError(t, m.timeK8sOp(context.Background(), opUpdate, func() error { return nil })) + // Records latency labeled by op/error. + m.recordK8sOp(context.Background(), opGet, time.Now(), errors.New("boom")) + m.recordK8sOp(context.Background(), opUpdate, time.Now(), nil) hist, ok := collectMetric(t, reader, "taskaction.k8s.duration").Data.(metricdata.Histogram[float64]) require.True(t, ok) @@ -138,43 +137,9 @@ func TestTimeK8sOp(t *testing.T) { require.True(t, ok) assert.False(t, errAttr.AsBool()) - // Nil receiver is a transparent pass-through that still runs the op. + // Safe no-op when metrics registration failed (nil receiver). var nilMetrics *taskActionMetrics - ran := false - assert.NoError(t, nilMetrics.timeK8sOp(context.Background(), opGet, func() error { ran = true; return nil })) - assert.True(t, ran) -} - -func TestInstrumentedClient(t *testing.T) { - reader := sdkmetric.NewManualReader() - provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) - ta := &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"}} - cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "cm-1", Namespace: "default"}} - base := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).WithObjects(ta, cm). - WithStatusSubresource(&flyteorgv1.TaskAction{}).Build() - - m, err := registerTaskActionMetrics(provider, base) - require.NoError(t, err) - c := newInstrumentedClient(base, m) - - ctx := context.Background() - fetched := &flyteorgv1.TaskAction{} - require.NoError(t, c.Get(ctx, client.ObjectKeyFromObject(ta), fetched)) - require.NoError(t, c.Update(ctx, fetched)) - fetched.Status.PluginPhase = "Executing" - require.NoError(t, c.Status().Update(ctx, fetched)) - // Non-TaskAction operations pass through untimed. - require.NoError(t, c.Get(ctx, client.ObjectKeyFromObject(cm), &corev1.ConfigMap{})) - - hist, ok := collectMetric(t, reader, "taskaction.k8s.duration").Data.(metricdata.Histogram[float64]) - require.True(t, ok) - for _, op := range []string{opGet, opUpdate, opStatusUpdate} { - assert.Equal(t, uint64(1), findOpDataPoint(t, hist, op).Count, "op=%s", op) - } - // Exactly the three TaskAction ops — the ConfigMap Get recorded nothing. - assert.Len(t, hist.DataPoints, 3) - - // When metrics registration failed, the client is returned unwrapped. - _, wrapped := newInstrumentedClient(base, nil).(*instrumentedClient) - assert.False(t, wrapped) + assert.NotPanics(t, func() { + nilMetrics.recordK8sOp(context.Background(), opGet, time.Now(), nil) + }) } diff --git a/executor/pkg/controller/taskaction_controller.go b/executor/pkg/controller/taskaction_controller.go index 4ce3940542..e53ae50b3f 100644 --- a/executor/pkg/controller/taskaction_controller.go +++ b/executor/pkg/controller/taskaction_controller.go @@ -169,7 +169,10 @@ func (r *TaskActionReconciler) recordSystemError( } if taskActionStatusChanged(original.Status, taskAction.Status) { - if updErr := r.Status().Update(ctx, taskAction); updErr != nil { + start := time.Now() + updErr := r.Status().Update(ctx, taskAction) + r.metrics.recordK8sOp(ctx, opStatusUpdate, start, updErr) + if updErr != nil { logger.Error(updErr, "failed to persist SystemFailures counter") } } @@ -201,8 +204,8 @@ func (r *TaskActionReconciler) finalizePermanentFailure( // NewTaskActionReconciler creates a new TaskActionReconciler. meterProvider is the // executor's OTel meter provider (otelutils.GetMeterProvider(otelServiceName) in -// executor/setup.go); the embedded client is wrapped so TaskAction CRD operations -// are timed under taskaction.k8s.duration (see newInstrumentedClient). +// executor/setup.go). TaskAction CRD operations are timed inline at the call +// sites via metrics.recordK8sOp. func NewTaskActionReconciler( c client.Client, scheme *runtime.Scheme, @@ -218,7 +221,7 @@ func NewTaskActionReconciler( log.Log.Error(err, "failed to register TaskAction OTel metrics") } return &TaskActionReconciler{ - Client: newInstrumentedClient(c, metrics), + Client: c, Scheme: scheme, PluginRegistry: registry, DataStore: dataStore, @@ -240,7 +243,10 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Fetch the TaskAction instance taskAction := &flyteorgv1.TaskAction{} - if err := r.Get(ctx, req.NamespacedName, taskAction); err != nil { + start := time.Now() + err := r.Get(ctx, req.NamespacedName, taskAction) + r.metrics.recordK8sOp(ctx, opGet, start, err) + if err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } r.metrics.observeCRDSize(ctx, taskAction) @@ -274,16 +280,21 @@ func (r *TaskActionReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.Recorder.Eventf(taskAction, corev1.EventTypeWarning, string(eventType), "%v", err) setCondition(taskAction, flyteorgv1.ConditionTypeFailed, metav1.ConditionTrue, reason, err.Error()) setCondition(taskAction, flyteorgv1.ConditionTypeProgressing, metav1.ConditionFalse, reason, err.Error()) - _ = r.Status().Update(ctx, taskAction) + start = time.Now() + updErr := r.Status().Update(ctx, taskAction) // error intentionally ignored: terminal either way + r.metrics.recordK8sOp(ctx, opStatusUpdate, start, updErr) return ctrl.Result{}, nil // terminal — do not requeue } // Ensure finalizer is present (once validation passes) if !controllerutil.ContainsFinalizer(taskAction, taskActionFinalizer) { controllerutil.AddFinalizer(taskAction, taskActionFinalizer) - if err := r.Update(ctx, taskAction); err != nil { - logger.Error(err, "Failed to update TaskAction with finalizer") - return ctrl.Result{}, err + start = time.Now() + updErr := r.Update(ctx, taskAction) + r.metrics.recordK8sOp(ctx, opUpdate, start, updErr) + if updErr != nil { + logger.Error(updErr, "Failed to update TaskAction with finalizer") + return ctrl.Result{}, updErr } return ctrl.Result{}, nil } @@ -510,7 +521,10 @@ func (r *TaskActionReconciler) handleAbortAndFinalize(ctx context.Context, taskA func (r *TaskActionReconciler) removeFinalizer(ctx context.Context, taskAction *flyteorgv1.TaskAction) (ctrl.Result, error) { controllerutil.RemoveFinalizer(taskAction, taskActionFinalizer) - if err := r.Update(ctx, taskAction); err != nil { + start := time.Now() + err := r.Update(ctx, taskAction) + r.metrics.recordK8sOp(ctx, opUpdate, start, err) + if err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil @@ -549,11 +563,17 @@ func (r *TaskActionReconciler) updateTaskActionStatus( // This will resolve the conflict error caused by k8s optimistic lock when 2 reconcile loops updating the same CRD if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { latest := &flyteorgv1.TaskAction{} - if getErr := r.Get(ctx, client.ObjectKeyFromObject(newTaskAction), latest); getErr != nil { + start := time.Now() + getErr := r.Get(ctx, client.ObjectKeyFromObject(newTaskAction), latest) + r.metrics.recordK8sOp(ctx, opGet, start, getErr) + if getErr != nil { return getErr } latest.Status = newTaskAction.Status - return r.Status().Update(ctx, latest) + start = time.Now() + updErr := r.Status().Update(ctx, latest) + r.metrics.recordK8sOp(ctx, opStatusUpdate, start, updErr) + return updErr }); err != nil { logger.Error(err, "Error updating status", "name", oldTaskAction.Name, "error", err, "TaskAction", newTaskAction) return err From c44b970132d703f78cc8551d6af9cd7fc411f786 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 10 Jun 2026 15:32:42 -0700 Subject: [PATCH 5/9] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kevin Su --- executor/pkg/controller/metrics.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index 8115c575f0..716cb76331 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -37,6 +37,9 @@ type taskActionMetrics struct { // provider (the executor's, registered in executor/setup.go). The active-by-phase // gauge is observed asynchronously by listing TaskActions from the controller cache. func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) (*taskActionMetrics, error) { + if _, ok := provider.(metricnoop.MeterProvider); ok { + return nil, nil + } meter := provider.Meter(taskActionMeterName) crdSize, err := meter.Int64Histogram( @@ -66,9 +69,13 @@ func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) ( } _, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + if c == nil { + return nil + } var list flyteorgv1.TaskActionList if err := c.List(ctx, &list); err != nil { - return err + // Avoid failing the entire metrics collection cycle when the cache/API is temporarily unavailable. + return nil } counts := make(map[string]int64, 8) for i := range list.Items { From 865948fe0c9024708f1189572176bcc2119e9aac Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 10 Jun 2026 15:41:37 -0700 Subject: [PATCH 6/9] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kevin Su --- executor/pkg/controller/metrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index 716cb76331..88140b8f20 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + metricnoop "go.opentelemetry.io/otel/metric/noop" "sigs.k8s.io/controller-runtime/pkg/client" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" From 849800e9098ba95384c54b93a0828f053b62fe56 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 10 Jun 2026 15:42:31 -0700 Subject: [PATCH 7/9] Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kevin Su --- executor/pkg/controller/metrics.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index 88140b8f20..0b62b99e6e 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -98,9 +98,8 @@ func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) ( return &taskActionMetrics{crdSizeBytes: crdSize, crdOpDuration: crdOp}, nil } -// observeCRDSize records the serialized size of a TaskAction CRD. No-op if metrics -// registration failed (m == nil). -func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) { +// observeCRDSize records the serialized size of a TaskAction CRD. No-op when +// custom metrics are disabled (m == nil). if m == nil || m.crdSizeBytes == nil { return } @@ -112,7 +111,7 @@ func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.T // recordK8sOp records the latency of a Kubernetes API operation against the // TaskAction CRD under taskaction.k8s.duration{op,error}. Call sites time the // operation inline (start := time.Now() before the call) and record right after. -// No-op when metrics registration failed (m == nil), so callers can record +// No-op when custom metrics are disabled (m == nil), so callers can record // unconditionally. func (m *taskActionMetrics) recordK8sOp(ctx context.Context, op string, start time.Time, err error) { if m == nil || m.crdOpDuration == nil { From a78ea873beb057f9192421dad1c8275d88f259d7 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 10 Jun 2026 16:30:25 -0700 Subject: [PATCH 8/9] fix: restore observeCRDSize function signature A "code review suggestions" commit dropped the `func (m *taskActionMetrics) observeCRDSize(...)` line, leaving its body at package scope and breaking the build. Restore the signature. Signed-off-by: Kevin Su --- executor/pkg/controller/metrics.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index 0b62b99e6e..2b5ebb5ce6 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -100,6 +100,7 @@ func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) ( // observeCRDSize records the serialized size of a TaskAction CRD. No-op when // custom metrics are disabled (m == nil). +func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) { if m == nil || m.crdSizeBytes == nil { return } From 1f52ff196b20e9e082a047afdeae159703919986 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 10 Jun 2026 17:42:49 -0700 Subject: [PATCH 9/9] refactor(executor): count active TaskActions from cache indexer (no deep-copy) The taskaction.active gauge listed CRDs via client.List on every metrics collection, which deep-copies every TaskAction out of the informer cache. At high CRD counts that is O(N) full-object copies plus GC churn each cycle. Read straight from the SharedIndexInformer's indexer instead: indexer.List() returns the cached object pointers without copying. Extract a pure countByPhase tally and inject it via cachedPhaseCounter(mgr.GetCache()), which also decouples registerTaskActionMetrics from the client and simplifies its test. Signed-off-by: Kevin Su --- executor/pkg/controller/metrics.go | 80 ++++++++++++++----- executor/pkg/controller/metrics_test.go | 51 ++++++------ .../pkg/controller/taskaction_controller.go | 9 ++- executor/setup.go | 2 +- 4 files changed, 92 insertions(+), 50 deletions(-) diff --git a/executor/pkg/controller/metrics.go b/executor/pkg/controller/metrics.go index 2b5ebb5ce6..b779e1200e 100644 --- a/executor/pkg/controller/metrics.go +++ b/executor/pkg/controller/metrics.go @@ -8,7 +8,8 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" metricnoop "go.opentelemetry.io/otel/metric/noop" - "sigs.k8s.io/controller-runtime/pkg/client" + toolscache "k8s.io/client-go/tools/cache" + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" ) @@ -36,8 +37,10 @@ type taskActionMetrics struct { // registerTaskActionMetrics wires the TaskAction OTel meters onto the given meter // provider (the executor's, registered in executor/setup.go). The active-by-phase -// gauge is observed asynchronously by listing TaskActions from the controller cache. -func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) (*taskActionMetrics, error) { +// gauge is observed asynchronously via activeByPhase, which returns the current +// TaskAction count per plugin phase (see cachedPhaseCounter for the cache-backed, +// no-copy implementation). A nil activeByPhase leaves the gauge unobserved. +func registerTaskActionMetrics(provider metric.MeterProvider, activeByPhase func(context.Context) map[string]int64) (*taskActionMetrics, error) { if _, ok := provider.(metricnoop.MeterProvider); ok { return nil, nil } @@ -70,23 +73,10 @@ func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) ( } _, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { - if c == nil { + if activeByPhase == nil { return nil } - var list flyteorgv1.TaskActionList - if err := c.List(ctx, &list); err != nil { - // Avoid failing the entire metrics collection cycle when the cache/API is temporarily unavailable. - return nil - } - counts := make(map[string]int64, 8) - for i := range list.Items { - phase := list.Items[i].Status.PluginPhase - if phase == "" { - phase = "Unknown" - } - counts[phase]++ - } - for phase, n := range counts { + for phase, n := range activeByPhase(ctx) { o.ObserveInt64(active, n, metric.WithAttributes(attribute.String("phase", phase))) } return nil @@ -98,6 +88,60 @@ func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) ( return &taskActionMetrics{crdSizeBytes: crdSize, crdOpDuration: crdOp}, nil } +// countByPhase tallies TaskActions by plugin phase; an empty phase maps to "Unknown". +// nil entries are skipped so callers can pass a raw cache listing without filtering. +func countByPhase(items []*flyteorgv1.TaskAction) map[string]int64 { + counts := make(map[string]int64, 8) + for _, ta := range items { + if ta == nil { + continue + } + phase := ta.Status.PluginPhase + if phase == "" { + phase = "Unknown" + } + counts[phase]++ + } + return counts +} + +// cachedPhaseCounter returns a function that counts TaskAction CRDs by plugin phase +// straight from the controller's informer cache indexer. Unlike client.List, the +// indexer's List returns the cached object pointers without deep-copying every CRD, +// so a collection cycle costs O(N) pointer reads instead of O(N) full-object copies — +// this is what keeps the active gauge cheap when many TaskActions exist. The indexer +// is resolved lazily on first call, because the cache is not yet started when the +// reconciler is constructed. +func cachedPhaseCounter(c ctrlcache.Cache) func(context.Context) map[string]int64 { + var indexer toolscache.Indexer + return func(ctx context.Context) map[string]int64 { + if indexer == nil { + if c == nil { + return nil + } + // BlockUntilSynced(false): never stall the metric-collection goroutine; a + // not-yet-synced cache just yields a partial count the next cycle corrects. + informer, err := c.GetInformer(ctx, &flyteorgv1.TaskAction{}, ctrlcache.BlockUntilSynced(false)) + if err != nil { + return nil + } + sii, ok := informer.(toolscache.SharedIndexInformer) + if !ok { + return nil + } + indexer = sii.GetIndexer() + } + objs := indexer.List() + items := make([]*flyteorgv1.TaskAction, 0, len(objs)) + for _, obj := range objs { + if ta, ok := obj.(*flyteorgv1.TaskAction); ok { + items = append(items, ta) + } + } + return countByPhase(items) + } +} + // observeCRDSize records the serialized size of a TaskAction CRD. No-op when // custom metrics are disabled (m == nil). func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) { diff --git a/executor/pkg/controller/metrics_test.go b/executor/pkg/controller/metrics_test.go index f384843b88..0545996880 100644 --- a/executor/pkg/controller/metrics_test.go +++ b/executor/pkg/controller/metrics_test.go @@ -12,24 +12,11 @@ import ( "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client/fake" flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" ) -// newTestMetricsScheme builds a scheme with the TaskAction CRD and core types -// (the latter for asserting non-TaskAction operations pass through untimed). -func newTestMetricsScheme(t *testing.T) *runtime.Scheme { - t.Helper() - s := runtime.NewScheme() - require.NoError(t, flyteorgv1.AddToScheme(s)) - require.NoError(t, corev1.AddToScheme(s)) - return s -} - // collectMetric drains the manual reader and returns the named metric. func collectMetric(t *testing.T, reader *sdkmetric.ManualReader, name string) metricdata.Metrics { t.Helper() @@ -60,20 +47,17 @@ func findOpDataPoint(t *testing.T, h metricdata.Histogram[float64], op string) m func TestRegisterTaskActionMetrics(t *testing.T) { reader := sdkmetric.NewManualReader() provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) - c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).WithObjects( - &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"}, - Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}}, - &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-2", Namespace: "default"}, - Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}}, - &flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-3", Namespace: "default"}}, - ).Build() - - m, err := registerTaskActionMetrics(provider, c) + + // The active gauge reports whatever the injected phase counter returns. + activeByPhase := func(context.Context) map[string]int64 { + return map[string]int64{"Executing": 2, "Unknown": 1} + } + + m, err := registerTaskActionMetrics(provider, activeByPhase) require.NoError(t, err) require.NotNil(t, m) - // Collection triggers the async gauge callback, which lists from the client - // and reports active CRD counts by phase ("" maps to Unknown). + // Collection triggers the async gauge callback, which observes one data point per phase. gauge, ok := collectMetric(t, reader, "taskaction.active").Data.(metricdata.Gauge[int64]) require.True(t, ok) counts := map[string]int64{} @@ -85,12 +69,24 @@ func TestRegisterTaskActionMetrics(t *testing.T) { assert.Equal(t, map[string]int64{"Executing": 2, "Unknown": 1}, counts) } +// TestCountByPhase covers the pure tallying used by cachedPhaseCounter: empty phase +// maps to "Unknown" and nil entries (which a raw cache listing may contain) are skipped. +func TestCountByPhase(t *testing.T) { + items := []*flyteorgv1.TaskAction{ + {ObjectMeta: metav1.ObjectMeta{Name: "ta-1"}, Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "ta-2"}, Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "ta-3"}}, // empty phase -> Unknown + nil, + } + assert.Equal(t, map[string]int64{"Executing": 2, "Unknown": 1}, countByPhase(items)) + assert.Equal(t, map[string]int64{}, countByPhase(nil)) +} + func TestObserveCRDSize(t *testing.T) { reader := sdkmetric.NewManualReader() provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) - c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).Build() - m, err := registerTaskActionMetrics(provider, c) + m, err := registerTaskActionMetrics(provider, nil) require.NoError(t, err) ta := &flyteorgv1.TaskAction{ @@ -117,9 +113,8 @@ func TestObserveCRDSize(t *testing.T) { func TestRecordK8sOp(t *testing.T) { reader := sdkmetric.NewManualReader() provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) - c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).Build() - m, err := registerTaskActionMetrics(provider, c) + m, err := registerTaskActionMetrics(provider, nil) require.NoError(t, err) // Records latency labeled by op/error. diff --git a/executor/pkg/controller/taskaction_controller.go b/executor/pkg/controller/taskaction_controller.go index e53ae50b3f..82470dbb86 100644 --- a/executor/pkg/controller/taskaction_controller.go +++ b/executor/pkg/controller/taskaction_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" + ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" @@ -204,8 +205,9 @@ func (r *TaskActionReconciler) finalizePermanentFailure( // NewTaskActionReconciler creates a new TaskActionReconciler. meterProvider is the // executor's OTel meter provider (otelutils.GetMeterProvider(otelServiceName) in -// executor/setup.go). TaskAction CRD operations are timed inline at the call -// sites via metrics.recordK8sOp. +// executor/setup.go). cache is the manager's cache (mgr.GetCache()); the active-by-phase +// gauge counts TaskActions straight from its indexer to avoid deep-copying every CRD. +// TaskAction CRD operations are timed inline at the call sites via metrics.recordK8sOp. func NewTaskActionReconciler( c client.Client, scheme *runtime.Scheme, @@ -214,8 +216,9 @@ func NewTaskActionReconciler( eventsClient workflowconnect.EventsProxyServiceClient, cluster string, meterProvider metric.MeterProvider, + cache ctrlcache.Cache, ) *TaskActionReconciler { - metrics, err := registerTaskActionMetrics(meterProvider, c) + metrics, err := registerTaskActionMetrics(meterProvider, cachedPhaseCounter(cache)) if err != nil { // Non-fatal: degrade to no custom metrics rather than failing controller setup. log.Log.Error(err, "failed to register TaskAction OTel metrics") diff --git a/executor/setup.go b/executor/setup.go index 23634b7aeb..73d7dc1ec2 100644 --- a/executor/setup.go +++ b/executor/setup.go @@ -167,7 +167,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error { reconciler := controller.NewTaskActionReconciler( mgr.GetClient(), mgr.GetScheme(), registry, dataStore, eventsClient, cfg.Cluster, - otelutils.GetMeterProvider(otelServiceName), + otelutils.GetMeterProvider(otelServiceName), mgr.GetCache(), ) reconciler.CatalogClient = asyncCatalogClient reconciler.Catalog = cacheClient