Skip to content
1 change: 1 addition & 0 deletions actions/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions app/internal/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions app/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
Comment thread
pingsutw marked this conversation as resolved.
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cache_service/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
tracerProvider := otelutils.GetTracerProvider(otelServiceName)
meterProvider := otelutils.GetMeterProvider(otelServiceName)

otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider))
otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider), otelconnect.WithoutServerPeerAttributes())
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions dataproxy/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions events/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
122 changes: 122 additions & 0 deletions executor/pkg/controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package controller

import (
"context"
"encoding/json"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"sigs.k8s.io/controller-runtime/pkg/client"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
)
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
Copilot marked this conversation as resolved.

const taskActionMeterName = "taskaction-controller"

// Values for the "op" attribute on taskaction.k8s.duration.
const (
opGet = "get"
opUpdate = "update"
opStatusUpdate = "status_update"
)

// taskActionMetrics holds OTel instruments for the TaskAction controller.
//
// Reconcile throughput, active workers, and workqueue latency already come from
// the controller-runtime metrics server, and event-proxy send latency comes from
// the otelconnect-wrapped events client (rpc_client_duration). This adds what
// none of those provide: TaskAction CRD count by phase, serialized CRD size, and
// per-operation Kubernetes API read/write latency for the CRD.
type taskActionMetrics struct {
crdSizeBytes metric.Int64Histogram
crdOpDuration metric.Float64Histogram
}

// registerTaskActionMetrics wires the TaskAction OTel meters onto the given meter
// provider (the executor's, registered in executor/setup.go). The active-by-phase
// gauge is observed asynchronously by listing TaskActions from the controller cache.
func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) (*taskActionMetrics, error) {
if _, ok := provider.(metricnoop.MeterProvider); ok {

Check failure on line 40 in executor/pkg/controller/metrics.go

View workflow job for this annotation

GitHub Actions / test (manager)

undefined: metricnoop

Check failure on line 40 in executor/pkg/controller/metrics.go

View workflow job for this annotation

GitHub Actions / test (executor)

undefined: metricnoop
return nil, nil
}
Comment thread
pingsutw marked this conversation as resolved.
Outdated
meter := provider.Meter(taskActionMeterName)

crdSize, err := meter.Int64Histogram(
"taskaction.crd.size_bytes",
metric.WithDescription("Serialized (JSON) size of a TaskAction CRD"),
metric.WithUnit("By"),
)
if err != nil {
return nil, err
}

crdOp, err := meter.Float64Histogram(
"taskaction.k8s.duration",
metric.WithDescription("Latency of TaskAction CRD operations against the Kubernetes API, labeled by op (get/update/status_update)"),
metric.WithUnit("ms"),
)
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
pingsutw marked this conversation as resolved.
if err != nil {
return nil, err
}

active, err := meter.Int64ObservableGauge(
"taskaction.active",
metric.WithDescription("Number of TaskAction CRDs, labeled by plugin phase"),
)
if err != nil {
return nil, err
}

_, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if c == nil {
return nil
}
var list flyteorgv1.TaskActionList
if err := c.List(ctx, &list); err != nil {
// Avoid failing the entire metrics collection cycle when the cache/API is temporarily unavailable.
return nil
}
counts := make(map[string]int64, 8)
for i := range list.Items {
phase := list.Items[i].Status.PluginPhase
if phase == "" {
phase = "Unknown"
}
counts[phase]++
}
for phase, n := range counts {
o.ObserveInt64(active, n, metric.WithAttributes(attribute.String("phase", phase)))
}
return nil
}, active)
if err != nil {
return nil, err
}

return &taskActionMetrics{crdSizeBytes: crdSize, crdOpDuration: crdOp}, nil
}

// observeCRDSize records the serialized size of a TaskAction CRD. No-op if metrics
// registration failed (m == nil).
func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) {
Comment thread
pingsutw marked this conversation as resolved.
Outdated
if m == nil || m.crdSizeBytes == nil {
return
}
if b, err := json.Marshal(ta); err == nil {
m.crdSizeBytes.Record(ctx, int64(len(b)))
}
}

// recordK8sOp records the latency of a Kubernetes API operation against the
Comment on lines +141 to +156

@pingsutw pingsutw Jun 11, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm going to run some load tests, will remove this if it causes too much overhead

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will definitely cause a good amount of load. We find that de(serialization) is most of the CPU load in propeller v1

// TaskAction CRD under taskaction.k8s.duration{op,error}. Call sites time the
// operation inline (start := time.Now() before the call) and record right after.
// No-op when metrics registration failed (m == nil), so callers can record
// unconditionally.
Comment thread
pingsutw marked this conversation as resolved.
Outdated
func (m *taskActionMetrics) recordK8sOp(ctx context.Context, op string, start time.Time, err error) {
if m == nil || m.crdOpDuration == nil {
return
}
m.crdOpDuration.Record(ctx, float64(time.Since(start).Microseconds())/1000.0,
metric.WithAttributes(attribute.String("op", op), attribute.Bool("error", err != nil)))
}
145 changes: 145 additions & 0 deletions executor/pkg/controller/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package controller

import (
"context"
"encoding/json"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
)

// newTestMetricsScheme builds a scheme with the TaskAction CRD and core types
// (the latter for asserting non-TaskAction operations pass through untimed).
func newTestMetricsScheme(t *testing.T) *runtime.Scheme {
t.Helper()
s := runtime.NewScheme()
require.NoError(t, flyteorgv1.AddToScheme(s))
require.NoError(t, corev1.AddToScheme(s))
return s
}

// collectMetric drains the manual reader and returns the named metric.
func collectMetric(t *testing.T, reader *sdkmetric.ManualReader, name string) metricdata.Metrics {
t.Helper()
var rm metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &rm))
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
if m.Name == name {
return m
}
}
}
t.Fatalf("metric %q not found", name)
return metricdata.Metrics{}
}

func findOpDataPoint(t *testing.T, h metricdata.Histogram[float64], op string) metricdata.HistogramDataPoint[float64] {
t.Helper()
for _, dp := range h.DataPoints {
if v, ok := dp.Attributes.Value(attribute.Key("op")); ok && v.AsString() == op {
return dp
}
}
t.Fatalf("no taskaction.k8s.duration datapoint with op=%q", op)
return metricdata.HistogramDataPoint[float64]{}
}

func TestRegisterTaskActionMetrics(t *testing.T) {
reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).WithObjects(
&flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"},
Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}},
&flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-2", Namespace: "default"},
Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"}},
&flyteorgv1.TaskAction{ObjectMeta: metav1.ObjectMeta{Name: "ta-3", Namespace: "default"}},
).Build()

m, err := registerTaskActionMetrics(provider, c)
require.NoError(t, err)
require.NotNil(t, m)

// Collection triggers the async gauge callback, which lists from the client
// and reports active CRD counts by phase ("" maps to Unknown).
gauge, ok := collectMetric(t, reader, "taskaction.active").Data.(metricdata.Gauge[int64])
require.True(t, ok)
counts := map[string]int64{}
for _, dp := range gauge.DataPoints {
v, ok := dp.Attributes.Value(attribute.Key("phase"))
require.True(t, ok)
counts[v.AsString()] = dp.Value
}
assert.Equal(t, map[string]int64{"Executing": 2, "Unknown": 1}, counts)
}

func TestObserveCRDSize(t *testing.T) {
reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).Build()

m, err := registerTaskActionMetrics(provider, c)
require.NoError(t, err)

ta := &flyteorgv1.TaskAction{
ObjectMeta: metav1.ObjectMeta{Name: "ta-1", Namespace: "default"},
Status: flyteorgv1.TaskActionStatus{PluginPhase: "Executing"},
}
m.observeCRDSize(context.Background(), ta)

b, err := json.Marshal(ta)
require.NoError(t, err)
hist, ok := collectMetric(t, reader, "taskaction.crd.size_bytes").Data.(metricdata.Histogram[int64])
require.True(t, ok)
require.Len(t, hist.DataPoints, 1)
assert.Equal(t, uint64(1), hist.DataPoints[0].Count)
assert.Equal(t, int64(len(b)), hist.DataPoints[0].Sum)

// Safe no-op when metrics registration failed (nil receiver).
var nilMetrics *taskActionMetrics
assert.NotPanics(t, func() {
nilMetrics.observeCRDSize(context.Background(), &flyteorgv1.TaskAction{})
})
}

func TestRecordK8sOp(t *testing.T) {
reader := sdkmetric.NewManualReader()
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
c := fake.NewClientBuilder().WithScheme(newTestMetricsScheme(t)).Build()

m, err := registerTaskActionMetrics(provider, c)
require.NoError(t, err)

// Records latency labeled by op/error.
m.recordK8sOp(context.Background(), opGet, time.Now(), errors.New("boom"))
m.recordK8sOp(context.Background(), opUpdate, time.Now(), nil)

hist, ok := collectMetric(t, reader, "taskaction.k8s.duration").Data.(metricdata.Histogram[float64])
require.True(t, ok)
get := findOpDataPoint(t, hist, opGet)
errAttr, ok := get.Attributes.Value(attribute.Key("error"))
require.True(t, ok)
assert.True(t, errAttr.AsBool())
update := findOpDataPoint(t, hist, opUpdate)
errAttr, ok = update.Attributes.Value(attribute.Key("error"))
require.True(t, ok)
assert.False(t, errAttr.AsBool())

// Safe no-op when metrics registration failed (nil receiver).
var nilMetrics *taskActionMetrics
assert.NotPanics(t, func() {
nilMetrics.recordK8sOp(context.Background(), opGet, time.Now(), nil)
})
}
Loading
Loading