Skip to content

Commit ae8e83b

Browse files
authored
chore(runner): move max duration logic into parent process (#2637)
* chore(runner): move max duration logic into parent process * chore(rsc): remove type-marker package.json * add changeset * chore(core): remove irrelevant test after our changes * chore(core): clarify we don't care about the timeout promise
1 parent 2283ca6 commit ae8e83b

File tree

12 files changed

+122
-112
lines changed

12 files changed

+122
-112
lines changed

.changeset/big-ants-act.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Move max duration handling into the parent process

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,17 @@ usage.setGlobalUsageManager(devUsageManager);
128128
const usageTimeoutManager = new UsageTimeoutManager(devUsageManager);
129129
timeout.setGlobalManager(usageTimeoutManager);
130130

131+
// Register listener to send IPC message when max duration is exceeded
132+
timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => {
133+
log(
134+
`[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s`
135+
);
136+
await zodIpc.send("MAX_DURATION_EXCEEDED", {
137+
maxDurationInSeconds,
138+
elapsedTimeInSeconds,
139+
});
140+
});
141+
131142
const standardResourceCatalog = new StandardResourceCatalog();
132143
resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog);
133144

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,17 @@ function initializeUsageManager({
726726
usage.setGlobalUsageManager(prodUsageManager);
727727
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
728728

729+
// Register listener to send IPC message when max duration is exceeded
730+
timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => {
731+
console.log(
732+
`[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s`
733+
);
734+
await zodIpc.send("MAX_DURATION_EXCEEDED", {
735+
maxDurationInSeconds,
736+
elapsedTimeInSeconds,
737+
});
738+
});
739+
729740
return prodUsageManager;
730741
}
731742

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
CleanupProcessError,
3131
internalErrorFromUnexpectedExit,
3232
GracefulExitTimeoutError,
33+
MaxDurationExceededError,
3334
UnexpectedExitError,
3435
SuspendedProcessError,
3536
} from "@trigger.dev/core/v3/errors";
@@ -74,6 +75,8 @@ export class TaskRunProcess {
7475
private _isBeingKilled: boolean = false;
7576
private _isBeingCancelled: boolean = false;
7677
private _isBeingSuspended: boolean = false;
78+
private _isMaxDurationExceeded: boolean = false;
79+
private _maxDurationInfo?: { maxDurationInSeconds: number; elapsedTimeInSeconds: number };
7780
private _stderr: Array<string> = [];
7881

7982
public onTaskRunHeartbeat: Evt<string> = new Evt();
@@ -209,6 +212,23 @@ export class TaskRunProcess {
209212
SET_SUSPENDABLE: async (message) => {
210213
this.onSetSuspendable.post(message);
211214
},
215+
MAX_DURATION_EXCEEDED: async (message) => {
216+
logger.debug("max duration exceeded, gracefully terminating child process", {
217+
maxDurationInSeconds: message.maxDurationInSeconds,
218+
elapsedTimeInSeconds: message.elapsedTimeInSeconds,
219+
pid: this.pid,
220+
});
221+
222+
// Set flag and store duration info for error reporting in #handleExit
223+
this._isMaxDurationExceeded = true;
224+
this._maxDurationInfo = {
225+
maxDurationInSeconds: message.maxDurationInSeconds,
226+
elapsedTimeInSeconds: message.elapsedTimeInSeconds,
227+
};
228+
229+
// Use the same graceful termination approach as cancel
230+
await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs);
231+
},
212232
},
213233
});
214234

@@ -319,7 +339,25 @@ export class TaskRunProcess {
319339

320340
const { rejecter } = attemptPromise;
321341

322-
if (this._isBeingCancelled) {
342+
if (this._isMaxDurationExceeded) {
343+
if (!this._maxDurationInfo) {
344+
rejecter(
345+
new UnexpectedExitError(
346+
code ?? -1,
347+
signal,
348+
"MaxDuration flag set but duration info missing"
349+
)
350+
);
351+
continue;
352+
}
353+
354+
rejecter(
355+
new MaxDurationExceededError(
356+
this._maxDurationInfo.maxDurationInSeconds,
357+
this._maxDurationInfo.elapsedTimeInSeconds
358+
)
359+
);
360+
} else if (this._isBeingCancelled) {
323361
rejecter(new CancelledProcessError());
324362
} else if (this._gracefulExitTimeoutElapsed) {
325363
// Order matters, this has to be before the graceful exit timeout
@@ -477,6 +515,14 @@ export class TaskRunProcess {
477515
};
478516
}
479517

518+
if (error instanceof MaxDurationExceededError) {
519+
return {
520+
type: "INTERNAL_ERROR",
521+
code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED,
522+
message: error.message,
523+
};
524+
}
525+
480526
if (error instanceof CleanupProcessError) {
481527
return {
482528
type: "INTERNAL_ERROR",

packages/core/src/v3/errors.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,17 @@ export class GracefulExitTimeoutError extends Error {
557557
}
558558
}
559559

560+
export class MaxDurationExceededError extends Error {
561+
constructor(
562+
public readonly maxDurationInSeconds: number,
563+
public readonly elapsedTimeInSeconds: number
564+
) {
565+
super(`Run exceeded maximum compute time (maxDuration) of ${maxDurationInSeconds} seconds`);
566+
567+
this.name = "MaxDurationExceededError";
568+
}
569+
}
570+
560571
type ErrorLink = {
561572
name: string;
562573
href: string;

packages/core/src/v3/schemas/messages.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,13 @@ export const ExecutorToWorkerMessageCatalog = {
189189
suspendable: z.boolean(),
190190
}),
191191
},
192+
MAX_DURATION_EXCEEDED: {
193+
message: z.object({
194+
version: z.literal("v1").default("v1"),
195+
maxDurationInSeconds: z.number(),
196+
elapsedTimeInSeconds: z.number(),
197+
}),
198+
},
192199
};
193200

194201
export const WorkerToExecutorMessageCatalog = {

packages/core/src/v3/timeout/api.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ export class TimeoutAPI implements TimeoutManager {
4747
this.disable();
4848
}
4949

50+
public registerListener(listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>) {
51+
const manager = this.#getManager();
52+
if (manager.registerListener) {
53+
manager.registerListener(listener);
54+
}
55+
}
56+
5057
#getManager(): TimeoutManager {
5158
return getGlobal(API_NAME) ?? NOOP_TIMEOUT_MANAGER;
5259
}

packages/core/src/v3/timeout/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export interface TimeoutManager {
22
abortAfterTimeout: (timeoutInSeconds?: number) => AbortController;
33
signal?: AbortSignal;
44
reset: () => void;
5+
registerListener?: (listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>) => void;
56
}
67

78
export class TaskRunExceededMaxDuration extends Error {

packages/core/src/v3/timeout/usageTimeoutManager.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,21 @@ export class UsageTimeoutManager implements TimeoutManager {
55
private _abortController: AbortController;
66
private _abortSignal: AbortSignal | undefined;
77
private _intervalId: NodeJS.Timeout | undefined;
8+
private _listener?: (
9+
timeoutInSeconds: number,
10+
elapsedTimeInSeconds: number
11+
) => void | Promise<void>;
812

913
constructor(private readonly usageManager: UsageManager) {
1014
this._abortController = new AbortController();
1115
}
1216

17+
registerListener(
18+
listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>
19+
): void {
20+
this._listener = listener;
21+
}
22+
1323
get signal(): AbortSignal | undefined {
1424
return this._abortSignal;
1525
}
@@ -42,8 +52,15 @@ export class UsageTimeoutManager implements TimeoutManager {
4252
if (sample.cpuTime > timeoutInSeconds * 1000) {
4353
clearInterval(this._intervalId);
4454

55+
const elapsedTimeInSeconds = sample.cpuTime / 1000;
56+
57+
// Call the listener if registered
58+
if (this._listener) {
59+
void this._listener(timeoutInSeconds, elapsedTimeInSeconds);
60+
}
61+
4562
this._abortController.abort(
46-
new TaskRunExceededMaxDuration(timeoutInSeconds, sample.cpuTime / 1000)
63+
new TaskRunExceededMaxDuration(timeoutInSeconds, elapsedTimeInSeconds)
4764
);
4865
}
4966
}

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { promiseWithResolvers } from "../../utils.js";
33
import { ApiError, RateLimitError } from "../apiClient/errors.js";
44
import { ConsoleInterceptor } from "../consoleInterceptor.js";
55
import {
6-
InternalError,
76
isCompleteTaskWithOutput,
87
isInternalError,
98
parseError,
@@ -419,29 +418,13 @@ export class TaskExecutor {
419418
throw new Error("Task does not have a run function");
420419
}
421420

422-
// Create a promise that rejects when the signal aborts
423-
const abortPromise = new Promise((_, reject) => {
424-
signal.addEventListener("abort", () => {
425-
if (typeof signal.reason === "string" && signal.reason.includes("cancel")) {
426-
return;
427-
}
428-
429-
const maxDuration = ctx.run.maxDuration;
430-
reject(
431-
new InternalError({
432-
code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED,
433-
message: `Run exceeded maximum compute time (maxDuration) of ${maxDuration} seconds`,
434-
})
435-
);
436-
});
437-
});
438-
439421
return runTimelineMetrics.measureMetric("trigger.dev/execution", "run", async () => {
440422
return await this._tracer.startActiveSpan(
441423
"run()",
442424
async (span) => {
443-
// Race between the run function and the abort promise
444-
return await Promise.race([runFn(payload, { ctx, init, signal }), abortPromise]);
425+
// maxDuration is now enforced by killing the process, not by Promise.race
426+
// The signal is still passed to runFn for cancellation and other abort conditions
427+
return await runFn(payload, { ctx, init, signal });
445428
},
446429
{
447430
attributes: { [SemanticInternalAttributes.STYLE_ICON]: "task-fn-run" },

0 commit comments

Comments
 (0)