diff --git a/README.md b/README.md index 6a68686..ea0e9c6 100644 --- a/README.md +++ b/README.md @@ -113,8 +113,13 @@ Or drive it manually: /team shutdown alice # graceful shutdown (handshake) /team shutdown # stop all teammates (leader session remains active) /team cleanup # remove team artifacts when done +/team gc # remove stale teams from past sessions +/team gc --dry-run # preview what would be removed ``` +> **Automatic cleanup:** Stale team directories from previous sessions are automatically +> garbage-collected on startup. Empty team directories are also cleaned up on session exit. + Or let the model drive it with the delegate tool: ```json @@ -207,6 +212,7 @@ All management commands live under `/team`. | `/team prune [--all]` | Mark stale manual teammates offline (hides them in widget) | | `/team kill ` | Force-terminate | | `/team cleanup [--force]` | Delete team artifacts | +| `/team gc [--dry-run] [--force]` | Remove stale team directories from all past sessions | | `/team id` | Print team/task-list IDs and paths | | `/team env ` | Print env vars to start a manual teammate | diff --git a/extensions/teams/leader-inbox.ts b/extensions/teams/leader-inbox.ts index 2b271a1..20fa41b 100644 --- a/extensions/teams/leader-inbox.ts +++ b/extensions/teams/leader-inbox.ts @@ -1,4 +1,4 @@ -import type { ExtensionContext } from "@mariozechner/pi-coding-agent"; +import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent"; import { popUnreadMessages, writeToMailbox } from "./mailbox.js"; import { sanitizeName } from "./names.js"; import { @@ -16,8 +16,67 @@ import type { TeamsHookInvocation } from "./hooks.js"; import type { TeamsStyle } from "./teams-style.js"; import { formatMemberDisplayName, getTeamsStrings } from "./teams-style.js"; +/** + * Event-driven tracker for delegation batches. + * + * Tracks task IDs from delegate() calls. Tasks are only marked done + * when an idle_notification with completedTaskId is received — NOT + * by polling task file status. This avoids race conditions where + * listTasks() returns stale or premature data. + */ +export class DelegationTracker { + private batches: Array<{ + taskIds: Set; + completedIds: Set; + notified: boolean; + }> = []; + + /** Register a new batch of delegated task IDs. */ + addBatch(taskIds: string[]): void { + if (taskIds.length === 0) return; + this.batches.push({ + taskIds: new Set(taskIds), + completedIds: new Set(), + notified: false, + }); + } + + /** + * Mark a task as completed (called when idle_notification with + * completedTaskId is received). Returns any batches that became + * fully complete as a result. + */ + markCompleted(taskId: string): Array<{ taskIds: string[] }> { + const newlyComplete: Array<{ taskIds: string[] }> = []; + + for (const batch of this.batches) { + if (batch.notified) continue; + if (!batch.taskIds.has(taskId)) continue; + + batch.completedIds.add(taskId); + + // Check if ALL tasks in this batch are now done + const allDone = [...batch.taskIds].every((id) => batch.completedIds.has(id)); + if (allDone) { + batch.notified = true; + newlyComplete.push({ taskIds: [...batch.taskIds] }); + } + } + + // Prune notified batches + this.batches = this.batches.filter((b) => !b.notified); + return newlyComplete; + } + + /** Clear all tracked batches (e.g. on session switch). */ + clear(): void { + this.batches = []; + } +} + export async function pollLeaderInbox(opts: { ctx: ExtensionContext; + pi: ExtensionAPI; teamId: string; teamDir: string; taskListId: string; @@ -25,8 +84,14 @@ export async function pollLeaderInbox(opts: { style: TeamsStyle; pendingPlanApprovals: Map; enqueueHook?: (invocation: TeamsHookInvocation) => void; + /** PR #6 compat: callback for teammate DMs routed to leader LLM context. */ + onDm?: (from: string, text: string) => void; + /** Callback for per-task completion notifications routed to leader LLM context. */ + onTaskCompleted?: (memberName: string, taskId: string, taskSubject: string) => void; + /** Batch delegation tracker for all-tasks-complete auto-notify. */ + delegationTracker?: DelegationTracker; }): Promise { - const { ctx, teamId, teamDir, taskListId, leadName, style, pendingPlanApprovals, enqueueHook } = opts; + const { ctx, pi, teamId, teamDir, taskListId, leadName, style, pendingPlanApprovals, enqueueHook, onDm, onTaskCompleted, delegationTracker } = opts; const strings = getTeamsStrings(style); let msgs: Awaited>; @@ -38,6 +103,10 @@ export async function pollLeaderInbox(opts: { } if (!msgs.length) return; + // Collect batch completions across all messages in this poll cycle, + // then fire notifications once at the end (avoids duplicate triggers). + const batchCompletions: Array<{ taskIds: string[] }> = []; + for (const m of msgs) { const approved = isShutdownApproved(m.text); if (approved) { @@ -115,7 +184,7 @@ export async function pollLeaderInbox(opts: { // ignore hook enqueue errors } - // Hook: task completion / failure + // Hook + notifications for task completion / failure if (idle.completedTaskId) { const completedTask = await getTask(teamDir, taskListId, idle.completedTaskId); try { @@ -132,6 +201,24 @@ export async function pollLeaderInbox(opts: { } catch { // ignore hook enqueue errors } + + if (idle.completedStatus !== "failed") { + // Per-task completion notification → inject into leader LLM context + if (onTaskCompleted && completedTask) { + try { + onTaskCompleted(name, idle.completedTaskId, completedTask.subject ?? `task #${idle.completedTaskId}`); + } catch { + // ignore notification errors + } + } + + // Event-driven batch tracking: mark this task done and + // collect any batches that became fully complete. + if (delegationTracker) { + const completed = delegationTracker.markCompleted(idle.completedTaskId); + batchCompletions.push(...completed); + } + } } if (idle.failureReason) { @@ -209,6 +296,28 @@ export async function pollLeaderInbox(opts: { continue; } - ctx.ui.notify(`Message from ${m.from}: ${m.text}`, "info"); + // Unrecognized message = teammate DM to leader + if (onDm) { + onDm(m.from, m.text); + } else { + ctx.ui.notify(`Message from ${m.from}: ${m.text}`, "info"); + } + } + + // Fire batch-complete notifications (deduplicated across this poll cycle). + for (const batch of batchCompletions) { + const taskRefs = batch.taskIds.map((id) => `#${id}`).join(", "); + const summary = `All delegated tasks completed (${taskRefs}). Review the results and continue.`; + + try { + if (ctx.isIdle()) { + pi.sendUserMessage(`[team] ${summary}`); + } else { + pi.sendUserMessage(`[team] ${summary}`, { deliverAs: "followUp" }); + } + } catch { + // Fallback: at minimum show a notification so user knows to check. + ctx.ui.notify(`✅ ${summary}`, "info"); + } } } diff --git a/extensions/teams/leader-lifecycle-commands.ts b/extensions/teams/leader-lifecycle-commands.ts index 0c08db6..ee17d81 100644 --- a/extensions/teams/leader-lifecycle-commands.ts +++ b/extensions/teams/leader-lifecycle-commands.ts @@ -9,6 +9,7 @@ import { getTeamDir, getTeamsRootDir, getTeamsStylesDir } from "./paths.js"; import { TEAM_MAILBOX_NS } from "./protocol.js"; import { unassignTasksForAgent, type TeamTask } from "./task-store.js"; import { setMemberStatus, setTeamStyle, type TeamConfig } from "./team-config.js"; +import { findGcCandidates, gcTeamDirs } from "./team-gc.js"; import { type TeamsStyle, formatMemberDisplayName, @@ -231,6 +232,88 @@ export async function handleTeamCleanupCommand(opts: { renderWidget(); } +export async function handleTeamGcCommand(opts: { + ctx: ExtensionCommandContext; + rest: string[]; + teamId: string; +}): Promise { + const { ctx, rest, teamId } = opts; + + const flags = rest.filter((a) => a.startsWith("--")); + const argsOnly = rest.filter((a) => !a.startsWith("--")); + const force = flags.includes("--force"); + const dryRun = flags.includes("--dry-run"); + + const unknownFlags = flags.filter((f) => f !== "--force" && f !== "--dry-run"); + if (unknownFlags.length) { + ctx.ui.notify(`Unknown flag(s): ${unknownFlags.join(", ")}`, "error"); + return; + } + if (argsOnly.length) { + ctx.ui.notify("Usage: /team gc [--dry-run] [--force]", "error"); + return; + } + + const excludeTeamIds = new Set([teamId]); + const { candidates, skipped } = await findGcCandidates({ excludeTeamIds }); + + if (candidates.length === 0) { + const msg = skipped.length > 0 + ? `No stale teams to clean up (${skipped.length} active team(s) skipped)` + : "No stale teams to clean up"; + ctx.ui.notify(msg, "info"); + return; + } + + if (dryRun) { + const lines = [ + `Would remove ${candidates.length} stale team(s):`, + "", + ...candidates.map((c) => ` ${c.teamId} — ${c.reason}`), + ]; + if (skipped.length > 0) { + lines.push("", `Skipped ${skipped.length} active team(s):`); + for (const s of skipped) { + lines.push(` ${s.teamId} — ${s.reason}`); + } + } + ctx.ui.notify(lines.join("\n"), "info"); + return; + } + + if (!force) { + if (process.stdout.isTTY && process.stdin.isTTY) { + const ok = await ctx.ui.confirm( + "Garbage collect stale teams", + [ + `Remove ${candidates.length} stale team director${candidates.length === 1 ? "y" : "ies"}?`, + "", + ...candidates.map((c) => ` ${c.teamId} — ${c.reason}`), + ].join("\n"), + ); + if (!ok) return; + } else { + ctx.ui.notify("Refusing to GC in non-interactive mode without --force", "error"); + return; + } + } + + const result = await gcTeamDirs(candidates); + + const lines: string[] = []; + if (result.removed.length > 0) { + lines.push(`Removed ${result.removed.length} stale team(s)`); + } + if (result.errors.length > 0) { + lines.push(`Failed to remove ${result.errors.length} team(s):`); + for (const e of result.errors) { + lines.push(` ${e.teamId}: ${e.error}`); + } + } + + ctx.ui.notify(lines.join("\n"), result.errors.length > 0 ? "warning" : "info"); +} + export async function handleTeamShutdownCommand(opts: { ctx: ExtensionCommandContext; rest: string[]; diff --git a/extensions/teams/leader-team-command.ts b/extensions/teams/leader-team-command.ts index 267ac3e..217cbe1 100644 --- a/extensions/teams/leader-team-command.ts +++ b/extensions/teams/leader-team-command.ts @@ -9,6 +9,7 @@ import { handleTeamAttachCommand, handleTeamDetachCommand } from "./leader-attac import { handleTeamCleanupCommand, handleTeamDelegateCommand, + handleTeamGcCommand, handleTeamKillCommand, handleTeamPruneCommand, handleTeamShutdownCommand, @@ -55,6 +56,7 @@ const TEAM_HELP_TEXT = [ " /team plan approve ", " /team plan reject [feedback...]", " /team cleanup [--force]", + " /team gc [--dry-run] [--force] # remove stale team directories", " /team prune [--all] # hide stale manual teammates (mark offline)", " /team task add ", " /team task assign ", @@ -210,6 +212,14 @@ export async function handleTeamCommand(opts: { }); }, + gc: async () => { + await handleTeamGcCommand({ + ctx, + rest, + teamId: activeTeamId, + }); + }, + prune: async () => { await handleTeamPruneCommand({ ctx, diff --git a/extensions/teams/leader-teams-tool.ts b/extensions/teams/leader-teams-tool.ts index 1199864..099cffe 100644 --- a/extensions/teams/leader-teams-tool.ts +++ b/extensions/teams/leader-teams-tool.ts @@ -34,6 +34,7 @@ import { } from "./task-store.js"; import type { TeammateRpc } from "./teammate-rpc.js"; import type { ContextMode, WorkspaceMode, SpawnTeammateFn } from "./spawn-types.js"; +import type { DelegationTracker } from "./leader-inbox.js"; type TeamsToolDelegateTask = { text: string; assignee?: string }; @@ -158,8 +159,9 @@ export function registerTeamsTool(opts: { refreshTasks: () => Promise; renderWidget: () => void; pendingPlanApprovals: Map; + delegationTracker?: DelegationTracker; }): void { - const { pi, teammates, spawnTeammate, getTeamId, getTaskListId, refreshTasks, renderWidget, pendingPlanApprovals } = opts; + const { pi, teammates, spawnTeammate, getTeamId, getTaskListId, refreshTasks, renderWidget, pendingPlanApprovals, delegationTracker } = opts; pi.registerTool({ name: "teams", @@ -1040,6 +1042,11 @@ export function registerTeamsTool(opts: { assignments.push({ taskId: task.id, assignee, subject }); } + // Track delegated task IDs for auto-notification when all complete + if (delegationTracker && assignments.length > 0) { + delegationTracker.addBatch(assignments.map((a) => a.taskId)); + } + void refreshTasks().finally(renderWidget); const lines: string[] = []; diff --git a/extensions/teams/leader.ts b/extensions/teams/leader.ts index b810743..92640a2 100644 --- a/extensions/teams/leader.ts +++ b/extensions/teams/leader.ts @@ -17,7 +17,7 @@ import { openInteractiveWidget } from "./teams-panel.js"; import { createTeamsWidget } from "./teams-widget.js"; import { resolveTeammateModelSelection, formatProviderModel } from "./model-policy.js"; import { getTeamsStyleFromEnv, type TeamsStyle, formatMemberDisplayName, getTeamsStrings } from "./teams-style.js"; -import { pollLeaderInbox as pollLeaderInboxImpl } from "./leader-inbox.js"; +import { DelegationTracker, pollLeaderInbox as pollLeaderInboxImpl } from "./leader-inbox.js"; import { getHookBaseName, getTeamsHookFailureAction, @@ -31,6 +31,9 @@ import { } from "./hooks.js"; import { handleTeamCommand } from "./leader-team-command.js"; import { registerTeamsTool } from "./leader-teams-tool.js"; +import { findGcCandidates, gcTeamDirs } from "./team-gc.js"; +import { cleanupTeamDir } from "./cleanup.js"; +import { getTeamsRootDir } from "./paths.js"; import type { ContextMode, SpawnTeammateFn, SpawnTeammateResult, WorkspaceMode } from "./spawn-types.js"; function getTeamsExtensionEntryPath(): string | null { @@ -116,6 +119,7 @@ export function runLeader(pi: ExtensionAPI): void { let tasks: TeamTask[] = []; let teamConfig: TeamConfig | null = null; const pendingPlanApprovals = new Map(); + const delegationTracker = new DelegationTracker(); // Task list namespace. By default we keep it aligned with the current session id. // (Do NOT read PI_TEAMS_TASK_LIST_ID for the leader; that env var is intended for workers // and can easily be set globally, which makes the leader "lose" its tasks.) @@ -601,12 +605,65 @@ export function runLeader(pi: ExtensionAPI): void { return { ok: true, name, mode, workspaceMode, childCwd, note, warnings }; }; + // ---- DM batching (PR #6 compat) ---- + // Accumulate DMs received between LLM turns so they can be delivered in a single message. + const pendingLeaderDms: Array<{ from: string; text: string }> = []; + let leaderDmFlushScheduled = false; + + const flushLeaderDms = () => { + if (!pendingLeaderDms.length) return; + const batch = pendingLeaderDms.splice(0); + leaderDmFlushScheduled = false; + + const formatted = batch + .map((dm) => `**${dm.from}:**\n${dm.text}`) + .join("\n\n---\n\n"); + + pi.sendMessage( + { + customType: "teams-teammate-dm", + content: `You received message(s) from teammate(s):\n\n${formatted}`, + display: true, + }, + { triggerTurn: true, deliverAs: "followUp" }, + ); + }; + + // ---- Per-task completion batching ---- + // Accumulate task completion notifications and flush them together. + const pendingTaskCompletions: Array<{ memberName: string; taskId: string; taskSubject: string }> = []; + let taskCompletionFlushScheduled = false; + + const flushTaskCompletions = () => { + if (!pendingTaskCompletions.length) return; + const batch = pendingTaskCompletions.splice(0); + taskCompletionFlushScheduled = false; + + const formatted = batch + .map((tc) => `- **${tc.memberName}** completed task #${tc.taskId}: ${tc.taskSubject}`) + .join("\n"); + + try { + pi.sendMessage( + { + customType: "teams-task-completed", + content: `Teammate task completion(s):\n${formatted}`, + display: true, + }, + { triggerTurn: false, deliverAs: "followUp" }, + ); + } catch { + // ignore — batch-complete sendUserMessage will wake the leader anyway + } + }; + const pollLeaderInbox = async () => { if (!currentCtx || !currentTeamId) return; const teamDir = getTeamDir(currentTeamId); const effectiveTaskListId = taskListId ?? currentTeamId; await pollLeaderInboxImpl({ ctx: currentCtx, + pi, teamId: currentTeamId, teamDir, taskListId: effectiveTaskListId, @@ -614,6 +671,23 @@ export function runLeader(pi: ExtensionAPI): void { style, pendingPlanApprovals, enqueueHook, + delegationTracker, + onDm(from, text) { + pendingLeaderDms.push({ from, text }); + if (!leaderDmFlushScheduled) { + leaderDmFlushScheduled = true; + // Batch DMs that arrive in the same poll cycle before flushing. + setTimeout(flushLeaderDms, 50); + } + }, + onTaskCompleted(memberName, taskId, taskSubject) { + pendingTaskCompletions.push({ memberName, taskId, taskSubject }); + if (!taskCompletionFlushScheduled) { + taskCompletionFlushScheduled = true; + // Batch completions that arrive in the same poll cycle. + setTimeout(flushTaskCompletions, 50); + } + }, }); }; @@ -641,6 +715,16 @@ export function runLeader(pi: ExtensionAPI): void { style, }); + // Startup GC: silently remove stale team directories from previous sessions. + void findGcCandidates({ excludeTeamIds: new Set([currentTeamId]) }) + .then((scan) => { + if (scan.candidates.length === 0) return; + return gcTeamDirs(scan.candidates); + }) + .catch(() => { + // Startup GC is best-effort; never block the session. + }); + await refreshTasks(); renderWidget(); @@ -677,6 +761,7 @@ export function runLeader(pi: ExtensionAPI): void { await stopAllTeammates(currentCtx, `The ${strings.teamNoun} is dissolved — leader moved on`); } stopLoops(); + delegationTracker.clear(); currentCtx = ctx; currentTeamId = currentCtx.sessionManager.getSessionId(); @@ -727,6 +812,20 @@ export function runLeader(pi: ExtensionAPI): void { stopLoops(); const strings = getTeamsStrings(style); await stopAllTeammates(currentCtx, `The ${strings.teamNoun} is over`); + + // Exit cleanup: delete own team directory if it's empty (no tasks, no teammates). + if (currentTeamId) { + try { + const teamDir = getTeamDir(currentTeamId); + const effectiveTlId = taskListId ?? currentTeamId; + const remainingTasks = await listTasks(teamDir, effectiveTlId); + if (remainingTasks.length === 0 && teammates.size === 0) { + await cleanupTeamDir(getTeamsRootDir(), teamDir); + } + } catch { + // Exit cleanup is best-effort; never block shutdown. + } + } }); registerTeamsTool({ @@ -738,6 +837,7 @@ export function runLeader(pi: ExtensionAPI): void { refreshTasks, renderWidget, pendingPlanApprovals, + delegationTracker, }); const openWidget = async (ctx: ExtensionCommandContext) => { diff --git a/extensions/teams/team-gc.ts b/extensions/teams/team-gc.ts new file mode 100644 index 0000000..2425529 --- /dev/null +++ b/extensions/teams/team-gc.ts @@ -0,0 +1,128 @@ +import * as fs from "node:fs"; +import * as path from "node:path"; +import { getTeamsRootDir } from "./paths.js"; +import { assessAttachClaimFreshness, readTeamAttachClaim } from "./team-attach-claim.js"; +import { loadTeamConfig } from "./team-config.js"; +import { listTasks } from "./task-store.js"; +import { cleanupTeamDir } from "./cleanup.js"; + +export interface GcCandidate { + teamId: string; + teamDir: string; + reason: string; +} + +export interface GcResult { + removed: GcCandidate[]; + skipped: { teamId: string; reason: string }[]; + errors: { teamId: string; error: string }[]; +} + +/** + * Scan all team directories and identify those safe to garbage-collect. + * + * A team is considered dead (safe to remove) when ALL of the following are true: + * - It is not the current session's team (`excludeTeamIds`) + * - It has no active (non-stale) attach claim + * - It has no online workers + * - It has no in-progress tasks + */ +export async function findGcCandidates(opts: { + excludeTeamIds: Set; + teamsRoot?: string; +}): Promise<{ candidates: GcCandidate[]; skipped: { teamId: string; reason: string }[] }> { + const teamsRoot = opts.teamsRoot ?? getTeamsRootDir(); + const candidates: GcCandidate[] = []; + const skipped: { teamId: string; reason: string }[] = []; + + let entries: fs.Dirent[]; + try { + entries = await fs.promises.readdir(teamsRoot, { withFileTypes: true }); + } catch { + return { candidates, skipped }; + } + + for (const entry of entries) { + if (!entry.isDirectory()) continue; + // Skip special directories (e.g. _styles, _hooks) + if (entry.name.startsWith("_")) continue; + + const teamId = entry.name; + const teamDir = path.join(teamsRoot, teamId); + + // Never GC the current session's team + if (opts.excludeTeamIds.has(teamId)) { + skipped.push({ teamId, reason: "current session" }); + continue; + } + + // Load config — if missing, the directory is orphaned and safe to remove + const cfg = await loadTeamConfig(teamDir); + if (!cfg) { + candidates.push({ teamId, teamDir, reason: "no config (orphaned)" }); + continue; + } + + // Check attach claim + const claim = await readTeamAttachClaim(teamDir); + if (claim) { + const freshness = assessAttachClaimFreshness(claim); + if (!freshness.isStale) { + skipped.push({ teamId, reason: "active attach claim" }); + continue; + } + } + + // Check for online workers + const onlineWorkers = cfg.members.filter((m) => m.role === "worker" && m.status === "online"); + if (onlineWorkers.length > 0) { + skipped.push({ teamId, reason: `${onlineWorkers.length} online worker(s)` }); + continue; + } + + // Check for in-progress tasks + const tasks = await listTasks(teamDir, cfg.taskListId); + const inProgress = tasks.filter((t) => t.status === "in_progress"); + if (inProgress.length > 0) { + skipped.push({ teamId, reason: `${inProgress.length} in-progress task(s)` }); + continue; + } + + // Build reason summary + const parts: string[] = []; + if (!claim) parts.push("no attach claim"); + else parts.push("stale attach claim"); + const workerCount = cfg.members.filter((m) => m.role === "worker").length; + if (workerCount === 0) parts.push("no workers"); + else parts.push(`${workerCount} offline worker(s)`); + const taskCount = tasks.length; + if (taskCount === 0) parts.push("no tasks"); + else parts.push(`${taskCount} completed/pending task(s)`); + + candidates.push({ teamId, teamDir, reason: parts.join(", ") }); + } + + return { candidates, skipped }; +} + +/** + * Delete the given GC candidates. Returns a result with removed/error details. + */ +export async function gcTeamDirs(candidates: GcCandidate[], teamsRoot?: string): Promise { + const root = teamsRoot ?? getTeamsRootDir(); + const result: GcResult = { removed: [], skipped: [], errors: [] }; + + for (const candidate of candidates) { + try { + await cleanupTeamDir(root, candidate.teamDir); + result.removed.push(candidate); + } catch (err) { + result.errors.push({ + teamId: candidate.teamId, + error: err instanceof Error ? err.message : String(err), + }); + } + } + + return result; +} diff --git a/skills/agent-teams/SKILL.md b/skills/agent-teams/SKILL.md index 64d6082..8391721 100644 --- a/skills/agent-teams/SKILL.md +++ b/skills/agent-teams/SKILL.md @@ -140,8 +140,11 @@ Spawning with `plan` restricts the teammate to read-only tools. After producing /team prune [--all] # hide stale manual teammates (mark offline in config) /team kill # force-terminate one RPC teammate /team cleanup [--force] # delete team directory after all teammates stopped +/team gc [--dry-run] [--force] # remove stale team directories from all past sessions ``` +Stale team directories from previous sessions are also automatically cleaned up on startup. + Teammates reject shutdown requests when they have an active task. Use `/team kill ` to force. ## Other commands