Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
541d95e
make cancelation great again
semd Nov 12, 2025
2f6b23e
Merge branch '14649/show_execution_cancelation_fix' into 14316-Single…
skynetigor Nov 12, 2025
d52d7cf
in case of single step execution display skeletons only for being exe…
skynetigor Nov 12, 2025
6c01c3c
Merge branch 'main' into 14316-Single-step-execution-displays-skeleto…
skynetigor Nov 12, 2025
a152298
Merge branch 'main' into 14316-Single-step-execution-displays-skeleto…
skynetigor Nov 12, 2025
da151ee
add unit-tests
skynetigor Nov 12, 2025
4cc40b8
Merge branch 'main' into 14316-Single-step-execution-displays-skeleto…
skynetigor Nov 12, 2025
cb6dc64
temp
skynetigor Nov 13, 2025
95222e0
Merge branch 'main' into 14709-Workflow-having-wait-step-with-long-du…
skynetigor Nov 13, 2025
dfbc29c
fixes
skynetigor Nov 13, 2025
93cca7b
fixes
skynetigor Nov 13, 2025
a73eeb8
temp
skynetigor Nov 14, 2025
3a1a25f
fix bugs
skynetigor Nov 14, 2025
c0892c8
fixes
skynetigor Nov 14, 2025
21ff0c3
fixes
skynetigor Nov 14, 2025
e68f7bf
fixes
skynetigor Nov 14, 2025
3f7a2d6
Merge branch 'main' into 14709-Workflow-having-wait-step-with-long-du…
skynetigor Nov 14, 2025
0868cc7
Update plugin.ts
skynetigor Nov 14, 2025
332b6d8
add abortable timeout
skynetigor Nov 14, 2025
69fa57d
fixes
skynetigor Nov 14, 2025
9b8fe2a
Update plugin.ts
skynetigor Nov 17, 2025
9b78019
fixes
skynetigor Nov 17, 2025
3f3813e
refactor
skynetigor Nov 17, 2025
0a9d1b4
Update run_node.ts
skynetigor Nov 17, 2025
28ef8d6
Update process_node_stack_monitoring.ts
skynetigor Nov 17, 2025
7d107fc
fixes
skynetigor Nov 17, 2025
44b95f1
Update plugin.ts
skynetigor Nov 17, 2025
f25ea22
refactor
skynetigor Nov 17, 2025
3ded572
fix tests for wait step
skynetigor Nov 17, 2025
15c5f28
fix tests
skynetigor Nov 17, 2025
ca8fcc5
Update workflow_execution_loop.ts
skynetigor Nov 17, 2025
33572a8
Delete workflow_task.ts
skynetigor Nov 17, 2025
7732d7b
Merge branch 'main' into 14709-Workflow-having-wait-step-with-long-du…
skynetigor Nov 17, 2025
3bc9da7
Merge branch 'main' into 14709-Workflow-having-wait-step-with-long-du…
skynetigor Nov 17, 2025
6b1276b
Update workflow_task_manager.ts
skynetigor Nov 17, 2025
c1d40a9
fixes
skynetigor Nov 17, 2025
cc1bdeb
Merge branch '14709-Workflow-having-wait-step-with-long-duration-will…
skynetigor Nov 17, 2025
d361fed
add tests
skynetigor Nov 17, 2025
7fa94bf
Update workflow_task_manager.ts
skynetigor Nov 17, 2025
f038945
Update workflow_task_manager.ts
skynetigor Nov 17, 2025
229c7b9
Update workflow_task_manager.test.ts
skynetigor Nov 18, 2025
ee6c672
replace string error with object error
skynetigor Nov 18, 2025
a42a573
Update http_step_impl.ts
skynetigor Nov 18, 2025
1f01460
enrich http step errors
skynetigor Nov 18, 2025
956bd8e
fix tests
skynetigor Nov 18, 2025
07a13a7
fixes
skynetigor Nov 18, 2025
4d9513f
Merge branch 'main' into 14709-Workflow-having-wait-step-with-long-du…
skynetigor Nov 18, 2025
8c65d1e
fixes
skynetigor Nov 18, 2025
48b8f71
add continue condition support
skynetigor Nov 18, 2025
9dbdb96
add retry condition support
skynetigor Nov 18, 2025
0be6b29
Merge branch '14709-Workflow-having-wait-step-with-long-duration-will…
skynetigor Nov 18, 2025
9ac3ef3
add condition support for retry
skynetigor Nov 18, 2025
9841753
Merge branch 'main' into 14737-HTTP-Step-Error-Handling-Lacks-Structu…
skynetigor Nov 18, 2025
6991572
Update build_execution_graph.ts
skynetigor Nov 18, 2025
330d8ff
Update catch_error.ts
skynetigor Nov 18, 2025
101a05e
fixes
skynetigor Nov 19, 2025
b69d76d
fixe integ tests
skynetigor Nov 19, 2025
89d4b36
add test case to check that error is available from step context
skynetigor Nov 19, 2025
6c47430
add testcase for continue expression
skynetigor Nov 19, 2025
bc5e081
Update on_failure_retry.test.ts
skynetigor Nov 19, 2025
19c888f
Update on_failure_retry.test.ts
skynetigor Nov 19, 2025
d60d1e5
add evaluateBooleanExpressionInContext
skynetigor Nov 19, 2025
e7f5d73
Merge branch 'elastic:main' into 14737-HTTP-Step-Error-Handling-Lacks…
skynetigor Nov 19, 2025
b036c6e
add error to global context for steps inside fallback scope
skynetigor Nov 19, 2025
be95e2d
Merge branch '14737-HTTP-Step-Error-Handling-Lacks-Structured-Error-D…
skynetigor Nov 20, 2025
b9b8d6e
fixes
skynetigor Nov 20, 2025
3e1e79e
fixes
skynetigor Nov 20, 2025
4b961db
Merge branch 'main' into 14737-HTTP-Step-Error-Handling-Lacks-Structu…
skynetigor Nov 20, 2025
3db7267
Changes from node scripts/eslint_all_files --no-cache --fix
kibanamachine Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,13 @@ function visitOnFailure(
graph = createFallback(stepId, graph, onFailureConfiguration.fallback, context);
}

if (onFailureConfiguration.continue) {
graph = createContinue(stepId, graph);
// Here we can statically determine that 'continue' is needed if "continue" is boolean.
// If condition is a string (expression), we need to evaluate it at runtime, so we always create the continue node.
if (
typeof onFailureConfiguration.continue === 'string' ||
onFailureConfiguration.continue === true
) {
graph = createContinue(stepId, onFailureConfiguration.continue, graph);
}

context.stack.pop();
Expand Down Expand Up @@ -470,7 +475,11 @@ function handleWorkflowLevelOnFailure(
return result;
}

function createContinue(stepId: string, innerGraph: WorkflowGraphType): WorkflowGraphType {
function createContinue(
stepId: string,
condition: string | boolean,
innerGraph: WorkflowGraphType
): WorkflowGraphType {
const graph = createTypedGraph({ directed: true });
const enterContinueNodeId = `enterContinue_${stepId}`;
const exitNodeId = `exitContinue_${stepId}`;
Expand All @@ -480,6 +489,9 @@ function createContinue(stepId: string, innerGraph: WorkflowGraphType): Workflow
stepId,
stepType: 'continue',
exitNodeId,
configuration: {
condition,
},
};
const exitContinueNode: ExitContinueNode = {
type: 'exit-continue',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import { WorkflowRetrySchema } from '../../../spec/schema';
export const EnterContinueNodeSchema = GraphNodeSchema.extend({
id: z.string(),
type: z.literal('enter-continue'),

configuration: z.object({
condition: z.union([z.string(), z.boolean()]),
}),
exitNodeId: z.string(),
});
export type EnterContinueNode = z.infer<typeof EnterContinueNodeSchema>;
Expand Down
10 changes: 9 additions & 1 deletion src/platform/packages/shared/kbn-workflows/spec/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export const RetryPolicySchema = z.object({

export const WorkflowRetrySchema = z.object({
'max-attempts': z.number().min(1),
condition: z.string().optional(), // e.g., "${{error.type == 'NetworkError'}}" (default: always retry)
delay: z
.string()
.regex(/^\d+(ms|[smhdw])$/, 'Invalid duration format')
Expand All @@ -37,7 +38,7 @@ export type BaseStep = z.infer<typeof BaseStepSchema>;
export const WorkflowOnFailureSchema = z.object({
retry: WorkflowRetrySchema.optional(),
fallback: z.array(BaseStepSchema).min(1).optional(),
continue: z.boolean().optional(),
continue: z.union([z.boolean(), z.string()]).optional(),
});

export type WorkflowOnFailure = z.infer<typeof WorkflowOnFailureSchema>;
Expand Down Expand Up @@ -589,3 +590,10 @@ export const DynamicStepContextSchema = DynamicWorkflowContextSchema.extend({
steps: z.object({}),
});
export type DynamicStepContext = z.infer<typeof DynamicStepContextSchema>;

export const ExecutionErrorSchema = z.object({
type: z.string(),
message: z.string(),
details: z.any().optional(),
});
export type ExecutionError = z.infer<typeof ExecutionErrorSchema>;
6 changes: 3 additions & 3 deletions src/platform/packages/shared/kbn-workflows/types/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import type { JsonValue } from '@kbn/utility-types';
import { z } from '@kbn/zod';
import type { WorkflowYaml } from '../spec/schema';
import type { ExecutionError, WorkflowYaml } from '../spec/schema';
import { WorkflowSchema } from '../spec/schema';

export enum ExecutionStatus {
Expand Down Expand Up @@ -74,7 +74,7 @@ export interface EsWorkflowExecution {
stepId?: string;
scopeStack: StackFrame[];
createdAt: string;
error: string | null;
error: ExecutionError | null;
createdBy: string;
startedAt: string;
finishedAt: string;
Expand Down Expand Up @@ -126,7 +126,7 @@ export interface EsWorkflowStepExecution {
* There might be several instances of the same stepId if it's inside loops, retries, etc.
*/
stepExecutionIndex: number;
error?: string | null;
error?: ExecutionError;
output?: JsonValue;
input?: JsonValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,15 @@ describe('workflow with continue on failure', () => {
});

describe.each(['step level', 'workflow level'])('continue is on %s', (testCase) => {
let buildYamlFn: () => string;

beforeAll(() => {
function buildYamlFn(continueExpression: boolean | string | undefined): string {
if (testCase === 'step level') {
buildYamlFn = () => {
return `
return `
steps:
- name: constantlyFailingStep
type: ${FakeConnectors.constantlyFailing.actionTypeId}
connector-id: ${FakeConnectors.constantlyFailing.name}
on-failure:
continue: true
${continueExpression ? 'on-failure:' : ''}
${continueExpression ? `continue: ${continueExpression}` : ''}
with:
message: 'Hi there! Are you alive?'

Expand All @@ -40,13 +37,11 @@ steps:
with:
message: 'Final message!'
`;
};
} else if (testCase === 'workflow level') {
buildYamlFn = () => {
return `
}
return `
settings:
on-failure:
continue: true
${continueExpression ? 'on-failure:' : ''}
${continueExpression ? `continue: ${continueExpression}` : ''}
steps:
- name: constantlyFailingStep
type: ${FakeConnectors.constantlyFailing.actionTypeId}
Expand All @@ -60,122 +55,122 @@ steps:
with:
message: 'Final message!'
`;
};
}
});
}

describe.each([true, '${{error.type == "Error"}}'])(
'when continue is %s',
(continueTestCase) => {
beforeEach(async () => {
jest.clearAllMocks();
await workflowRunFixture.runWorkflow({
workflowYaml: buildYamlFn(continueTestCase),
});
});

describe('when continue is true', () => {
beforeEach(async () => {
jest.clearAllMocks();
await workflowRunFixture.runWorkflow({
workflowYaml: buildYamlFn(),
it('should successfully complete workflow despite error in constantlyFailingStep', async () => {
const workflowExecutionDoc =
workflowRunFixture.workflowExecutionRepositoryMock.workflowExecutions.get(
'fake_workflow_execution_id'
);
expect(workflowExecutionDoc?.status).toBe(ExecutionStatus.COMPLETED);
expect(workflowExecutionDoc?.error).toBe(undefined);
expect(workflowExecutionDoc?.scopeStack).toEqual([]);
});
});

it('should successfully complete workflow despite error in constantlyFailingStep', async () => {
const workflowExecutionDoc =
workflowRunFixture.workflowExecutionRepositoryMock.workflowExecutions.get(
'fake_workflow_execution_id'
it('should execute constantlyFailingStep once and record its failure', async () => {
const failingStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter(
(se) =>
se.stepId === 'constantlyFailingStep' &&
se.stepType === FakeConnectors.constantlyFailing.actionTypeId
);
expect(workflowExecutionDoc?.status).toBe(ExecutionStatus.COMPLETED);
expect(workflowExecutionDoc?.error).toBe(undefined);
expect(workflowExecutionDoc?.scopeStack).toEqual([]);
});

it('should execute constantlyFailingStep once and record its failure', async () => {
const failingStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter(
(se) =>
se.stepId === 'constantlyFailingStep' &&
se.stepType === FakeConnectors.constantlyFailing.actionTypeId
);
expect(failingStepExecutions.length).toBe(1);
expect(failingStepExecutions[0].status).toBe(ExecutionStatus.FAILED);
expect(failingStepExecutions[0].error).toBe('Error: Constantly failing connector');
});

it('should execute finalStep successfully after the failing step', async () => {
const finalStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter((se) => se.stepId === 'finalStep');
expect(finalStepExecutions.length).toBe(1);
expect(finalStepExecutions[0].status).toBe(ExecutionStatus.COMPLETED);
expect(finalStepExecutions[0].error).toBe(undefined);
});

it('should execute finalStep after constantlyFailingStep', async () => {
const failingStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter(
(se) =>
se.stepId === 'constantlyFailingStep' &&
se.stepType === FakeConnectors.constantlyFailing.actionTypeId
);
const finalStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter((se) => se.stepId === 'finalStep');

expect(failingStepExecutions.length).toBe(1);
expect(finalStepExecutions.length).toBe(1);

const failingStepCompletedAt = new Date(failingStepExecutions[0].completedAt!).getTime();
const finalStepStartedAt = new Date(finalStepExecutions[0].startedAt).getTime();

expect(finalStepStartedAt).toBeGreaterThanOrEqual(failingStepCompletedAt);
});
});
});
expect(failingStepExecutions.length).toBe(1);
expect(failingStepExecutions[0].status).toBe(ExecutionStatus.FAILED);
expect(failingStepExecutions[0].error).toEqual({
message: 'Error: Constantly failing connector',
type: 'Error',
});
});

describe('when continue is false (default behavior)', () => {
beforeEach(async () => {
jest.clearAllMocks();
await workflowRunFixture.runWorkflow({
workflowYaml: `
steps:
- name: constantlyFailingStep
type: ${FakeConnectors.constantlyFailing.actionTypeId}
connector-id: ${FakeConnectors.constantlyFailing.name}
with:
message: 'Hi there! Are you alive?'
it('should execute finalStep successfully after the failing step', async () => {
const finalStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter((se) => se.stepId === 'finalStep');
expect(finalStepExecutions.length).toBe(1);
expect(finalStepExecutions[0].status).toBe(ExecutionStatus.COMPLETED);
expect(finalStepExecutions[0].error).toBe(undefined);
});

- name: finalStep
type: slack
connector-id: ${FakeConnectors.slack2.name}
with:
message: 'Final message!'
`,
});
});

it('should fail the workflow due to error in constantlyFailingStep', async () => {
const workflowExecutionDoc =
workflowRunFixture.workflowExecutionRepositoryMock.workflowExecutions.get(
'fake_workflow_execution_id'
);
expect(workflowExecutionDoc?.status).toBe(ExecutionStatus.FAILED);
expect(workflowExecutionDoc?.error).toBe('Error: Constantly failing connector');
expect(workflowExecutionDoc?.scopeStack).toEqual([]);
});

it('should execute constantlyFailingStep once and record its failure', async () => {
const failingStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter(
(se) =>
se.stepId === 'constantlyFailingStep' &&
se.stepType === FakeConnectors.constantlyFailing.actionTypeId
);
expect(failingStepExecutions.length).toBe(1);
expect(failingStepExecutions[0].status).toBe(ExecutionStatus.FAILED);
expect(failingStepExecutions[0].error).toBe('Error: Constantly failing connector');
});

it('should not execute finalStep', async () => {
const finalStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter((se) => se.stepId === 'finalStep');
expect(finalStepExecutions.length).toBe(0);
});
it('should execute finalStep after constantlyFailingStep', async () => {
const failingStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter(
(se) =>
se.stepId === 'constantlyFailingStep' &&
se.stepType === FakeConnectors.constantlyFailing.actionTypeId
);
const finalStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter((se) => se.stepId === 'finalStep');

expect(failingStepExecutions.length).toBe(1);
expect(finalStepExecutions.length).toBe(1);

const failingStepCompletedAt = new Date(failingStepExecutions[0].completedAt!).getTime();
const finalStepStartedAt = new Date(finalStepExecutions[0].startedAt).getTime();

expect(finalStepStartedAt).toBeGreaterThanOrEqual(failingStepCompletedAt);
});
}
);

describe.each([undefined, false, '${{error.type == "SomeOtherError"}}'])(
'when continue is "%s"',
(continueTestCase) => {
beforeEach(async () => {
jest.clearAllMocks();
await workflowRunFixture.runWorkflow({
workflowYaml: buildYamlFn(continueTestCase),
});
});

it('should fail the workflow due to error in constantlyFailingStep', async () => {
const workflowExecutionDoc =
workflowRunFixture.workflowExecutionRepositoryMock.workflowExecutions.get(
'fake_workflow_execution_id'
);
expect(workflowExecutionDoc?.status).toBe(ExecutionStatus.FAILED);
expect(workflowExecutionDoc?.error).toEqual({
message: 'Error: Constantly failing connector',
type: 'Error',
});
expect(workflowExecutionDoc?.scopeStack).toEqual([]);
});

it('should execute constantlyFailingStep once and record its failure', async () => {
const failingStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter(
(se) =>
se.stepId === 'constantlyFailingStep' &&
se.stepType === FakeConnectors.constantlyFailing.actionTypeId
);
expect(failingStepExecutions.length).toBe(1);
expect(failingStepExecutions[0].status).toBe(ExecutionStatus.FAILED);
expect(failingStepExecutions[0].error).toEqual({
message: 'Error: Constantly failing connector',
type: 'Error',
});
});

it('should not execute finalStep', async () => {
const finalStepExecutions = Array.from(
workflowRunFixture.stepExecutionRepositoryMock.stepExecutions.values()
).filter((se) => se.stepId === 'finalStep');
expect(finalStepExecutions.length).toBe(0);
});
}
);
});
});
Loading