From b12b82be667cdd10288c9cd47360876add9d80d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Fri, 13 Mar 2026 10:55:01 +0800 Subject: [PATCH 1/6] add workflow timeout --- .../dolphinscheduler/common/enums/AlertType.java | 2 +- .../org/apache/dolphinscheduler/dao/AlertDao.java | 15 ++++++++++++--- .../event/TaskTimeoutLifecycleEvent.java | 2 +- .../lifecycle/WorkflowLifecycleEventType.java | 13 +++++++++++++ .../WorkflowStartLifecycleEventHandler.java | 15 +++++++++++++++ .../service/alert/WorkflowAlertManager.java | 6 ++++++ 6 files changed, 48 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java index f7a5ba0d3906..058afcb3fc71 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java @@ -29,7 +29,7 @@ public enum AlertType { /** * 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning, - * 5 task failure, 6 task success, 7 task timeout, 8 close alert + * 5 task failure, 6 task success, 7 task timeout */ WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"), WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"), diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java index 708c8ad6b8a0..3b1c45816ae1 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java @@ -195,6 +195,13 @@ public void sendServerStoppedAlert(String host, String serverType) { * @param projectUser projectUser */ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) { + if (projectUser == null) { + throw new IllegalArgumentException("projectUser must not be null"); + } + if (workflowInstance.getWarningGroupId() == null) { + throw new IllegalArgumentException("warningGroupId of the workflow instance must not be null"); + } + int alertGroupId = workflowInstance.getWarningGroupId(); Alert alert = new Alert(); List workflowAlertContentList = new ArrayList<>(1); @@ -220,10 +227,11 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode()); alert.setWorkflowInstanceId(workflowInstance.getId()); alert.setAlertType(AlertType.WORKFLOW_INSTANCE_TIMEOUT); - saveTaskTimeoutAlert(alert, content, alertGroupId); + + saveTimeoutAlert(alert, content, alertGroupId); } - private void saveTaskTimeoutAlert(Alert alert, String content, int alertGroupId) { + private void saveTimeoutAlert(Alert alert, String content, int alertGroupId) { alert.setAlertGroupId(alertGroupId); alert.setWarningType(WarningType.FAILURE); alert.setContent(content); @@ -275,7 +283,8 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode()); alert.setWorkflowInstanceId(workflowInstance.getId()); alert.setAlertType(AlertType.TASK_TIMEOUT); - saveTaskTimeoutAlert(alert, content, workflowInstance.getWarningGroupId()); + + saveTimeoutAlert(alert, content, workflowInstance.getWarningGroupId()); } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java index 5ce6d109a62e..29905b71ea81 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskTimeoutLifecycleEvent.java @@ -51,7 +51,7 @@ public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExec final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance(); checkState(timeoutStrategy != null, "The task timeoutStrategy must not be null"); - checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0 minutes", timeoutInMinutes); + checkState(timeoutInMinutes > 0, "The task timeout: %s must > 0 minutes", timeoutInMinutes); long delayTime = System.currentTimeMillis() - taskInstance.getSubmitTime().getTime() + TimeUnit.MINUTES.toMillis(timeoutInMinutes); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java index 95070d62296e..87e40e1d3f97 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java @@ -25,34 +25,47 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType { * Start the workflow instance */ START, + /** * Notify the workflow instance there exist a task has been finished, and should do DAG topology logic transaction. */ TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH, + + /** + * Do Timeout strategy of the workflow instance. + */ + TIMEOUT, + /** * Pause the workflow instance */ PAUSE, + /** * The workflow instance has been paused */ PAUSED, + /** * Stop the workflow instance */ STOP, + /** * The workflow instance has been stopped */ STOPPED, + /** * The workflow instance has been success */ SUCCEED, + /** * The workflow instance has been failed */ FAILED, + /** * Finalize the workflow instance. */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java index 2a3aba95e31a..0381f5a03ee6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; @@ -38,6 +40,7 @@ public void handle(final IWorkflowStateAction workflowStateAction, final IWorkflowExecutionRunnable workflowExecutionRunnable, final WorkflowStartLifecycleEvent workflowStartEvent) { + workflowTimeoutMonitor(workflowExecutionRunnable); workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent); } @@ -45,4 +48,16 @@ public void handle(final IWorkflowStateAction workflowStateAction, public ILifecycleEventType matchEventType() { return WorkflowLifecycleEventType.START; } + + private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + if (workflowInstance.getTimeout() <= 0) { + log.debug("The workflow {} timeout {} is invalided, so the timeout monitor will not be started.", + workflowInstance.getName(), + workflowInstance.getTimeout()); + return; + } + workflowExecutionRunnable.getWorkflowEventBus() + .publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable)); + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java index bbe3ec3b07b7..dcb5307c8b7e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/WorkflowAlertManager.java @@ -264,4 +264,10 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser); } + + public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance) { + ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId()); + alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser); + } + } From 6c304669ef7ad3f0573e53daf1a4346eec39f533 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Fri, 13 Mar 2026 14:46:10 +0800 Subject: [PATCH 2/6] add workflow timeout event and handle --- .../event/WorkflowTimeoutLifecycleEvent.java | 69 +++++++++++++++++ .../WorkflowTimeoutLifecycleEventHandler.java | 76 +++++++++++++++++++ 2 files changed, 145 insertions(+) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java new file mode 100644 index 000000000000..acbb69da13b5 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; + +import java.util.concurrent.TimeUnit; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent { + + private IWorkflowExecutionRunnable workflowExecutionRunnable; + + protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable, + final long timeout) { + super(timeout); + this.workflowExecutionRunnable = workflowExecutionRunnable; + } + + public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + checkState(workflowInstance != null, "The workflow instance must be initialized before retrying."); + + final int timeout = workflowInstance.getTimeout(); + checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes", timeout); + + long delayTime = System.currentTimeMillis() - workflowInstance.getStartTime().getTime() + + TimeUnit.MINUTES.toMillis(timeout); + return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime); + } + + @Override + public ILifecycleEventType getEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } + + @Override + public String toString() { + return "WorkflowTimeoutLifecycleEvent{" + + "workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() + + '}'; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java new file mode 100644 index 000000000000..34e405125af3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowTimeoutLifecycleEventHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler; + +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType; +import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent; +import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction; +import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowTimeoutLifecycleEventHandler + extends + AbstractWorkflowLifecycleEventHandler { + + private final WorkflowAlertManager workflowAlertManager; + + public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) { + this.workflowAlertManager = workflowAlertManager; + } + + @Override + public void handle(final IWorkflowStateAction workflowStateAction, + final IWorkflowExecutionRunnable workflowExecutionRunnable, + final WorkflowTimeoutLifecycleEvent workflowTimeoutLifecycleEvent) { + final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph(); + if (workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) { + // all the TaskExecutionRunnable chain in the graph is finish, means the workflow is already finished. + return; + } + + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + final boolean shouldSendAlert = workflowInstance.getWarningGroupId() != null; + + if (shouldSendAlert) { + doWorkflowTimeoutAlert(workflowExecutionRunnable); + } else { + log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.", + workflowInstance.getName()); + } + + } + + @Override + public ILifecycleEventType matchEventType() { + return WorkflowLifecycleEventType.TIMEOUT; + } + + private void doWorkflowTimeoutAlert(final IWorkflowExecutionRunnable workflowExecutionRunnable) { + final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); + workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance); + } +} From d31aa6ef252e593abb6502f15c5858ef0d13b688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Fri, 20 Mar 2026 11:36:56 +0800 Subject: [PATCH 3/6] use restartTime --- .../workflow/lifecycle/WorkflowLifecycleEventType.java | 9 --------- .../lifecycle/event/WorkflowTimeoutLifecycleEvent.java | 4 ++-- .../handler/WorkflowStartLifecycleEventHandler.java | 2 +- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java index 87e40e1d3f97..aa16f8405b10 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/WorkflowLifecycleEventType.java @@ -25,47 +25,38 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType { * Start the workflow instance */ START, - /** * Notify the workflow instance there exist a task has been finished, and should do DAG topology logic transaction. */ TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH, - /** * Do Timeout strategy of the workflow instance. */ TIMEOUT, - /** * Pause the workflow instance */ PAUSE, - /** * The workflow instance has been paused */ PAUSED, - /** * Stop the workflow instance */ STOP, - /** * The workflow instance has been stopped */ STOPPED, - /** * The workflow instance has been success */ SUCCEED, - /** * The workflow instance has been failed */ FAILED, - /** * Finalize the workflow instance. */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java index acbb69da13b5..e98406e554f2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -45,12 +45,12 @@ protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflo public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable workflowExecutionRunnable) { final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); - checkState(workflowInstance != null, "The workflow instance must be initialized before retrying."); + checkState(workflowInstance != null, "The workflow instance must be initialized."); final int timeout = workflowInstance.getTimeout(); checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes", timeout); - long delayTime = System.currentTimeMillis() - workflowInstance.getStartTime().getTime() + long delayTime = System.currentTimeMillis() - workflowInstance.getRestartTime().getTime() + TimeUnit.MINUTES.toMillis(timeout); return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java index 0381f5a03ee6..ce78aa30de9b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/handler/WorkflowStartLifecycleEventHandler.java @@ -52,7 +52,7 @@ public ILifecycleEventType matchEventType() { private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) { final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); if (workflowInstance.getTimeout() <= 0) { - log.debug("The workflow {} timeout {} is invalided, so the timeout monitor will not be started.", + log.debug("The workflow {} timeout {} is not configured or invalid, skip timeout monitor.", workflowInstance.getName(), workflowInstance.getTimeout()); return; From 1a18b36d260f43d71ea0d43b0ae9f5ac8099a445 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Fri, 20 Mar 2026 11:40:05 +0800 Subject: [PATCH 4/6] use restartTime --- .../workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java index e98406e554f2..a1427ab2c6e4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -45,7 +45,7 @@ protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflo public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable workflowExecutionRunnable) { final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); - checkState(workflowInstance != null, "The workflow instance must be initialized."); + checkState(workflowInstance != null, "The workflow instance must be initialized before creating workflow timeout event."); final int timeout = workflowInstance.getTimeout(); checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes", timeout); From e4f43dfe2222b6387c93b06c7256e848cbafb13e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Fri, 20 Mar 2026 15:59:36 +0800 Subject: [PATCH 5/6] add it test case for timeout --- .../event/WorkflowTimeoutLifecycleEvent.java | 3 +- .../server/master/integration/Repository.java | 14 +++++ .../cases/WorkflowStartTestCase.java | 41 ++++++++++++ .../workflow_with_workflow_timeout_alert.yaml | 63 +++++++++++++++++++ 4 files changed, 120 insertions(+), 1 deletion(-) create mode 100644 dolphinscheduler-master/src/test/resources/it/start/workflow_with_workflow_timeout_alert.yaml diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java index a1427ab2c6e4..dc97d5d34e71 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/lifecycle/event/WorkflowTimeoutLifecycleEvent.java @@ -45,7 +45,8 @@ protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflo public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable workflowExecutionRunnable) { final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance(); - checkState(workflowInstance != null, "The workflow instance must be initialized before creating workflow timeout event."); + checkState(workflowInstance != null, + "The workflow instance must be initialized before creating workflow timeout event."); final int timeout = workflowInstance.getTimeout(); checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes", timeout); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java index 15c31c0940da..4659fbfaaf3e 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/Repository.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.server.master.integration; +import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.mapper.AlertMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; @@ -39,6 +41,9 @@ public class Repository { @Autowired private TaskInstanceDao taskInstanceDao; + @Autowired + private AlertMapper alertMapper; + /** * Return the list of process instances for a given workflow definition in ascending order of their IDs. */ @@ -87,4 +92,13 @@ public List queryAllTaskInstance() { return taskInstanceDao.queryAll(); } + /** + * Return the list of alert for a given workflow instance in ascending order of their IDs. + */ + public List queryAlert(final Integer workflowInstanceId) { + return alertMapper.selectByWorkflowInstanceId(workflowInstanceId) + .stream() + .sorted(Comparator.comparingInt(Alert::getId)) + .collect(Collectors.toList()); + } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java index b19b27ebe8a2..22f0f24886c4 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java @@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.awaitility.Awaitility.await; +import org.apache.dolphinscheduler.common.enums.AlertType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.TaskDependType; @@ -1778,4 +1779,44 @@ public void testTaskRemainsSubmittedSuccess_with_noAvailableWorkerAndTimeoutDisa // masterContainer.assertAllResourceReleased(); } + @Test + @DisplayName("Test start a workflow when timeout should trigger alert when warningGroupId is set") + public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() { + final String yaml = "/it/start/workflow_with_workflow_timeout_alert.yaml"; + final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml); + final WorkflowDefinition workflow = context.getOneWorkflow(); + + final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() + .workflowDefinition(workflow) + .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .build(); + final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO); + + await().atMost(Duration.ofMinutes(2)) + .untilAsserted(() -> { + Assertions + .assertThat(repository.queryWorkflowInstance(workflowInstanceId)) + .matches( + workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS); + Assertions + .assertThat(repository.queryTaskInstance(workflow)) + .hasSize(1) + .anySatisfy(taskInstance -> { + assertThat(taskInstance.getName()).isEqualTo("long_running_task"); + assertThat(taskInstance.getWorkerGroup()).isEqualTo("default"); + assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS); + }); + Assertions + .assertThat(repository.queryAlert(workflowInstanceId)) + .hasSize(1) + .anySatisfy(alert -> { + assertThat(alert.getTitle()).isEqualTo("Workflow Timeout Warn"); + assertThat(alert.getProjectCode()).isEqualTo(1); + assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(1); + assertThat(alert.getAlertType()).isEqualTo(AlertType.WORKFLOW_INSTANCE_TIMEOUT); + }); + }); + + masterContainer.assertAllResourceReleased(); + } } diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_workflow_timeout_alert.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_workflow_timeout_alert.yaml new file mode 100644 index 000000000000..a2b1cd09d40f --- /dev/null +++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_workflow_timeout_alert.yaml @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +project: + name: MasterIntegrationTest + code: 1 + description: This is a fake project for timeout alert testing + userId: 1 + userName: admin + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + +workflows: + - name: workflow_with_timeout_alert + code: 1 + version: 1 + projectCode: 1 + description: This is a fake workflow with timeout alert configuration + releaseState: ONLINE + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + userId: 1 + executionType: PARALLEL + timeout: 1 + warningGroupId: 1 + +tasks: + - name: long_running_task + code: 1 + version: 1 + projectCode: 1 + userId: 1 + taskType: LogicFakeTask + taskParams: '{"localParams":[],"shellScript":"sleep 90","resourceList":[]}' + workerGroup: default + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 + taskExecuteType: BATCH + +taskRelations: + - projectCode: 1 + workflowDefinitionCode: 1 + workflowDefinitionVersion: 1 + preTaskCode: 0 + preTaskVersion: 0 + postTaskCode: 1 + postTaskVersion: 1 + createTime: 2024-08-12 00:00:00 + updateTime: 2024-08-12 00:00:00 \ No newline at end of file From 65e9b5ee5e5258c46ea12ccf2491af7e51def9a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Mon, 23 Mar 2026 14:33:20 +0800 Subject: [PATCH 6/6] add warningGroupId --- .../server/master/integration/WorkflowOperator.java | 8 ++++++++ .../master/integration/cases/WorkflowStartTestCase.java | 1 + 2 files changed, 9 insertions(+) diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java index aaac0b459deb..062df236d9e4 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/WorkflowOperator.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; @@ -67,6 +68,7 @@ public Integer manualTriggerWorkflow(final WorkflowTriggerDTO workflowTriggerDTO .dryRun(workflowTriggerDTO.getDryRun()) .taskDependType(workflowTriggerDTO.getTaskDependType()) .failureStrategy(workflowTriggerDTO.getFailureStrategy()) + .warningGroupId(workflowTriggerDTO.getWarningGroupId()) .build(); final WorkflowManualTriggerResponse manualTriggerWorkflowResponse = @@ -160,6 +162,12 @@ public static class WorkflowTriggerDTO { @Builder.Default private FailureStrategy failureStrategy = FailureStrategy.CONTINUE; + + @Builder.Default + private WarningType warningType = WarningType.NONE; + + @Builder.Default + private Integer warningGroupId = null; } @Data diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java index 22f0f24886c4..e5a12c78c276 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java @@ -1789,6 +1789,7 @@ public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() { final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder() .workflowDefinition(workflow) .runWorkflowCommandParam(new RunWorkflowCommandParam()) + .warningGroupId(workflow.getWarningGroupId()) .build(); final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);