Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F
ctx = contextutils.WithResourceVersion(ctx, mutableW.GetResourceVersion())

maxRetries := uint32(p.cfg.MaxWorkflowRetries) // #nosec G115
if !mutableW.GetDeletionTimestamp().IsZero() || mutableW.Status.FailedAttempts > maxRetries {
if !mutableW.GetDeletionTimestamp().IsZero() ||
(mutableW.Status.FailedAttempts > maxRetries &&
mutableW.GetExecutionStatus().GetPhase() != v1alpha1.WorkflowPhaseHandlingFailureNode) {
var err error
func() {
defer func() {
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (d dynamicNodeTaskNodeHandler) produceDynamicWorkflow(ctx context.Context,
if stdErrors.IsCausedBy(err, utils.ErrorCodeUser) {
return handler.DoTransition(handler.TransitionTypeEphemeral,
handler.PhaseInfoFailure(core.ExecutionError_USER, "DynamicWorkflowBuildFailed", err.Error(), nil),
), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil
), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error(), IsFailurePermanent: true}, nil
}
return handler.Transition{}, handler.DynamicNodeState{}, err
}
Expand All @@ -125,7 +125,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n
if stdErrors.IsCausedBy(err, utils.ErrorCodeUser) {
return handler.DoTransition(handler.TransitionTypeEphemeral,
handler.PhaseInfoFailure(core.ExecutionError_USER, "DynamicWorkflowBuildFailed", err.Error(), nil),
), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil
), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error(), IsFailurePermanent: true}, nil
}
// Mostly a system error or unknown
return handler.Transition{}, handler.DynamicNodeState{}, err
Expand Down
29 changes: 20 additions & 9 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,17 +499,28 @@ func (c *workflowExecutor) HandleAbortedWorkflow(ctx context.Context, w *v1alpha
}

var status Status
if err != nil {
// This workflow failed, record that phase and corresponding error message.
status = StatusFailed(&core.ExecutionError{
Code: "Workflow abort failed",
if w.GetDeletionTimestamp() != nil {
// Explicit deletion: abort without running failure node.
if err != nil {
status = StatusFailed(&core.ExecutionError{
Code: "Workflow abort failed",
Message: err.Error(),
Kind: core.ExecutionError_SYSTEM,
})
} else {
status = Status{TransitionToPhase: v1alpha1.WorkflowPhaseAborted}
}
} else {
// Max system retries exhausted: give on_failure a chance to run.
execErr := &core.ExecutionError{
Code: errors.RuntimeExecutionError.String(),
Message: err.Error(),
Kind: core.ExecutionError_SYSTEM,
})
} else {
// Otherwise, this workflow is aborted.
status = Status{
TransitionToPhase: v1alpha1.WorkflowPhaseAborted,
}
if failureNode := w.GetOnFailureNode(); failureNode != nil {
status = StatusFailureNode(execErr)
} else {
status = StatusFailed(execErr)
}
}
if err := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, w.GetExecutionStatus(), status); err != nil {
Expand Down
Loading