feat(executor): add OTel metrics for the TaskAction controller#7483
Conversation
There was a problem hiding this comment.
Pull request overview
Adds OpenTelemetry business metrics to the executor’s TaskAction controller to improve operator visibility into TaskAction CRD volume, phases, and CRD sizes, integrating with the existing otelutils.GetMeterProvider("executor") pipeline.
Changes:
- Registers OTel instruments for TaskAction metrics: an active-by-phase observable gauge and a CRD-size histogram.
- Records TaskAction CRD JSON size once per reconcile.
- Adds unit tests validating metric registration and nil-safe CRD size observation.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| executor/pkg/controller/taskaction_controller.go | Initializes controller metrics and records CRD size during reconcile. |
| executor/pkg/controller/metrics.go | Implements OTel metric registration and observation logic for TaskAction controller. |
| executor/pkg/controller/metrics_test.go | Adds unit tests for metric registration and CRD-size observation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
2fc6796 to
81e9c1f
Compare
Build on the meter provider from #7459 to instrument the TaskAction controller with two metrics the framework doesn't provide: - taskaction.active (Int64ObservableGauge): number of TaskAction CRDs by plugin phase, observed asynchronously by listing from the controller cache. - taskaction.crd.size_bytes (Int64Histogram): serialized size of a TaskAction CRD, recorded per reconcile. Both are emitted via otelutils.GetMeterProvider("executor"), so they flow through the same OTLP pipeline as the RPC metrics. Registration is non-fatal (degrades to no custom metrics) and observeCRDSize is nil-safe. Reconcile rate/latency, workqueue depth, and k8s API r/w latency are already exposed by the controller-runtime metrics server; event-proxy send latency is already captured by the otelconnect-wrapped events client. Signed-off-by: Kevin Su <pingsutw@apache.org>
- taskaction.k8s.duration{op,error}: per-operation latency (get / update /
status_update) of TaskAction CRD calls to the Kubernetes API, recorded via
thin timed wrappers around the embedded client. Fills the gap left by the
controller-runtime metrics server, which does not register
rest_client_request_duration_seconds in this version.
- Add otelconnect.WithoutServerPeerAttributes() to every server-side interceptor
(runs, actions, app, internal-app, dataproxy, cache, secret, events, executor).
otelconnect otherwise tags rpc_server_* metrics with net.peer.port (the client's
ephemeral source port), so each series is hit once -> rate() is always ~0 and the
cumulative OTLP payload grows unbounded (exceeds the collector's default 4MiB).
Signed-off-by: Kevin Su <pingsutw@apache.org>
81e9c1f to
9af0d96
Compare
…of wrappers Apply review feedback on the TaskAction OTel metrics: - Inject the meter provider into registerTaskActionMetrics / NewTaskActionReconciler instead of hardcoding "executor" in metrics.go. The name lives next to otelutils.RegisterProvidersWithContext in executor/setup.go, so a rename can no longer silently route the metrics to the noop provider. - Replace the crdGet/crdUpdate/crdStatusUpdate wrappers with a client.Client decorator (newInstrumentedClient) embedded in the reconciler. Call sites use the idiomatic r.Get/r.Update/r.Status().Update again and cannot accidentally bypass the timing; non-TaskAction objects pass through untimed. Op labels become typed constants. - With a real (injectable) provider, tests now assert recorded data via a manual reader: CRD size sum, op/error labels on taskaction.k8s.duration, gauge phase counts through the async callback, and pass-through behaviour of the instrumented client. Signed-off-by: Kevin Su <pingsutw@apache.org>
| // newInstrumentedClient wraps c so TaskAction CRD operations (Get, Update, and | ||
| // Status().Update) are timed under taskaction.k8s.duration. Operations on other | ||
| // object types pass through untimed. When metrics registration failed (m == nil) | ||
| // it returns c unchanged, so callers can wrap unconditionally. | ||
| // | ||
| // The reconciler embeds the wrapped client, which makes the instrumentation | ||
| // structural: call sites use the idiomatic r.Get/r.Update/r.Status().Update and | ||
| // cannot accidentally bypass the timing. |
Drop the instrumentedClient wrapper in favour of explicit inline timing at each call site: start := time.Now() before the operation, then metrics.recordK8sOp(ctx, op, start, err) right after. The timing is now visible where it happens, with no hidden interception behind the embedded client. recordK8sOp stays nil-receiver-safe so struct-literal constructed reconcilers (tests) and failed registration degrade to no-ops. Signed-off-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kevin Su <pingsutw@gmail.com>
| otelInterceptor, err := otelconnect.NewInterceptor( | ||
| otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)), | ||
| otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)), | ||
| otelconnect.WithoutServerPeerAttributes(), | ||
| ) |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kevin Su <pingsutw@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Kevin Su <pingsutw@gmail.com>
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| "go.opentelemetry.io/otel/attribute" | ||
| sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||
| "go.opentelemetry.io/otel/sdk/metric/metricdata" | ||
| corev1 "k8s.io/api/core/v1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/runtime" | ||
| "sigs.k8s.io/controller-runtime/pkg/client/fake" | ||
|
|
||
| flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1" | ||
| ) | ||
|
|
||
| // newTestMetricsScheme builds a scheme with the TaskAction CRD and core types | ||
| // (the latter for asserting non-TaskAction operations pass through untimed). | ||
| func newTestMetricsScheme(t *testing.T) *runtime.Scheme { |
A "code review suggestions" commit dropped the `func (m *taskActionMetrics) observeCRDSize(...)` line, leaving its body at package scope and breaking the build. Restore the signature. Signed-off-by: Kevin Su <pingsutw@apache.org>
…eep-copy) The taskaction.active gauge listed CRDs via client.List on every metrics collection, which deep-copies every TaskAction out of the informer cache. At high CRD counts that is O(N) full-object copies plus GC churn each cycle. Read straight from the SharedIndexInformer's indexer instead: indexer.List() returns the cached object pointers without copying. Extract a pure countByPhase tally and inject it via cachedPhaseCounter(mgr.GetCache()), which also decouples registerTaskActionMetrics from the client and simplifies its test. Signed-off-by: Kevin Su <pingsutw@apache.org>
| return countByPhase(items) | ||
| } | ||
| } | ||
|
|
||
| // observeCRDSize records the serialized size of a TaskAction CRD. No-op when | ||
| // custom metrics are disabled (m == nil). | ||
| func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) { | ||
| if m == nil || m.crdSizeBytes == nil { | ||
| return | ||
| } | ||
| if b, err := json.Marshal(ta); err == nil { | ||
| m.crdSizeBytes.Record(ctx, int64(len(b))) | ||
| } | ||
| } | ||
|
|
||
| // recordK8sOp records the latency of a Kubernetes API operation against the |
There was a problem hiding this comment.
i'm going to run some load tests, will remove this if it causes too much overhead
There was a problem hiding this comment.
This will definitely cause a good amount of load. We find that de(serialization) is most of the CPU load in propeller v1
| metrics, err := registerTaskActionMetrics(meterProvider, cachedPhaseCounter(cache)) | ||
| if err != nil { | ||
| // Non-fatal: degrade to no custom metrics rather than failing controller setup. | ||
| log.Log.Error(err, "failed to register TaskAction OTel metrics") |
There was a problem hiding this comment.
You could argue it should fail. I think something really bad would have to happen for this to error
Tracking issue
Related to #7459
Why are the changes needed?
#7459 added the OpenTelemetry meter-provider plumbing (
otelutils.GetMeterProvider)and instrumented the connect RPC clients/servers, but added no business metrics for
the TaskAction controller — operators have no visibility into how many TaskAction CRDs
exist, what phase they're in, how large they are, or how long their Kubernetes API
operations take.
It also surfaced a cardinality problem: the otelconnect server interceptors tag every
rpc_server_*metric with the caller's ephemeral source port, which is unbounded.What changes were proposed in this pull request?
1. TaskAction controller metrics
Three custom OTel meters, registered on the executor meter provider that is injected
into the reconciler (
NewTaskActionReconciler(... , meterProvider, cache)→registerTaskActionMetrics(provider, cachedPhaseCounter(cache))), so they flow throughthe same OTLP pipeline as the RPC metrics and the helper is testable without globals:
taskaction.active(Int64ObservableGauge) — number of TaskAction CRDs byphase. Observed asynchronously via a callback that counts straight from thecontroller cache's informer indexer (
indexer.List()returns the cached objectpointers, so there is no per-collection deep-copy of every CRD — unlike
client.List,this stays O(N) pointer reads even with many TaskActions). Off the reconcile hot path;
runs on the SDK's collection goroutine ~once per export.
taskaction.crd.size_bytes(Int64Histogram) — serialized (JSON) size of aTaskAction CRD, recorded once per reconcile.
taskaction.k8s.duration(Float64Histogram,op/errorattributes) —per-operation latency of TaskAction CRD calls to the Kubernetes API
(
get/update/status_update), timed inline at the call sites and recorded viarecordK8sOp.Registration short-circuits to a no-op when metrics are disabled (the meter provider is
a noop provider), and every record helper is nil-safe, so the controller degrades to no
custom metrics rather than failing setup.
2. Reduce server RPC metric cardinality
Add
otelconnect.WithoutServerPeerAttributes()to the server interceptors across allservices (runs, actions, app, internal-app, dataproxy, cache, secret, events, executor).
otelconnect otherwise tags
rpc_server_*metrics/spans withnet.peer.nameandnet.peer.port— on the server side these are the caller's IP and ephemeral sourceport, so the port is a new value per connection. That makes each series a one-shot
(
rate()≈ 0), explodes cardinality, and grows the cumulative OTLP payload past thecollector's default 4 MiB gRPC limit. This drops those two attributes from server-side
telemetry only; client-side telemetry (where
net.peer.*is the server's fixed address)is untouched, and no metric is removed — only two high-cardinality labels.
Deliberately not re-implemented (already provided)
:10254)rpc_client_duration{rpc_method="Record"}How was this patch tested?
metrics_test.go):TestRegisterTaskActionMetrics(gauge observesthe injected phase counter),
TestCountByPhase(pure tally: empty phase →Unknown,nil entries skipped),
TestObserveCRDSize(records and is nil-safe),TestRecordK8sOp(records latency and is nil-safe).
go build ./...,go vet, andgofmtclean.taskaction_*metrics flow through an OTelcollector → Prometheus and render in Grafana next to the controller-runtime and RPC
panels.
Labels
added
Check all the applicable boxes