Skip to content

Commit

Permalink
Fix task nodes under switch task will not be skipped (apache#16108)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Jun 5, 2024
1 parent 5867bee commit 5e4afe6
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,30 @@ public SwitchLogicTask(TaskExecutionContext taskExecutionContext,
this.taskInstance = workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute runnable"));
// Since the default branch is not in the dependTaskList, we need to add it to the end
// otherwise the default branch will never be skipped in DAGHelper
addDefaultBranchToEnd();
}

@Override
public void handle() throws MasterTaskExecuteException {
// Calculate the condition result and get the next node
if (CollectionUtils.isEmpty(taskParameters.getDependTaskList())) {
// If the branch is empty then will go into the default branch
// This case shouldn't happen, we can directly throw exception and forbid the user to set branch
moveToDefaultBranch();
} else {
calculateSwitchBranch();
}
taskInstance.setSwitchDependency(taskParameters);
log.info("Switch task execute finished");
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
log.info("Switch task execute finished: {}", taskExecutionContext.getCurrentExecutionStatus().name());
}

private void moveToDefaultBranch() {
checkIfBranchExist(taskParameters.getNextNode());

List<SwitchResultVo> switchResultVos = taskParameters.getDependTaskList();
switchResultVos.add(new SwitchResultVo(null, taskParameters.getNextNode()));
SwitchResultVo defaultSwitchResultVo = getDefaultSwitchResultVo();
checkIfBranchExist(defaultSwitchResultVo.getNextNode());

taskParameters.setResultConditionLocation(switchResultVos.size() - 1);

log.info("The condition is not satisfied, move to the default branch: {}",
Expand All @@ -90,17 +94,15 @@ private void moveToDefaultBranch() {

private void calculateSwitchBranch() {
List<SwitchResultVo> switchResultVos = taskParameters.getDependTaskList();
if (CollectionUtils.isEmpty(switchResultVos)) {
moveToDefaultBranch();
}
Map<String, Property> globalParams = taskExecutionContext.getPrepareParamsMap();
Map<String, Property> varParams = JSONUtils
.toList(taskInstance.getVarPool(), Property.class)
.stream()
.collect(Collectors.toMap(Property::getProp, Property -> Property));

int finalConditionLocation = -1;
for (int i = 0; i < switchResultVos.size(); i++) {
// The last one is the default branch, no need to calculate
for (int i = 0; i < switchResultVos.size() - 1; i++) {
SwitchResultVo switchResultVo = switchResultVos.get(i);
log.info("Begin to execute {} condition: {} ", i, switchResultVo.getCondition());
String content = SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(), globalParams,
Expand All @@ -111,22 +113,25 @@ private void calculateSwitchBranch() {
result = SwitchTaskUtils.evaluate(content);
log.info("Execute condition sentence: {} successfully: {}", content, result);
if (result) {
// If matched, break the loop
finalConditionLocation = i;
break;
}
} catch (Exception e) {
log.info("Execute condition sentence: {} failed", content, e);
}
}
// If the finalConditionLocation is -1, then the default branch will be executed
if (finalConditionLocation >= 0) {
checkIfBranchExist(switchResultVos.get(finalConditionLocation).getNextNode());
List<Long> nextNodes = switchResultVos.get(finalConditionLocation).getNextNode();
checkIfBranchExist(nextNodes);
log.info("The condition is satisfied, move to the branch: {}",
switchResultVos.get(finalConditionLocation).getNextNode().stream()
.map(node -> workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag()
.getNode(node).getName())
.collect(Collectors.toList()));
taskParameters.setResultConditionLocation(finalConditionLocation);
} else {
log.info("All conditions are not satisfied, move to the default branch");
moveToDefaultBranch();
}
}
Expand All @@ -147,4 +152,16 @@ private void checkIfBranchExist(List<Long> branchNode) {
}
}

private void addDefaultBranchToEnd() {
SwitchResultVo switchResultVo = new SwitchResultVo(null, taskParameters.getNextNode());
List<SwitchResultVo> dependTaskList = taskParameters.getDependTaskList();
if (!dependTaskList.contains(switchResultVo)) {
dependTaskList.add(switchResultVo);
}
}

private SwitchResultVo getDefaultSwitchResultVo() {
return taskParameters.getDependTaskList().get(taskParameters.getDependTaskList().size() - 1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public static String generateContentWithTaskParams(String condition, Map<String,
if (MapUtils.isNotEmpty(varParams)) {
params.putAll(varParams);
}
String originContent = content;
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
while (m.find()) {
Expand All @@ -82,10 +81,6 @@ public static String generateContentWithTaskParams(String condition, Map<String,
content = content.replace("${" + paramName + "}", value);
}

// if not replace any params, throw exception to avoid illegal condition
if (originContent.equals(content)) {
throw new IllegalArgumentException("condition is not valid, please check it. condition: " + condition);
}
return content;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testIllegalCondition() {
Map<String, Property> globalParams = new HashMap<>();
Map<String, Property> varParams = new HashMap<>();
globalParams.put("test", new Property("test", Direct.IN, DataType.INTEGER, "1"));
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
Assertions.assertDoesNotThrow(() -> {
SwitchTaskUtils.generateContentWithTaskParams(content, globalParams, varParams);
});

Expand All @@ -70,15 +70,5 @@ public void testIllegalCondition() {
SwitchTaskUtils.evaluate(script);
});

String contentWithSpecify1 = "cmd.abc";
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify1, globalParams, varParams);
});

String contentWithSpecify2 = "cmd()";
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> {
SwitchTaskUtils.generateContentWithTaskParams(contentWithSpecify2, globalParams, varParams);
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
public class TaskInstanceLogHeader {

private static final List<String> INITIALIZE_TASK_CONTEXT_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Initialize task context ***********************************",
"***********************************************************************************************");
private static final List<String> LOAD_TASK_INSTANCE_PLUGIN_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Load task instance plugin *********************************",
"***********************************************************************************************");

Expand All @@ -40,12 +40,12 @@ public static void printInitializeTaskContextHeader() {
}

private static final List<String> EXECUTE_TASK_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Execute task instance *************************************",
"***********************************************************************************************");

private static final List<String> FINALIZE_TASK_HEADER = Lists.newArrayList(
"***********************************************************************************************",
"\n***********************************************************************************************",
"********************************* Finalize task instance ************************************",
"***********************************************************************************************");

Expand Down

0 comments on commit 5e4afe6

Please sign in to comment.