Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/big-ants-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Move max duration handling into the parent process
11 changes: 11 additions & 0 deletions packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ usage.setGlobalUsageManager(devUsageManager);
const usageTimeoutManager = new UsageTimeoutManager(devUsageManager);
timeout.setGlobalManager(usageTimeoutManager);

// Register listener to send IPC message when max duration is exceeded
timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => {
log(
`[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s`
);
await zodIpc.send("MAX_DURATION_EXCEEDED", {
maxDurationInSeconds,
elapsedTimeInSeconds,
});
});

const standardResourceCatalog = new StandardResourceCatalog();
resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog);

Expand Down
11 changes: 11 additions & 0 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,17 @@ function initializeUsageManager({
usage.setGlobalUsageManager(prodUsageManager);
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));

// Register listener to send IPC message when max duration is exceeded
timeout.registerListener(async (maxDurationInSeconds, elapsedTimeInSeconds) => {
console.log(
`[${new Date().toISOString()}] Max duration exceeded: ${maxDurationInSeconds}s, elapsed: ${elapsedTimeInSeconds}s`
);
await zodIpc.send("MAX_DURATION_EXCEEDED", {
maxDurationInSeconds,
elapsedTimeInSeconds,
});
});

return prodUsageManager;
}

Expand Down
48 changes: 47 additions & 1 deletion packages/cli-v3/src/executions/taskRunProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
CleanupProcessError,
internalErrorFromUnexpectedExit,
GracefulExitTimeoutError,
MaxDurationExceededError,
UnexpectedExitError,
SuspendedProcessError,
} from "@trigger.dev/core/v3/errors";
Expand Down Expand Up @@ -74,6 +75,8 @@ export class TaskRunProcess {
private _isBeingKilled: boolean = false;
private _isBeingCancelled: boolean = false;
private _isBeingSuspended: boolean = false;
private _isMaxDurationExceeded: boolean = false;
private _maxDurationInfo?: { maxDurationInSeconds: number; elapsedTimeInSeconds: number };
private _stderr: Array<string> = [];

public onTaskRunHeartbeat: Evt<string> = new Evt();
Expand Down Expand Up @@ -209,6 +212,23 @@ export class TaskRunProcess {
SET_SUSPENDABLE: async (message) => {
this.onSetSuspendable.post(message);
},
MAX_DURATION_EXCEEDED: async (message) => {
logger.debug("max duration exceeded, gracefully terminating child process", {
maxDurationInSeconds: message.maxDurationInSeconds,
elapsedTimeInSeconds: message.elapsedTimeInSeconds,
pid: this.pid,
});

// Set flag and store duration info for error reporting in #handleExit
this._isMaxDurationExceeded = true;
this._maxDurationInfo = {
maxDurationInSeconds: message.maxDurationInSeconds,
elapsedTimeInSeconds: message.elapsedTimeInSeconds,
};

// Use the same graceful termination approach as cancel
await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs);
},
},
});

Expand Down Expand Up @@ -319,7 +339,25 @@ export class TaskRunProcess {

const { rejecter } = attemptPromise;

if (this._isBeingCancelled) {
if (this._isMaxDurationExceeded) {
if (!this._maxDurationInfo) {
rejecter(
new UnexpectedExitError(
code ?? -1,
signal,
"MaxDuration flag set but duration info missing"
)
);
continue;
}

rejecter(
new MaxDurationExceededError(
this._maxDurationInfo.maxDurationInSeconds,
this._maxDurationInfo.elapsedTimeInSeconds
)
);
} else if (this._isBeingCancelled) {
rejecter(new CancelledProcessError());
} else if (this._gracefulExitTimeoutElapsed) {
// Order matters, this has to be before the graceful exit timeout
Expand Down Expand Up @@ -477,6 +515,14 @@ export class TaskRunProcess {
};
}

if (error instanceof MaxDurationExceededError) {
return {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED,
message: error.message,
};
}

if (error instanceof CleanupProcessError) {
return {
type: "INTERNAL_ERROR",
Expand Down
11 changes: 11 additions & 0 deletions packages/core/src/v3/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,17 @@ export class GracefulExitTimeoutError extends Error {
}
}

export class MaxDurationExceededError extends Error {
constructor(
public readonly maxDurationInSeconds: number,
public readonly elapsedTimeInSeconds: number
) {
super(`Run exceeded maximum compute time (maxDuration) of ${maxDurationInSeconds} seconds`);

this.name = "MaxDurationExceededError";
}
}

type ErrorLink = {
name: string;
href: string;
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ export const ExecutorToWorkerMessageCatalog = {
suspendable: z.boolean(),
}),
},
MAX_DURATION_EXCEEDED: {
message: z.object({
version: z.literal("v1").default("v1"),
maxDurationInSeconds: z.number(),
elapsedTimeInSeconds: z.number(),
}),
},
};

export const WorkerToExecutorMessageCatalog = {
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/v3/timeout/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ export class TimeoutAPI implements TimeoutManager {
this.disable();
}

public registerListener(listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>) {
const manager = this.#getManager();
if (manager.registerListener) {
manager.registerListener(listener);
}
}

#getManager(): TimeoutManager {
return getGlobal(API_NAME) ?? NOOP_TIMEOUT_MANAGER;
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/timeout/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export interface TimeoutManager {
abortAfterTimeout: (timeoutInSeconds?: number) => AbortController;
signal?: AbortSignal;
reset: () => void;
registerListener?: (listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>) => void;
}

export class TaskRunExceededMaxDuration extends Error {
Expand Down
19 changes: 18 additions & 1 deletion packages/core/src/v3/timeout/usageTimeoutManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,21 @@ export class UsageTimeoutManager implements TimeoutManager {
private _abortController: AbortController;
private _abortSignal: AbortSignal | undefined;
private _intervalId: NodeJS.Timeout | undefined;
private _listener?: (
timeoutInSeconds: number,
elapsedTimeInSeconds: number
) => void | Promise<void>;

constructor(private readonly usageManager: UsageManager) {
this._abortController = new AbortController();
}

registerListener(
listener: (timeoutInSeconds: number, elapsedTimeInSeconds: number) => void | Promise<void>
): void {
this._listener = listener;
}

get signal(): AbortSignal | undefined {
return this._abortSignal;
}
Expand Down Expand Up @@ -42,8 +52,15 @@ export class UsageTimeoutManager implements TimeoutManager {
if (sample.cpuTime > timeoutInSeconds * 1000) {
clearInterval(this._intervalId);

const elapsedTimeInSeconds = sample.cpuTime / 1000;

// Call the listener if registered
if (this._listener) {
this._listener(timeoutInSeconds, elapsedTimeInSeconds);
}

this._abortController.abort(
new TaskRunExceededMaxDuration(timeoutInSeconds, sample.cpuTime / 1000)
new TaskRunExceededMaxDuration(timeoutInSeconds, elapsedTimeInSeconds)
);
}
}
Expand Down
23 changes: 3 additions & 20 deletions packages/core/src/v3/workers/taskExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { promiseWithResolvers } from "../../utils.js";
import { ApiError, RateLimitError } from "../apiClient/errors.js";
import { ConsoleInterceptor } from "../consoleInterceptor.js";
import {
InternalError,
isCompleteTaskWithOutput,
isInternalError,
parseError,
Expand Down Expand Up @@ -419,29 +418,13 @@ export class TaskExecutor {
throw new Error("Task does not have a run function");
}

// Create a promise that rejects when the signal aborts
const abortPromise = new Promise((_, reject) => {
signal.addEventListener("abort", () => {
if (typeof signal.reason === "string" && signal.reason.includes("cancel")) {
return;
}

const maxDuration = ctx.run.maxDuration;
reject(
new InternalError({
code: TaskRunErrorCodes.MAX_DURATION_EXCEEDED,
message: `Run exceeded maximum compute time (maxDuration) of ${maxDuration} seconds`,
})
);
});
});

return runTimelineMetrics.measureMetric("trigger.dev/execution", "run", async () => {
return await this._tracer.startActiveSpan(
"run()",
async (span) => {
// Race between the run function and the abort promise
return await Promise.race([runFn(payload, { ctx, init, signal }), abortPromise]);
// maxDuration is now enforced by killing the process, not by Promise.race
// The signal is still passed to runFn for cancellation and other abort conditions
return await runFn(payload, { ctx, init, signal });
},
{
attributes: { [SemanticInternalAttributes.STYLE_ICON]: "task-fn-run" },
Expand Down
3 changes: 0 additions & 3 deletions packages/rsc/src/package.json

This file was deleted.

Loading