Skip to content

Commit 7521565

Browse files
Add exception handling during doProgress call in HandlerContextImpl to prevent unhandled errors (#556)
* Add exception handling during `doProgress` call in `HandlerContextImpl` to prevent unhandled errors. This seems to be the cause of so many unhandled suspension exceptions, and possibly the reason for threads left hanging. * When do progress replies with either read from input, or wait pending run, i shall wait for the first of the two to happen.
1 parent 7591272 commit 7521565

File tree

6 files changed

+63
-52
lines changed

6 files changed

+63
-52
lines changed

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,5 +230,5 @@
230230
restate = '2.5.0-SNAPSHOT'
231231
schema-kenerator = '2.1.2'
232232
spring-boot = '3.4.9'
233-
vertx = '4.5.18'
233+
vertx = '4.5.22'
234234
victools-json-schema = '4.38.0'

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ internal constructor(
123123
retryPolicy: RetryPolicy?,
124124
block: suspend () -> T
125125
): DurableFuture<T> {
126-
var serde: Serde<T> = resolveSerde(typeTag)
127-
var coroutineCtx = currentCoroutineContext()
126+
val serde: Serde<T> = resolveSerde(typeTag)
127+
val coroutineCtx = currentCoroutineContext()
128128
val javaRetryPolicy =
129129
retryPolicy?.let {
130130
dev.restate.sdk.common.RetryPolicy.exponential(

sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ public RequestProcessor processorForRequest(
173173

174174
String fullyQualifiedServiceMethod = serviceName + "/" + handlerName;
175175

176+
// If we got it, set it
177+
String invocationIdHeader = headersAccessor.get("x-restate-invocation-id");
178+
if (invocationIdHeader != null) {
179+
loggingContextSetter.set(LoggingContextSetter.INVOCATION_ID_KEY, invocationIdHeader);
180+
}
181+
176182
// Instantiate state machine
177183
StateMachine stateMachine = StateMachine.init(headersAccessor, loggingContextSetter);
178184

sdk-core/src/main/java/dev/restate/sdk/core/HandlerContextImpl.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class HandlerContextImpl implements HandlerContextInternal {
4242
private final @Nullable String objectKey;
4343
private final String fullyQualifiedHandlerName;
4444

45-
private CompletableFuture<Void> nextProcessedRun;
4645
private final List<AsyncResultInternal<String>> invocationIdsToCancel;
4746
private final HashMap<Integer, Consumer<RunCompleter>> scheduledRuns;
4847

@@ -349,6 +348,10 @@ public void pollAsyncResult(AsyncResultInternal<?> asyncResult) {
349348

350349
private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
351350
while (true) {
351+
if (this.stateMachine.state() == InvocationState.CLOSED) {
352+
asyncResult.publicFuture().completeExceptionally(AbortedExecutionException.INSTANCE);
353+
return;
354+
}
352355
if (asyncResult.isDone()) {
353356
return;
354357
}
@@ -399,21 +402,26 @@ private void pollAsyncResultInner(AsyncResultInternal<?> asyncResult) {
399402
}
400403

401404
// Not ready yet, let's try to do some progress
402-
StateMachine.DoProgressResponse response = this.stateMachine.doProgress(uncompletedLeaves);
405+
StateMachine.DoProgressResponse response;
406+
try {
407+
response = this.stateMachine.doProgress(uncompletedLeaves);
408+
} catch (Throwable e) {
409+
this.failWithoutContextSwitch(e);
410+
asyncResult.publicFuture().completeExceptionally(AbortedExecutionException.INSTANCE);
411+
return;
412+
}
403413

404414
if (response instanceof StateMachine.DoProgressResponse.AnyCompleted) {
405415
// Let it loop now
406-
} else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput) {
407-
this.stateMachine
408-
.waitNextInputSignal()
409-
.thenAccept(v -> this.pollAsyncResultInner(asyncResult));
416+
} else if (response instanceof StateMachine.DoProgressResponse.ReadFromInput
417+
|| response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) {
418+
this.stateMachine.onNextEvent(
419+
() -> this.pollAsyncResultInner(asyncResult),
420+
response instanceof StateMachine.DoProgressResponse.ReadFromInput);
410421
return;
411422
} else if (response instanceof StateMachine.DoProgressResponse.ExecuteRun) {
412423
triggerScheduledRun(((StateMachine.DoProgressResponse.ExecuteRun) response).handle());
413424
// Let it loop now
414-
} else if (response instanceof StateMachine.DoProgressResponse.WaitingPendingRun) {
415-
this.waitNextProcessedRun().thenAccept(v -> this.pollAsyncResultInner(asyncResult));
416-
return;
417425
}
418426
}
419427
}
@@ -425,7 +433,6 @@ public void proposeRunSuccess(int runHandle, Slice toWrite) {
425433
} catch (Exception e) {
426434
this.failWithoutContextSwitch(e);
427435
}
428-
triggerNextProcessedRun();
429436
}
430437

431438
@Override
@@ -439,15 +446,6 @@ public void proposeRunFailure(
439446
} catch (Exception e) {
440447
this.failWithoutContextSwitch(e);
441448
}
442-
triggerNextProcessedRun();
443-
}
444-
445-
private void triggerNextProcessedRun() {
446-
if (this.nextProcessedRun != null) {
447-
var fut = this.nextProcessedRun;
448-
this.nextProcessedRun = null;
449-
fut.complete(null);
450-
}
451449
}
452450

453451
private void triggerScheduledRun(int handle) {
@@ -470,13 +468,6 @@ public void proposeFailure(Throwable toWrite, @Nullable RetryPolicy retryPolicy)
470468
});
471469
}
472470

473-
private CompletableFuture<Void> waitNextProcessedRun() {
474-
if (this.nextProcessedRun == null) {
475-
this.nextProcessedRun = new CompletableFuture<>();
476-
}
477-
return this.nextProcessedRun;
478-
}
479-
480471
@Override
481472
public void close() {
482473
this.stateMachine.end();

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ static StateMachine init(
4242

4343
CompletableFuture<Void> waitForReady();
4444

45-
// --- Await next input
45+
// --- Await next event
4646

47-
CompletableFuture<Void> waitNextInputSignal();
47+
void onNextEvent(Runnable runnable, boolean triggerNowIfInputClosed);
4848

4949
// --- Async results
5050

sdk-core/src/main/java/dev/restate/sdk/core/statemachine/StateMachineImpl.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.function.Consumer;
2929
import org.apache.logging.log4j.LogManager;
3030
import org.apache.logging.log4j.Logger;
31+
import org.jspecify.annotations.NonNull;
3132
import org.jspecify.annotations.Nullable;
3233

3334
class StateMachineImpl implements StateMachine {
@@ -38,7 +39,7 @@ class StateMachineImpl implements StateMachine {
3839

3940
// Callbacks
4041
private final CompletableFuture<Void> waitForReadyFuture = new CompletableFuture<>();
41-
private CompletableFuture<Void> waitNextProcessedInput;
42+
private @NonNull Runnable nextEventListener = () -> {};
4243

4344
// Java Flow and message handling
4445
private final MessageDecoder messageDecoder = new MessageDecoder();
@@ -72,22 +73,22 @@ public CompletableFuture<Void> waitForReady() {
7273
}
7374

7475
@Override
75-
public CompletableFuture<Void> waitNextInputSignal() {
76-
if (this.stateContext.isInputClosed()) {
77-
return CompletableFuture.completedFuture(null);
76+
public void onNextEvent(Runnable runnable, boolean triggerNowIfInputClosed) {
77+
this.nextEventListener =
78+
() -> {
79+
this.nextEventListener.run();
80+
runnable.run();
81+
};
82+
// Trigger this now
83+
if (triggerNowIfInputClosed && this.stateContext.isInputClosed()) {
84+
this.triggerNextEventSignal();
7885
}
79-
if (waitNextProcessedInput == null) {
80-
this.waitNextProcessedInput = new CompletableFuture<>();
81-
}
82-
return this.waitNextProcessedInput;
8386
}
8487

85-
private void triggerWaitNextInputSignal() {
86-
if (this.waitNextProcessedInput != null) {
87-
CompletableFuture<Void> fut = this.waitNextProcessedInput;
88-
this.waitNextProcessedInput = null;
89-
fut.complete(null);
90-
}
88+
private void triggerNextEventSignal() {
89+
Runnable listener = this.nextEventListener;
90+
this.nextEventListener = () -> {};
91+
listener.run();
9192
}
9293

9394
// -- IO
@@ -142,7 +143,7 @@ public void onNext(Slice slice) {
142143
}
143144

144145
if (shouldTriggerInputListener) {
145-
this.triggerWaitNextInputSignal();
146+
this.triggerNextEventSignal();
146147
}
147148

148149
} catch (Throwable e) {
@@ -152,8 +153,8 @@ public void onNext(Slice slice) {
152153

153154
@Override
154155
public void onError(Throwable throwable) {
155-
LOG.trace("Got failure", throwable);
156156
this.stateContext.getCurrentState().hitError(throwable, null, null, this.stateContext);
157+
this.triggerNextEventSignal();
157158
cancelInputSubscription();
158159
}
159160

@@ -164,8 +165,9 @@ public void onComplete() {
164165
this.stateContext.getCurrentState().onInputClosed(this.stateContext);
165166
} catch (Throwable e) {
166167
this.onError(e);
168+
return;
167169
}
168-
this.triggerWaitNextInputSignal();
170+
this.triggerNextEventSignal();
169171
this.cancelInputSubscription();
170172
}
171173

@@ -549,7 +551,13 @@ public int run(String name) {
549551
@Override
550552
public void proposeRunCompletion(int handle, Slice value) {
551553
LOG.debug("Executing 'Run completed with success'");
552-
this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext);
554+
try {
555+
this.stateContext.getCurrentState().proposeRunCompletion(handle, value, this.stateContext);
556+
} catch (Throwable e) {
557+
this.onError(e);
558+
return;
559+
}
560+
this.triggerNextEventSignal();
553561
}
554562

555563
@Override
@@ -559,9 +567,15 @@ public void proposeRunCompletion(
559567
Duration attemptDuration,
560568
@Nullable RetryPolicy retryPolicy) {
561569
LOG.debug("Executing 'Run completed with failure'");
562-
this.stateContext
563-
.getCurrentState()
564-
.proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext);
570+
try {
571+
this.stateContext
572+
.getCurrentState()
573+
.proposeRunCompletion(handle, exception, attemptDuration, retryPolicy, this.stateContext);
574+
} catch (Throwable e) {
575+
this.onError(e);
576+
return;
577+
}
578+
this.triggerNextEventSignal();
565579
}
566580

567581
@Override

0 commit comments

Comments
 (0)