Skip to content

Commit 707cbf0

Browse files
author
songyongtan
committed
[fel] adapter flow case
1 parent ab700ac commit 707cbf0

File tree

5 files changed

+46
-29
lines changed

5 files changed

+46
-29
lines changed

framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public AiWhenHappen<O, D, I, RF, F> when(Operators.Whether<I> whether, Operators
5353
*
5454
* @param processor The handler to process unmatched inputs.
5555
* @return An {@link AiState} representing the terminal node of the conditional flow.
56-
* @throws IllegalArgumentException if processor is null
56+
* @throws IllegalArgumentException if processor is null.
5757
*/
5858
public AiState<O, D, O, RF, F> others(Operators.Then<I, O> processor) {
5959
Validation.notNull(processor, "Ai branch processor cannot be null.");

framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,18 +34,22 @@
3434
* @since 2025-06-11
3535
*/
3636
public class AiFlowCaseTest {
37+
private static final int SPEED = 1;
3738
@Nested
3839
class DesensitizeCase {
3940
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
4041
emitter.emit(new AiMessage("<think>"));
41-
for (int i = 0; i < 10; i++) {
42+
int takeTime = 10 * SPEED;
43+
SleepUtil.sleep(takeTime);
44+
for (int i = 0; i < 48; i++) {
4245
emitter.emit(new AiMessage(String.valueOf(i)));
43-
SleepUtil.sleep(100);
46+
SleepUtil.sleep(takeTime);
4447
}
4548
emitter.emit(new AiMessage("</think>"));
46-
for (int i = 100; i < 110; i++) {
49+
SleepUtil.sleep(takeTime);
50+
for (int i = 100; i < 150; i++) {
4751
emitter.emit(new AiMessage(String.valueOf(i)));
48-
SleepUtil.sleep(100);
52+
SleepUtil.sleep(takeTime);
4953
}
5054
emitter.complete();
5155
}), ChatOption.custom().model("modelName").stream(true).build());
@@ -60,7 +64,8 @@ class DesensitizeCase {
6064
this.log(input);
6165
return input;
6266
})
63-
.map(this::mockDesensitize)
67+
.map(this::mockDesensitize1)
68+
.map(this::mockDesensitize2)
6469
.close();
6570

6671
@Test
@@ -74,7 +79,7 @@ void run() {
7479
}).offer(Tip.fromArray("hi"));
7580
result.await();
7681
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
77-
Assertions.assertEquals(22, counter.get());
82+
Assertions.assertEquals(100, counter.get());
7883
}
7984

8085
private Chunk classic(ChatMessage message, StateContext ctx) {
@@ -92,10 +97,16 @@ private Chunk classic(ChatMessage message, StateContext ctx) {
9297
return new Chunk(false, message.text());
9398
}
9499

95-
private String mockDesensitize(Chunk chunk) {
100+
private String mockDesensitize1(Chunk chunk) {
101+
SleepUtil.sleep(10 * SPEED);
96102
return chunk.content.replace("3", "*");
97103
}
98104

105+
private String mockDesensitize2(String chunk) {
106+
SleepUtil.sleep(10 * SPEED);
107+
return chunk.replace("4", "*");
108+
}
109+
99110
private void log(Chunk chunk) {
100111
System.out.println("log content:" + chunk.content);
101112
}
@@ -113,17 +124,17 @@ private static class Chunk {
113124
/**
114125
* Simulates a backpressure scenario where:
115126
* <ol>
116-
* <li>The LLM generates data (50ms per item) faster than the TTS can process it.</li>
127+
* <li>The LLM generates data faster than the TTS can process it.</li>
117128
* <li>TTS processing is constrained to a single thread.</li>
118-
* <li>TTS processing speed is artificially slowed (100ms per item).</li>
129+
* <li>TTS processing speed is artificially slowed.</li>
119130
* </ol>
120131
*/
121132
@Nested
122133
class BackPressureCase {
123134
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
124-
for (int i = 0; i < 10; i++) {
135+
for (int i = 0; i < 100; i++) {
125136
emitter.emit(new AiMessage(String.valueOf(i)));
126-
SleepUtil.sleep(50);
137+
SleepUtil.sleep(5 * SPEED);
127138
}
128139
emitter.complete();
129140
System.out.printf("time:%s, generate completed.\n", System.currentTimeMillis());
@@ -132,6 +143,7 @@ class BackPressureCase {
132143
private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create()
133144
.prompt(Prompts.human("{{0}}"))
134145
.generate(model)
146+
.map(this::mockDesensitize).concurrency(1) // Limit processing to 1 concurrent thread
135147
.map(this::mockTTS).concurrency(1) // Limit processing to 1 concurrent thread
136148
.close();
137149

@@ -146,30 +158,36 @@ void run() {
146158
}).offer(Tip.fromArray("hi"));
147159
result.await();
148160
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
149-
Assertions.assertEquals(10, counter.get());
161+
Assertions.assertEquals(100, counter.get());
162+
}
163+
164+
private String mockDesensitize(ChatMessage chunk) {
165+
// Simulate time-consuming operation with a delay.
166+
SleepUtil.sleep(10 * SPEED);
167+
return chunk.text().replace("3", "*");
150168
}
151169

152-
private String mockTTS(ChatMessage chunk) {
170+
private String mockTTS(String chunk) {
153171
// Simulate time-consuming operation with a delay.
154-
SleepUtil.sleep(100);
155-
return chunk.text();
172+
SleepUtil.sleep(10 * SPEED);
173+
return chunk;
156174
}
157175
}
158176

159177
/**
160178
* Demonstrates concurrent processing with balanced throughput where:
161179
* <ol>
162-
* <li>LLM generates data at moderate pace (50ms per item)</li>
163-
* <li>Downstream processing runs with 3 concurrent threads</li>
164-
* <li>Processing speed is slightly slower than generation (150ms vs 50ms)</li>
180+
* <li>LLM generates data at moderate pace.</li>
181+
* <li>Downstream processing runs with 3 concurrent threads.</li>
182+
* <li>Processing speed is slightly slower than generation (3 : 1).</li>
165183
* </ol>
166184
*/
167185
@Nested
168186
class ConcurrencyCase {
169187
private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> {
170-
for (int i = 0; i < 10; i++) {
188+
for (int i = 0; i < 100; i++) {
171189
emitter.emit(new AiMessage(String.valueOf(i)));
172-
SleepUtil.sleep(50);
190+
SleepUtil.sleep(10 * SPEED);
173191
}
174192
emitter.complete();
175193
}), ChatOption.custom().model("modelName").stream(true).build());
@@ -191,12 +209,12 @@ void run() {
191209
}).offer(Tip.fromArray("hi"));
192210
result.await();
193211
System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime);
194-
Assertions.assertEquals(10, counter.get());
212+
Assertions.assertEquals(100, counter.get());
195213
}
196214

197215
private String mockDesensitize(ChatMessage chunk) {
198216
// Simulate slower processing at 1/3 speed of LLM generation.
199-
SleepUtil.sleep(150);
217+
SleepUtil.sleep(30 * SPEED);
200218
return chunk.text().replace("3", "*");
201219
}
202220
}

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public WhenHappen<O, D, I, F> when(Operators.Whether<I> whether, Operators.Then<
5252
*
5353
* @param processor The handler to process unmatched inputs.
5454
* @return An {@link State} representing the join node of the conditional flow.
55-
* @throws IllegalArgumentException if processor is null
55+
* @throws IllegalArgumentException if processor is null.
5656
*/
5757
public State<O, D, O, F> others(Operators.Then<I, O> processor) {
5858
this.when(null, processor);

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,6 @@ public class To<I, O> extends IdGenerator implements Subscriber<I, O> {
100100
@Getter
101101
private final FlowLocks locks;
102102

103-
private int maxConcurrency = MAX_CONCURRENCY;
104-
105103
// 默认自动流转过滤器是按batchID批次过滤contexts
106104
private final Operators.Filter<I> defaultAutoFilter = (contexts) -> {
107105
if (CollectionUtils.isEmpty(contexts)) {
@@ -140,7 +138,7 @@ public class To<I, O> extends IdGenerator implements Subscriber<I, O> {
140138
@Getter
141139
private ProcessMode processMode;
142140

143-
private final Map<String, Integer> processingSessions = new ConcurrentHashMap<>();
141+
private Map<String, Integer> processingSessions = new ConcurrentHashMap<>();
144142

145143
private Operators.Validator<I> validator = (repo, to) -> repo.requestMappingContext(to.streamId,
146144
to.froms.stream().map(Identity::getId).collect(Collectors.toList()),
@@ -162,6 +160,7 @@ public class To<I, O> extends IdGenerator implements Subscriber<I, O> {
162160
*/
163161
private Operators.Produce<FlowContext<I>, O> produce;
164162

163+
private volatile int maxConcurrency = MAX_CONCURRENCY;
165164
/**
166165
* 当前并发度,已经提交的批次
167166
*/

framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ public interface Validator<T> {
298298
* This functional interface defines the processing operation that converts an
299299
* input of one type to an output of potentially different type.
300300
*
301-
* @param <T> The type of raw material (input) to be processed
302-
* @param <R> The type of product (output) to be produced
301+
* @param <T> The type of raw material (input) to be processed.
302+
* @param <R> The type of product (output) to be produced.
303303
*/
304304
@FunctionalInterface
305305
public interface Then<T, R> {

0 commit comments

Comments
 (0)