Skip to content

Commit 562e88a

Browse files
committed
feat(cli): auto-cancel dev runs on CLI exit via detached watchdog
When the dev CLI exits (e.g. ctrl+c via pnpm), runs that were mid-execution previously stayed stuck in EXECUTING status for up to 5 minutes until the heartbeat timeout fired. Now they are cancelled within seconds. The dev CLI spawns a lightweight detached watchdog process at startup. The watchdog monitors the CLI process ID and, when it detects the CLI has exited, calls a new POST /engine/v1/dev/disconnect endpoint to cancel all in-flight runs immediately (skipping PENDING_CANCEL since the worker is known to be dead). Watchdog design: - Fully detached (detached: true, stdio: ignore, unref()) so it survives even when pnpm sends SIGKILL to the process tree - Active run IDs maintained via atomic file write (.trigger/active-runs.json) - Single-instance guarantee via PID file (.trigger/watchdog.pid) - Safety timeout: exits after 24 hours to prevent zombie processes - On clean shutdown, the watchdog is killed (no disconnect needed) Disconnect endpoint: - Rate-limited: 5 calls/min per environment - Capped at 500 runs per call - Small counts (<= 25): cancelled inline with pMap concurrency 10 - Large counts: delegated to the bulk action system - Uses finalizeRun: true to skip PENDING_CANCEL and go straight to FINISHED Run engine change: - cancelRun() now respects finalizeRun when the run is in EXECUTING status, skipping the PENDING_CANCEL waiting state and going directly to FINISHED
1 parent c013322 commit 562e88a

File tree

9 files changed

+512
-26
lines changed

9 files changed

+512
-26
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Auto-cancel in-flight dev runs when the CLI exits, using a detached watchdog process that survives pnpm SIGKILL
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Added `/engine/v1/dev/disconnect` endpoint to auto-cancel runs when the CLI disconnects. Maximum of 500 runs can be cancelled. Uses the bulk action system when there are more than 25 runs to cancel.
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { Ratelimit } from "@upstash/ratelimit";
3+
import { tryCatch } from "@trigger.dev/core";
4+
import { DevDisconnectRequestBody } from "@trigger.dev/core/v3";
5+
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
6+
import { BulkActionNotificationType, BulkActionType } from "@trigger.dev/database";
7+
import { prisma } from "~/db.server";
8+
import { logger } from "~/services/logger.server";
9+
import { RateLimiter } from "~/services/rateLimiter.server";
10+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
11+
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
12+
import { commonWorker } from "~/v3/commonWorker.server";
13+
import pMap from "p-map";
14+
15+
const CANCEL_REASON = "Dev session ended (CLI exited)";
16+
17+
// Below this threshold, cancel runs inline with pMap.
18+
// Above it, create a bulk action and process asynchronously.
19+
const BULK_ACTION_THRESHOLD = 25;
20+
21+
// Maximum number of runs that can be cancelled in a single disconnect call.
22+
const MAX_RUNS = 500;
23+
24+
// Rate limit: 5 calls per minute per environment
25+
const disconnectRateLimiter = new RateLimiter({
26+
keyPrefix: "dev-disconnect",
27+
limiter: Ratelimit.fixedWindow(5, "1 m"),
28+
logFailure: true,
29+
});
30+
31+
const { action } = createActionApiRoute(
32+
{
33+
body: DevDisconnectRequestBody,
34+
maxContentLength: 1024 * 256, // 256KB
35+
method: "POST",
36+
},
37+
async ({ authentication, body }) => {
38+
const environmentId = authentication.environment.id;
39+
40+
// Rate limit per environment
41+
const rateLimitResult = await disconnectRateLimiter.limit(environmentId);
42+
if (!rateLimitResult.success) {
43+
return json(
44+
{ error: "Rate limit exceeded", retryAfter: Math.ceil((rateLimitResult.reset - Date.now()) / 1000) },
45+
{ status: 429 }
46+
);
47+
}
48+
49+
// Cap the number of runs
50+
const runFriendlyIds = body.runFriendlyIds.slice(0, MAX_RUNS);
51+
52+
if (runFriendlyIds.length === 0) {
53+
return json({ cancelled: 0 }, { status: 200 });
54+
}
55+
56+
logger.info("Dev disconnect: cancelling runs", {
57+
environmentId,
58+
runCount: runFriendlyIds.length,
59+
capped: body.runFriendlyIds.length > MAX_RUNS,
60+
});
61+
62+
// For small numbers of runs, cancel inline
63+
if (runFriendlyIds.length <= BULK_ACTION_THRESHOLD) {
64+
const cancelled = await cancelRunsInline(runFriendlyIds, environmentId);
65+
return json({ cancelled }, { status: 200 });
66+
}
67+
68+
// For large numbers, create a bulk action to process asynchronously
69+
const bulkActionId = await createBulkCancelAction(
70+
runFriendlyIds,
71+
authentication.environment.project.id,
72+
environmentId
73+
);
74+
75+
logger.info("Dev disconnect: created bulk action for large run set", {
76+
environmentId,
77+
bulkActionId,
78+
runCount: runFriendlyIds.length,
79+
});
80+
81+
return json({ cancelled: 0, bulkActionId }, { status: 200 });
82+
}
83+
);
84+
85+
async function cancelRunsInline(
86+
runFriendlyIds: string[],
87+
environmentId: string
88+
): Promise<number> {
89+
const runIds = runFriendlyIds.map((fid) => RunId.toId(fid));
90+
91+
const runs = await prisma.taskRun.findMany({
92+
where: {
93+
id: { in: runIds },
94+
runtimeEnvironmentId: environmentId,
95+
},
96+
select: {
97+
id: true,
98+
engine: true,
99+
friendlyId: true,
100+
status: true,
101+
createdAt: true,
102+
completedAt: true,
103+
taskEventStore: true,
104+
},
105+
});
106+
107+
let cancelled = 0;
108+
const cancelService = new CancelTaskRunService(prisma);
109+
110+
await pMap(
111+
runs,
112+
async (run) => {
113+
const [error, result] = await tryCatch(
114+
cancelService.call(run, { reason: CANCEL_REASON, finalizeRun: true })
115+
);
116+
117+
if (error) {
118+
logger.error("Dev disconnect: failed to cancel run", {
119+
runId: run.id,
120+
error,
121+
});
122+
} else if (result && !result.alreadyFinished) {
123+
cancelled++;
124+
}
125+
},
126+
{ concurrency: 10 }
127+
);
128+
129+
logger.info("Dev disconnect: completed inline cancellation", {
130+
environmentId,
131+
cancelled,
132+
total: runFriendlyIds.length,
133+
});
134+
135+
return cancelled;
136+
}
137+
138+
async function createBulkCancelAction(
139+
runFriendlyIds: string[],
140+
projectId: string,
141+
environmentId: string
142+
): Promise<string> {
143+
const { id, friendlyId } = BulkActionId.generate();
144+
145+
await prisma.bulkActionGroup.create({
146+
data: {
147+
id,
148+
friendlyId,
149+
projectId,
150+
environmentId,
151+
name: "Dev session disconnect",
152+
type: BulkActionType.CANCEL,
153+
params: { runId: runFriendlyIds },
154+
queryName: "bulk_action_v1",
155+
totalCount: runFriendlyIds.length,
156+
completionNotification: BulkActionNotificationType.NONE,
157+
},
158+
});
159+
160+
await commonWorker.enqueue({
161+
id: `processBulkAction-${id}`,
162+
job: "processBulkAction",
163+
payload: { bulkActionId: id },
164+
});
165+
166+
return friendlyId;
167+
}
168+
169+
export { action };

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ export type CancelTaskRunServiceOptions = {
88
cancelAttempts?: boolean;
99
cancelledAt?: Date;
1010
bulkActionId?: string;
11+
/** Skip PENDING_CANCEL and finalize immediately (use when the worker is known to be dead). */
12+
finalizeRun?: boolean;
1113
};
1214

1315
type CancelTaskRunServiceResult = {
@@ -57,6 +59,7 @@ export class CancelTaskRunService extends BaseService {
5759
runId: taskRun.id,
5860
completedAt: options?.cancelledAt,
5961
reason: options?.reason,
62+
finalizeRun: options?.finalizeRun,
6063
bulkActionId: options?.bulkActionId,
6164
tx: this._prisma,
6265
});

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,35 +1436,39 @@ export class RunAttemptSystem {
14361436
});
14371437

14381438
//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
1439+
//unless finalizeRun is true (worker is known to be dead), in which case skip straight to FINISHED
14391440
if (
14401441
isExecuting(latestSnapshot.executionStatus) ||
14411442
isPendingExecuting(latestSnapshot.executionStatus)
14421443
) {
1443-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
1444-
run,
1445-
snapshot: {
1446-
executionStatus: "PENDING_CANCEL",
1447-
description: "Run was cancelled",
1448-
},
1449-
previousSnapshotId: latestSnapshot.id,
1450-
environmentId: latestSnapshot.environmentId,
1451-
environmentType: latestSnapshot.environmentType,
1452-
projectId: latestSnapshot.projectId,
1453-
organizationId: latestSnapshot.organizationId,
1454-
workerId,
1455-
runnerId,
1456-
});
1444+
if (!finalizeRun) {
1445+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
1446+
run,
1447+
snapshot: {
1448+
executionStatus: "PENDING_CANCEL",
1449+
description: "Run was cancelled",
1450+
},
1451+
previousSnapshotId: latestSnapshot.id,
1452+
environmentId: latestSnapshot.environmentId,
1453+
environmentType: latestSnapshot.environmentType,
1454+
projectId: latestSnapshot.projectId,
1455+
organizationId: latestSnapshot.organizationId,
1456+
workerId,
1457+
runnerId,
1458+
});
14571459

1458-
//the worker needs to be notified so it can kill the run and complete the attempt
1459-
await sendNotificationToWorker({
1460-
runId,
1461-
snapshot: newSnapshot,
1462-
eventBus: this.$.eventBus,
1463-
});
1464-
return {
1465-
alreadyFinished: false,
1466-
...executionResultFromSnapshot(newSnapshot),
1467-
};
1460+
//the worker needs to be notified so it can kill the run and complete the attempt
1461+
await sendNotificationToWorker({
1462+
runId,
1463+
snapshot: newSnapshot,
1464+
eventBus: this.$.eventBus,
1465+
});
1466+
return {
1467+
alreadyFinished: false,
1468+
...executionResultFromSnapshot(newSnapshot),
1469+
};
1470+
}
1471+
// finalizeRun is true — fall through to finish the run immediately
14681472
}
14691473

14701474
//not executing, so we will actually finish the run

packages/cli-v3/src/apiClient.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
DevConfigResponseBody,
88
DevDequeueRequestBody,
99
DevDequeueResponseBody,
10+
DevDisconnectRequestBody,
11+
DevDisconnectResponseBody,
1012
EnvironmentVariableResponseBody,
1113
FailDeploymentRequestBody,
1214
FailDeploymentResponseBody,
@@ -557,6 +559,7 @@ export class CliApiClient {
557559
heartbeatRun: this.devHeartbeatRun.bind(this),
558560
startRunAttempt: this.devStartRunAttempt.bind(this),
559561
completeRunAttempt: this.devCompleteRunAttempt.bind(this),
562+
disconnect: this.devDisconnect.bind(this),
560563
setEngineURL: this.setEngineURL.bind(this),
561564
} as const;
562565
}
@@ -681,6 +684,23 @@ export class CliApiClient {
681684
return eventSource;
682685
}
683686

687+
private async devDisconnect(
688+
body: DevDisconnectRequestBody
689+
): Promise<ApiResult<DevDisconnectResponseBody>> {
690+
if (!this.accessToken) {
691+
throw new Error("devDisconnect: No access token");
692+
}
693+
694+
return wrapZodFetch(DevDisconnectResponseBody, `${this.engineURL}/engine/v1/dev/disconnect`, {
695+
method: "POST",
696+
headers: {
697+
Authorization: `Bearer ${this.accessToken}`,
698+
Accept: "application/json",
699+
},
700+
body: JSON.stringify(body),
701+
});
702+
}
703+
684704
private async devDequeue(
685705
body: DevDequeueRequestBody
686706
): Promise<ApiResult<DevDequeueResponseBody>> {

0 commit comments

Comments
 (0)