Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ const (
)

type metrics struct {
pluginPanics labeled.Counter
unsupportedTaskType labeled.Counter
pluginExecutionLatency labeled.StopWatch
pluginQueueLatency labeled.StopWatch
pluginPanics labeled.Counter
unsupportedTaskType labeled.Counter
pluginExecutionLatency labeled.StopWatch
pluginQueueLatency labeled.StopWatch
pluginInitializeLatency labeled.StopWatch

// TODO We should have a metric to capture custom state size
scope promutils.Scope
Expand Down Expand Up @@ -553,6 +554,14 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
}
}

// Emit the initializing latency if the task has just transitioned from Initializing to Running.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is new. (I want to measure progress regarding startup times, i.e. image pulls.)

if ts.PluginPhase == pluginCore.PhaseInitializing &&
pluginTrns.pInfo.Phase() == pluginCore.PhaseRunning {
if !ts.LastPhaseUpdatedAt.IsZero() {
t.metrics.pluginInitializeLatency.Observe(ctx, ts.LastPhaseUpdatedAt, time.Now())
}
}

if pluginTrns.pInfo.Phase() == ts.PluginPhase {
if pluginTrns.pInfo.Version() == ts.PluginPhaseVersion {
logger.Debugf(ctx, "p+Version previously seen .. no event will be sent")
Expand Down Expand Up @@ -800,12 +809,20 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex
}

// STEP 6: Persist the plugin state
// Only refresh LastPhaseUpdatedAt when the phase itself changes; intra-phase
// version bumps (e.g. emitted by MaybeUpdatePhaseVersion when a plugin's
// reason string changes) must not reset it, otherwise duration metrics like
// plugin_queue_latency lose their start anchor and undercount.
lastPhaseUpdatedAt := ts.LastPhaseUpdatedAt
if ts.PluginPhase != pluginTrns.pInfo.Phase() {
lastPhaseUpdatedAt = time.Now()
}
err = nCtx.NodeStateWriter().PutTaskNodeState(handler.TaskNodeState{
PluginState: pluginTrns.pluginState,
PluginStateVersion: pluginTrns.pluginStateVersion,
PluginPhase: pluginTrns.pInfo.Phase(),
PluginPhaseVersion: pluginTrns.pInfo.Version(),
LastPhaseUpdatedAt: time.Now(),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We called this not only when transitioning from one phase to another but also within a single phase when the phase version was increased. This lead to wrong queue latency measurements.

LastPhaseUpdatedAt: lastPhaseUpdatedAt,
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
CleanupOnFailure: ts.CleanupOnFailure || pluginTrns.pInfo.CleanupOnFailure(),
})
Expand Down Expand Up @@ -1060,11 +1077,12 @@ func New(ctx context.Context, kubeClient executors.Client, kubeClientset kuberne
pluginsForType: make(map[pluginCore.TaskType]map[pluginID]pluginCore.Plugin),
taskMetricsMap: make(map[MetricKey]*taskMetrics),
metrics: &metrics{
pluginPanics: labeled.NewCounter("plugin_panic", "Task plugin panicked when trying to execute a Handler.", scope),
unsupportedTaskType: labeled.NewCounter("unsupported_tasktype", "No Handler plugin configured for Handler type", scope),
pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latency", "Time taken to invoke plugin for one round", time.Microsecond, scope),
pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latency", "Time spent by plugin in queued phase", time.Microsecond, scope),
scope: scope,
pluginPanics: labeled.NewCounter("plugin_panic", "Task plugin panicked when trying to execute a Handler.", scope),
unsupportedTaskType: labeled.NewCounter("unsupported_tasktype", "No Handler plugin configured for Handler type", scope),
pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latency", "Time taken to invoke plugin for one round", time.Microsecond, scope),
pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latency", "Time spent by plugin in queued phase", time.Microsecond, scope),
pluginInitializeLatency: labeled.NewStopWatch("plugin_initialize_latency", "Time spent by plugin in initializing phase", time.Microsecond, scope),
scope: scope,
},
pluginScope: scope.NewSubScope("plugin"),
kubeClient: kubeClient,
Expand Down
262 changes: 262 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -1313,3 +1314,264 @@ func Test_task_Handle_ValidateOutputErr(t *testing.T) {
assert.NoError(t, err)
assert.False(t, result.IsRecoverable)
}

// Test_task_Handle_LastPhaseUpdatedAt verifies that the persisted
// LastPhaseUpdatedAt is only refreshed when the plugin's reported phase
// actually changes. Intra-phase version bumps (which MaybeUpdatePhaseVersion
// produces when a plugin's reason string changes — e.g. as a Pending pod
// progresses through scheduler reasons during autoscaling) must preserve
// the timestamp.
//
// The plugin_queue_latency and plugin_initialize_latency metrics anchor
// off this timestamp; if version bumps reset it, both metrics undercount
// and only capture the final segment before exiting the phase.
func Test_task_Handle_LastPhaseUpdatedAt(t *testing.T) {
inputs := &core.LiteralMap{
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boilerplate is mostly taken from existing tests above.

Literals: map[string]*core.Literal{
"foo": coreutils.MustMakeLiteral("bar"),
},
}

createNodeContext := func(
startingPhase pluginCore.Phase,
startingVersion uint32,
startingLastPhaseUpdatedAt time.Time,
pluginResp fakeplugins.NextPhaseState,
recorder interfaces.EventRecorder,
s *taskNodeStateHolder,
) *nodeMocks.NodeExecutionContext {
wfExecID := &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
nodeID := "n1"

nm := &nodeMocks.NodeExecutionMetadata{}
nm.EXPECT().GetAnnotations().Return(map[string]string{})
nm.EXPECT().GetNodeExecutionID().Return(&core.NodeExecutionIdentifier{
NodeId: nodeID,
ExecutionId: wfExecID,
})
nm.EXPECT().GetK8sServiceAccount().Return("service-account")
nm.EXPECT().GetLabels().Return(map[string]string{})
nm.EXPECT().GetNamespace().Return("namespace")
nm.EXPECT().GetOwnerID().Return(types.NamespacedName{Namespace: "namespace", Name: "name"})
nm.EXPECT().GetOwnerReference().Return(v12.OwnerReference{
Kind: "sample",
Name: "name",
})
nm.EXPECT().IsInterruptible().Return(false)

tk := &core.TaskTemplate{
Id: &core.Identifier{ResourceType: core.ResourceType_TASK, Project: "proj", Domain: "dom", Version: "ver"},
Type: "test",
Metadata: &core.TaskMetadata{
Discoverable: false,
},
Interface: &core.TypedInterface{
Inputs: &core.VariableMap{
Variables: map[string]*core.Variable{
"foo": {
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_STRING,
},
},
},
},
},
Outputs: &core.VariableMap{
Variables: map[string]*core.Variable{
"x": {
Type: &core.LiteralType{
Type: &core.LiteralType_Simple{
Simple: core.SimpleType_BOOLEAN,
},
},
},
},
},
},
}
taskID := &core.Identifier{}
tr := &nodeMocks.TaskReader{}
tr.EXPECT().GetTaskID().Return(taskID)
tr.EXPECT().GetTaskType().Return("test")
tr.EXPECT().Read(mock.Anything).Return(tk, nil)

ns := &flyteMocks.ExecutableNodeStatus{}
ns.EXPECT().GetDataDir().Return("data-dir")
ns.EXPECT().GetOutputDir().Return("data-dir")

res := &v1.ResourceRequirements{}
n := &flyteMocks.ExecutableNode{}
ma := 5
n.EXPECT().GetRetryStrategy().Return(&v1alpha1.RetryStrategy{MinAttempts: &ma})
n.EXPECT().GetResources().Return(res)

ir := &ioMocks.InputReader{}
ir.EXPECT().GetInputPath().Return("input")
ir.EXPECT().Get(mock.Anything).Return(inputs, nil)
nCtx := &nodeMocks.NodeExecutionContext{}
nCtx.EXPECT().NodeExecutionMetadata().Return(nm)
nCtx.EXPECT().Node().Return(n)
nCtx.EXPECT().InputReader().Return(ir)
ds, err := storage.NewDataStore(
&storage.Config{
Type: storage.TypeMemory,
},
promutils.NewTestScope(),
)
assert.NoError(t, err)
nCtx.EXPECT().DataStore().Return(ds)
nCtx.EXPECT().CurrentAttempt().Return(uint32(1))
nCtx.EXPECT().TaskReader().Return(tr)
nCtx.EXPECT().NodeStatus().Return(ns)
nCtx.EXPECT().NodeID().Return(nodeID)
nCtx.EXPECT().EventsRecorder().Return(recorder)
nCtx.EXPECT().EnqueueOwnerFunc().Return(nil)

nCtx.EXPECT().RawOutputPrefix().Return("s3://sandbox/")
nCtx.EXPECT().OutputShardSelector().Return(ioutils.NewConstantShardSelector([]string{"x"}))

executionContext := &mocks.ExecutionContext{}
executionContext.EXPECT().GetExecutionConfig().Return(v1alpha1.ExecutionConfig{})
executionContext.EXPECT().GetEventVersion().Return(v1alpha1.EventVersion0)
executionContext.EXPECT().GetParentInfo().Return(nil)
executionContext.EXPECT().IncrementParallelism().Return(1)
nCtx.EXPECT().ExecutionContext().Return(executionContext)

st := bytes.NewBuffer([]byte{})
cod := codex.GobStateCodec{}
assert.NoError(t, cod.Encode(pluginResp, st))
nr := &nodeMocks.NodeStateReader{}
nr.EXPECT().GetTaskNodeState().Return(handler.TaskNodeState{
PluginState: st.Bytes(),
PluginPhase: startingPhase,
PluginPhaseVersion: startingVersion,
LastPhaseUpdatedAt: startingLastPhaseUpdatedAt,
})
nCtx.EXPECT().NodeStateReader().Return(nr)
nCtx.EXPECT().NodeStateWriter().Return(s)
return nCtx
}

noopRm := CreateNoopResourceManager(context.TODO(), promutils.NewTestScope())
referenceTime := time.Now().Add(-time.Hour)

type args struct {
startingPhase pluginCore.Phase
startingVersion uint32
nextPhase pluginCore.Phase
nextVersion uint32
}

tests := []struct {
name string
args args
timestampPreserved bool
}{
{
name: "queued same phase, version bump preserves timestamp",
args: args{
startingPhase: pluginCore.PhaseQueued,
startingVersion: 0,
nextPhase: pluginCore.PhaseQueued,
nextVersion: 1,
},
timestampPreserved: true,
},
{
name: "queued -> initializing refreshes timestamp",
args: args{
startingPhase: pluginCore.PhaseQueued,
startingVersion: 0,
nextPhase: pluginCore.PhaseInitializing,
nextVersion: 0,
},
timestampPreserved: false,
},
{
name: "initializing same phase, version bump preserves timestamp",
args: args{
startingPhase: pluginCore.PhaseInitializing,
startingVersion: 0,
nextPhase: pluginCore.PhaseInitializing,
nextVersion: 1,
},
timestampPreserved: true,
},
{
name: "initializing -> running refreshes timestamp",
args: args{
startingPhase: pluginCore.PhaseInitializing,
startingVersion: 0,
nextPhase: pluginCore.PhaseRunning,
nextVersion: 0,
},
timestampPreserved: false,
},
{
name: "queued -> running refreshes timestamp",
args: args{
startingPhase: pluginCore.PhaseQueued,
startingVersion: 0,
nextPhase: pluginCore.PhaseRunning,
nextVersion: 0,
},
timestampPreserved: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scope := promutils.NewTestScope()
state := &taskNodeStateHolder{}
ev := &fakeBufferedEventRecorder{}
nCtx := createNodeContext(
tt.args.startingPhase,
tt.args.startingVersion,
referenceTime,
fakeplugins.NextPhaseState{
Phase: tt.args.nextPhase,
PhaseVersion: tt.args.nextVersion,
TaskInfo: &pluginCore.TaskInfo{},
},
ev,
state,
)
tk := Handler{
cfg: &config.Config{MaxErrorMessageLength: 100, MaxPluginPhaseVersions: 100000},
defaultPlugins: map[pluginCore.TaskType]pluginCore.Plugin{
"test": fakeplugins.NewPhaseBasedPlugin(),
},
pluginScope: scope,
catalog: &pluginCatalogMocks.Client{},
resourceManager: noopRm,
taskMetricsMap: make(map[MetricKey]*taskMetrics),
metrics: &metrics{
pluginPanics: labeled.NewCounter("plugin_panic", "x", scope),
unsupportedTaskType: labeled.NewCounter("unsupported_tasktype", "x", scope),
pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latency", "x", time.Microsecond, scope),
pluginQueueLatency: labeled.NewStopWatch("plugin_queue_latency", "x", time.Microsecond, scope),
pluginInitializeLatency: labeled.NewStopWatch("plugin_initialize_latency", "x", time.Microsecond, scope),
scope: scope,
},
eventConfig: eventConfig,
agentService: &agent.AgentService{},
}
_, err := tk.Handle(context.TODO(), nCtx)
assert.NoError(t, err)

if tt.timestampPreserved {
assert.Equal(t, referenceTime, state.s.LastPhaseUpdatedAt,
"LastPhaseUpdatedAt must be preserved across intra-phase version bumps")
} else {
assert.True(t, state.s.LastPhaseUpdatedAt.After(referenceTime),
"LastPhaseUpdatedAt must refresh on phase change (got %v, expected after %v)",
state.s.LastPhaseUpdatedAt, referenceTime)
}
})
}
}
Loading
Loading