Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum AlertType {

/**
* 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning,
* 5 task failure, 6 task success, 7 task timeout, 8 close alert
* 5 task failure, 6 task success, 7 task timeout
*/
WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"),
WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ public void sendServerStoppedAlert(String host, String serverType) {
* @param projectUser projectUser
*/
public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) {
if (projectUser == null) {
throw new IllegalArgumentException("projectUser must not be null");
}
if (workflowInstance.getWarningGroupId() == null) {
throw new IllegalArgumentException("warningGroupId of the workflow instance must not be null");
}

int alertGroupId = workflowInstance.getWarningGroupId();
Alert alert = new Alert();
List<WorkflowAlertContent> workflowAlertContentList = new ArrayList<>(1);
Expand All @@ -220,10 +227,11 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
alert.setWorkflowInstanceId(workflowInstance.getId());
alert.setAlertType(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
saveTaskTimeoutAlert(alert, content, alertGroupId);

saveTimeoutAlert(alert, content, alertGroupId);
}

private void saveTaskTimeoutAlert(Alert alert, String content, int alertGroupId) {
private void saveTimeoutAlert(Alert alert, String content, int alertGroupId) {
alert.setAlertGroupId(alertGroupId);
alert.setWarningType(WarningType.FAILURE);
alert.setContent(content);
Expand Down Expand Up @@ -275,7 +283,8 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance,
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
alert.setWorkflowInstanceId(workflowInstance.getId());
alert.setAlertType(AlertType.TASK_TIMEOUT);
saveTaskTimeoutAlert(alert, content, workflowInstance.getWarningGroupId());

saveTimeoutAlert(alert, content, workflowInstance.getWarningGroupId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExec
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();

checkState(timeoutStrategy != null, "The task timeoutStrategy must not be null");
checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0 minutes", timeoutInMinutes);
checkState(timeoutInMinutes > 0, "The task timeout: %s must > 0 minutes", timeoutInMinutes);

long delayTime = System.currentTimeMillis() - taskInstance.getSubmitTime().getTime()
+ TimeUnit.MINUTES.toMillis(timeoutInMinutes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType {
* Notify the workflow instance there exist a task has been finished, and should do DAG topology logic transaction.
*/
TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH,
/**
* Do Timeout strategy of the workflow instance.
*/
TIMEOUT,
/**
* Pause the workflow instance
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event;

import static com.google.common.base.Preconditions.checkState;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import java.util.concurrent.TimeUnit;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent {

private IWorkflowExecutionRunnable workflowExecutionRunnable;

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AbstractWorkflowLifecycleLifecycleEvent.getWorkflowExecutionRunnable
; it is advisable to add an Override annotation.

protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final long timeout) {
super(timeout);
this.workflowExecutionRunnable = workflowExecutionRunnable;
}

public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
checkState(workflowInstance != null,
"The workflow instance must be initialized before creating workflow timeout event.");

final int timeout = workflowInstance.getTimeout();
checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes", timeout);

long delayTime = System.currentTimeMillis() - workflowInstance.getRestartTime().getTime()
+ TimeUnit.MINUTES.toMillis(timeout);
return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime);
}

@Override
public ILifecycleEventType getEventType() {
return WorkflowLifecycleEventType.TIMEOUT;
}

@Override
public String toString() {
return "WorkflowTimeoutLifecycleEvent{" +
"workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;

Expand All @@ -38,11 +40,24 @@ public void handle(final IWorkflowStateAction workflowStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowStartLifecycleEvent workflowStartEvent) {

workflowTimeoutMonitor(workflowExecutionRunnable);
workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent);
}

@Override
public ILifecycleEventType matchEventType() {
return WorkflowLifecycleEventType.START;
}

private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
if (workflowInstance.getTimeout() <= 0) {
log.debug("The workflow {} timeout {} is not configured or invalid, skip timeout monitor.",
workflowInstance.getName(),
workflowInstance.getTimeout());
return;
}
workflowExecutionRunnable.getWorkflowEventBus()
.publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WorkflowTimeoutLifecycleEventHandler
extends
AbstractWorkflowLifecycleEventHandler<WorkflowTimeoutLifecycleEvent> {

private final WorkflowAlertManager workflowAlertManager;

public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) {
this.workflowAlertManager = workflowAlertManager;
}

@Override
public void handle(final IWorkflowStateAction workflowStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowTimeoutLifecycleEvent workflowTimeoutLifecycleEvent) {
final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph();
if (workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
// all the TaskExecutionRunnable chain in the graph is finish, means the workflow is already finished.
return;
}

final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
final boolean shouldSendAlert = workflowInstance.getWarningGroupId() != null;

if (shouldSendAlert) {
doWorkflowTimeoutAlert(workflowExecutionRunnable);
} else {
log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.",
workflowInstance.getName());
}

}

@Override
public ILifecycleEventType matchEventType() {
return WorkflowLifecycleEventType.TIMEOUT;
}

private void doWorkflowTimeoutAlert(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.dolphinscheduler.server.master.integration;

import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;

Expand All @@ -39,6 +41,9 @@ public class Repository {
@Autowired
private TaskInstanceDao taskInstanceDao;

@Autowired
private AlertMapper alertMapper;

/**
* Return the list of process instances for a given workflow definition in ascending order of their IDs.
*/
Expand Down Expand Up @@ -87,4 +92,13 @@ public List<TaskInstance> queryAllTaskInstance() {
return taskInstanceDao.queryAll();
}

/**
* Return the list of alert for a given workflow instance in ascending order of their IDs.
*/
public List<Alert> queryAlert(final Integer workflowInstanceId) {
return alertMapper.selectByWorkflowInstanceId(workflowInstanceId)
.stream()
.sorted(Comparator.comparingInt(Alert::getId))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand Down Expand Up @@ -67,6 +68,7 @@ public Integer manualTriggerWorkflow(final WorkflowTriggerDTO workflowTriggerDTO
.dryRun(workflowTriggerDTO.getDryRun())
.taskDependType(workflowTriggerDTO.getTaskDependType())
.failureStrategy(workflowTriggerDTO.getFailureStrategy())
.warningGroupId(workflowTriggerDTO.getWarningGroupId())
.build();

final WorkflowManualTriggerResponse manualTriggerWorkflowResponse =
Expand Down Expand Up @@ -160,6 +162,12 @@ public static class WorkflowTriggerDTO {

@Builder.Default
private FailureStrategy failureStrategy = FailureStrategy.CONTINUE;

@Builder.Default
private WarningType warningType = WarningType.NONE;

@Builder.Default
private Integer warningGroupId = null;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.awaitility.Awaitility.await;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
Expand Down Expand Up @@ -1778,4 +1779,45 @@ public void testTaskRemainsSubmittedSuccess_with_noAvailableWorkerAndTimeoutDisa
// masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow when timeout should trigger alert when warningGroupId is set")
public void testWorkflowTimeout_WithAlertGroup_ShouldSendAlert() {
final String yaml = "/it/start/workflow_with_workflow_timeout_alert.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition workflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(workflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.warningGroupId(workflow.getWarningGroupId())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);
Assertions
.assertThat(repository.queryTaskInstance(workflow))
.hasSize(1)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("long_running_task");
assertThat(taskInstance.getWorkerGroup()).isEqualTo("default");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
Assertions
.assertThat(repository.queryAlert(workflowInstanceId))
.hasSize(1)
.anySatisfy(alert -> {
assertThat(alert.getTitle()).isEqualTo("Workflow Timeout Warn");
assertThat(alert.getProjectCode()).isEqualTo(1);
assertThat(alert.getWorkflowDefinitionCode()).isEqualTo(1);
assertThat(alert.getAlertType()).isEqualTo(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
});
});

masterContainer.assertAllResourceReleased();
}
}
Loading
Loading