Skip to content

Commit f743ab3

Browse files
committed
Remove lease-based research task lifecycle
1 parent c0f7a75 commit f743ab3

9 files changed

Lines changed: 148 additions & 577 deletions

File tree

apps/orchestrator-worker/src/app.ts

Lines changed: 11 additions & 173 deletions
Original file line numberDiff line numberDiff line change
@@ -2495,9 +2495,6 @@ export type RunOrchestratorOptions = {
24952495

24962496
type AuditLogger = (event: string, payload: Record<string, unknown>) => void;
24972497

2498-
const RUN_LEASE_MS = 90_000;
2499-
const RUN_HEARTBEAT_INTERVAL_MS = 15_000;
2500-
25012498
function decorateWork(c: Context, work: WorkSummary): WorkSummary {
25022499
const metadata = "metadata" in work && work.metadata && typeof work.metadata === "object"
25032500
? (work.metadata as Record<string, unknown>)
@@ -4807,14 +4804,7 @@ export async function finalizeStaleRun(
48074804
}
48084805

48094806
const activeResearchTask = researchTasks.some((task) => {
4810-
if (task.status !== "queued" && task.status !== "starting" && task.status !== "running") {
4811-
return false;
4812-
}
4813-
const taskLeaseActive = Boolean(task.leaseExpiresAt && Date.parse(task.leaseExpiresAt) > Date.now());
4814-
const taskHeartbeatFresh = Boolean(
4815-
task.lastHeartbeatAt && (Date.now() - Date.parse(task.lastHeartbeatAt)) < RUN_LEASE_MS,
4816-
);
4817-
return taskLeaseActive || taskHeartbeatFresh;
4807+
return task.status === "queued" || task.status === "starting" || task.status === "running";
48184808
});
48194809
if (activeResearchTask) {
48204810
return run;
@@ -4931,50 +4921,6 @@ function terminalResearchTaskError(task: { checkpointJson?: unknown; errorJson?:
49314921
return null;
49324922
}
49334923

4934-
function semanticStepFailureLabel(step: string) {
4935-
switch (step) {
4936-
case "model_selected":
4937-
return "while selecting the semantic ranking model";
4938-
case "embed_query":
4939-
return "while embedding a semantic query";
4940-
case "vector_query":
4941-
return "while querying the vector index";
4942-
case "hydrate_chunks":
4943-
return "while loading matched passages";
4944-
case "alphaloop_stream_start":
4945-
return "while starting AlphaLoop";
4946-
case "write_answer":
4947-
return "while writing the final answer";
4948-
default:
4949-
return "during semantic retrieval";
4950-
}
4951-
}
4952-
4953-
function stalledResearchTaskError(task: {
4954-
kind?: string;
4955-
checkpointJson?: unknown;
4956-
}) {
4957-
const checkpoint = task.checkpointJson;
4958-
if (checkpoint && typeof checkpoint === "object" && !Array.isArray(checkpoint)) {
4959-
const record = checkpoint as Record<string, unknown>;
4960-
if (task.kind === "semantic_research") {
4961-
if (record.type === "semantic.step" && typeof record.step === "string") {
4962-
return `Semantic search stopped making progress ${semanticStepFailureLabel(record.step)}.`;
4963-
}
4964-
if (record.type === "semantic.alphaloop") {
4965-
return "Semantic search stopped making progress while AlphaLoop was reviewing the retrieved passages.";
4966-
}
4967-
if (record.type === "research.note" && typeof record.note === "string" && record.note.trim().length > 0) {
4968-
return `Semantic search stopped making progress after: ${record.note.trim()}`;
4969-
}
4970-
}
4971-
}
4972-
if (task.kind === "semantic_research") {
4973-
return "Semantic search stopped making progress and lost its worker lease.";
4974-
}
4975-
return "This long-running research task stopped making progress and lost its worker lease.";
4976-
}
4977-
49784924
function normalizedComparisonText(value: string) {
49794925
return value
49804926
.toLowerCase()
@@ -6337,11 +6283,8 @@ function terminalRunStateUpdate(status: "completed" | "failed" | "timed_out", co
63376283
return {
63386284
status,
63396285
completedAt,
6340-
ownerInstanceId: null,
6341-
heartbeatAt: completedAt,
6342-
leaseExpiresAt: null,
63436286
activeToolCallId: null,
6344-
} satisfies Partial<Pick<RunRecord, "status" | "completedAt" | "ownerInstanceId" | "heartbeatAt" | "leaseExpiresAt" | "activeToolCallId">>;
6287+
} satisfies Partial<Pick<RunRecord, "status" | "completedAt" | "activeToolCallId">>;
63456288
}
63466289

63476290
function backgroundJobIsTerminal(status: BackgroundJobRecord["status"]) {
@@ -6676,14 +6619,13 @@ export async function reapExpiredRuntimeInstances(
66766619

66776620
export async function reapStaleRuns(
66786621
deps: AppDeps,
6679-
context: { runId: string },
6622+
_context: { runId: string },
66806623
limit = 100,
66816624
) {
66826625
const allRuns = await deps.store.listAllRuns();
66836626
const staleRuns = allRuns
66846627
.filter((run) => run.status === "running" || run.status === "queued")
66856628
.slice(0, limit);
6686-
const janitorRequest = new Request(`${apiOrigin(deps)}/internal/janitor`);
66876629
for (const run of staleRuns) {
66886630
const runRecord = await deps.store.getRun(run.id);
66896631
if (!runRecord) {
@@ -6700,34 +6642,6 @@ export async function reapStaleRuns(
67006642
} catch {
67016643
// Best-effort sync for active delegated jobs.
67026644
}
6703-
continue;
6704-
}
6705-
const leaseExpired = !runRecord.leaseExpiresAt || Date.parse(runRecord.leaseExpiresAt) <= Date.now();
6706-
if (!leaseExpired) {
6707-
continue;
6708-
}
6709-
try {
6710-
const claimed = await deps.store.claimRunLease(run.id, {
6711-
ownerInstanceId: `janitor:${context.runId}:${run.id}`,
6712-
heartbeatAt: new Date().toISOString(),
6713-
leaseExpiresAt: new Date(Date.now() + RUN_LEASE_MS).toISOString(),
6714-
});
6715-
if (!claimed) {
6716-
continue;
6717-
}
6718-
const claimedRun = await deps.store.getRun(run.id);
6719-
if (!claimedRun) {
6720-
continue;
6721-
}
6722-
await appendRunLifecycleEvent(deps, claimedRun, "run.recovery.claimed", {
6723-
claimedBy: context.runId,
6724-
previousOwnerInstanceId: runRecord.ownerInstanceId,
6725-
previousHeartbeatAt: runRecord.heartbeatAt,
6726-
previousLeaseExpiresAt: runRecord.leaseExpiresAt,
6727-
});
6728-
await finalizeStaleRun(deps, janitorRequest, claimedRun);
6729-
} catch {
6730-
// Best-effort janitor pass; the next schedule can retry this run.
67316645
}
67326646
}
67336647
}
@@ -9470,9 +9384,6 @@ async function runHermesConversation(
94709384
...run,
94719385
status,
94729386
completedAt,
9473-
ownerInstanceId: null,
9474-
heartbeatAt: completedAt,
9475-
leaseExpiresAt: null,
94769387
activeToolCallId: null,
94779388
};
94789389
};
@@ -10232,9 +10143,6 @@ export async function runOrchestrator(
1023210143
} else if (event === "run.completed") {
1023310144
currentActiveToolCallId = null;
1023410145
}
10235-
if (persistableRunEventNames.has(event)) {
10236-
await renewRunLease();
10237-
}
1023810146
const nowMs = deps.now?.() ?? Date.now();
1023910147
if (event === "tool.started") {
1024010148
const toolName = typeof data.toolName === "string" ? data.toolName : "";
@@ -10805,14 +10713,7 @@ export async function runOrchestrator(
1080510713
}
1080610714
const sessionMessages = await deps.store.listMessages(activeSession.id);
1080710715
const conversationHistory = formatConversationHistory(sessionMessages);
10808-
const initialHeartbeatAt = new Date().toISOString();
10809-
const initialLeaseExpiresAt = new Date(Date.now() + RUN_LEASE_MS).toISOString();
10810-
const runOwnerInstanceId = `run:${activeSession.id}:${Date.now()}:${input.userId}`;
10811-
run = await deps.store.createRun(activeSession.id, {
10812-
ownerInstanceId: runOwnerInstanceId,
10813-
heartbeatAt: initialHeartbeatAt,
10814-
leaseExpiresAt: initialLeaseExpiresAt,
10815-
});
10716+
run = await deps.store.createRun(activeSession.id);
1081610717
runCreated = true;
1081710718
activeRuns.set(run.id, {
1081810719
sessionId: activeSession.id,
@@ -10823,27 +10724,7 @@ export async function runOrchestrator(
1082310724
subscribers: new Map(),
1082410725
});
1082510726
let currentActiveToolCallId: string | null = null;
10826-
let lastLeaseHeartbeatAtMs = Date.now();
10827-
const renewRunLease = async (force = false) => {
10828-
if (!run) {
10829-
return;
10830-
}
10831-
if (isTerminalRunStatus(run.status)) {
10832-
return;
10833-
}
10834-
const nowMs = Date.now();
10835-
if (!force && nowMs - lastLeaseHeartbeatAtMs < RUN_HEARTBEAT_INTERVAL_MS / 2) {
10836-
return;
10837-
}
10838-
lastLeaseHeartbeatAtMs = nowMs;
10839-
await deps.store.updateRun(run.id, {
10840-
ownerInstanceId: runOwnerInstanceId,
10841-
heartbeatAt: new Date(nowMs).toISOString(),
10842-
leaseExpiresAt: new Date(nowMs + RUN_LEASE_MS).toISOString(),
10843-
activeToolCallId: currentActiveToolCallId,
10844-
});
10845-
};
10846-
const clearRunLease = async (status: "completed" | "failed" | "timed_out") => {
10727+
const finalizeRunState = async (status: "completed" | "failed" | "timed_out") => {
1084710728
if (!run) {
1084810729
return;
1084910730
}
@@ -10853,15 +10734,9 @@ export async function runOrchestrator(
1085310734
...run,
1085410735
status,
1085510736
completedAt,
10856-
ownerInstanceId: null,
10857-
heartbeatAt: completedAt,
10858-
leaseExpiresAt: null,
1085910737
activeToolCallId: null,
1086010738
};
1086110739
};
10862-
const leaseHeartbeatTimer = setInterval(() => {
10863-
void renewRunLease(true).catch(() => {});
10864-
}, RUN_HEARTBEAT_INTERVAL_MS);
1086510740
scheduleRawLogPersist(true);
1086610741
await send("run.started", {
1086710742
runId: run.id,
@@ -10949,9 +10824,7 @@ export async function runOrchestrator(
1094910824
toolCallId: string,
1095010825
) => {
1095110826
let lastSequence = (await deps.store.listRunEvents(run.id)).at(-1)?.sequence ?? 0;
10952-
let lastRecoveryEnqueueAt = 0;
1095310827
while (true) {
10954-
await renewRunLease();
1095510828
lastSequence = await forwardPersistedToolEvents(lastSequence, toolCallId);
1095610829
const task = await deps.store.getResearchTask(taskId);
1095710830
if (!task) {
@@ -10965,51 +10838,19 @@ export async function runOrchestrator(
1096510838
status: toolCall.status === "completed" ? "succeeded" : "failed",
1096610839
errorJson: toolCall.status === "failed" ? toolCall.resultJson : null,
1096710840
completedAt,
10968-
lastHeartbeatAt: completedAt,
10969-
leaseExpiresAt: null,
1097010841
});
1097110842
}
1097210843
return {
1097310844
status: toolCall.status,
1097410845
result: toolCall.resultJson,
1097510846
};
1097610847
}
10977-
const leaseExpired = !task.leaseExpiresAt || Date.parse(task.leaseExpiresAt) <= Date.now();
10978-
if ((task.status === "starting" || task.status === "running") && leaseExpired) {
10979-
const staleError = stalledResearchTaskError(task);
10980-
await deps.store.updateResearchTask(task.id, {
10981-
status: "failed",
10982-
errorJson: { error: staleError },
10983-
completedAt: new Date().toISOString(),
10984-
lastHeartbeatAt: new Date().toISOString(),
10985-
leaseExpiresAt: null,
10986-
});
10987-
return {
10988-
status: "failed",
10989-
result: { ok: false, error: staleError },
10990-
} as const;
10991-
}
10992-
if (
10993-
deps.enqueueJob
10994-
&& task.status === "queued"
10995-
&& leaseExpired
10996-
&& Date.now() - lastRecoveryEnqueueAt >= 30_000
10997-
) {
10998-
lastRecoveryEnqueueAt = Date.now();
10999-
await deps.enqueueJob({
11000-
type: "research_task_requested",
11001-
taskId: task.id,
11002-
queuedAt: new Date().toISOString(),
11003-
});
11004-
}
1100510848
const terminalError = terminalResearchTaskError(task);
1100610849
if (terminalError && (task.status === "queued" || task.status === "starting" || task.status === "running")) {
1100710850
await deps.store.updateResearchTask(task.id, {
1100810851
status: "failed",
1100910852
errorJson: { error: terminalError },
1101010853
completedAt: new Date().toISOString(),
11011-
lastHeartbeatAt: new Date().toISOString(),
11012-
leaseExpiresAt: null,
1101310854
});
1101410855
return {
1101510856
status: "failed",
@@ -11217,7 +11058,7 @@ export async function runOrchestrator(
1121711058
send,
1121811059
);
1121911060
runFinalized = true;
11220-
await clearRunLease("completed");
11061+
await finalizeRunState("completed");
1122111062
await send("run.completed", {
1122211063
runId: run.id,
1122311064
sessionId: activeSession.id,
@@ -12105,7 +11946,7 @@ export async function runOrchestrator(
1210511946
...(routeDecision.workflowHint ? { workflowHint: routeDecision.workflowHint } : {}),
1210611947
...(experimentProposal ? { experimentProposal } : {}),
1210711948
});
12108-
await clearRunLease("completed");
11949+
await finalizeRunState("completed");
1210911950
await streamAssistantText(routeDecision.answer, send);
1211011951
await send("assistant.completed", {
1211111952
answer: routeDecision.answer,
@@ -12278,7 +12119,7 @@ export async function runOrchestrator(
1227812119
await deps.store.updateRun(run.id, {
1227912120
plannerTurns: turn,
1228012121
});
12281-
await clearRunLease("completed");
12122+
await finalizeRunState("completed");
1228212123
await synthesizeAnswer(
1228312124
deps,
1228412125
{
@@ -12498,7 +12339,6 @@ export async function runOrchestrator(
1249812339
status: existing?.status === "queued" ? "running" : existing?.status ?? "running",
1249912340
runtimeId: typeof (options.runtimeId ?? startedRuntimeId) === "string" ? options.runtimeId ?? startedRuntimeId : existing?.runtimeId ?? null,
1250012341
progressSeq: nextSeq,
12501-
lastHeartbeatAt: new Date().toISOString(),
1250212342
checkpointJson: checkpointFromToolProgressDetail(detail),
1250312343
...(existing?.startedAt ? {} : { startedAt: new Date().toISOString() }),
1250412344
});
@@ -12704,12 +12544,11 @@ export async function runOrchestrator(
1270412544
error: typeof result.error === "string" ? result.error : "Long-running research failed.",
1270512545
},
1270612546
completedAt: new Date().toISOString(),
12707-
lastHeartbeatAt: new Date().toISOString(),
1270812547
});
1270912548
}
1271012549

1271112550
if (activeRuns.get(run.id)?.cancelRequested) {
12712-
await clearRunLease("failed");
12551+
await finalizeRunState("failed");
1271312552
await send("run.completed", {
1271412553
runId: run.id,
1271512554
sessionId: session.id,
@@ -12751,7 +12590,7 @@ export async function runOrchestrator(
1275112590
if (completedBriefing) {
1275212591
await completeRunFromBriefing(completedBriefing, "standard");
1275312592
} else {
12754-
await clearRunLease("timed_out");
12593+
await finalizeRunState("timed_out");
1275512594
const timeoutMessage = "The run hit its hard limits before it produced a valid answer.";
1275612595
const timeoutResearchDocumentHtml = await appendFinalAnswerResearchDocumentHtml(
1275712596
deps,
@@ -12817,7 +12656,7 @@ export async function runOrchestrator(
1281712656
error: error instanceof Error ? error.message : "Unknown orchestrator error",
1281812657
})),
1281912658
);
12820-
await clearRunLease("failed");
12659+
await finalizeRunState("failed");
1282112660

1282212661
const failureMessage = userFacingRunFailureMessage(error);
1282312662
const failureResearchDocumentHtml = await appendFinalAnswerResearchDocumentHtml(
@@ -12858,7 +12697,6 @@ export async function runOrchestrator(
1285812697
});
1285912698
return;
1286012699
} finally {
12861-
clearInterval(leaseHeartbeatTimer);
1286212700
await harvestPendingWorkspace(true);
1286312701
if (runFinalized) {
1286412702
return;

0 commit comments

Comments
 (0)