Skip to content
1 change: 1 addition & 0 deletions actions/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions app/internal/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions app/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
Comment thread
pingsutw marked this conversation as resolved.
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cache_service/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
tracerProvider := otelutils.GetTracerProvider(otelServiceName)
meterProvider := otelutils.GetMeterProvider(otelServiceName)

otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider))
otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider), otelconnect.WithoutServerPeerAttributes())
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions dataproxy/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions events/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
167 changes: 167 additions & 0 deletions executor/pkg/controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package controller

import (
"context"
"encoding/json"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
metricnoop "go.opentelemetry.io/otel/metric/noop"
toolscache "k8s.io/client-go/tools/cache"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
)
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
Copilot marked this conversation as resolved.

const taskActionMeterName = "taskaction-controller"

// Values for the "op" attribute on taskaction.k8s.duration.
const (
opGet = "get"
opUpdate = "update"
opStatusUpdate = "status_update"
)

// taskActionMetrics holds OTel instruments for the TaskAction controller.
//
// Reconcile throughput, active workers, and workqueue latency already come from
// the controller-runtime metrics server, and event-proxy send latency comes from
// the otelconnect-wrapped events client (rpc_client_duration). This adds what
// none of those provide: TaskAction CRD count by phase, serialized CRD size, and
// per-operation Kubernetes API read/write latency for the CRD.
type taskActionMetrics struct {
crdSizeBytes metric.Int64Histogram
crdOpDuration metric.Float64Histogram
}

// registerTaskActionMetrics wires the TaskAction OTel meters onto the given meter
// provider (the executor's, registered in executor/setup.go). The active-by-phase
// gauge is observed asynchronously via activeByPhase, which returns the current
// TaskAction count per plugin phase (see cachedPhaseCounter for the cache-backed,
// no-copy implementation). A nil activeByPhase leaves the gauge unobserved.
func registerTaskActionMetrics(provider metric.MeterProvider, activeByPhase func(context.Context) map[string]int64) (*taskActionMetrics, error) {
if _, ok := provider.(metricnoop.MeterProvider); ok {
return nil, nil
}
meter := provider.Meter(taskActionMeterName)

crdSize, err := meter.Int64Histogram(
"taskaction.crd.size_bytes",
metric.WithDescription("Serialized (JSON) size of a TaskAction CRD"),
metric.WithUnit("By"),
)
if err != nil {
return nil, err
}

crdOp, err := meter.Float64Histogram(
"taskaction.k8s.duration",
metric.WithDescription("Latency of TaskAction CRD operations against the Kubernetes API, labeled by op (get/update/status_update)"),
metric.WithUnit("ms"),
)
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
pingsutw marked this conversation as resolved.
if err != nil {
return nil, err
}

active, err := meter.Int64ObservableGauge(
"taskaction.active",
metric.WithDescription("Number of TaskAction CRDs, labeled by plugin phase"),
)
if err != nil {
return nil, err
}

_, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if activeByPhase == nil {
return nil
}
for phase, n := range activeByPhase(ctx) {
o.ObserveInt64(active, n, metric.WithAttributes(attribute.String("phase", phase)))
}
return nil
}, active)
if err != nil {
return nil, err
}

return &taskActionMetrics{crdSizeBytes: crdSize, crdOpDuration: crdOp}, nil
}

// countByPhase tallies TaskActions by plugin phase; an empty phase maps to "Unknown".
// nil entries are skipped so callers can pass a raw cache listing without filtering.
func countByPhase(items []*flyteorgv1.TaskAction) map[string]int64 {
counts := make(map[string]int64, 8)
for _, ta := range items {
if ta == nil {
continue
}
phase := ta.Status.PluginPhase
if phase == "" {
phase = "Unknown"
}
counts[phase]++
}
return counts
}

// cachedPhaseCounter returns a function that counts TaskAction CRDs by plugin phase
// straight from the controller's informer cache indexer. Unlike client.List, the
// indexer's List returns the cached object pointers without deep-copying every CRD,
// so a collection cycle costs O(N) pointer reads instead of O(N) full-object copies —
// this is what keeps the active gauge cheap when many TaskActions exist. The indexer
// is resolved lazily on first call, because the cache is not yet started when the
// reconciler is constructed.
func cachedPhaseCounter(c ctrlcache.Cache) func(context.Context) map[string]int64 {
var indexer toolscache.Indexer
return func(ctx context.Context) map[string]int64 {
if indexer == nil {
if c == nil {
return nil
}
// BlockUntilSynced(false): never stall the metric-collection goroutine; a
// not-yet-synced cache just yields a partial count the next cycle corrects.
informer, err := c.GetInformer(ctx, &flyteorgv1.TaskAction{}, ctrlcache.BlockUntilSynced(false))
if err != nil {
return nil
}
sii, ok := informer.(toolscache.SharedIndexInformer)
if !ok {
return nil
}
indexer = sii.GetIndexer()
}
objs := indexer.List()
items := make([]*flyteorgv1.TaskAction, 0, len(objs))
for _, obj := range objs {
if ta, ok := obj.(*flyteorgv1.TaskAction); ok {
items = append(items, ta)
}
}
return countByPhase(items)
}
}

// observeCRDSize records the serialized size of a TaskAction CRD. No-op when
// custom metrics are disabled (m == nil).
func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) {
if m == nil || m.crdSizeBytes == nil {
return
}
if b, err := json.Marshal(ta); err == nil {
m.crdSizeBytes.Record(ctx, int64(len(b)))
}
}

// recordK8sOp records the latency of a Kubernetes API operation against the
Comment on lines +141 to +156

@pingsutw pingsutw Jun 11, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm going to run some load tests, will remove this if it causes too much overhead

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will definitely cause a good amount of load. We find that de(serialization) is most of the CPU load in propeller v1

// TaskAction CRD under taskaction.k8s.duration{op,error}. Call sites time the
// operation inline (start := time.Now() before the call) and record right after.
// No-op when custom metrics are disabled (m == nil), so callers can record
// unconditionally.
func (m *taskActionMetrics) recordK8sOp(ctx context.Context, op string, start time.Time, err error) {
if m == nil || m.crdOpDuration == nil {
return
}
m.crdOpDuration.Record(ctx, float64(time.Since(start).Microseconds())/1000.0,
metric.WithAttributes(attribute.String("op", op), attribute.Bool("error", err != nil)))
}
140 changes: 140 additions & 0 deletions executor/pkg/controller/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package controller

import (
"context"
"encoding/json"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
)

// collectMetric drains the manual reader and returns the named metric.
func collectMetric(t *testing.T, reader *sdkmetric.ManualReader, name string) metricdata.Metrics {
t.Helper()
var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &rm))
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name == name {
return m
}
}
}
t.Fatalf("metric %q not found", name)
return metricdata.Metrics{}
}

func findOpDataPoint(t *testing.T, h metricdata.Histogram[float64], op string) metricdata.HistogramDataPoint[float64] {
t.Helper()
for _, dp := range h.DataPoints {
if v, ok := dp.Attributes.Value(attribute.Key("op")); ok && v.AsString() == op {
return dp
}
}
t.Fatalf("no taskaction.k8s.duration datapoint with op=%q", op)
return metricdata.HistogramDataPoint[float64]{}
}

func TestRegisterTaskActionMetrics(t *testing.T) {
reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))

// The active gauge reports whatever the injected phase counter returns.
activeByPhase := func(context.Context) map[string]int64 {
return map[string]int64{"Executing": 2, "Unknown": 1}
}

m, err := registerTaskActionMetrics(provider, activeByPhase)
require.NoError(t, err)
require.NotNil(t, m)

// Collection triggers the async gauge callback, which observes one data point per phase.
gauge, ok := collectMetric(t, reader, "taskaction.active").Data.(metricdata.Gauge[int64])
require.True(t, ok)
counts := map[string]int64{}
for _, dp := range gauge.DataPoints {
v, ok := dp.Attributes.Value(attribute.Key("phase"))
require.True(t, ok)
counts[v.AsString()] = dp.Value
}
assert.Equal(t, map[string]int64{"Executing": 2, "Unknown": 1}, counts)
}

// TestCountByPhase covers the pure tallying used by cachedPhaseCounter: empty phase
// maps to "Unknown" and nil entries (which a raw cache listing may contain) are skipped.
func TestCountByPhase(t *testing.T) {
items := []*flyteorgv1.TaskAction{
{ObjectMeta: metav1.ObjectMeta{Name: "ta-1"}, Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}},
{ObjectMeta: metav1.ObjectMeta{Name: "ta-2"}, Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}},
{ObjectMeta: metav1.ObjectMeta{Name: "ta-3"}}, // empty phase -> Unknown
nil,
}
assert.Equal(t, map[string]int64{"Executing": 2, "Unknown": 1}, countByPhase(items))
assert.Equal(t, map[string]int64{}, countByPhase(nil))
}

func TestObserveCRDSize(t *testing.T) {
reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))

m, err := registerTaskActionMetrics(provider, nil)
require.NoError(t, err)

ta := &flyteorgv1.TaskAction{
ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"},
Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"},
}
m.observeCRDSize(context.Background(), ta)

b, err := json.Marshal(ta)
require.NoError(t, err)
hist, ok := collectMetric(t, reader, "taskaction.crd.size_bytes").Data.(metricdata.Histogram[int64])
require.True(t, ok)
require.Len(t, hist.DataPoints, 1)
assert.Equal(t, uint64(1), hist.DataPoints[0].Count)
assert.Equal(t, int64(len(b)), hist.DataPoints[0].Sum)

// Safe no-op when metrics registration failed (nil receiver).
var nilMetrics *taskActionMetrics
assert.NotPanics(t, func() {
nilMetrics.observeCRDSize(context.Background(), &flyteorgv1.TaskAction{})
})
}

func TestRecordK8sOp(t *testing.T) {
reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))

m, err := registerTaskActionMetrics(provider, nil)
require.NoError(t, err)

// Records latency labeled by op/error.
m.recordK8sOp(context.Background(), opGet, time.Now(), errors.New("boom"))
m.recordK8sOp(context.Background(), opUpdate, time.Now(), nil)

hist, ok := collectMetric(t, reader, "taskaction.k8s.duration").Data.(metricdata.Histogram[float64])
require.True(t, ok)
get := findOpDataPoint(t, hist, opGet)
errAttr, ok := get.Attributes.Value(attribute.Key("error"))
require.True(t, ok)
assert.True(t, errAttr.AsBool())
update := findOpDataPoint(t, hist, opUpdate)
errAttr, ok = update.Attributes.Value(attribute.Key("error"))
require.True(t, ok)
assert.False(t, errAttr.AsBool())

// Safe no-op when metrics registration failed (nil receiver).
var nilMetrics *taskActionMetrics
assert.NotPanics(t, func() {
nilMetrics.recordK8sOp(context.Background(), opGet, time.Now(), nil)
})
}
Loading
Loading