Skip to content

Commit f3d2921

Browse files
committed
Improve job cleanup and add force-cleanup API
1 parent d58e9a8 commit f3d2921

File tree

4 files changed

+94
-11
lines changed

4 files changed

+94
-11
lines changed

apps/media-server/src/app.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ app.get("/", (c) => {
2828
"/video/process",
2929
"/video/process/:jobId/status",
3030
"/video/process/:jobId/cancel",
31+
"/video/cleanup",
32+
"/video/force-cleanup",
3133
],
3234
});
3335
});

apps/media-server/src/lib/ffmpeg-video.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,13 @@ export async function downloadVideoToTemp(
189189
);
190190

191191
try {
192+
const timeoutSignal = AbortSignal.timeout(DOWNLOAD_TIMEOUT_MS);
193+
const combinedSignal = abortSignal
194+
? AbortSignal.any([abortSignal, timeoutSignal])
195+
: timeoutSignal;
196+
192197
const response = await fetch(videoUrl, {
193-
signal: abortSignal ?? AbortSignal.timeout(DOWNLOAD_TIMEOUT_MS),
198+
signal: combinedSignal,
194199
});
195200

196201
console.log(

apps/media-server/src/lib/job-manager.ts

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ export interface Job {
5858

5959
const jobs = new Map<string, Job>();
6060
const JOB_TTL_MS = 60 * 60 * 1000;
61+
const STALE_JOB_MS = 15 * 60 * 1000;
62+
const MAX_JOB_LIFETIME_MS = 45 * 60 * 1000;
6163

6264
// Dynamic concurrency control for video processing.
6365
//
@@ -268,15 +270,44 @@ export function cleanupExpiredJobs(): number {
268270
let cleaned = 0;
269271

270272
for (const [jobId, job] of jobs) {
271-
if (now - job.updatedAt > JOB_TTL_MS) {
273+
const age = now - job.createdAt;
274+
const staleness = now - job.updatedAt;
275+
276+
if (staleness > JOB_TTL_MS) {
272277
if (isActivePhase(job.phase)) {
273278
console.warn(
274-
`[job-manager] Cleaning up stuck job ${jobId} (phase=${job.phase}, age=${Math.round((now - job.createdAt) / 60000)}m)`,
279+
`[job-manager] Cleaning up expired job ${jobId} (phase=${job.phase}, age=${Math.round(age / 60000)}m)`,
275280
);
276281
job.abortController?.abort();
277282
}
278283
deleteJob(jobId);
279284
cleaned++;
285+
continue;
286+
}
287+
288+
if (isActivePhase(job.phase) && staleness > STALE_JOB_MS) {
289+
console.warn(
290+
`[job-manager] Marking stale job ${jobId} as error (phase=${job.phase}, no update for ${Math.round(staleness / 60000)}m)`,
291+
);
292+
job.abortController?.abort();
293+
job.phase = "error";
294+
job.error = `Job stale: no progress update for ${Math.round(staleness / 60000)} minutes`;
295+
job.message = "Processing failed (stale)";
296+
job.updatedAt = now;
297+
cleaned++;
298+
continue;
299+
}
300+
301+
if (isActivePhase(job.phase) && age > MAX_JOB_LIFETIME_MS) {
302+
console.warn(
303+
`[job-manager] Marking long-running job ${jobId} as error (phase=${job.phase}, age=${Math.round(age / 60000)}m)`,
304+
);
305+
job.abortController?.abort();
306+
job.phase = "error";
307+
job.error = `Job exceeded maximum lifetime of ${Math.round(MAX_JOB_LIFETIME_MS / 60000)} minutes`;
308+
job.message = "Processing failed (timeout)";
309+
job.updatedAt = now;
310+
cleaned++;
280311
}
281312
}
282313

@@ -315,14 +346,32 @@ export async function sendWebhook(job: Job): Promise<void> {
315346
}
316347
}
317348

318-
const cleanupInterval = setInterval(
319-
() => {
320-
const cleaned = cleanupExpiredJobs();
321-
if (cleaned > 0) {
322-
console.log(`[job-manager] Cleaned up ${cleaned} expired jobs`);
349+
export function forceCleanupActiveJobs(): number {
350+
let cleaned = 0;
351+
const now = Date.now();
352+
353+
for (const [jobId, job] of jobs) {
354+
if (isActivePhase(job.phase)) {
355+
console.warn(
356+
`[job-manager] Force-cleaning job ${jobId} (phase=${job.phase}, age=${Math.round((now - job.createdAt) / 60000)}m)`,
357+
);
358+
job.abortController?.abort();
359+
job.phase = "error";
360+
job.error = "Force-cleaned by admin";
361+
job.message = "Processing failed (force-cleaned)";
362+
job.updatedAt = now;
363+
cleaned++;
323364
}
324-
},
325-
5 * 60 * 1000,
326-
);
365+
}
366+
367+
return cleaned;
368+
}
369+
370+
const cleanupInterval = setInterval(() => {
371+
const cleaned = cleanupExpiredJobs();
372+
if (cleaned > 0) {
373+
console.log(`[job-manager] Cleaned up ${cleaned} expired/stale jobs`);
374+
}
375+
}, 60 * 1000);
327376

328377
cleanupInterval.unref?.();

apps/media-server/src/routes/video.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
canAcceptNewVideoProcess,
2121
createJob,
2222
deleteJob,
23+
forceCleanupActiveJobs,
2324
generateJobId,
2425
getActiveVideoProcessCount,
2526
getAllJobs,
@@ -77,6 +78,7 @@ function isTimeoutError(err: unknown): boolean {
7778
video.get("/status", (c) => {
7879
const jobs = getAllJobs();
7980
const resources = getSystemResources();
81+
const now = Date.now();
8082
return c.json({
8183
instanceId: getInstanceId(),
8284
pid: process.pid,
@@ -95,6 +97,9 @@ video.get("/status", (c) => {
9597
progress: j.progress,
9698
createdAt: j.createdAt,
9799
updatedAt: j.updatedAt,
100+
ageMinutes: Math.round((now - j.createdAt) / 60000),
101+
stalenessMinutes: Math.round((now - j.updatedAt) / 60000),
102+
error: j.error,
98103
})),
99104
});
100105
});
@@ -292,6 +297,19 @@ video.post("/process", async (c) => {
292297
`[video/process] Async processing error for job ${jobId}:`,
293298
err,
294299
);
300+
const currentJob = getJob(jobId);
301+
if (
302+
currentJob &&
303+
currentJob.phase !== "error" &&
304+
currentJob.phase !== "complete" &&
305+
currentJob.phase !== "cancelled"
306+
) {
307+
updateJob(jobId, {
308+
phase: "error",
309+
error: err instanceof Error ? err.message : String(err),
310+
message: "Processing failed (unhandled)",
311+
});
312+
}
295313
});
296314

297315
return c.json({
@@ -754,4 +772,13 @@ video.post("/cleanup", async (c) => {
754772
});
755773
});
756774

775+
video.post("/force-cleanup", (c) => {
776+
const cleaned = forceCleanupActiveJobs();
777+
return c.json({
778+
success: true,
779+
cleanedJobs: cleaned,
780+
message: `Force-cleaned ${cleaned} active jobs`,
781+
});
782+
});
783+
757784
export default video;

0 commit comments

Comments
 (0)