Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum AlertType {

/**
* 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning,
* 5 task failure, 6 task success, 7 task timeout, 8 close alert
* 5 task failure, 6 task success, 7 task timeout
*/
WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"),
WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ public void sendServerStoppedAlert(String host, String serverType) {
* @param projectUser projectUser
*/
public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectUser projectUser) {
if (projectUser == null) {
throw new IllegalArgumentException("projectUser must not be null");
}
if (workflowInstance.getWarningGroupId() == null) {
throw new IllegalArgumentException("warningGroupId of the workflow instance must not be null");
}

int alertGroupId = workflowInstance.getWarningGroupId();
Alert alert = new Alert();
List<WorkflowAlertContent> workflowAlertContentList = new ArrayList<>(1);
Expand All @@ -220,10 +227,11 @@ public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance, ProjectU
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
alert.setWorkflowInstanceId(workflowInstance.getId());
alert.setAlertType(AlertType.WORKFLOW_INSTANCE_TIMEOUT);
saveTaskTimeoutAlert(alert, content, alertGroupId);

saveTimeoutAlert(alert, content, alertGroupId);
}

private void saveTaskTimeoutAlert(Alert alert, String content, int alertGroupId) {
private void saveTimeoutAlert(Alert alert, String content, int alertGroupId) {
alert.setAlertGroupId(alertGroupId);
alert.setWarningType(WarningType.FAILURE);
alert.setContent(content);
Expand Down Expand Up @@ -275,7 +283,8 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance,
alert.setWorkflowDefinitionCode(workflowInstance.getWorkflowDefinitionCode());
alert.setWorkflowInstanceId(workflowInstance.getId());
alert.setAlertType(AlertType.TASK_TIMEOUT);
saveTaskTimeoutAlert(alert, content, workflowInstance.getWarningGroupId());

saveTimeoutAlert(alert, content, workflowInstance.getWarningGroupId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static TaskTimeoutLifecycleEvent of(final ITaskExecutionRunnable taskExec
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();

checkState(timeoutStrategy != null, "The task timeoutStrategy must not be null");
checkState(timeoutInMinutes >= 0, "The task timeout: %s must >=0 minutes", timeoutInMinutes);
checkState(timeoutInMinutes > 0, "The task timeout: %s must > 0 minutes", timeoutInMinutes);

long delayTime = System.currentTimeMillis() - taskInstance.getSubmitTime().getTime()
+ TimeUnit.MINUTES.toMillis(timeoutInMinutes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,47 @@ public enum WorkflowLifecycleEventType implements ILifecycleEventType {
* Start the workflow instance
*/
START,

/**
* Notify the workflow instance there exist a task has been finished, and should do DAG topology logic transaction.
*/
TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH,

/**
* Do Timeout strategy of the workflow instance.
*/
TIMEOUT,

/**
* Pause the workflow instance
*/
PAUSE,

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't change the unrelated code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't change the unrelated code.

ok

/**
* The workflow instance has been paused
*/
PAUSED,

/**
* Stop the workflow instance
*/
STOP,

/**
* The workflow instance has been stopped
*/
STOPPED,

/**
* The workflow instance has been success
*/
SUCCEED,

/**
* The workflow instance has been failed
*/
FAILED,

/**
* Finalize the workflow instance.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event;

import static com.google.common.base.Preconditions.checkState;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.AbstractWorkflowLifecycleLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;

import java.util.concurrent.TimeUnit;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class WorkflowTimeoutLifecycleEvent extends AbstractWorkflowLifecycleLifecycleEvent {

private IWorkflowExecutionRunnable workflowExecutionRunnable;

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AbstractWorkflowLifecycleLifecycleEvent.getWorkflowExecutionRunnable
; it is advisable to add an Override annotation.

protected WorkflowTimeoutLifecycleEvent(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final long timeout) {
super(timeout);
this.workflowExecutionRunnable = workflowExecutionRunnable;
}

public static WorkflowTimeoutLifecycleEvent of(IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
checkState(workflowInstance != null, "The workflow instance must be initialized before retrying.");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


final int timeout = workflowInstance.getTimeout();
checkState(timeout > 0, "The workflow timeout: %s must > 0 minutes", timeout);

long delayTime = System.currentTimeMillis() - workflowInstance.getStartTime().getTime()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the workflow instance is rerun the startTime will not change. You should use restartTime .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the workflow instance is rerun the startTime will not change. You should use restartTime .

good

+ TimeUnit.MINUTES.toMillis(timeout);
return new WorkflowTimeoutLifecycleEvent(workflowExecutionRunnable, delayTime);
}

@Override
public ILifecycleEventType getEventType() {
return WorkflowLifecycleEventType.TIMEOUT;
}

@Override
public String toString() {
return "WorkflowTimeoutLifecycleEvent{" +
"workflow=" + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getName() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowStartLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;

Expand All @@ -38,11 +40,24 @@ public void handle(final IWorkflowStateAction workflowStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowStartLifecycleEvent workflowStartEvent) {

workflowTimeoutMonitor(workflowExecutionRunnable);
workflowStateAction.onStartEvent(workflowExecutionRunnable, workflowStartEvent);
}

@Override
public ILifecycleEventType matchEventType() {
return WorkflowLifecycleEventType.START;
}

private void workflowTimeoutMonitor(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
if (workflowInstance.getTimeout() <= 0) {
log.debug("The workflow {} timeout {} is invalided, so the timeout monitor will not be started.",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okok

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log is incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log is incorrect.

fix it

workflowInstance.getName(),
workflowInstance.getTimeout());
return;
}
workflowExecutionRunnable.getWorkflowEventBus()
.publish(WorkflowTimeoutLifecycleEvent.of(workflowExecutionRunnable));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.handler;

import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.server.master.engine.ILifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowExecutionGraph;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.WorkflowLifecycleEventType;
import org.apache.dolphinscheduler.server.master.engine.workflow.lifecycle.event.WorkflowTimeoutLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.workflow.statemachine.IWorkflowStateAction;
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WorkflowTimeoutLifecycleEventHandler
extends
AbstractWorkflowLifecycleEventHandler<WorkflowTimeoutLifecycleEvent> {

private final WorkflowAlertManager workflowAlertManager;

public WorkflowTimeoutLifecycleEventHandler(final WorkflowAlertManager workflowAlertManager) {
this.workflowAlertManager = workflowAlertManager;
}

@Override
public void handle(final IWorkflowStateAction workflowStateAction,
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowTimeoutLifecycleEvent workflowTimeoutLifecycleEvent) {
final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph();
if (workflowExecutionGraph.isAllTaskExecutionRunnableChainFinish()) {
// all the TaskExecutionRunnable chain in the graph is finish, means the workflow is already finished.
return;
}

final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
final boolean shouldSendAlert = workflowInstance.getWarningGroupId() != null;

if (shouldSendAlert) {
doWorkflowTimeoutAlert(workflowExecutionRunnable);
} else {
log.info("Skipped sending timeout alert for workflow {} because warningGroupId is null.",
workflowInstance.getName());
}

}

@Override
public ILifecycleEventType matchEventType() {
return WorkflowLifecycleEventType.TIMEOUT;
}

private void doWorkflowTimeoutAlert(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
final WorkflowInstance workflowInstance = workflowExecutionRunnable.getWorkflowInstance();
workflowAlertManager.sendWorkflowTimeoutAlert(workflowInstance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,10 @@ public void sendTaskTimeoutAlert(WorkflowInstance workflowInstance,
ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId());
alertDao.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser);
}

public void sendWorkflowTimeoutAlert(WorkflowInstance workflowInstance) {
ProjectUser projectUser = projectDao.queryProjectWithUserByWorkflowInstanceId(workflowInstance.getId());
alertDao.sendWorkflowTimeoutAlert(workflowInstance, projectUser);
}

}
Loading