Skip to content

Commit eb67d04

Browse files
authored
fix(workflow): Clearly differentiate WFT failures resulting from unhandle Promise rejection (#1606)
1 parent b0316da commit eb67d04

File tree

7 files changed

+313
-204
lines changed

7 files changed

+313
-204
lines changed

packages/test/src/test-integration-split-two.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -374,9 +374,9 @@ test('unhandledRejection causes WFT to fail', configMacro, async (t, config) =>
374374
t.fail();
375375
return;
376376
}
377-
t.is(failure.message, 'unhandled rejection');
377+
t.is(failure.message, 'Unhandled Promise rejection: Error: unhandled rejection');
378378
t.true(failure.stackTrace?.includes(`Error: unhandled rejection`));
379-
t.is(failure.cause?.message, 'root failure');
379+
t.is(failure.cause?.cause?.message, 'root failure');
380380
},
381381
{ minTimeout: 300, factor: 1, retries: 100 }
382382
)

packages/test/src/test-payload-converter.ts

+11-5
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,18 @@ if (RUN_INTEGRATION_TESTS) {
193193
const expectedErrorWasThrown = new Promise<void>((resolve) => {
194194
markErrorThrown = resolve;
195195
});
196-
const logger = new DefaultLogger('ERROR', (entry) => {
197-
if (entry.meta?.error.message === 'Unknown encoding: json/protobuf') {
198-
markErrorThrown();
199-
}
196+
197+
Runtime.install({
198+
logger: new DefaultLogger('WARN', (entry) => {
199+
if (
200+
entry.message.includes('Failing workflow task') &&
201+
entry.meta?.failure?.includes('Unknown encoding: json/protobuf')
202+
) {
203+
markErrorThrown();
204+
}
205+
}),
206+
telemetryOptions: { logging: { forward: {}, filter: 'WARN' } },
200207
});
201-
Runtime.install({ logger });
202208

203209
const taskQueue = `${__filename}/${t.title}`;
204210
const worker = await Worker.create({

packages/test/src/test-workflows.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -1645,9 +1645,16 @@ test('globalOverrides', async (t) => {
16451645

16461646
test('logAndTimeout', async (t) => {
16471647
const { workflowType, workflow } = t.context;
1648-
await t.throwsAsync(activate(t, makeStartWorkflow(workflowType)), {
1649-
code: 'ERR_SCRIPT_EXECUTION_TIMEOUT',
1650-
message: 'Script execution timed out after 400ms',
1648+
const completion = await activate(t, makeStartWorkflow(workflowType));
1649+
compareCompletion(t, completion, {
1650+
failed: {
1651+
failure: {
1652+
message: 'Script execution timed out after 400ms',
1653+
source: 'TypeScriptSDK',
1654+
stackTrace: 'Error: Script execution timed out after 400ms',
1655+
cause: undefined,
1656+
},
1657+
},
16511658
});
16521659
const calls = await workflow.getAndResetSinkCalls();
16531660
// Ignore LogTimestamp and workflowInfo for the purpose of this comparison

packages/worker/src/errors.ts

-5
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@ export class GracefulShutdownPeriodExpiredError extends Error {}
1616
* a Promise. To silent rejections on a specific Promise, use `promise.catch(funcThatCantThrow)`
1717
* (e.g. `promise.catch(() => void 0)` or `promise.catch((e) => logger.error(e))`).
1818
*/
19-
// FIXME: At this time, this wrapper is only used for errors that could not be associated with a
20-
// specific workflow run; it should also be used for unhandled rejections in workflow code,
21-
// but this is not possible at the moment as we intentionally "unhandle" non-TemporalFailure
22-
// errors happening in workflow code (i.e. ALL non-TemporalFailure errors thrown from
23-
// workflow code becomes Unhandled Rejection at some point in our own logic)
2419
@SymbolBasedInstanceOfError('UnhandledRejectionError')
2520
export class UnhandledRejectionError extends Error {
2621
constructor(

packages/worker/src/workflow/vm-shared.ts

+86-75
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export function setUnhandledRejectionHandler(getWorkflowByRunId: (runId: string)
2626
if (runId !== undefined) {
2727
const workflow = getWorkflowByRunId(runId);
2828
if (workflow !== undefined) {
29-
workflow.setUnhandledRejection(err);
29+
workflow.setUnhandledRejection(new UnhandledRejectionError(`Unhandled Promise rejection: ${err}`, err));
3030
return;
3131
}
3232
}
@@ -323,97 +323,100 @@ export abstract class BaseVMWorkflow implements Workflow {
323323
public async activate(
324324
activation: coresdk.workflow_activation.IWorkflowActivation
325325
): Promise<coresdk.workflow_completion.IWorkflowActivationCompletion> {
326-
if (this.context === undefined) throw new IllegalStateError('Workflow isolate context uninitialized');
327-
activation = coresdk.workflow_activation.WorkflowActivation.fromObject(activation);
328-
if (!activation.jobs) throw new TypeError('Expected workflow activation jobs to be defined');
329-
330-
// Queries are particular in many ways, and Core guarantees that a single activation will not
331-
// contain both queries and other jobs. So let's handle them separately.
332-
const [queries, nonQueries] = partition(activation.jobs, ({ queryWorkflow }) => queryWorkflow != null);
333-
if (queries.length > 0) {
334-
if (nonQueries.length > 0) throw new TypeError('Got both queries and other jobs in a single activation');
335-
return this.activateQueries(activation);
336-
}
326+
try {
327+
if (this.context === undefined) throw new IllegalStateError('Workflow isolate context uninitialized');
328+
activation = coresdk.workflow_activation.WorkflowActivation.fromObject(activation);
329+
if (!activation.jobs) throw new TypeError('Expected workflow activation jobs to be defined');
330+
331+
// Queries are particular in many ways, and Core guarantees that a single activation will not
332+
// contain both queries and other jobs. So let's handle them separately.
333+
const [queries, nonQueries] = partition(activation.jobs, ({ queryWorkflow }) => queryWorkflow != null);
334+
if (queries.length > 0) {
335+
if (nonQueries.length > 0) throw new TypeError('Got both queries and other jobs in a single activation');
336+
return this.activateQueries(activation);
337+
}
337338

338-
// Update the activator's state in preparation for a non-query activation.
339-
// This is done early, so that we can then rely on the activator while processing the activation.
340-
if (activation.timestamp == null)
341-
throw new TypeError('Expected activation.timestamp to be set for non-query activation');
342-
this.activator.now = tsToMs(activation.timestamp);
343-
this.activator.mutateWorkflowInfo((info) => ({
344-
...info,
345-
historyLength: activation.historyLength as number,
346-
// Exact truncation for multi-petabyte histories
347-
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
348-
historySize: activation.historySizeBytes?.toNumber() ?? 0,
349-
continueAsNewSuggested: activation.continueAsNewSuggested ?? false,
350-
currentBuildId: activation.buildIdForCurrentTask ?? undefined,
351-
unsafe: {
352-
...info.unsafe,
353-
isReplaying: activation.isReplaying ?? false,
354-
},
355-
}));
356-
this.activator.addKnownFlags(activation.availableInternalFlags ?? []);
339+
// Update the activator's state in preparation for a non-query activation.
340+
// This is done early, so that we can then rely on the activator while processing the activation.
341+
if (activation.timestamp == null)
342+
throw new TypeError('Expected activation.timestamp to be set for non-query activation');
343+
this.activator.now = tsToMs(activation.timestamp);
344+
this.activator.mutateWorkflowInfo((info) => ({
345+
...info,
346+
historyLength: activation.historyLength as number,
347+
// Exact truncation for multi-petabyte histories
348+
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
349+
historySize: activation.historySizeBytes?.toNumber() ?? 0,
350+
continueAsNewSuggested: activation.continueAsNewSuggested ?? false,
351+
currentBuildId: activation.buildIdForCurrentTask ?? undefined,
352+
unsafe: {
353+
...info.unsafe,
354+
isReplaying: activation.isReplaying ?? false,
355+
},
356+
}));
357+
this.activator.addKnownFlags(activation.availableInternalFlags ?? []);
357358

358-
// Initialization of the workflow must happen before anything else. Yet, keep the init job in
359-
// place in the list as we'll use it as a marker to know when to start the workflow function.
360-
const initWorkflowJob = activation.jobs.find((job) => job.initializeWorkflow != null)?.initializeWorkflow;
361-
if (initWorkflowJob) this.workflowModule.initialize(initWorkflowJob);
359+
// Initialization of the workflow must happen before anything else. Yet, keep the init job in
360+
// place in the list as we'll use it as a marker to know when to start the workflow function.
361+
const initWorkflowJob = activation.jobs.find((job) => job.initializeWorkflow != null)?.initializeWorkflow;
362+
if (initWorkflowJob) this.workflowModule.initialize(initWorkflowJob);
362363

363-
const hasSignals = activation.jobs.some(({ signalWorkflow }) => signalWorkflow != null);
364-
const doSingleBatch = !hasSignals || this.activator.hasFlag(SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch);
364+
const hasSignals = activation.jobs.some(({ signalWorkflow }) => signalWorkflow != null);
365+
const doSingleBatch = !hasSignals || this.activator.hasFlag(SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch);
365366

366-
const [patches, nonPatches] = partition(activation.jobs, ({ notifyHasPatch }) => notifyHasPatch != null);
367-
for (const { notifyHasPatch } of patches) {
368-
if (notifyHasPatch == null) throw new TypeError('Expected notifyHasPatch to be set');
369-
this.activator.notifyHasPatch(notifyHasPatch);
370-
}
367+
const [patches, nonPatches] = partition(activation.jobs, ({ notifyHasPatch }) => notifyHasPatch != null);
368+
for (const { notifyHasPatch } of patches) {
369+
if (notifyHasPatch == null) throw new TypeError('Expected notifyHasPatch to be set');
370+
this.activator.notifyHasPatch(notifyHasPatch);
371+
}
371372

372-
if (doSingleBatch) {
373-
// updateRandomSeed requires the same special handling as patches (before anything else, and don't
374-
// unblock conditions after each job). Unfortunately, prior to ProcessWorkflowActivationJobsAsSingleBatch,
375-
// they were handled as regular jobs, making it unsafe to properly handle that job above, with patches.
376-
const [updateRandomSeed, rest] = partition(nonPatches, ({ updateRandomSeed }) => updateRandomSeed != null);
377-
if (updateRandomSeed.length > 0)
378-
this.activator.updateRandomSeed(updateRandomSeed[updateRandomSeed.length - 1].updateRandomSeed!);
379-
this.workflowModule.activate(
380-
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs: rest })
381-
);
382-
this.tryUnblockConditionsAndMicrotasks();
383-
} else {
384-
const [signals, nonSignals] = partition(
385-
nonPatches,
386-
// Move signals to a first batch; all the rest goes in a second batch.
387-
({ signalWorkflow }) => signalWorkflow != null
388-
);
389-
390-
// Loop and invoke each batch, waiting for microtasks to complete after each batch.
391-
let batchIndex = 0;
392-
for (const jobs of [signals, nonSignals]) {
393-
if (jobs.length === 0) continue;
373+
if (doSingleBatch) {
374+
// updateRandomSeed requires the same special handling as patches (before anything else, and don't
375+
// unblock conditions after each job). Unfortunately, prior to ProcessWorkflowActivationJobsAsSingleBatch,
376+
// they were handled as regular jobs, making it unsafe to properly handle that job above, with patches.
377+
const [updateRandomSeed, rest] = partition(nonPatches, ({ updateRandomSeed }) => updateRandomSeed != null);
378+
if (updateRandomSeed.length > 0)
379+
this.activator.updateRandomSeed(updateRandomSeed[updateRandomSeed.length - 1].updateRandomSeed!);
394380
this.workflowModule.activate(
395-
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }),
396-
batchIndex++
381+
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs: rest })
397382
);
398383
this.tryUnblockConditionsAndMicrotasks();
384+
} else {
385+
const [signals, nonSignals] = partition(
386+
nonPatches,
387+
// Move signals to a first batch; all the rest goes in a second batch.
388+
({ signalWorkflow }) => signalWorkflow != null
389+
);
390+
391+
// Loop and invoke each batch, waiting for microtasks to complete after each batch.
392+
let batchIndex = 0;
393+
for (const jobs of [signals, nonSignals]) {
394+
if (jobs.length === 0) continue;
395+
this.workflowModule.activate(
396+
coresdk.workflow_activation.WorkflowActivation.fromObject({ ...activation, jobs }),
397+
batchIndex++
398+
);
399+
this.tryUnblockConditionsAndMicrotasks();
400+
}
399401
}
400-
}
401402

402-
const completion = this.workflowModule.concludeActivation();
403+
const completion = this.workflowModule.concludeActivation();
403404

404-
// Give unhandledRejection handler a chance to be triggered.
405-
await new Promise(setImmediate);
406-
if (this.unhandledRejection) {
405+
// Give unhandledRejection handler a chance to be triggered.
406+
await new Promise(setImmediate);
407+
if (this.unhandledRejection) throw this.unhandledRejection;
408+
409+
return completion;
410+
} catch (err) {
407411
return {
408412
runId: this.activator.info.runId,
409413
// FIXME: Calling `activator.errorToFailure()` directly from outside the VM is unsafe, as it
410414
// depends on the `failureConverter` and `payloadConverter`, which may be customized and
411415
// therefore aren't guaranteed not to access `global` or to cause scheduling microtasks.
412416
// Admitingly, the risk is very low, so we're leaving it as is for now.
413-
failed: { failure: this.activator.errorToFailure(this.unhandledRejection) },
417+
failed: { failure: this.activator.errorToFailure(err) },
414418
};
415419
}
416-
return completion;
417420
}
418421

419422
private activateQueries(
@@ -434,14 +437,22 @@ export abstract class BaseVMWorkflow implements Workflow {
434437
* If called (by an external unhandledRejection handler), activations will fail with provided error.
435438
*/
436439
public setUnhandledRejection(err: unknown): void {
440+
if (this.activator) {
441+
// This is very unlikely to make a difference, as unhandled rejections should be reported
442+
// on the next macro task of the outer execution context (i.e. not inside the VM), at which
443+
// point we are done handling the workflow activation anyway. But just in case, copying the
444+
// error to the activator will ensure that any attempt to make progress in the workflow
445+
// VM will immediately fail.
446+
this.activator.workflowTaskError = err;
447+
}
437448
this.unhandledRejection = err;
438449
}
439450

440451
/**
441452
* Call into the Workflow context to attempt to unblock any blocked conditions and microtasks.
442453
*
443-
* This is performed in a loop allowing microtasks to be processed between
444-
* each iteration until there are no more conditions to unblock.
454+
* This is performed in a loop, going in and out of the VM, allowing microtasks to be processed
455+
* between each iteration of the outer loop, until there are no more conditions to unblock.
445456
*/
446457
protected tryUnblockConditionsAndMicrotasks(): void {
447458
for (;;) {

packages/workflow/src/internals.ts

+57-8
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,38 @@ export class Activator implements ActivationHandler {
204204
childToParent: new Map(),
205205
};
206206

207+
/**
208+
* The error that caused the current Workflow Task to fail. Sets if a non-`TemporalFailure`
209+
* error bubbles up out of the Workflow function, or out of a Signal or Update handler. We
210+
* capture errors this way because those functions are not technically awaited when started,
211+
* but left to run asynchronously. There is therefore no real "parent" function that can
212+
* directly handle those errors, and not capturing it would result in an Unhandled Promise
213+
* Rejection. So instead, we buffer the error here, to then be processed in the context
214+
* of our own synchronous Activation handling event loop.
215+
*
216+
* Our code does a best effort to stop processing the current activation as soon as possible
217+
* after this field is set:
218+
* - If an error is thrown while executing code synchronously (e.g. anything before the
219+
* first `await` statement in a Workflow function or a signal/update handler), the error
220+
* will be _immediately_ rethrown, which will prevent execution of further jobs in the
221+
* current activation. We know we're currently running code synchronously thanks to the
222+
* `rethrowSynchronously` flag below.
223+
* - It an error is thrown while executing microtasks, then the error will be rethrown on
224+
* the next call to `tryUnblockConditions()`.
225+
*
226+
* Unfortunately, there's no way for us to prevent further execution of microtasks that have
227+
* already been scheduled, nor those that will be recursively scheduled from those microtasks.
228+
* Should more errors get thrown while settling microtasks, those will be ignored (i.e. only
229+
* the first captured error is preserved).
230+
*/
231+
public workflowTaskError: unknown;
232+
233+
/**
234+
* Set to true when running synchronous code (e.g. while processing activation jobs and when calling
235+
* `tryUnblockConditions()`). While this flag is set, it is safe to let errors bubble up.
236+
*/
237+
public rethrowSynchronously = false;
238+
207239
public readonly rootScope = new RootCancellationScope();
208240

209241
/**
@@ -703,7 +735,7 @@ export class Activator implements ActivationHandler {
703735
if (error instanceof TemporalFailure) {
704736
this.rejectUpdate(protocolInstanceId, error);
705737
} else {
706-
throw error;
738+
this.handleWorkflowFailure(error);
707739
}
708740
})
709741
.finally(() => this.inProgressUpdates.delete(updateId));
@@ -830,6 +862,8 @@ export class Activator implements ActivationHandler {
830862
}
831863

832864
public warnIfUnfinishedHandlers(): void {
865+
if (this.workflowTaskError) return;
866+
833867
const getWarnable = (handlerExecutions: Iterable<MessageHandlerExecution>): MessageHandlerExecution[] => {
834868
return Array.from(handlerExecutions).filter(
835869
(ex) => ex.unfinishedPolicy === HandlerUnfinishedPolicy.WARN_AND_ABANDON
@@ -941,17 +975,12 @@ export class Activator implements ActivationHandler {
941975
* Transforms failures into a command to be sent to the server.
942976
* Used to handle any failure emitted by the Workflow.
943977
*/
944-
async handleWorkflowFailure(error: unknown): Promise<void> {
978+
handleWorkflowFailure(error: unknown): void {
945979
if (this.cancelled && isCancellation(error)) {
946980
this.pushCommand({ cancelWorkflowExecution: {} }, true);
947981
} else if (error instanceof ContinueAsNew) {
948982
this.pushCommand({ continueAsNewWorkflowExecution: error.command }, true);
949-
} else {
950-
if (!(error instanceof TemporalFailure)) {
951-
// This results in an unhandled rejection which will fail the activation
952-
// preventing it from completing.
953-
throw error;
954-
}
983+
} else if (error instanceof TemporalFailure) {
955984
// Fail the workflow. We do not want to issue unfinishedHandlers warnings. To achieve that, we
956985
// mark all handlers as completed now.
957986
this.inProgressSignals.clear();
@@ -964,9 +993,29 @@ export class Activator implements ActivationHandler {
964993
},
965994
true
966995
);
996+
} else {
997+
this.recordWorkflowTaskError(error);
967998
}
968999
}
9691000

1001+
recordWorkflowTaskError(error: unknown): void {
1002+
// Only keep the first error that bubbles up; subsequent errors will be ignored.
1003+
if (this.workflowTaskError === undefined) this.workflowTaskError = error;
1004+
1005+
// Immediately rethrow the error if we know it is safe to do so (i.e. we are not running async
1006+
// microtasks). Otherwise, the error will be rethrown whenever we get an opportunity to do so,
1007+
// e.g. the next time `tryUnblockConditions()` is called.
1008+
if (this.rethrowSynchronously) this.maybeRethrowWorkflowTaskError();
1009+
}
1010+
1011+
/**
1012+
* If a Workflow Task error was captured, and we are running in synchronous mode,
1013+
* then bubble it up now. This is safe to call even if there is no error to rethrow.
1014+
*/
1015+
maybeRethrowWorkflowTaskError(): void {
1016+
if (this.workflowTaskError) throw this.workflowTaskError;
1017+
}
1018+
9701019
private completeQuery(queryId: string, result: unknown): void {
9711020
this.pushCommand({
9721021
respondToQuery: { queryId, succeeded: { response: this.payloadConverter.toPayload(result) } },

0 commit comments

Comments
 (0)