Skip to content

Commit dbba958

Browse files
lubberlandwanglu14
andauthored
fix (#105)
Co-authored-by: wanglu14 <wanglu14@staff.weibo.com>
1 parent 2870421 commit dbba958

3 files changed

Lines changed: 179 additions & 2 deletions

File tree

rill-flow-dag/olympicene-traversal/src/main/java/com/weibo/rill/flow/olympicene/traversal/runners/SuspenseTaskRunner.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.common.collect.ImmutableSet;
2020
import com.google.common.collect.Maps;
2121
import com.google.common.collect.Sets;
22+
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
23+
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
2224
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
2325
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
2426
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
@@ -31,11 +33,11 @@
3133
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
3234
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
3335
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
34-
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
3536
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
3637
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
3738
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
3839
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
40+
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
3941
import lombok.extern.slf4j.Slf4j;
4042
import org.apache.commons.collections.CollectionUtils;
4143
import org.apache.commons.collections.MapUtils;
@@ -117,7 +119,25 @@ public ExecutionResult finish(String executionId, NotifyInfo notifyInfo, Map<Str
117119
if (notifyInfo.getTaskStatus() != null && notifyInfo.getTaskStatus().isCompleted()) {
118120
taskInfo.updateInvokeMsg(notifyInfo.getTaskInvokeMsg());
119121
updateTaskInvokeEndTime(taskInfo);
120-
taskInfo.setTaskStatus(notifyInfo.getTaskStatus());
122+
TaskStatus finalStatus = notifyInfo.getTaskStatus();
123+
// 仅超时(msg="timeout")时,若 skipOnTimeout=true 且 tolerance=true,则将状态置为 SKIPPED 而非 FAILED
124+
boolean isTimeout = java.util.Optional.ofNullable(notifyInfo.getTaskInvokeMsg())
125+
.map(TaskInvokeMsg::getMsg)
126+
.map("timeout"::equals)
127+
.orElse(false);
128+
if (finalStatus == TaskStatus.FAILED && isTimeout) {
129+
boolean tolerance = java.util.Optional.ofNullable(taskInfo.getTask())
130+
.map(BaseTask::isTolerance).orElse(false);
131+
boolean skipOnTimeout = java.util.Optional.ofNullable(taskInfo.getTask())
132+
.map(BaseTask::getTimeline)
133+
.map(Timeline::getSkipOnTimeout)
134+
.map(Boolean::parseBoolean)
135+
.orElse(false);
136+
if (tolerance && skipOnTimeout) {
137+
finalStatus = TaskStatus.SKIPPED;
138+
}
139+
}
140+
taskInfo.setTaskStatus(finalStatus);
121141
dagInfoStorage.saveTaskInfos(executionId, ImmutableSet.of(taskInfo));
122142
return;
123143
}

rill-flow-dag/olympicene-traversal/src/test/groovy/com/weibo/rill/flow/olympicene/traversal/SuspenseTaskTraversalTest.groovy

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import com.weibo.rill.flow.olympicene.core.model.NotifyInfo
77
import com.weibo.rill.flow.olympicene.core.model.dag.DAG
88
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus
99
import com.weibo.rill.flow.interfaces.model.task.TaskInfo
10+
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg
1011
import com.weibo.rill.flow.interfaces.model.task.TaskStatus
1112
import com.weibo.rill.flow.olympicene.core.runtime.DAGParser
1213
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure
@@ -219,4 +220,156 @@ class SuspenseTaskTraversalTest extends Specification {
219220
["url": "bbb"] | DAGEvent.TASK_FINISH
220221
["text": "aaa"] | DAGEvent.TASK_FAILED
221222
}
223+
def "suspense task should be SKIPPED on timeout when skipOnTimeout=true and tolerance=true"() {
224+
given:
225+
// skipOnTimeout=true 且 tolerance=true,超时后节点应变为 SKIPPED
226+
String text = "version: 0.0.1\n" +
227+
"namespace: olympicene\n" +
228+
"service: mca\n" +
229+
"name: test\n" +
230+
"type: flow\n" +
231+
"tasks: \n" +
232+
"- category: suspense\n" +
233+
" name: A\n" +
234+
" tolerance: true\n" +
235+
" timeline:\n" +
236+
" timeoutInSeconds: \"120\"\n" +
237+
" skipOnTimeout: \"true\"\n" +
238+
" conditions:\n" +
239+
" - \$.input.[?(@.url == \"bbb\")]\n" +
240+
" next: B\n" +
241+
"- category: function\n" +
242+
" name: B\n" +
243+
" resourceName: \"olympicene::test::funtion1::prod\" \n" +
244+
" pattern: task_scheduler\n" +
245+
" inputMappings:\n" +
246+
" - target: \$.input.gopUrls\n" +
247+
" source: \$.context.gopUrls\n" +
248+
" outputMappings:\n" +
249+
" - target: \$.context.url\n" +
250+
" source: \$.output.url"
251+
DAG dag = dagParser.parse(text)
252+
253+
when:
254+
// 提交 DAG,suspense 节点进入 RUNNING 状态等待唤醒
255+
olympicene.submit('timeout_skip_1', dag, [:])
256+
// 模拟超时:TimeCheckRunner 会以 taskStatus=FAILED, msg="timeout" 调用 finishTaskSync
257+
olympicene.wakeup('timeout_skip_1', [:],
258+
NotifyInfo.builder()
259+
.taskInfoName('A')
260+
.taskStatus(TaskStatus.FAILED)
261+
.taskInvokeMsg(TaskInvokeMsg.builder().msg("timeout").build())
262+
.build())
263+
TaskInfo taskInfo = dagStorage.getBasicTaskInfo('timeout_skip_1', 'A')
264+
265+
then:
266+
// skipOnTimeout=true && tolerance=true && timeout → 应为 SKIPPED
267+
taskInfo.getTaskStatus() == TaskStatus.SKIPPED
268+
}
269+
270+
def "suspense task should be FAILED on timeout when skipOnTimeout=false"() {
271+
given:
272+
// skipOnTimeout=false,超时后节点应触发 TASK_FAILED 事件(不被跳过)
273+
String text = "version: 0.0.1\n" +
274+
"namespace: olympicene\n" +
275+
"service: mca\n" +
276+
"name: test\n" +
277+
"type: flow\n" +
278+
"tasks: \n" +
279+
"- category: suspense\n" +
280+
" name: A\n" +
281+
" tolerance: true\n" +
282+
" timeline:\n" +
283+
" timeoutInSeconds: \"120\"\n" +
284+
" skipOnTimeout: \"false\"\n" +
285+
" conditions:\n" +
286+
" - \$.input.[?(@.url == \"bbb\")]\n"
287+
DAG dag = dagParser.parse(text)
288+
289+
when:
290+
olympicene.submit('timeout_noskip_1', dag, [:])
291+
olympicene.wakeup('timeout_noskip_1', [:],
292+
NotifyInfo.builder()
293+
.taskInfoName('A')
294+
.taskStatus(TaskStatus.FAILED)
295+
.taskInvokeMsg(TaskInvokeMsg.builder().msg("timeout").build())
296+
.build())
297+
298+
then:
299+
// skipOnTimeout=false → 触发 TASK_FAILED 回调事件,而不是 TASK_SKIPPED
300+
1 * callback.onEvent({
301+
Event event -> event.eventCode == DAGEvent.TASK_FAILED.getCode()
302+
})
303+
}
304+
305+
def "suspense task should be FAILED on timeout when tolerance=false even if skipOnTimeout=true"() {
306+
given:
307+
// tolerance=false,即使 skipOnTimeout=true,超时后也应触发 TASK_FAILED 事件
308+
String text = "version: 0.0.1\n" +
309+
"namespace: olympicene\n" +
310+
"service: mca\n" +
311+
"name: test\n" +
312+
"type: flow\n" +
313+
"tasks: \n" +
314+
"- category: suspense\n" +
315+
" name: A\n" +
316+
" tolerance: false\n" +
317+
" timeline:\n" +
318+
" timeoutInSeconds: \"120\"\n" +
319+
" skipOnTimeout: \"true\"\n" +
320+
" conditions:\n" +
321+
" - \$.input.[?(@.url == \"bbb\")]\n"
322+
DAG dag = dagParser.parse(text)
323+
324+
when:
325+
olympicene.submit('timeout_tol_false_1', dag, [:])
326+
olympicene.wakeup('timeout_tol_false_1', [:],
327+
NotifyInfo.builder()
328+
.taskInfoName('A')
329+
.taskStatus(TaskStatus.FAILED)
330+
.taskInvokeMsg(TaskInvokeMsg.builder().msg("timeout").build())
331+
.build())
332+
333+
then:
334+
// tolerance=false → 触发 TASK_FAILED 回调事件,而不是 TASK_SKIPPED
335+
1 * callback.onEvent({
336+
Event event -> event.eventCode == DAGEvent.TASK_FAILED.getCode()
337+
})
338+
}
339+
def "suspense task should remain FAILED on interruption even if skipOnTimeout=true and tolerance=true"() {
340+
given:
341+
// tolerance=true 且 skipOnTimeout=true,但触发的是打断而非超时,应保持 FAILED
342+
String text = "version: 0.0.1\n" +
343+
"namespace: olympicene\n" +
344+
"service: mca\n" +
345+
"name: test\n" +
346+
"type: flow\n" +
347+
"tasks: \n" +
348+
"- category: suspense\n" +
349+
" name: A\n" +
350+
" tolerance: true\n" +
351+
" timeline:\n" +
352+
" timeoutInSeconds: \"120\"\n" +
353+
" skipOnTimeout: \"true\"\n" +
354+
" inputMappings:\n" +
355+
" - target: \$.input.url\n" +
356+
" source: \$.context.url\n" +
357+
" - target: \$.input.text\n" +
358+
" source: \$.context.text\n" +
359+
" conditions:\n" +
360+
" - \$.input.[?(@.url == \"bbb\")]\n" +
361+
" interruptions:\n" +
362+
" - \$.input.[?(@.text == \"aaa\")]\n"
363+
DAG dag = dagParser.parse(text)
364+
365+
when:
366+
// 提交 DAG,初始上下文触发 interruption 条件(text=aaa)
367+
olympicene.submit('interrupt_with_skip_flag_1', dag, ["text": "aaa"])
368+
369+
then:
370+
// 虽然 tolerance=true && skipOnTimeout=true,但触发的是打断而非超时,应为 TASK_FAILED 而非 TASK_SKIPPED
371+
1 * callback.onEvent({
372+
Event event -> event.eventCode == DAGEvent.TASK_FAILED.getCode()
373+
})
374+
}
222375
}

rill-flow-interfaces/src/main/java/com/weibo/rill/flow/interfaces/model/strategy/Timeline.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,19 @@
2626
@Getter
2727
public class Timeline {
2828
private String timeoutInSeconds;
29+
private String skipOnTimeout;
2930
private String suspenseIntervalSeconds;
3031
private String suspenseTimestamp;
3132

3233
@JsonCreator
3334
public Timeline(@JsonProperty("timeoutInSeconds") String timeoutInSeconds,
35+
@JsonProperty("skipOnTimeout") String skipOnTimeout,
3436
@JsonProperty("suspenseIntervalSeconds") String suspenseIntervalSeconds,
3537
@JsonProperty("suspenseTimestamp") String suspenseTimestamp) {
3638
this.timeoutInSeconds = timeoutInSeconds;
39+
this.skipOnTimeout = skipOnTimeout;
3740
this.suspenseIntervalSeconds = suspenseIntervalSeconds;
3841
this.suspenseTimestamp = suspenseTimestamp;
3942
}
4043
}
44+

0 commit comments

Comments
 (0)