Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 162 additions & 33 deletions src/orchestration/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ const ORCHESTRATOR_TODO_KEY = "orchestrator.todo_md";
const DEFAULT_TASK_FUNDING_CENTS = 25;
const DEFAULT_MAX_REPLANS = 3;

// Deadlock prevention constants
const ASSIGNED_TASK_TIMEOUT_MS = 10 * 60 * 1000; // 10 min — reset assigned tasks with no progress
const MAX_SELF_ASSIGNED_PER_GOAL = 3; // circuit breaker for self-assignment overflow
const EXECUTING_PHASE_TIMEOUT_MS = 30 * 60 * 1000; // 30 min — force-fail stuck goals

type ExecutionPhase =
| "idle"
| "classifying"
Expand Down Expand Up @@ -516,60 +521,154 @@ export class Orchestrator {
};
}

// Recover stale tasks: workers that died (process restart, sandbox crash)
// leave tasks stuck in 'assigned' forever. Detect and reset them.
if (this.params.isWorkerAlive) {
const assignedTasks = getTasksByGoal(this.params.db, goal.id)
.filter((t) => t.status === "assigned" && t.assignedTo);
for (const task of assignedTasks) {
// ── Deadlock Recovery ──────────────────────────────────────────
// Recover stale tasks stuck in 'assigned' status. Two mechanisms:
//
// 1. Worker liveness check: if isWorkerAlive is available and reports
// the worker dead, reset immediately.
// 2. Timeout fallback: if a task has been in 'assigned' longer than
// ASSIGNED_TASK_TIMEOUT_MS, reset unconditionally. This catches
// cases where isWorkerAlive is unavailable, returns stale data,
// or self-assigned tasks never complete.
//
// This is the core fix for #266, #259, #262.
const assignedTasks = getTasksByGoal(this.params.db, goal.id)
.filter((t) => t.status === "assigned" && t.assignedTo);
const now = Date.now();

for (const task of assignedTasks) {
let shouldRecover = false;
let reason = "";

// Check 1: worker liveness (if available)
if (this.params.isWorkerAlive) {
const alive = this.params.isWorkerAlive(task.assignedTo!);
if (!alive) {
logger.warn("Recovering stale task from dead worker", {
taskId: task.id,
worker: task.assignedTo,
});
this.params.db.prepare(
"UPDATE task_graph SET status = 'pending', assigned_to = NULL, started_at = NULL WHERE id = ?",
).run(task.id);
shouldRecover = true;
reason = "dead worker";
}
}

// Check 2: timeout — unconditional, catches all stuck cases
if (!shouldRecover && task.startedAt) {
const assignedAt = new Date(task.startedAt).getTime();
const taskTimeout = Math.max(task.timeoutMs, ASSIGNED_TASK_TIMEOUT_MS);
if (!Number.isNaN(assignedAt) && now - assignedAt > taskTimeout) {
shouldRecover = true;
reason = `timeout (${Math.round((now - assignedAt) / 60_000)}min)`;
}
}

if (shouldRecover) {
logger.warn("Recovering stale task", {
taskId: task.id,
worker: task.assignedTo,
reason,
});
this.params.db.prepare(
"UPDATE task_graph SET status = 'pending', assigned_to = NULL, started_at = NULL WHERE id = ?",
).run(task.id);
}
}

// ── Phase Escape Hatch ──────────────────────────────────────
// If the goal has been in 'executing' phase for too long with zero
// progress (all tasks stuck in assigned/pending), force-fail to
// unblock new goals.
const phaseEnteredAt = this.getPhaseEnteredAt(goal.id);
if (phaseEnteredAt) {
const phaseAge = now - phaseEnteredAt;
const progress = getGoalProgress(this.params.db, goal.id);
const stuck = progress.total > 0
&& progress.completed === 0
&& progress.failed === 0
&& phaseAge > EXECUTING_PHASE_TIMEOUT_MS;

if (stuck) {
logger.warn("Goal stuck in executing phase, force-failing", {
goalId: goal.id,
phaseAgeMin: Math.round(phaseAge / 60_000),
progress,
});
updateGoalStatus(this.params.db, goal.id, "failed");
return { ...state, phase: "failed", failedError: "Stuck: no progress in executing phase" };
}
}

const ready = getReadyTasks(this.params.db)
.filter((task) => task.goalId === goal.id);

// Count existing self-assigned tasks for circuit breaker
const selfAddress = this.params.identity?.address;
const selfAssignedCount = selfAddress
? assignedTasks.filter((t) => t.assignedTo === selfAddress).length
: 0;
let selfAssignedThisTick = 0;

for (const task of ready) {
try {
const assignment = await this.matchTaskToAgent(task);
assignTask(this.params.db, task.id, assignment.agentAddress);

const isLocalWorker = assignment.agentAddress.startsWith("local://");
const isSelfAssigned = assignment.agentAddress === this.params.identity?.address;
const isSelfAssigned = assignment.agentAddress === selfAddress;

// Self-assignment circuit breaker: prevent parent from being
// overwhelmed with too many self-assigned tasks, which starves
// the orchestrator tick and causes cascading deadlocks.
if (isSelfAssigned) {
if (selfAssignedCount + selfAssignedThisTick >= MAX_SELF_ASSIGNED_PER_GOAL) {
logger.warn("Self-assignment circuit breaker: skipping task", {
taskId: task.id,
selfAssigned: selfAssignedCount + selfAssignedThisTick,
limit: MAX_SELF_ASSIGNED_PER_GOAL,
});
continue; // task stays pending, retried next tick
}
selfAssignedThisTick += 1;
}

assignTask(this.params.db, task.id, assignment.agentAddress);

// Local workers receive their task directly at spawn time and run
// their own inference loop. Self-assigned tasks are handled by the
// parent agent via its normal turn. Neither needs funding or messaging.
if (!isLocalWorker && !isSelfAssigned) {
await this.fundAgentForTask(assignment.agentAddress, task);
try {
await this.fundAgentForTask(assignment.agentAddress, task);

const message = this.params.messaging.createMessage({
type: "task_assignment",
to: assignment.agentAddress,
goalId: task.goalId,
taskId: task.id,
priority: "high",
requiresResponse: true,
content: JSON.stringify({
const message = this.params.messaging.createMessage({
type: "task_assignment",
to: assignment.agentAddress,
goalId: task.goalId,
taskId: task.id,
title: task.title,
description: task.description,
agentRole: task.agentRole,
dependencies: task.dependencies,
timeoutMs: task.metadata.timeoutMs,
}),
});

await this.params.messaging.send(message);
priority: "high",
requiresResponse: true,
content: JSON.stringify({
taskId: task.id,
title: task.title,
description: task.description,
agentRole: task.agentRole,
dependencies: task.dependencies,
timeoutMs: task.metadata.timeoutMs,
}),
});

await this.params.messaging.send(message);
} catch (postAssignError) {
// Funding or messaging failed AFTER assignment was committed.
// Roll back the assignment so the task doesn't get stuck forever
// in 'assigned' with a worker that was never notified.
const postErr = normalizeError(postAssignError);
logger.warn("Rolling back assignment after post-assign failure", {
taskId: task.id,
worker: assignment.agentAddress,
error: postErr.message,
});
this.params.db.prepare(
"UPDATE task_graph SET status = 'pending', assigned_to = NULL, started_at = NULL WHERE id = ?",
).run(task.id);
continue;
}
}

this.params.agentTracker.updateStatus(assignment.agentAddress, "running");
Expand Down Expand Up @@ -760,6 +859,7 @@ export class Orchestrator {

private async handleCompletePhase(state: OrchestratorState): Promise<OrchestratorState> {
await this.recallAgentCredits();
this.clearExecutingSince(state.goalId);

return {
...DEFAULT_STATE,
Expand All @@ -773,6 +873,7 @@ export class Orchestrator {
error: state.failedError,
replanCount: state.replanCount,
});
this.clearExecutingSince(state.goalId);

if (!state.goalId) {
return { ...DEFAULT_STATE };
Expand Down Expand Up @@ -995,6 +1096,34 @@ export class Orchestrator {
return row?.count ?? 0;
}

private clearExecutingSince(goalId: string | null): void {
if (!goalId) return;
this.params.db.prepare("DELETE FROM kv WHERE key = ?")
.run(`orchestrator.executing_since.${goalId}`);
}

/**
* Track when the goal entered the 'executing' phase, used by the phase
* escape hatch to detect permanently-stuck goals.
*/
private getPhaseEnteredAt(goalId: string): number | null {
const key = `orchestrator.executing_since.${goalId}`;
const raw = this.params.db
.prepare("SELECT value FROM kv WHERE key = ?")
.get(key) as { value: string } | undefined;

if (!raw?.value) {
// First time seeing this goal in executing — record now
this.params.db.prepare(
"INSERT OR REPLACE INTO kv (key, value, updated_at) VALUES (?, ?, datetime('now'))",
).run(key, new Date().toISOString());
return null; // don't trigger on the first tick
}

const ts = Date.parse(raw.value);
return Number.isNaN(ts) ? null : ts;
}

private getMaxReplans(): number {
const configured = Number(this.params.config?.maxReplans ?? DEFAULT_MAX_REPLANS);
if (!Number.isFinite(configured)) {
Expand Down
4 changes: 3 additions & 1 deletion src/orchestration/task-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ export function assignTask(db: Database, taskId: string, agentAddress: string):
}

updateTaskStatus(db, taskId, "assigned");
db.prepare("UPDATE task_graph SET assigned_to = ? WHERE id = ?").run(normalizedAddress, taskId);
db.prepare(
"UPDATE task_graph SET assigned_to = ?, started_at = ? WHERE id = ?",
).run(normalizedAddress, new Date().toISOString(), taskId);
});
}

Expand Down
Loading