Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,13 @@ Or drive it manually:
/team shutdown alice # graceful shutdown (handshake)
/team shutdown # stop all teammates (leader session remains active)
/team cleanup # remove team artifacts when done
/team gc # remove stale teams from past sessions
/team gc --dry-run # preview what would be removed
```

> **Automatic cleanup:** Stale team directories from previous sessions are automatically
> garbage-collected on startup. Empty team directories are also cleaned up on session exit.

Or let the model drive it with the delegate tool:

```json
Expand Down Expand Up @@ -207,6 +212,7 @@ All management commands live under `/team`.
| `/team prune [--all]` | Mark stale manual teammates offline (hides them in widget) |
| `/team kill <name>` | Force-terminate |
| `/team cleanup [--force]` | Delete team artifacts |
| `/team gc [--dry-run] [--force]` | Remove stale team directories from all past sessions |
| `/team id` | Print team/task-list IDs and paths |
| `/team env <name>` | Print env vars to start a manual teammate |

Expand Down
117 changes: 113 additions & 4 deletions extensions/teams/leader-inbox.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ExtensionContext } from "@mariozechner/pi-coding-agent";
import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent";
import { popUnreadMessages, writeToMailbox } from "./mailbox.js";
import { sanitizeName } from "./names.js";
import {
Expand All @@ -16,17 +16,82 @@ import type { TeamsHookInvocation } from "./hooks.js";
import type { TeamsStyle } from "./teams-style.js";
import { formatMemberDisplayName, getTeamsStrings } from "./teams-style.js";

/**
* Event-driven tracker for delegation batches.
*
* Tracks task IDs from delegate() calls. Tasks are only marked done
* when an idle_notification with completedTaskId is received — NOT
* by polling task file status. This avoids race conditions where
* listTasks() returns stale or premature data.
*/
export class DelegationTracker {
private batches: Array<{
taskIds: Set<string>;
completedIds: Set<string>;
notified: boolean;
}> = [];

/** Register a new batch of delegated task IDs. */
addBatch(taskIds: string[]): void {
if (taskIds.length === 0) return;
this.batches.push({
taskIds: new Set(taskIds),
completedIds: new Set(),
notified: false,
});
}

/**
* Mark a task as completed (called when idle_notification with
* completedTaskId is received). Returns any batches that became
* fully complete as a result.
*/
markCompleted(taskId: string): Array<{ taskIds: string[] }> {
const newlyComplete: Array<{ taskIds: string[] }> = [];

for (const batch of this.batches) {
if (batch.notified) continue;
if (!batch.taskIds.has(taskId)) continue;

batch.completedIds.add(taskId);

// Check if ALL tasks in this batch are now done
const allDone = [...batch.taskIds].every((id) => batch.completedIds.has(id));
if (allDone) {
batch.notified = true;
newlyComplete.push({ taskIds: [...batch.taskIds] });
}
}

// Prune notified batches
this.batches = this.batches.filter((b) => !b.notified);
return newlyComplete;
}

/** Clear all tracked batches (e.g. on session switch). */
clear(): void {
this.batches = [];
}
}

export async function pollLeaderInbox(opts: {
ctx: ExtensionContext;
pi: ExtensionAPI;
teamId: string;
teamDir: string;
taskListId: string;
leadName: string;
style: TeamsStyle;
pendingPlanApprovals: Map<string, { requestId: string; name: string; taskId?: string }>;
enqueueHook?: (invocation: TeamsHookInvocation) => void;
/** PR #6 compat: callback for teammate DMs routed to leader LLM context. */
onDm?: (from: string, text: string) => void;
/** Callback for per-task completion notifications routed to leader LLM context. */
onTaskCompleted?: (memberName: string, taskId: string, taskSubject: string) => void;
/** Batch delegation tracker for all-tasks-complete auto-notify. */
delegationTracker?: DelegationTracker;
}): Promise<void> {
const { ctx, teamId, teamDir, taskListId, leadName, style, pendingPlanApprovals, enqueueHook } = opts;
const { ctx, pi, teamId, teamDir, taskListId, leadName, style, pendingPlanApprovals, enqueueHook, onDm, onTaskCompleted, delegationTracker } = opts;
const strings = getTeamsStrings(style);

let msgs: Awaited<ReturnType<typeof popUnreadMessages>>;
Expand All @@ -38,6 +103,10 @@ export async function pollLeaderInbox(opts: {
}
if (!msgs.length) return;

// Collect batch completions across all messages in this poll cycle,
// then fire notifications once at the end (avoids duplicate triggers).
const batchCompletions: Array<{ taskIds: string[] }> = [];

for (const m of msgs) {
const approved = isShutdownApproved(m.text);
if (approved) {
Expand Down Expand Up @@ -115,7 +184,7 @@ export async function pollLeaderInbox(opts: {
// ignore hook enqueue errors
}

// Hook: task completion / failure
// Hook + notifications for task completion / failure
if (idle.completedTaskId) {
const completedTask = await getTask(teamDir, taskListId, idle.completedTaskId);
try {
Expand All @@ -132,6 +201,24 @@ export async function pollLeaderInbox(opts: {
} catch {
// ignore hook enqueue errors
}

if (idle.completedStatus !== "failed") {
// Per-task completion notification → inject into leader LLM context
if (onTaskCompleted && completedTask) {
try {
onTaskCompleted(name, idle.completedTaskId, completedTask.subject ?? `task #${idle.completedTaskId}`);
} catch {
// ignore notification errors
}
}

// Event-driven batch tracking: mark this task done and
// collect any batches that became fully complete.
if (delegationTracker) {
const completed = delegationTracker.markCompleted(idle.completedTaskId);
batchCompletions.push(...completed);
}
}
}

if (idle.failureReason) {
Expand Down Expand Up @@ -209,6 +296,28 @@ export async function pollLeaderInbox(opts: {
continue;
}

ctx.ui.notify(`Message from ${m.from}: ${m.text}`, "info");
// Unrecognized message = teammate DM to leader
if (onDm) {
onDm(m.from, m.text);
} else {
ctx.ui.notify(`Message from ${m.from}: ${m.text}`, "info");
}
}

// Fire batch-complete notifications (deduplicated across this poll cycle).
for (const batch of batchCompletions) {
const taskRefs = batch.taskIds.map((id) => `#${id}`).join(", ");
const summary = `All delegated tasks completed (${taskRefs}). Review the results and continue.`;

try {
if (ctx.isIdle()) {
pi.sendUserMessage(`[team] ${summary}`);
} else {
pi.sendUserMessage(`[team] ${summary}`, { deliverAs: "followUp" });
}
} catch {
// Fallback: at minimum show a notification so user knows to check.
ctx.ui.notify(`✅ ${summary}`, "info");
}
}
}
83 changes: 83 additions & 0 deletions extensions/teams/leader-lifecycle-commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { getTeamDir, getTeamsRootDir, getTeamsStylesDir } from "./paths.js";
import { TEAM_MAILBOX_NS } from "./protocol.js";
import { unassignTasksForAgent, type TeamTask } from "./task-store.js";
import { setMemberStatus, setTeamStyle, type TeamConfig } from "./team-config.js";
import { findGcCandidates, gcTeamDirs } from "./team-gc.js";
import {
type TeamsStyle,
formatMemberDisplayName,
Expand Down Expand Up @@ -231,6 +232,88 @@ export async function handleTeamCleanupCommand(opts: {
renderWidget();
}

export async function handleTeamGcCommand(opts: {
ctx: ExtensionCommandContext;
rest: string[];
teamId: string;
}): Promise<void> {
const { ctx, rest, teamId } = opts;

const flags = rest.filter((a) => a.startsWith("--"));
const argsOnly = rest.filter((a) => !a.startsWith("--"));
const force = flags.includes("--force");
const dryRun = flags.includes("--dry-run");

const unknownFlags = flags.filter((f) => f !== "--force" && f !== "--dry-run");
if (unknownFlags.length) {
ctx.ui.notify(`Unknown flag(s): ${unknownFlags.join(", ")}`, "error");
return;
}
if (argsOnly.length) {
ctx.ui.notify("Usage: /team gc [--dry-run] [--force]", "error");
return;
}

const excludeTeamIds = new Set([teamId]);
const { candidates, skipped } = await findGcCandidates({ excludeTeamIds });

if (candidates.length === 0) {
const msg = skipped.length > 0
? `No stale teams to clean up (${skipped.length} active team(s) skipped)`
: "No stale teams to clean up";
ctx.ui.notify(msg, "info");
return;
}

if (dryRun) {
const lines = [
`Would remove ${candidates.length} stale team(s):`,
"",
...candidates.map((c) => ` ${c.teamId} — ${c.reason}`),
];
if (skipped.length > 0) {
lines.push("", `Skipped ${skipped.length} active team(s):`);
for (const s of skipped) {
lines.push(` ${s.teamId} — ${s.reason}`);
}
}
ctx.ui.notify(lines.join("\n"), "info");
return;
}

if (!force) {
if (process.stdout.isTTY && process.stdin.isTTY) {
const ok = await ctx.ui.confirm(
"Garbage collect stale teams",
[
`Remove ${candidates.length} stale team director${candidates.length === 1 ? "y" : "ies"}?`,
"",
...candidates.map((c) => ` ${c.teamId} — ${c.reason}`),
].join("\n"),
);
if (!ok) return;
} else {
ctx.ui.notify("Refusing to GC in non-interactive mode without --force", "error");
return;
}
}

const result = await gcTeamDirs(candidates);

const lines: string[] = [];
if (result.removed.length > 0) {
lines.push(`Removed ${result.removed.length} stale team(s)`);
}
if (result.errors.length > 0) {
lines.push(`Failed to remove ${result.errors.length} team(s):`);
for (const e of result.errors) {
lines.push(` ${e.teamId}: ${e.error}`);
}
}

ctx.ui.notify(lines.join("\n"), result.errors.length > 0 ? "warning" : "info");
}

export async function handleTeamShutdownCommand(opts: {
ctx: ExtensionCommandContext;
rest: string[];
Expand Down
10 changes: 10 additions & 0 deletions extensions/teams/leader-team-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { handleTeamAttachCommand, handleTeamDetachCommand } from "./leader-attac
import {
handleTeamCleanupCommand,
handleTeamDelegateCommand,
handleTeamGcCommand,
handleTeamKillCommand,
handleTeamPruneCommand,
handleTeamShutdownCommand,
Expand Down Expand Up @@ -55,6 +56,7 @@ const TEAM_HELP_TEXT = [
" /team plan approve <name>",
" /team plan reject <name> [feedback...]",
" /team cleanup [--force]",
" /team gc [--dry-run] [--force] # remove stale team directories",
" /team prune [--all] # hide stale manual teammates (mark offline)",
" /team task add <text...>",
" /team task assign <id> <agent>",
Expand Down Expand Up @@ -210,6 +212,14 @@ export async function handleTeamCommand(opts: {
});
},

gc: async () => {
await handleTeamGcCommand({
ctx,
rest,
teamId: activeTeamId,
});
},

prune: async () => {
await handleTeamPruneCommand({
ctx,
Expand Down
9 changes: 8 additions & 1 deletion extensions/teams/leader-teams-tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
} from "./task-store.js";
import type { TeammateRpc } from "./teammate-rpc.js";
import type { ContextMode, WorkspaceMode, SpawnTeammateFn } from "./spawn-types.js";
import type { DelegationTracker } from "./leader-inbox.js";

type TeamsToolDelegateTask = { text: string; assignee?: string };

Expand Down Expand Up @@ -158,8 +159,9 @@ export function registerTeamsTool(opts: {
refreshTasks: () => Promise<void>;
renderWidget: () => void;
pendingPlanApprovals: Map<string, { requestId: string; name: string; taskId?: string }>;
delegationTracker?: DelegationTracker;
}): void {
const { pi, teammates, spawnTeammate, getTeamId, getTaskListId, refreshTasks, renderWidget, pendingPlanApprovals } = opts;
const { pi, teammates, spawnTeammate, getTeamId, getTaskListId, refreshTasks, renderWidget, pendingPlanApprovals, delegationTracker } = opts;

pi.registerTool({
name: "teams",
Expand Down Expand Up @@ -1040,6 +1042,11 @@ export function registerTeamsTool(opts: {
assignments.push({ taskId: task.id, assignee, subject });
}

// Track delegated task IDs for auto-notification when all complete
if (delegationTracker && assignments.length > 0) {
delegationTracker.addBatch(assignments.map((a) => a.taskId));
}

void refreshTasks().finally(renderWidget);

const lines: string[] = [];
Expand Down
Loading