Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/strange-moles-provide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Auto-cancel in-flight dev runs when the CLI exits, using a detached watchdog process that survives pnpm SIGKILL
6 changes: 6 additions & 0 deletions .server-changes/dev-cli-disconnect-md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Added `/engine/v1/dev/disconnect` endpoint to auto-cancel runs when the CLI disconnects. Maximum of 500 runs can be cancelled. Uses the bulk action system when there are more than 25 runs to cancel.
174 changes: 174 additions & 0 deletions apps/webapp/app/routes/engine.v1.dev.disconnect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { json } from "@remix-run/server-runtime";
import { Ratelimit } from "@upstash/ratelimit";
import { tryCatch } from "@trigger.dev/core";
import { DevDisconnectRequestBody } from "@trigger.dev/core/v3";
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { BulkActionNotificationType, BulkActionType } from "@trigger.dev/database";
import { prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { RateLimiter } from "~/services/rateLimiter.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
import { commonWorker } from "~/v3/commonWorker.server";
import pMap from "p-map";

const CANCEL_REASON = "Dev session ended (CLI exited)";

// Below this threshold, cancel runs inline with pMap.
// Above it, create a bulk action and process asynchronously.
const BULK_ACTION_THRESHOLD = 25;

// Maximum number of runs that can be cancelled in a single disconnect call.
const MAX_RUNS = 500;

// Rate limit: 5 calls per minute per environment
const disconnectRateLimiter = new RateLimiter({
keyPrefix: "dev-disconnect",
limiter: Ratelimit.fixedWindow(5, "1 m"),
logFailure: true,
});

const { action } = createActionApiRoute(
{
body: DevDisconnectRequestBody,
maxContentLength: 1024 * 256, // 256KB
method: "POST",
},
async ({ authentication, body }) => {
const environmentId = authentication.environment.id;

// Rate limit per environment
const rateLimitResult = await disconnectRateLimiter.limit(environmentId);
if (!rateLimitResult.success) {
return json(
{ error: "Rate limit exceeded", retryAfter: Math.ceil((rateLimitResult.reset - Date.now()) / 1000) },
{ status: 429 }
);
}

if (body.runFriendlyIds.length > MAX_RUNS) {
return json(
{ error: `A maximum of ${MAX_RUNS} runs can be cancelled per request` },
{ status: 400 }
);
}

const { runFriendlyIds } = body;

if (runFriendlyIds.length === 0) {
return json({ cancelled: 0 }, { status: 200 });
}

logger.info("Dev disconnect: cancelling runs", {
environmentId,
runCount: runFriendlyIds.length,
});

// For small numbers of runs, cancel inline
if (runFriendlyIds.length <= BULK_ACTION_THRESHOLD) {
const cancelled = await cancelRunsInline(runFriendlyIds, environmentId);
return json({ cancelled }, { status: 200 });
}

// For large numbers, create a bulk action to process asynchronously
const bulkActionId = await createBulkCancelAction(
runFriendlyIds,
authentication.environment.project.id,
environmentId
);

logger.info("Dev disconnect: created bulk action for large run set", {
environmentId,
bulkActionId,
runCount: runFriendlyIds.length,
});

return json({ cancelled: 0, bulkActionId }, { status: 200 });
}
);

async function cancelRunsInline(
runFriendlyIds: string[],
environmentId: string
): Promise<number> {
const runIds = runFriendlyIds.map((fid) => RunId.toId(fid));

const runs = await prisma.taskRun.findMany({
where: {
id: { in: runIds },
runtimeEnvironmentId: environmentId,
},
select: {
id: true,
engine: true,
friendlyId: true,
status: true,
createdAt: true,
completedAt: true,
taskEventStore: true,
},
});

let cancelled = 0;
const cancelService = new CancelTaskRunService(prisma);

await pMap(
runs,
async (run) => {
const [error, result] = await tryCatch(
cancelService.call(run, { reason: CANCEL_REASON, finalizeRun: true })
);

if (error) {
logger.error("Dev disconnect: failed to cancel run", {
runId: run.id,
error,
});
} else if (result && !result.alreadyFinished) {
cancelled++;
}
},
{ concurrency: 10 }
);

logger.info("Dev disconnect: completed inline cancellation", {
environmentId,
cancelled,
total: runFriendlyIds.length,
});

return cancelled;
}

async function createBulkCancelAction(
runFriendlyIds: string[],
projectId: string,
environmentId: string
): Promise<string> {
const { id, friendlyId } = BulkActionId.generate();

await prisma.bulkActionGroup.create({
data: {
id,
friendlyId,
projectId,
environmentId,
name: "Dev session disconnect",
type: BulkActionType.CANCEL,
params: { runId: runFriendlyIds, finalizeRun: true },
queryName: "bulk_action_v1",
totalCount: runFriendlyIds.length,
completionNotification: BulkActionNotificationType.NONE,
},
});

await commonWorker.enqueue({
id: `processBulkAction-${id}`,
job: "processBulkAction",
payload: { bulkActionId: id },
});

return friendlyId;
}

export { action };
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ export class BulkActionService extends BaseService {
}

// 2. Parse the params
const rawParams = group.params && typeof group.params === "object" ? group.params : {};
const finalizeRun = "finalizeRun" in rawParams && (rawParams as any).finalizeRun === true;
const filters = parseRunListInputOptions({
organizationId: group.project.organizationId,
projectId: group.projectId,
environmentId: group.environmentId,
...(group.params && typeof group.params === "object" ? group.params : {}),
...rawParams,
});

const runsRepository = new RunsRepository({
Expand Down Expand Up @@ -199,6 +201,7 @@ export class BulkActionService extends BaseService {
cancelService.call(run, {
reason: `Bulk action ${group.friendlyId} cancelled run`,
bulkActionId: bulkActionId,
finalizeRun,
})
);
if (error) {
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export type CancelTaskRunServiceOptions = {
cancelAttempts?: boolean;
cancelledAt?: Date;
bulkActionId?: string;
/** Skip PENDING_CANCEL and finalize immediately (use when the worker is known to be dead). */
finalizeRun?: boolean;
};

type CancelTaskRunServiceResult = {
Expand Down Expand Up @@ -57,6 +59,7 @@ export class CancelTaskRunService extends BaseService {
runId: taskRun.id,
completedAt: options?.cancelledAt,
reason: options?.reason,
finalizeRun: options?.finalizeRun,
bulkActionId: options?.bulkActionId,
tx: this._prisma,
});
Expand Down
52 changes: 28 additions & 24 deletions internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1436,35 +1436,39 @@ export class RunAttemptSystem {
});

//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
//unless finalizeRun is true (worker is known to be dead), in which case skip straight to FINISHED
if (
isExecuting(latestSnapshot.executionStatus) ||
isPendingExecuting(latestSnapshot.executionStatus)
) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
run,
snapshot: {
executionStatus: "PENDING_CANCEL",
description: "Run was cancelled",
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
});
if (!finalizeRun) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
run,
snapshot: {
executionStatus: "PENDING_CANCEL",
description: "Run was cancelled",
},
previousSnapshotId: latestSnapshot.id,
environmentId: latestSnapshot.environmentId,
environmentType: latestSnapshot.environmentType,
projectId: latestSnapshot.projectId,
organizationId: latestSnapshot.organizationId,
workerId,
runnerId,
});

//the worker needs to be notified so it can kill the run and complete the attempt
await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
return {
alreadyFinished: false,
...executionResultFromSnapshot(newSnapshot),
};
//the worker needs to be notified so it can kill the run and complete the attempt
await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
return {
alreadyFinished: false,
...executionResultFromSnapshot(newSnapshot),
};
}
// finalizeRun is true — fall through to finish the run immediately
}

//not executing, so we will actually finish the run
Expand Down
20 changes: 20 additions & 0 deletions packages/cli-v3/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
DevConfigResponseBody,
DevDequeueRequestBody,
DevDequeueResponseBody,
DevDisconnectRequestBody,
DevDisconnectResponseBody,
EnvironmentVariableResponseBody,
FailDeploymentRequestBody,
FailDeploymentResponseBody,
Expand Down Expand Up @@ -557,6 +559,7 @@ export class CliApiClient {
heartbeatRun: this.devHeartbeatRun.bind(this),
startRunAttempt: this.devStartRunAttempt.bind(this),
completeRunAttempt: this.devCompleteRunAttempt.bind(this),
disconnect: this.devDisconnect.bind(this),
setEngineURL: this.setEngineURL.bind(this),
} as const;
}
Expand Down Expand Up @@ -681,6 +684,23 @@ export class CliApiClient {
return eventSource;
}

private async devDisconnect(
body: DevDisconnectRequestBody
): Promise<ApiResult<DevDisconnectResponseBody>> {
if (!this.accessToken) {
throw new Error("devDisconnect: No access token");
}

return wrapZodFetch(DevDisconnectResponseBody, `${this.engineURL}/engine/v1/dev/disconnect`, {
method: "POST",
headers: {
Authorization: `Bearer ${this.accessToken}`,
Accept: "application/json",
},
body: JSON.stringify(body),
});
}

private async devDequeue(
body: DevDequeueRequestBody
): Promise<ApiResult<DevDequeueResponseBody>> {
Expand Down
Loading