Skip to content
1 change: 1 addition & 0 deletions actions/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions app/internal/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext, cfg *appconfig.Inter
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions app/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func Setup(ctx context.Context, sc *stdlibapp.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
Comment thread
pingsutw marked this conversation as resolved.
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cache_service/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
tracerProvider := otelutils.GetTracerProvider(otelServiceName)
meterProvider := otelutils.GetMeterProvider(otelServiceName)

otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider))
otelInterceptor, err := otelconnect.NewInterceptor(otelconnect.WithTracerProvider(tracerProvider), otelconnect.WithMeterProvider(meterProvider), otelconnect.WithoutServerPeerAttributes())
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions dataproxy/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
1 change: 1 addition & 0 deletions events/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Setup(ctx context.Context, sc *app.SetupContext) error {
otelInterceptor, err := otelconnect.NewInterceptor(
otelconnect.WithTracerProvider(otelutils.GetTracerProvider(otelServiceName)),
otelconnect.WithMeterProvider(otelutils.GetMeterProvider(otelServiceName)),
otelconnect.WithoutServerPeerAttributes(),
)
if err != nil {
return fmt.Errorf("creating otel interceptor: %w", err)
Expand Down
59 changes: 59 additions & 0 deletions executor/pkg/controller/instrumented_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package controller

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
)

// newInstrumentedClient wraps c so TaskAction CRD operations (Get, Update, and
// Status().Update) are timed under taskaction.k8s.duration. Operations on other
// object types pass through untimed. When metrics registration failed (m == nil)
// it returns c unchanged, so callers can wrap unconditionally.
//
// The reconciler embeds the wrapped client, which makes the instrumentation
// structural: call sites use the idiomatic r.Get/r.Update/r.Status().Update and
// cannot accidentally bypass the timing.
func newInstrumentedClient(c client.Client, m *taskActionMetrics) client.Client {
if m == nil {
return c
}
return &instrumentedClient{Client: c, metrics: m}
}

type instrumentedClient struct {
client.Client
metrics *taskActionMetrics
}

func (c *instrumentedClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
if _, ok := obj.(*flyteorgv1.TaskAction); !ok {
return c.Client.Get(ctx, key, obj, opts...)
}
return c.metrics.timeK8sOp(ctx, opGet, func() error { return c.Client.Get(ctx, key, obj, opts...) })
}

func (c *instrumentedClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if _, ok := obj.(*flyteorgv1.TaskAction); !ok {
return c.Client.Update(ctx, obj, opts...)
}
return c.metrics.timeK8sOp(ctx, opUpdate, func() error { return c.Client.Update(ctx, obj, opts...) })
}

func (c *instrumentedClient) Status() client.SubResourceWriter {
return &instrumentedStatusWriter{SubResourceWriter: c.Client.Status(), metrics: c.metrics}
}

type instrumentedStatusWriter struct {
client.SubResourceWriter
metrics *taskActionMetrics
}

func (w *instrumentedStatusWriter) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
if _, ok := obj.(*flyteorgv1.TaskAction); !ok {
return w.SubResourceWriter.Update(ctx, obj, opts...)
}
return w.metrics.timeK8sOp(ctx, opStatusUpdate, func() error { return w.SubResourceWriter.Update(ctx, obj, opts...) })
}
116 changes: 116 additions & 0 deletions executor/pkg/controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package controller

import (
"context"
"encoding/json"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"sigs.k8s.io/controller-runtime/pkg/client"

flyteorgv1 "github.com/flyteorg/flyte/v2/executor/api/v1"
)
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
Copilot marked this conversation as resolved.

const taskActionMeterName = "taskaction-controller"

// Values for the "op" attribute on taskaction.k8s.duration.
const (
opGet = "get"
opUpdate = "update"
opStatusUpdate = "status_update"
)

// taskActionMetrics holds OTel instruments for the TaskAction controller.
//
// Reconcile throughput, active workers, and workqueue latency already come from
// the controller-runtime metrics server, and event-proxy send latency comes from
// the otelconnect-wrapped events client (rpc_client_duration). This adds what
// none of those provide: TaskAction CRD count by phase, serialized CRD size, and
// per-operation Kubernetes API read/write latency for the CRD.
type taskActionMetrics struct {
crdSizeBytes metric.Int64Histogram
crdOpDuration metric.Float64Histogram
}

// registerTaskActionMetrics wires the TaskAction OTel meters onto the given meter
// provider (the executor's, registered in executor/setup.go). The active-by-phase
// gauge is observed asynchronously by listing TaskActions from the controller cache.
func registerTaskActionMetrics(provider metric.MeterProvider, c client.Client) (*taskActionMetrics, error) {
meter := provider.Meter(taskActionMeterName)
Comment thread
pingsutw marked this conversation as resolved.
Outdated

crdSize, err := meter.Int64Histogram(
"taskaction.crd.size_bytes",
metric.WithDescription("Serialized (JSON) size of a TaskAction CRD"),
metric.WithUnit("By"),
)
if err != nil {
return nil, err
}

crdOp, err := meter.Float64Histogram(
"taskaction.k8s.duration",
metric.WithDescription("Latency of TaskAction CRD operations against the Kubernetes API, labeled by op (get/update/status_update)"),
metric.WithUnit("ms"),
)
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
pingsutw marked this conversation as resolved.
Comment thread
pingsutw marked this conversation as resolved.
if err != nil {
return nil, err
}

active, err := meter.Int64ObservableGauge(
"taskaction.active",
metric.WithDescription("Number of TaskAction CRDs, labeled by plugin phase"),
)
if err != nil {
return nil, err
}

_, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
var list flyteorgv1.TaskActionList
if err := c.List(ctx, &list); err != nil {
return err
}
Comment thread
pingsutw marked this conversation as resolved.
Outdated
Comment thread
pingsutw marked this conversation as resolved.
Outdated
counts := make(map[string]int64, 8)
Comment thread
pingsutw marked this conversation as resolved.
Outdated
for i := range list.Items {
phase := list.Items[i].Status.PluginPhase
if phase == "" {
phase = "Unknown"
}
counts[phase]++
}
for phase, n := range counts {
o.ObserveInt64(active, n, metric.WithAttributes(attribute.String("phase", phase)))
}
return nil
}, active)
if err != nil {
return nil, err
}

return &taskActionMetrics{crdSizeBytes: crdSize, crdOpDuration: crdOp}, nil
}

// observeCRDSize records the serialized size of a TaskAction CRD. No-op if metrics
// registration failed (m == nil).
func (m *taskActionMetrics) observeCRDSize(ctx context.Context, ta *flyteorgv1.TaskAction) {
Comment thread
pingsutw marked this conversation as resolved.
Outdated
if m == nil || m.crdSizeBytes == nil {
return
}
if b, err := json.Marshal(ta); err == nil {
m.crdSizeBytes.Record(ctx, int64(len(b)))
}
}

// timeK8sOp times a Kubernetes API operation against the TaskAction CRD and records
// its latency under taskaction.k8s.duration{op,error}. It is a transparent pass-through
// when metrics registration failed (m == nil), so callers can wrap unconditionally.
func (m *taskActionMetrics) timeK8sOp(ctx context.Context, op string, fn func() error) error {
if m == nil || m.crdOpDuration == nil {
return fn()
}
start := time.Now()
err := fn()
m.crdOpDuration.Record(ctx, float64(time.Since(start).Microseconds())/1000.0,
metric.WithAttributes(attribute.String("op", op), attribute.Bool("error", err != nil)))
return err
}
Loading
Loading