15
15
*/
16
16
package io .serverlessworkflow .impl .executors ;
17
17
18
+ import io .serverlessworkflow .api .types .CallHTTP ;
18
19
import io .serverlessworkflow .api .types .CallTask ;
19
20
import io .serverlessworkflow .api .types .Task ;
20
21
import io .serverlessworkflow .api .types .TaskBase ;
21
22
import io .serverlessworkflow .impl .WorkflowDefinition ;
23
+ import java .util .ServiceLoader ;
24
+ import java .util .ServiceLoader .Provider ;
22
25
23
26
public class DefaultTaskExecutorFactory implements TaskExecutorFactory {
24
27
@@ -30,12 +33,15 @@ public static TaskExecutorFactory get() {
30
33
31
34
protected DefaultTaskExecutorFactory () {}
32
35
36
+ private ServiceLoader <CallableTask > callTasks = ServiceLoader .load (CallableTask .class );
37
+
33
38
public TaskExecutor <? extends TaskBase > getTaskExecutor (
34
39
Task task , WorkflowDefinition definition ) {
35
40
if (task .getCallTask () != null ) {
36
41
CallTask callTask = task .getCallTask ();
37
42
if (callTask .getCallHTTP () != null ) {
38
- return new HttpExecutor (callTask .getCallHTTP (), definition );
43
+ return new CallTaskExecutor <>(
44
+ callTask .getCallHTTP (), definition , findCallTask (CallHTTP .class ));
39
45
}
40
46
} else if (task .getSwitchTask () != null ) {
41
47
return new SwitchExecutor (task .getSwitchTask (), definition );
@@ -46,4 +52,15 @@ public TaskExecutor<? extends TaskBase> getTaskExecutor(
46
52
}
47
53
throw new UnsupportedOperationException (task .get ().getClass ().getName () + " not supported yet" );
48
54
}
55
+
56
+ @ SuppressWarnings ("unchecked" )
57
+ private <T extends TaskBase > CallableTask <T > findCallTask (Class <T > clazz ) {
58
+ return (CallableTask <T >)
59
+ callTasks .stream ()
60
+ .map (Provider ::get )
61
+ .filter (s -> s .accept (clazz ))
62
+ .findAny ()
63
+ .orElseThrow (
64
+ () -> new UnsupportedOperationException (clazz .getName () + " not supported yet" ));
65
+ }
49
66
}
0 commit comments