diff --git a/CLAUDE.md b/CLAUDE.md index 93c02c27d3..d03f004809 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -13,7 +13,7 @@ This repo contains the CLI for Entire. - `entire/`: Main CLI entry point. Also home to kubectl-style external-command resolution (`entire ` → `entire-` on PATH) — see [External Commands](docs/architecture/external-commands.md). - `entire/cli`: CLI utilities and helpers (Cobra commands, helpers, group roots) - `entire/cli/commands`: actual command implementations -- `entire/cli/agent`: agent implementations (Claude Code, Gemini CLI, OpenCode, Cursor, Factory AI Droid, Copilot CLI) - see [Agent Integration Checklist](docs/architecture/agent-integration-checklist.md) and [Agent Implementation Guide](docs/architecture/agent-guide.md) +- `entire/cli/agent`: agent implementations (Claude Code, Gemini CLI, OpenCode, Cursor, Factory AI Droid, Copilot CLI, Pi) - see [Agent Integration Checklist](docs/architecture/agent-integration-checklist.md) and [Agent Implementation Guide](docs/architecture/agent-guide.md) - `entire/cli/strategy`: strategy implementation (manual-commit) - see section below - `entire/cli/checkpoint`: checkpoint storage abstractions (temporary and committed) - `entire/cli/session`: session state management @@ -115,6 +115,7 @@ mise run test:e2e --agent opencode [filter] # OpenCode only mise run test:e2e --agent cursor [filter] # Cursor only mise run test:e2e --agent factoryai-droid [filter] # Factory AI Droid only mise run test:e2e --agent copilot-cli [filter] # Copilot CLI only +mise run test:e2e --agent pi [filter] # Pi only ``` E2E tests: @@ -122,9 +123,9 @@ E2E tests: - Use the `//go:build e2e` build tag - Located in `e2e/tests/` - See [`e2e/README.md`](e2e/README.md) for full documentation (structure, debugging, adding agents) -- Test real agent interactions (Claude Code, Gemini CLI, OpenCode, Cursor, Factory AI Droid, Copilot CLI, or Vogon creating files, committing, etc.) +- Test real agent interactions (Claude Code, Gemini CLI, OpenCode, Cursor, Factory AI Droid, Copilot CLI, Pi, or Vogon creating files, committing, etc.) - Validate checkpoint scenarios documented in `docs/architecture/checkpoint-scenarios.md` -- Support multiple agents via `E2E_AGENT` env var (`claude-code`, `gemini`, `opencode`, `cursor`, `factoryai-droid`, `copilot-cli`, `vogon`) +- Support multiple agents via `E2E_AGENT` env var (`claude-code`, `gemini`, `opencode`, `cursor`, `factoryai-droid`, `copilot-cli`, `pi`, `vogon`) **Environment variables:** diff --git a/cmd/entire/cli/agent/pi/entire_extension.ts b/cmd/entire/cli/agent/pi/entire_extension.ts new file mode 100644 index 0000000000..d14bdb2d82 --- /dev/null +++ b/cmd/entire/cli/agent/pi/entire_extension.ts @@ -0,0 +1,73 @@ +// Entire CLI extension for Pi +// Auto-generated by `entire enable --agent pi` +// Do not edit manually — changes will be overwritten on next install. +// +// Forwards Pi lifecycle events to `entire hooks pi ` so Entire can +// create checkpoints, capture transcripts, and offer rewind/resume. +// +// ENTIRE_CMD is replaced at install time by Entire's installer. + +import type { ExtensionAPI } from "@earendil-works/pi-coding-agent"; +import { execFile } from "node:child_process"; + +export default function (pi: ExtensionAPI) { + const ENTIRE_CMD = "__ENTIRE_CMD__"; + + function fireHook(hookName: string, data: Record): Promise { + return new Promise((resolve) => { + try { + const child = execFile( + "sh", + ["-c", `${ENTIRE_CMD} hooks pi ${hookName}`], + { timeout: 10000, windowsHide: true }, + () => resolve(), + ); + child.stdin?.end(JSON.stringify(data)); + } catch { + // best effort — never block the agent on a hook failure + resolve(); + } + }); + } + + // Agent-driven bash subprocesses inherit a real TTY but cannot answer + // hook prompts. Disable git/Entire terminal prompts for bash calls so + // Entire treats agent-driven commits as non-interactive. + pi.on("tool_call", async (event) => { + if (event.toolName !== "bash") return; + const input = event.input as { command?: string }; + if (typeof input.command !== "string" || input.command.includes("GIT_TERMINAL_PROMPT=")) { + return; + } + input.command = "export GIT_TERMINAL_PROMPT=0\n" + input.command; + }); + + pi.on("session_start", async (_event, ctx) => { + await fireHook("session_start", { + type: "session_start", + cwd: ctx.cwd, + session_file: ctx.sessionManager.getSessionFile(), + }); + }); + + pi.on("before_agent_start", async (event, ctx) => { + await fireHook("before_agent_start", { + type: "before_agent_start", + cwd: ctx.cwd, + session_file: ctx.sessionManager.getSessionFile(), + prompt: event.prompt, + }); + }); + + pi.on("agent_end", async (_event, ctx) => { + await fireHook("agent_end", { + type: "agent_end", + cwd: ctx.cwd, + session_file: ctx.sessionManager.getSessionFile(), + }); + }); + + pi.on("session_shutdown", async () => { + await fireHook("session_shutdown", { type: "session_shutdown" }); + }); +} diff --git a/cmd/entire/cli/agent/pi/hooks.go b/cmd/entire/cli/agent/pi/hooks.go new file mode 100644 index 0000000000..f238b16c12 --- /dev/null +++ b/cmd/entire/cli/agent/pi/hooks.go @@ -0,0 +1,124 @@ +package pi + +import ( + "context" + _ "embed" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/paths" +) + +// Compile-time interface assertion +var _ agent.HookSupport = (*PiAgent)(nil) + +//go:embed entire_extension.ts +var extensionTemplate string + +const ( + // extensionDirName is the directory pi auto-discovers project-local + // extensions from. + extensionDirName = ".pi/extensions/entire" + + // extensionFileName is the file pi loads from extensionDirName. + extensionFileName = "index.ts" + + // entireMarker identifies the file as Entire-owned. Substring of the + // auto-generated header so AreHooksInstalled can verify ownership by + // content (and so it survives the ENTIRE_CMD placeholder substitution). + entireMarker = "Auto-generated by `entire enable --agent pi`" + + // entireCmdPlaceholder is replaced at install time with either `entire` + // (production) or a `go run …` path (local-dev). + entireCmdPlaceholder = "__ENTIRE_CMD__" +) + +func extensionPath(ctx context.Context) (string, error) { + root, err := paths.WorktreeRoot(ctx) + if err != nil { + // Fall back to CWD for tests run outside a git repo. + //nolint:forbidigo // explicit fallback when WorktreeRoot fails + root, err = os.Getwd() + if err != nil { + return "", fmt.Errorf("resolve repo root: %w", err) + } + } + return filepath.Join(root, extensionDirName, extensionFileName), nil +} + +func renderExtension(localDev bool) string { + var cmd string + if localDev { + cmd = `go run "$(git rev-parse --show-toplevel)"/cmd/entire/main.go` + } else { + cmd = "entire" + } + return strings.ReplaceAll(extensionTemplate, entireCmdPlaceholder, cmd) +} + +// InstallHooks writes the Entire pi extension to .pi/extensions/entire/index.ts. +// Returns 1 if the extension was written, 0 if already up-to-date (idempotent). +// If the file exists but content differs (e.g., localDev vs production), it is +// rewritten as long as it is recognisable as Entire-owned (contains the +// marker). A foreign file at the same path is left untouched unless force is +// true — this protects user-authored extensions that happen to live at +// .pi/extensions/entire/index.ts. +func (a *PiAgent) InstallHooks(ctx context.Context, localDev bool, force bool) (int, error) { + path, err := extensionPath(ctx) + if err != nil { + return 0, err + } + content := renderExtension(localDev) + + if !force { + //nolint:gosec // path constructed from validated repo root + existing, readErr := os.ReadFile(path) + switch { + case readErr == nil && string(existing) == content: + return 0, nil // already up-to-date + case readErr == nil && !strings.Contains(string(existing), entireMarker): + return 0, fmt.Errorf("refusing to overwrite foreign file at %s; remove it or pass --force", path) + } + } + + //nolint:gosec // G301: pi reads the directory; standard 0755 permissions + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return 0, fmt.Errorf("create extension dir: %w", err) + } + //nolint:gosec // G306: pi reads the file; standard 0644 permissions + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + return 0, fmt.Errorf("write extension: %w", err) + } + return 1, nil +} + +// UninstallHooks removes the entire pi extension directory (if present). +func (a *PiAgent) UninstallHooks(ctx context.Context) error { + path, err := extensionPath(ctx) + if err != nil { + return err + } + dir := filepath.Dir(path) + if err := os.RemoveAll(dir); err != nil { + return fmt.Errorf("remove pi extension dir: %w", err) + } + return nil +} + +// AreHooksInstalled returns true when the extension file exists and is +// recognisable as Entire-owned (contains the marker string). +func (a *PiAgent) AreHooksInstalled(ctx context.Context) bool { + path, err := extensionPath(ctx) + if err != nil { + return false + } + //nolint:gosec // path from validated repo root + data, err := os.ReadFile(path) + if err != nil { + return false + } + return strings.Contains(string(data), entireMarker) +} diff --git a/cmd/entire/cli/agent/pi/hooks_test.go b/cmd/entire/cli/agent/pi/hooks_test.go new file mode 100644 index 0000000000..92ca9ea30c --- /dev/null +++ b/cmd/entire/cli/agent/pi/hooks_test.go @@ -0,0 +1,173 @@ +package pi + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" +) + +// Note: t.Parallel is incompatible with t.Chdir. + +func TestInstallHooks_FreshInstall(t *testing.T) { + dir := t.TempDir() + t.Chdir(dir) + + count, err := (&PiAgent{}).InstallHooks(context.Background(), false, false) + if err != nil { + t.Fatalf("InstallHooks: %v", err) + } + if count != 1 { + t.Errorf("count = %d, want 1", count) + } + + path := filepath.Join(dir, ".pi", "extensions", "entire", "index.ts") + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("extension not written: %v", err) + } + body := string(data) + + if !strings.Contains(body, `const ENTIRE_CMD = "entire"`) { + t.Error("production ENTIRE_CMD missing") + } + if !strings.Contains(body, "hooks pi ") { + t.Error("missing call to `entire hooks pi`") + } + if !strings.Contains(body, entireMarker) { + t.Error("entireMarker missing") + } + if strings.Contains(body, "go run") { + t.Error("production extension should not contain 'go run'") + } +} + +func TestInstallHooks_LocalDev(t *testing.T) { + dir := t.TempDir() + t.Chdir(dir) + if _, err := (&PiAgent{}).InstallHooks(context.Background(), true, false); err != nil { + t.Fatalf("InstallHooks: %v", err) + } + data, err := os.ReadFile(filepath.Join(dir, ".pi", "extensions", "entire", "index.ts")) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(data), `go run "$(git rev-parse --show-toplevel)"/cmd/entire/main.go`) { + t.Error("local-dev extension should reference git rev-parse path") + } +} + +func TestInstallHooks_Idempotent(t *testing.T) { + dir := t.TempDir() + t.Chdir(dir) + a := &PiAgent{} + + c1, err := a.InstallHooks(context.Background(), false, false) + if err != nil { + t.Fatal(err) + } + if c1 != 1 { + t.Errorf("first install count = %d", c1) + } + c2, err := a.InstallHooks(context.Background(), false, false) + if err != nil { + t.Fatal(err) + } + if c2 != 0 { + t.Errorf("second install (idempotent) count = %d", c2) + } +} + +func TestInstallHooks_RewritesOnModeChange(t *testing.T) { + dir := t.TempDir() + t.Chdir(dir) + a := &PiAgent{} + if _, err := a.InstallHooks(context.Background(), false, false); err != nil { + t.Fatal(err) + } + c, err := a.InstallHooks(context.Background(), true, false) + if err != nil { + t.Fatal(err) + } + if c != 1 { + t.Errorf("expected rewrite on mode change, got %d", c) + } +} + +func TestUninstallHooks(t *testing.T) { + dir := t.TempDir() + t.Chdir(dir) + a := &PiAgent{} + if _, err := a.InstallHooks(context.Background(), false, false); err != nil { + t.Fatal(err) + } + if !a.AreHooksInstalled(context.Background()) { + t.Fatal("AreHooksInstalled should be true after install") + } + if err := a.UninstallHooks(context.Background()); err != nil { + t.Fatalf("UninstallHooks: %v", err) + } + if a.AreHooksInstalled(context.Background()) { + t.Error("AreHooksInstalled should be false after uninstall") + } + // Idempotent uninstall. + if err := a.UninstallHooks(context.Background()); err != nil { + t.Errorf("second uninstall: %v", err) + } +} + +func TestAreHooksInstalled_RejectsForeignFile(t *testing.T) { + dir := t.TempDir() + t.Chdir(dir) + path := filepath.Join(dir, ".pi", "extensions", "entire", "index.ts") + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(path, []byte("// user's own extension\n"), 0o644); err != nil { + t.Fatal(err) + } + if (&PiAgent{}).AreHooksInstalled(context.Background()) { + t.Error("should not claim a non-Entire file") + } +} + +func TestInstallHooks_RefusesForeignFileWithoutForce(t *testing.T) { + // User has their own extension at the same path. Without --force we must + // not clobber it. With --force we replace it. + dir := t.TempDir() + t.Chdir(dir) + path := filepath.Join(dir, ".pi", "extensions", "entire", "index.ts") + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatal(err) + } + userContent := []byte("// user's own extension\nconsole.log('mine');\n") + if err := os.WriteFile(path, userContent, 0o644); err != nil { + t.Fatal(err) + } + + // Without force: should refuse, leave file untouched. + _, err := (&PiAgent{}).InstallHooks(context.Background(), false, false) + if err == nil { + t.Fatal("expected error when foreign file exists and force=false") + } + got, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + if string(got) != string(userContent) { + t.Errorf("foreign file was modified: %q", got) + } + + // With force: should overwrite. + if _, err := (&PiAgent{}).InstallHooks(context.Background(), false, true); err != nil { + t.Fatalf("force install failed: %v", err) + } + got, err = os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + if !strings.Contains(string(got), entireMarker) { + t.Error("force install should write Entire-owned file") + } +} diff --git a/cmd/entire/cli/agent/pi/lifecycle.go b/cmd/entire/cli/agent/pi/lifecycle.go new file mode 100644 index 0000000000..aa6cb9633a --- /dev/null +++ b/cmd/entire/cli/agent/pi/lifecycle.go @@ -0,0 +1,274 @@ +package pi + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/logging" + "github.com/entireio/cli/cmd/entire/cli/paths" +) + +// Hook names — these match Pi's native event names exactly (snake_case), +// because the embedded TypeScript extension forwards `pi.on()` events +// directly. Keeping the names identical avoids a translation layer in the +// extension. +const ( + HookNameSessionStart = "session_start" + HookNameBeforeAgentStart = "before_agent_start" + HookNameAgentEnd = "agent_end" + HookNameSessionShutdown = "session_shutdown" +) + +// HookNames returns the verbs registered as `entire hooks pi `. +func (a *PiAgent) HookNames() []string { + return []string{ + HookNameSessionStart, + HookNameBeforeAgentStart, + HookNameAgentEnd, + HookNameSessionShutdown, + } +} + +// GetSupportedHooks maps Pi's native events to normalised lifecycle types. +// +// - session_start → SessionStart +// - before_agent_start → TurnStart +// - agent_end → TurnEnd +// - session_shutdown → (cleanup-only, no lifecycle event — see ParseHookEvent) +func (a *PiAgent) GetSupportedHooks() []agent.HookType { + return []agent.HookType{ + agent.HookSessionStart, + agent.HookUserPromptSubmit, + agent.HookStop, + } +} + +// piHookPayload is the JSON the embedded TypeScript extension pipes to +// `entire hooks pi ` on stdin. +type piHookPayload struct { + Type string `json:"type"` + Cwd string `json:"cwd,omitempty"` + SessionFile string `json:"session_file,omitempty"` + SessionID string `json:"session_id,omitempty"` + Prompt string `json:"prompt,omitempty"` +} + +// ParseHookEvent translates a Pi hook invocation into a normalised lifecycle +// event. Implements agent.HookSupport. +func (a *PiAgent) ParseHookEvent(ctx context.Context, hookName string, stdin io.Reader) (*agent.Event, error) { + data, err := io.ReadAll(stdin) + if err != nil { + return nil, fmt.Errorf("read pi hook input: %w", err) + } + if len(data) == 0 { + return nil, errors.New("empty pi hook input") + } + + var payload piHookPayload + if err := json.Unmarshal(data, &payload); err != nil { + return nil, fmt.Errorf("parse pi hook payload: %w", err) + } + + sessionID := payload.SessionID + if sessionID == "" { + sessionID = extractSessionIDFromPath(payload.SessionFile) + } + + now := time.Now() + + switch hookName { + case HookNameSessionStart: + cacheSessionID(ctx, sessionID) + return &agent.Event{ + Type: agent.SessionStart, + SessionID: sessionID, + Timestamp: now, + }, nil + + case HookNameBeforeAgentStart: + // Pi emits before_agent_start with a fully-populated session ID, but + // we cache it anyway to support the agent_end fallback below. + if sessionID == "" { + sessionID = readCachedSessionID(ctx) + } else { + cacheSessionID(ctx, sessionID) + } + // Provide the live Pi session file as SessionRef so state.TranscriptPath + // is populated before any mid-turn commits. Without this, the + // post-commit hook cannot condense when no shadow branch exists yet. + return &agent.Event{ + Type: agent.TurnStart, + SessionID: sessionID, + SessionRef: payload.SessionFile, + Prompt: payload.Prompt, + Timestamp: now, + }, nil + + case HookNameAgentEnd: + if sessionID == "" { + sessionID = readCachedSessionID(ctx) + } + // Capture the Pi JSONL into /.entire/tmp/pi/.json so the + // strategy has a stable transcript reference even if the user later + // deletes Pi sessions. The pi/ subdir avoids colliding with paths + // other agents (or test harnesses) stage under .entire/tmp/. + sessionRef := captureTranscript(ctx, sessionID, payload.SessionFile) + return &agent.Event{ + Type: agent.TurnEnd, + SessionID: sessionID, + SessionRef: sessionRef, + Timestamp: now, + }, nil + + case HookNameSessionShutdown: + // Cleanup-only: clear the cached session ID. We intentionally do NOT + // emit SessionEnd here. + // + // Pi fires session_shutdown and agent_end on session teardown, and the + // TypeScript extension dispatches both via separate `entire hooks pi …` + // child processes (execFile is non-blocking). Child-process startup + // ordering then decides which event reaches the lifecycle dispatcher + // first; if session_shutdown wins, an emitted SessionEnd transitions + // the session to "ended" before agent_end can save the linkable + // checkpoint, leaving prepare-commit-msg with no session to attach a + // trailer to and the user's commit unlinked. + // + // agent_end is the source of truth for "turn complete" (and, for Pi, + // effectively "session over" for any single-turn `pi -p` invocation). + // SessionEnd is left for the framework to derive from idle timeout or + // the next SessionStart's stale-state cleanup. + clearCachedSessionID(ctx) + return nil, nil //nolint:nilnil // intentional: cleanup-only, no lifecycle event + + default: + // Unknown / future hooks have no lifecycle significance. + return nil, nil //nolint:nilnil // unknown hook = no lifecycle event (acceptable) + } +} + +// --- session ID cache --- +// +// Pi's `before_agent_start` event sometimes fires before `session_start` has +// completed cacheing the session ID (race during early extension load), and +// `agent_end` may fire after Pi has torn down its session manager. We cache +// the active session ID at session_start time so subsequent hooks can recover +// it. + +const activeSessionFile = "pi-active-session" + +// piHookCacheSubdir is the subdirectory under .entire/tmp/ where hook +// flow caches the active-session ID file and the agent_end transcript +// snapshot. Agent-specific (not just .entire/tmp/) so other agents' +// integration tests and tooling don't shadow each other under the cache +// root. +const piHookCacheSubdir = "pi" + +// resolveSessionDir returns the per-repo hook cache directory used by +// cacheSessionID / readCachedSessionID / clearCachedSessionID and +// captureTranscript. +// +// This is intentionally distinct from PiAgent.GetSessionDir, which +// points at Pi's native session store (~/.pi/agent/sessions/...) so +// cold attach can resolve transcripts that were never hook-captured. +// The cache here is hook-internal and only reachable via Pi hooks +// firing; the framework records the cached path as SessionRef in +// checkpoint metadata, so subsequent operations on hooked sessions go +// through the recorded path rather than re-resolving via GetSessionDir. +func resolveSessionDir(ctx context.Context) string { + root, err := paths.WorktreeRoot(ctx) + if err != nil { + //nolint:forbidigo // fallback when no git repo (tests run outside repos) + wd, wdErr := os.Getwd() + if wdErr != nil { + return filepath.Join(paths.EntireTmpDir, piHookCacheSubdir) + } + root = wd + } + return filepath.Join(root, paths.EntireTmpDir, piHookCacheSubdir) +} + +func cacheSessionID(ctx context.Context, id string) { + if id == "" { + return + } + dir := resolveSessionDir(ctx) + if err := os.MkdirAll(dir, 0o750); err != nil { + logging.Debug(ctx, "pi: cache session id mkdir", slog.String("err", err.Error())) + return + } + + if err := os.WriteFile(filepath.Join(dir, activeSessionFile), []byte(id), 0o600); err != nil { + logging.Debug(ctx, "pi: cache session id write", slog.String("err", err.Error())) + } +} + +func readCachedSessionID(ctx context.Context) string { + dir := resolveSessionDir(ctx) + //nolint:gosec // path constructed from validated repo root + data, err := os.ReadFile(filepath.Join(dir, activeSessionFile)) + if err != nil { + return "" + } + return strings.TrimSpace(string(data)) +} + +func clearCachedSessionID(ctx context.Context) { + dir := resolveSessionDir(ctx) + _ = os.Remove(filepath.Join(dir, activeSessionFile)) +} + +// captureTranscript copies the Pi JSONL session file to +// /.entire/tmp/pi/.json so Entire has a stable transcript +// reference. Returns the path to the cached file, or "" if either input is +// missing. The pi/ namespace under .entire/tmp/ is intentional — see +// GetSessionDir / piHookCacheSubdir for the rationale. +func captureTranscript(ctx context.Context, sessionID, piSessionFile string) string { + if sessionID == "" || piSessionFile == "" { + return "" + } + dir := resolveSessionDir(ctx) + if err := os.MkdirAll(dir, 0o750); err != nil { + logging.Warn(ctx, "pi: capture transcript mkdir failed", + slog.String("dir", dir), slog.String("err", err.Error())) + return "" + } + dst := filepath.Join(dir, sessionID+".json") + //nolint:gosec // G703: piSessionFile from trusted Pi extension stdin payload + data, err := os.ReadFile(piSessionFile) + if err != nil { + logging.Warn(ctx, "pi: capture transcript read failed", + slog.String("src", piSessionFile), slog.String("err", err.Error())) + return "" + } + //nolint:gosec // G703: dst constructed from validated session ID inside .entire/tmp + if err := os.WriteFile(dst, data, 0o600); err != nil { + logging.Warn(ctx, "pi: capture transcript write failed", + slog.String("dst", dst), slog.String("err", err.Error())) + return "" + } + return dst +} + +// extractSessionIDFromPath extracts the UUID from a Pi session filename. +// Pattern: _.jsonl → returns +// Falls back to the basename without extension if the pattern doesn't match. +func extractSessionIDFromPath(p string) string { + if p == "" { + return "" + } + base := filepath.Base(p) + base = strings.TrimSuffix(base, ".jsonl") + if i := strings.LastIndex(base, "_"); i >= 0 { + return base[i+1:] + } + return base +} diff --git a/cmd/entire/cli/agent/pi/lifecycle_test.go b/cmd/entire/cli/agent/pi/lifecycle_test.go new file mode 100644 index 0000000000..c5ba36d4b2 --- /dev/null +++ b/cmd/entire/cli/agent/pi/lifecycle_test.go @@ -0,0 +1,218 @@ +package pi + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +func TestParseHookEvent_SessionStart(t *testing.T) { + t.Parallel() + a := &PiAgent{} + stdin := strings.NewReader(`{"type":"session_start","cwd":"/repo","session_file":"/tmp/2026-05-09T12-00-00-000Z_abc-123.jsonl"}`) + ev, err := a.ParseHookEvent(context.Background(), HookNameSessionStart, stdin) + if err != nil { + t.Fatalf("ParseHookEvent: %v", err) + } + if ev.Type != agent.SessionStart { + t.Errorf("Type = %v", ev.Type) + } + if ev.SessionID != "abc-123" { + t.Errorf("SessionID = %q (want abc-123 extracted from filename)", ev.SessionID) + } +} + +func TestParseHookEvent_BeforeAgentStart(t *testing.T) { + t.Parallel() + a := &PiAgent{} + stdin := strings.NewReader(`{"type":"before_agent_start","session_file":"/tmp/2026-05-09T12-00-00-000Z_abc-123.jsonl","prompt":"do thing"}`) + ev, err := a.ParseHookEvent(context.Background(), HookNameBeforeAgentStart, stdin) + if err != nil { + t.Fatalf("ParseHookEvent: %v", err) + } + if ev.Type != agent.TurnStart { + t.Errorf("Type = %v, want TurnStart", ev.Type) + } + if ev.Prompt != "do thing" { + t.Errorf("Prompt = %q", ev.Prompt) + } + if ev.SessionRef != "/tmp/2026-05-09T12-00-00-000Z_abc-123.jsonl" { + t.Errorf("SessionRef = %q", ev.SessionRef) + } +} + +func TestParseHookEvent_SessionShutdown_NoLifecycleEvent(t *testing.T) { + t.Parallel() + a := &PiAgent{} + stdin := strings.NewReader(`{"type":"session_shutdown","session_id":"explicit-id"}`) + ev, err := a.ParseHookEvent(context.Background(), HookNameSessionShutdown, stdin) + if err != nil { + t.Fatalf("ParseHookEvent: %v", err) + } + // session_shutdown is cleanup-only — see ParseHookEvent for the rationale. + if ev != nil { + t.Fatalf("expected nil event from session_shutdown, got %+v", ev) + } +} + +func TestParseHookEvent_SessionShutdown_ClearsCache(t *testing.T) { + // session_shutdown's only side effect is clearing the cached session ID. + // Cannot use t.Parallel — t.Chdir. + dir := t.TempDir() + t.Chdir(dir) + + ctx := context.Background() + a := &PiAgent{} + + // Populate the cache via session_start. + if _, err := a.ParseHookEvent(ctx, HookNameSessionStart, strings.NewReader( + `{"type":"session_start","session_file":"/tmp/2026-05-09T12-00-00-000Z_cached-id.jsonl"}`)); err != nil { + t.Fatalf("session_start setup: %v", err) + } + if got := readCachedSessionID(ctx); got != "cached-id" { + t.Fatalf("cache pre-shutdown = %q, want cached-id", got) + } + + // session_shutdown clears the cache and emits no event. + ev, err := a.ParseHookEvent(ctx, HookNameSessionShutdown, strings.NewReader(`{"type":"session_shutdown"}`)) + if err != nil { + t.Fatalf("session_shutdown: %v", err) + } + if ev != nil { + t.Errorf("expected nil event, got %+v", ev) + } + if got := readCachedSessionID(ctx); got != "" { + t.Errorf("cache should be cleared after session_shutdown, got %q", got) + } +} + +func TestParseHookEvent_EmptyStdin(t *testing.T) { + t.Parallel() + a := &PiAgent{} + _, err := a.ParseHookEvent(context.Background(), HookNameSessionStart, strings.NewReader("")) + if err == nil { + t.Error("expected error on empty stdin") + } +} + +func TestParseHookEvent_UnknownHookYieldsNoEvent(t *testing.T) { + t.Parallel() + a := &PiAgent{} + // Anything not in HookNames() is treated as a no-op. + ev, err := a.ParseHookEvent(context.Background(), "some_unknown_event", strings.NewReader(`{"type":"unknown"}`)) + if err != nil { + t.Fatalf("err: %v", err) + } + if ev != nil { + t.Errorf("expected nil event for unknown hook, got %+v", ev) + } +} + +func TestExtractSessionIDFromPath(t *testing.T) { + t.Parallel() + cases := map[string]string{ + "": "", + "/tmp/2026-05-09T12-00-00-000Z_abc-123.jsonl": "abc-123", + "abc-123.jsonl": "abc-123", + "/tmp/no-underscore-here.jsonl": "no-underscore-here", + "/path/with/multiple_under_scores_id.jsonl": "id", + } + for in, want := range cases { + if got := extractSessionIDFromPath(in); got != want { + t.Errorf("extractSessionIDFromPath(%q) = %q, want %q", in, got, want) + } + } +} + +func TestSessionIDCacheRoundtrip(t *testing.T) { + // Cannot use t.Parallel — t.Chdir mutates process state. + dir := t.TempDir() + t.Chdir(dir) + + ctx := context.Background() + if got := readCachedSessionID(ctx); got != "" { + t.Errorf("expected empty cache initially, got %q", got) + } + cacheSessionID(ctx, "abc-123") + if got := readCachedSessionID(ctx); got != "abc-123" { + t.Errorf("readCachedSessionID = %q, want abc-123", got) + } + clearCachedSessionID(ctx) + if got := readCachedSessionID(ctx); got != "" { + t.Errorf("after clear, got %q", got) + } +} + +func TestCaptureTranscript(t *testing.T) { + // Cannot use t.Parallel — t.Chdir. + dir := t.TempDir() + t.Chdir(dir) + + src := filepath.Join(dir, "src.jsonl") + body := []byte(`{"type":"session","version":3}` + "\n") + if err := os.WriteFile(src, body, 0o600); err != nil { + t.Fatal(err) + } + + dst := captureTranscript(context.Background(), "abc-123", src) + if dst == "" { + t.Fatal("captureTranscript returned empty path") + } + got, err := os.ReadFile(dst) + if err != nil { + t.Fatal(err) + } + if string(got) != string(body) { + t.Errorf("captured content mismatch") + } + if !strings.HasSuffix(dst, "abc-123.json") { + t.Errorf("captured path = %q, expected .../abc-123.json", dst) + } +} + +func TestCaptureTranscript_MissingInputs(t *testing.T) { + // Cannot use t.Parallel — t.Chdir. + t.Chdir(t.TempDir()) + if got := captureTranscript(context.Background(), "", "/some/path"); got != "" { + t.Errorf("empty session id should return empty, got %q", got) + } + if got := captureTranscript(context.Background(), "abc", ""); got != "" { + t.Errorf("empty session file should return empty, got %q", got) + } +} + +func TestGetSupportedHooks(t *testing.T) { + t.Parallel() + got := (&PiAgent{}).GetSupportedHooks() + // Note: session_shutdown is cleanup-only, not a HookSessionEnd source — + // see ParseHookEvent's session_shutdown case for why. + want := []agent.HookType{ + agent.HookSessionStart, + agent.HookUserPromptSubmit, + agent.HookStop, + } + if len(got) != len(want) { + t.Fatalf("got %d hooks, want %d", len(got), len(want)) + } + for i := range want { + if got[i] != want[i] { + t.Errorf("hook[%d] = %q, want %q", i, got[i], want[i]) + } + } +} + +func TestHookNamesMatchesParser(t *testing.T) { + t.Parallel() + a := &PiAgent{} + for _, name := range a.HookNames() { + // All named hooks must accept a minimal payload without erroring. + _, err := a.ParseHookEvent(context.Background(), name, strings.NewReader(`{}`)) + if err != nil { + t.Errorf("ParseHookEvent(%q): %v", name, err) + } + } +} diff --git a/cmd/entire/cli/agent/pi/pi.go b/cmd/entire/cli/agent/pi/pi.go new file mode 100644 index 0000000000..604f17eef2 --- /dev/null +++ b/cmd/entire/cli/agent/pi/pi.go @@ -0,0 +1,285 @@ +// Package pi implements the Agent interface for the pi coding agent +// (https://github.com/earendil-works/pi-mono). +// The npm package the embedded extension imports a type from is +// `@earendil-works/pi-coding-agent`. +// +// This is an in-tree port of the previously-external entire-agent-pi plugin +// (github.com/entireio/external-agents/agents/entire-agent-pi). The behaviour +// matches the external version — most notably the active-branch resolution +// for Pi's tree-shaped sessions — but the integration is plumbed directly +// through the in-tree Agent / HookSupport / TokenCalculator / TranscriptAnalyzer +// interfaces rather than the external JSON-over-stdio protocol. +package pi + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/types" + "github.com/entireio/cli/cmd/entire/cli/paths" +) + +// piHomeEnvVar overrides the default Pi home directory (~/.pi/agent). +// Pi itself reads this variable, so honoring it keeps Entire and Pi in +// agreement when a developer points Pi at a non-default home. +const piHomeEnvVar = "PI_CODING_AGENT_DIR" + +// piSessionDirEnvVar lets tests redirect Pi's session lookup without +// touching the real ~/.pi/agent. Mirrors ENTIRE_TEST__SESSION_DIR +// used by Codex. +const piSessionDirEnvVar = "ENTIRE_TEST_PI_SESSION_DIR" + +//nolint:gochecknoinits // Agent self-registration is the intended pattern +func init() { + agent.Register(agent.AgentNamePi, NewPiAgent) +} + +// PiAgent implements agent.Agent for the pi coding agent. +// +//nolint:revive // PiAgent is clearer than Agent in this context +type PiAgent struct{} + +// NewPiAgent returns a new Pi agent instance. +func NewPiAgent() agent.Agent { + return &PiAgent{} +} + +// --- Identity --- + +func (a *PiAgent) Name() types.AgentName { return agent.AgentNamePi } +func (a *PiAgent) Type() types.AgentType { return agent.AgentTypePi } +func (a *PiAgent) Description() string { return "Pi coding agent integration for Entire" } +func (a *PiAgent) IsPreview() bool { return true } +func (a *PiAgent) ProtectedDirs() []string { return []string{".pi"} } +func (a *PiAgent) ProtectedFiles() []string { return nil } + +// DetectPresence reports whether pi is configured for *this repo*. We only +// check repo-local config (.pi/) and intentionally ignore $PATH — in-tree +// agents follow the convention used by Claude/Gemini/OpenCode where +// detection means "this repo is set up for this agent", not "this agent is +// installed somewhere on this machine". The external plugin uses the broader +// $PATH check because it can't see repo state; we don't have that limitation. +func (a *PiAgent) DetectPresence(ctx context.Context) (bool, error) { + repoRoot, err := paths.WorktreeRoot(ctx) + if err != nil { + repoRoot = "." + } + if _, err := os.Stat(filepath.Join(repoRoot, ".pi")); err == nil { + return true, nil + } + return false, nil +} + +// --- Transcript Storage (chunking) --- + +// ReadTranscript reads a captured Pi JSONL session transcript from disk. +// SessionRef is the absolute path returned by captureTranscript(). +func (a *PiAgent) ReadTranscript(sessionRef string) ([]byte, error) { + if sessionRef == "" { + return nil, errors.New("empty session ref") + } + //nolint:gosec // SessionRef from validated lifecycle hook input + data, err := os.ReadFile(sessionRef) + if err != nil { + return nil, fmt.Errorf("read pi transcript %s: %w", sessionRef, err) + } + return data, nil +} + +// ChunkTranscript splits a Pi JSONL transcript at line boundaries. +func (a *PiAgent) ChunkTranscript(_ context.Context, content []byte, maxSize int) ([][]byte, error) { + chunks, err := agent.ChunkJSONL(content, maxSize) + if err != nil { + return nil, fmt.Errorf("chunk pi transcript: %w", err) + } + return chunks, nil +} + +// ReassembleTranscript concatenates JSONL chunks with newlines. +func (a *PiAgent) ReassembleTranscript(chunks [][]byte) ([]byte, error) { + return agent.ReassembleJSONL(chunks), nil +} + +// --- Legacy methods --- + +// GetSessionID extracts the session ID from a hook input. +func (a *PiAgent) GetSessionID(input *agent.HookInput) string { + if input == nil { + return "" + } + return input.SessionID +} + +// GetSessionDir returns the directory where Pi natively stores session +// transcripts for repoPath: /sessions//. +// +// Pointing this at the native store (rather than the per-repo +// .entire/tmp/pi/ cache populated by the agent_end hook) is what lets +// `entire session attach ` resolve cold sessions — sessions that +// were never hooked, or whose hook capture failed. attach falls through +// to GetSessionDir + ResolveSessionFile when no SessionRef is recorded +// in metadata, and the live Pi store is the only place that always has +// the transcript on disk. +// +// Resolution order: +// 1. ENTIRE_TEST_PI_SESSION_DIR (test override; no encoding applied) +// 2. PI_CODING_AGENT_DIR (Pi's own override; encoding still applies) +// 3. ~/.pi/agent (default) +// +// The .entire/tmp/pi/ cache stays as a hook-internal detail — +// captureTranscript writes there and the TurnEnd event records that +// path as SessionRef in checkpoint metadata, so subsequent operations +// on hooked sessions go through the recorded SessionRef and never call +// GetSessionDir. +func (a *PiAgent) GetSessionDir(repoPath string) (string, error) { + if override := os.Getenv(piSessionDirEnvVar); override != "" { + return override, nil + } + home, err := resolvePiHome() + if err != nil { + return "", err + } + return filepath.Join(home, "sessions", encodeRepoPathForPi(repoPath)), nil +} + +// GetSessionBaseDir returns the base directory containing per-project +// session subdirectories. Used by attach's cross-project fallback +// (searchTranscriptInProjectDirs) when a session was started from a +// different cwd than the current worktree root. +func (a *PiAgent) GetSessionBaseDir() (string, error) { + home, err := resolvePiHome() + if err != nil { + return "", err + } + return filepath.Join(home, "sessions"), nil +} + +// ResolveSessionFile returns the path to the Pi session file for +// agentSessionID in sessionDir. Pi names files _.jsonl, +// so glob for the matching ID; on multiple matches the lexicographically +// latest (most recent timestamp) wins. +// +// Absolute paths pass through unchanged so hook payloads carrying live +// pi paths work without re-resolution. When no match exists, fall back +// to a deterministic non-existent path so downstream stat checks fail +// cleanly rather than panicking on an empty path. +func (a *PiAgent) ResolveSessionFile(sessionDir, agentSessionID string) string { + if filepath.IsAbs(agentSessionID) { + return agentSessionID + } + if path := findPiSessionByID(sessionDir, agentSessionID); path != "" { + return path + } + if sessionDir == "" { + return agentSessionID + } + return filepath.Join(sessionDir, agentSessionID+".jsonl") +} + +// resolvePiHome returns Pi's home directory: $PI_CODING_AGENT_DIR or +// ~/.pi/agent. +func resolvePiHome() (string, error) { + if dir := os.Getenv(piHomeEnvVar); dir != "" { + return dir, nil + } + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("resolve user home: %w", err) + } + return filepath.Join(home, ".pi", "agent"), nil +} + +// encodeRepoPathForPi encodes an absolute repo path into Pi's +// session-directory naming scheme: every path separator becomes '-', +// wrapped with '--' delimiters. /Users/foo/repo encodes as +// --Users-foo-repo--. Leading and trailing separators are absorbed so +// /a/b/ and /a/b encode the same way. Empty input returns "". +// +// Both '/' and '\\' are replaced regardless of host OS: on Windows, +// git rev-parse --show-toplevel returns forward slashes, but native +// Windows APIs use backslashes — a host-only replacement would leak +// the other form through and produce nested directories instead of a +// single name, breaking session lookup. filepath.ToSlash isn't enough +// because it only normalises the host's separator. +func encodeRepoPathForPi(repoPath string) string { + if repoPath == "" { + return "" + } + body := strings.NewReplacer("/", "-", `\`, "-").Replace(repoPath) + body = strings.Trim(body, "-") + return "--" + body + "--" +} + +// findPiSessionByID globs sessionDir for *_.jsonl. Returns +// the lexicographically latest match (most recent timestamp) or "" when +// no match exists or sessionDir/sessionID is empty. +func findPiSessionByID(sessionDir, sessionID string) string { + if sessionDir == "" || sessionID == "" { + return "" + } + matches, err := filepath.Glob(filepath.Join(sessionDir, "*_"+sessionID+".jsonl")) + if err != nil || len(matches) == 0 { + return "" + } + sort.Strings(matches) + return matches[len(matches)-1] +} + +// ReadSession loads a captured Pi transcript and returns it as an AgentSession. +func (a *PiAgent) ReadSession(input *agent.HookInput) (*agent.AgentSession, error) { + if input == nil || input.SessionRef == "" { + return nil, errors.New("no session ref provided") + } + + data, err := os.ReadFile(input.SessionRef) + if err != nil { + return nil, fmt.Errorf("read pi session: %w", err) + } + return &agent.AgentSession{ + AgentName: a.Name(), + SessionID: input.SessionID, + SessionRef: input.SessionRef, + NativeData: data, + }, nil +} + +// WriteSession writes a captured Pi transcript back to disk so Pi can resume +// from it. Pi loads sessions from arbitrary paths via `pi --session `, +// so a plain write is sufficient. +func (a *PiAgent) WriteSession(_ context.Context, session *agent.AgentSession) error { + if session == nil { + return errors.New("nil session") + } + if session.SessionRef == "" { + return errors.New("session has empty SessionRef") + } + if len(session.NativeData) == 0 { + return errors.New("session has empty NativeData") + } + if err := os.MkdirAll(filepath.Dir(session.SessionRef), 0o750); err != nil { + return fmt.Errorf("create pi session dir: %w", err) + } + + if err := os.WriteFile(session.SessionRef, session.NativeData, 0o600); err != nil { + return fmt.Errorf("write pi session file: %w", err) + } + return nil +} + +// FormatResumeCommand returns the shell command to resume a specific Pi +// session by ID. Pi accepts a partial UUID via `pi --session `. When no +// session is specified, fall back to `pi --continue` which reopens the most +// recent session. +func (a *PiAgent) FormatResumeCommand(sessionID string) string { + id := strings.TrimSpace(sessionID) + if id == "" { + return "pi --continue" + } + return "pi --session " + id +} diff --git a/cmd/entire/cli/agent/pi/pijsonl/pijsonl.go b/cmd/entire/cli/agent/pi/pijsonl/pijsonl.go new file mode 100644 index 0000000000..2600f3a1db --- /dev/null +++ b/cmd/entire/cli/agent/pi/pijsonl/pijsonl.go @@ -0,0 +1,181 @@ +// Package pijsonl provides shared parsing primitives for Pi's session JSONL +// format. It is consumed both by the in-tree pi agent (cmd/entire/cli/agent/pi) +// and by the v2 compact-transcript dispatcher +// (cmd/entire/cli/transcript/compact). Keeping these in one place ensures +// active-branch resolution, line counting, and offset slicing stay byte- +// compatible across both call sites. +package pijsonl + +import ( + "bufio" + "bytes" + "encoding/json" +) + +// EntryTypeMessage is the JSONL `type` value for conversational entries. +const EntryTypeMessage = "message" + +// Role values present on Message.Role. +const ( + RoleUser = "user" + RoleAssistant = "assistant" + RoleToolResult = "toolResult" +) + +// ContentTypeText is the content-block `type` value for text blocks. +const ContentTypeText = "text" + +// MaxScannerLine is the maximum size of a single JSONL line we will parse. +// Pi tool calls can embed full file contents in arguments — 10 MB matches +// what other in-tree transcript scanners use (codex, copilot, droid). +const MaxScannerLine = 10 * 1024 * 1024 + +// NewScanner returns a bufio.Scanner pre-configured with the Pi line-size limit. +func NewScanner(data []byte) *bufio.Scanner { + s := bufio.NewScanner(bytes.NewReader(data)) + s.Buffer(make([]byte, 0, MaxScannerLine), MaxScannerLine) + return s +} + +// CountLines returns the number of newline-terminated (or final unterminated) +// lines in data, including blank lines. Matches the offset semantics used by +// SkipLines and the compact-format StartLine. +func CountLines(data []byte) int { + if len(data) == 0 { + return 0 + } + n := bytes.Count(data, []byte{'\n'}) + if data[len(data)-1] != '\n' { + n++ + } + return n +} + +// SkipLines returns data with the first n newline-terminated lines removed. +// Returns nil if data has fewer than n lines. +func SkipLines(data []byte, n int) []byte { + if n <= 0 { + return data + } + off := 0 + for i := 0; i < n && off < len(data); i++ { + idx := bytes.IndexByte(data[off:], '\n') + if idx < 0 { + return nil + } + off += idx + 1 + } + return data[off:] +} + +// Entry is the outer shell of one Pi JSONL line. +type Entry struct { + Type string `json:"type"` + ID string `json:"id"` + Timestamp string `json:"timestamp"` + Message Message `json:"message"` +} + +// Message is the inner Pi `message` object on entries with type=="message". +type Message struct { + Role string `json:"role"` + Content json.RawMessage `json:"content"` + Usage *Usage `json:"usage,omitempty"` + StopReason string `json:"stopReason,omitempty"` + ToolCallID string `json:"toolCallId,omitempty"` + ToolName string `json:"toolName,omitempty"` + IsError bool `json:"isError,omitempty"` +} + +// Usage mirrors pi-ai's Usage struct (token-count fields only). +type Usage struct { + Input int `json:"input"` + Output int `json:"output"` + CacheRead int `json:"cacheRead"` + CacheWrite int `json:"cacheWrite"` +} + +// ContentItem is one element of a Pi message's content array. +type ContentItem struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Name string `json:"name,omitempty"` + ID string `json:"id,omitempty"` + Arguments json.RawMessage `json:"arguments,omitempty"` +} + +// ResolveActiveBranch walks a Pi transcript tree and returns the set of entry +// IDs on the active conversation branch (root → most-recent message). +// +// Pi sessions form a tree: every entry has id and parentId. When the user +// /forks or /clones mid-conversation, the JSONL file accumulates entries from +// BOTH branches. Without filtering, downstream analysis double-counts tokens, +// files, and prompts. +// +// IMPORTANT: callers must pass FULL transcript bytes, not bytes already +// truncated by SkipLines. Resolving over a truncated buffer yields a +// disconnected tree where parentId pointers no longer reach the root, causing +// abandoned-branch entries to leak in. +// +// Returns nil when the transcript has no tree structure (every entry has no +// parent or all entries are linear) — callers should treat nil as "all entries +// are on the active branch". +func ResolveActiveBranch(data []byte) map[string]bool { + type node struct { + Type string `json:"type"` + ID string `json:"id"` + ParentID *string `json:"parentId"` + } + + var lastMessageID string + hasTree := false + parentOf := make(map[string]string) + + scanner := NewScanner(data) + for scanner.Scan() { + var n node + if err := json.Unmarshal(scanner.Bytes(), &n); err != nil || n.ID == "" { + continue + } + if n.ParentID != nil { + parentOf[n.ID] = *n.ParentID + if *n.ParentID != "" { + hasTree = true + } + } + if n.Type == EntryTypeMessage { + lastMessageID = n.ID + } + } + + if !hasTree || lastMessageID == "" { + return nil + } + + active := make(map[string]bool) + for cur := lastMessageID; cur != ""; { + if active[cur] { + break // cycle protection + } + active[cur] = true + parent, ok := parentOf[cur] + if !ok { + break + } + cur = parent + } + return active +} + +// DecodeStringContent returns the raw string when content is a plain string, +// or "" when it's a JSON array (caller should decode as []ContentItem). +func DecodeStringContent(raw json.RawMessage) string { + if len(raw) == 0 { + return "" + } + var s string + if err := json.Unmarshal(raw, &s); err == nil { + return s + } + return "" +} diff --git a/cmd/entire/cli/agent/pi/pijsonl/pijsonl_test.go b/cmd/entire/cli/agent/pi/pijsonl/pijsonl_test.go new file mode 100644 index 0000000000..3ecccb0ccd --- /dev/null +++ b/cmd/entire/cli/agent/pi/pijsonl/pijsonl_test.go @@ -0,0 +1,128 @@ +package pijsonl + +import ( + "strings" + "testing" +) + +func TestResolveActiveBranch_LinearChain(t *testing.T) { + t.Parallel() + data := []byte(`{"type":"session","id":"s1"} +{"type":"model_change","id":"mc1","parentId":null} +{"type":"message","id":"m1","parentId":"mc1"} +{"type":"message","id":"m2","parentId":"m1"} +{"type":"message","id":"m3","parentId":"m2"} +`) + active := ResolveActiveBranch(data) + for _, id := range []string{"m3", "m2", "m1", "mc1"} { + if !active[id] { + t.Errorf("expected %q in active set", id) + } + } +} + +func TestResolveActiveBranch_FlatReturnsNil(t *testing.T) { + t.Parallel() + data := []byte(`{"type":"session","id":"s1"} +{"type":"message","id":"m1"} +{"type":"message","id":"m2"} +`) + if ResolveActiveBranch(data) != nil { + t.Error("expected nil for flat transcript (no parentId references)") + } +} + +func TestResolveActiveBranch_TwoBranchesPicksLast(t *testing.T) { + t.Parallel() + data := []byte(`{"type":"message","id":"a","parentId":"root"} +{"type":"message","id":"root","parentId":null} +{"type":"message","id":"b","parentId":"a"} +{"type":"message","id":"c","parentId":"a"} +`) + active := ResolveActiveBranch(data) + if !active["c"] || !active["a"] { + t.Errorf("expected c+a in active, got %v", active) + } + if active["b"] { + t.Error("b (abandoned) should not be in active set") + } +} + +func TestResolveActiveBranch_CycleProtection(t *testing.T) { + t.Parallel() + data := []byte(`{"type":"message","id":"a","parentId":"b"} +{"type":"message","id":"b","parentId":"a"} +`) + active := ResolveActiveBranch(data) + if !active["a"] || !active["b"] { + t.Errorf("active = %v, want both a and b (cycle terminates)", active) + } +} + +func TestSkipLines(t *testing.T) { + t.Parallel() + data := []byte("a\nb\nc\nd\n") + if got := string(SkipLines(data, 0)); got != "a\nb\nc\nd\n" { + t.Errorf("0: got %q", got) + } + if got := string(SkipLines(data, 2)); got != "c\nd\n" { + t.Errorf("2: got %q", got) + } + // At end of fully-terminated data, SkipLines returns the empty tail + // (not nil). nil is reserved for "data ran out mid-line" — see below. + if got := SkipLines(data, 4); len(got) != 0 { + t.Errorf("4 (exhaust): got %q, expected empty tail", got) + } + // With unterminated final line, asking for more lines than exist must + // return nil so callers can detect the underflow. + unterminated := []byte("a\nb") + if got := SkipLines(unterminated, 5); got != nil { + t.Errorf("unterminated past end: expected nil, got %q", got) + } +} + +func TestCountLines(t *testing.T) { + t.Parallel() + cases := map[string]int{ + "": 0, + "a\n": 1, + "a\nb\n": 2, + "a\nb": 2, // unterminated final line counted + "a\n\nb\n": 3, // blank line counted (offset semantics) + "\n": 1, + } + for in, want := range cases { + if got := CountLines([]byte(in)); got != want { + t.Errorf("CountLines(%q) = %d, want %d", in, got, want) + } + } +} + +func TestNewScanner_HandlesLargeLines(t *testing.T) { + t.Parallel() + // A 5MB JSONL line — well over the legacy 1MB cap, well under the new + // 10MB cap. Verifies the scanner doesn't choke on file-content-bearing + // toolCall arguments. + big := `{"type":"message","id":"x","message":{"role":"user","content":"` + + strings.Repeat("a", 5*1024*1024) + `"}}` + "\n" + scanner := NewScanner([]byte(big)) + if !scanner.Scan() { + t.Fatalf("scanner failed on large line: %v", scanner.Err()) + } + if len(scanner.Bytes()) < 5*1024*1024 { + t.Errorf("got truncated line: %d bytes", len(scanner.Bytes())) + } +} + +func TestDecodeStringContent(t *testing.T) { + t.Parallel() + if got := DecodeStringContent([]byte(`"hello"`)); got != "hello" { + t.Errorf("string content: got %q", got) + } + if got := DecodeStringContent([]byte(`[{"type":"text","text":"hi"}]`)); got != "" { + t.Errorf("array content should return empty: got %q", got) + } + if got := DecodeStringContent(nil); got != "" { + t.Errorf("nil: got %q", got) + } +} diff --git a/cmd/entire/cli/agent/pi/session_resolve_test.go b/cmd/entire/cli/agent/pi/session_resolve_test.go new file mode 100644 index 0000000000..8347eadd95 --- /dev/null +++ b/cmd/entire/cli/agent/pi/session_resolve_test.go @@ -0,0 +1,156 @@ +package pi + +import ( + "os" + "path/filepath" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +func TestEncodeRepoPathForPi(t *testing.T) { + t.Parallel() + tests := []struct { + in string + want string + }{ + {"/Users/foo/repo", "--Users-foo-repo--"}, + {"/Users/foo/repo/", "--Users-foo-repo--"}, // trailing separator stripped + {"/private/var/folders/2y/T/e2e-repo-1", "--private-var-folders-2y-T-e2e-repo-1--"}, + // Windows: git rev-parse --show-toplevel returns forward slashes + // regardless of platform — must encode the same way on every OS. + {`C:/Users/foo/repo`, `--C:-Users-foo-repo--`}, + {`C:/Users/foo/repo/`, `--C:-Users-foo-repo--`}, + // Native Windows separators (in case a caller passes them through). + {`C:\Users\foo\repo`, `--C:-Users-foo-repo--`}, + {`C:\Users\foo\repo\`, `--C:-Users-foo-repo--`}, + {"", ""}, + } + for _, tt := range tests { + got := encodeRepoPathForPi(tt.in) + if got != tt.want { + t.Errorf("encodeRepoPathForPi(%q) = %q, want %q", tt.in, got, tt.want) + } + } +} + +func TestGetSessionDir_HonorsTestOverride(t *testing.T) { + override := t.TempDir() + t.Setenv(piSessionDirEnvVar, override) + + dir, err := (&PiAgent{}).GetSessionDir("/Users/foo/repo") + if err != nil { + t.Fatal(err) + } + if dir != override { + t.Errorf("GetSessionDir = %q, want override %q", dir, override) + } +} + +func TestGetSessionDir_HonorsPiHomeOverride(t *testing.T) { + piHome := t.TempDir() + t.Setenv(piHomeEnvVar, piHome) + t.Setenv(piSessionDirEnvVar, "") + + dir, err := (&PiAgent{}).GetSessionDir("/Users/foo/repo") + if err != nil { + t.Fatal(err) + } + want := filepath.Join(piHome, "sessions", "--Users-foo-repo--") + if dir != want { + t.Errorf("GetSessionDir = %q, want %q", dir, want) + } +} + +func TestGetSessionBaseDir_HonorsPiHomeOverride(t *testing.T) { + piHome := t.TempDir() + t.Setenv(piHomeEnvVar, piHome) + + base, err := (&PiAgent{}).GetSessionBaseDir() + if err != nil { + t.Fatal(err) + } + want := filepath.Join(piHome, "sessions") + if base != want { + t.Errorf("GetSessionBaseDir = %q, want %q", base, want) + } +} + +// PiAgent must implement SessionBaseDirProvider so attach's +// cross-project fallback (searchTranscriptInProjectDirs) can scan +// sibling project subdirs of ~/.pi/agent/sessions/ when the session +// was started from a different cwd than the current worktree root. +func TestPiAgent_ImplementsSessionBaseDirProvider(t *testing.T) { + t.Parallel() + if _, ok := agent.AsSessionBaseDirProvider(NewPiAgent()); !ok { + t.Fatal("expected pi to implement SessionBaseDirProvider") + } +} + +func TestResolveSessionFile_AbsolutePathPassthrough(t *testing.T) { + t.Parallel() + abs := "/tmp/2026-01-01T00-00-00-000Z_abc123.jsonl" + got := (&PiAgent{}).ResolveSessionFile("/ignored", abs) + if got != abs { + t.Errorf("absolute path: got %q, want %q", got, abs) + } +} + +func TestResolveSessionFile_GlobsBySessionID(t *testing.T) { + t.Parallel() + dir := t.TempDir() + id := "sess-xyz" + + older := filepath.Join(dir, "2026-01-01T00-00-00-000Z_"+id+".jsonl") + newer := filepath.Join(dir, "2026-06-15T12-00-00-000Z_"+id+".jsonl") + for _, p := range []string{older, newer} { + if err := os.WriteFile(p, []byte("{}"), 0o600); err != nil { + t.Fatal(err) + } + } + + // Unrelated file with a different session ID — must not match. + unrelated := filepath.Join(dir, "2026-06-15T12-00-00-000Z_other.jsonl") + if err := os.WriteFile(unrelated, []byte("{}"), 0o600); err != nil { + t.Fatal(err) + } + + got := (&PiAgent{}).ResolveSessionFile(dir, id) + if got != newer { + t.Errorf("ResolveSessionFile picked %q, want most recent %q", got, newer) + } +} + +func TestResolveSessionFile_NoMatchFallback(t *testing.T) { + t.Parallel() + dir := t.TempDir() + id := "missing-id" + + got := (&PiAgent{}).ResolveSessionFile(dir, id) + want := filepath.Join(dir, id+".jsonl") + if got != want { + t.Errorf("ResolveSessionFile fallback = %q, want %q", got, want) + } + // The fallback must be a non-existent path so callers' stat checks fail cleanly. + if _, err := os.Stat(got); !os.IsNotExist(err) { + t.Errorf("expected stat to report not-exist for fallback path, got err=%v", err) + } +} + +func TestResolveSessionFile_NoMatch_NoSessionDir(t *testing.T) { + t.Parallel() + got := (&PiAgent{}).ResolveSessionFile("", "sess-xyz") + if got != "sess-xyz" { + t.Errorf("ResolveSessionFile with empty dir = %q, want %q", got, "sess-xyz") + } +} + +func TestFindPiSessionByID_EmptyInputs(t *testing.T) { + t.Parallel() + if got := findPiSessionByID("", "id"); got != "" { + t.Errorf("empty dir: got %q, want \"\"", got) + } + if got := findPiSessionByID(t.TempDir(), ""); got != "" { + t.Errorf("empty id: got %q, want \"\"", got) + } +} diff --git a/cmd/entire/cli/agent/pi/transcript.go b/cmd/entire/cli/agent/pi/transcript.go new file mode 100644 index 0000000000..4fd6857bc4 --- /dev/null +++ b/cmd/entire/cli/agent/pi/transcript.go @@ -0,0 +1,185 @@ +package pi + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/entireio/cli/cmd/entire/cli/agent" + "github.com/entireio/cli/cmd/entire/cli/agent/pi/pijsonl" +) + +// Compile-time interface assertions +var ( + _ agent.TokenCalculator = (*PiAgent)(nil) + _ agent.TranscriptAnalyzer = (*PiAgent)(nil) + _ agent.PromptExtractor = (*PiAgent)(nil) +) + +// CalculateTokenUsage sums per-assistant-message token usage from a Pi JSONL +// transcript starting at the given line offset. Only assistant messages on +// the active conversation branch contribute to the totals — see +// pijsonl.ResolveActiveBranch for the rationale. +func (a *PiAgent) CalculateTokenUsage(transcriptData []byte, fromOffset int) (*agent.TokenUsage, error) { + usage := &agent.TokenUsage{} + if len(transcriptData) == 0 { + return usage, nil + } + + // IMPORTANT: resolve active branch on FULL data before slicing — a + // truncated buffer breaks parentId chains and leaks abandoned branches in. + active := pijsonl.ResolveActiveBranch(transcriptData) + content := pijsonl.SkipLines(transcriptData, fromOffset) + + scanner := pijsonl.NewScanner(content) + for scanner.Scan() { + var entry pijsonl.Entry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + continue + } + if entry.Type != pijsonl.EntryTypeMessage || entry.Message.Role != pijsonl.RoleAssistant || entry.Message.Usage == nil { + continue + } + if active != nil && !active[entry.ID] { + continue + } + usage.InputTokens += entry.Message.Usage.Input + usage.OutputTokens += entry.Message.Usage.Output + usage.CacheReadTokens += entry.Message.Usage.CacheRead + usage.CacheCreationTokens += entry.Message.Usage.CacheWrite + usage.APICallCount++ + } + if err := scanner.Err(); err != nil { + return usage, fmt.Errorf("pi transcript scanner: %w", err) + } + return usage, nil +} + +// GetTranscriptPosition returns the JSONL line count of the file at path. +// Used by the strategy as the offset for incremental ExtractModifiedFiles +// calls. Missing files report 0 (consistent with Claude Code). +func (a *PiAgent) GetTranscriptPosition(path string) (int, error) { + if path == "" { + return 0, nil + } + //nolint:gosec // path from validated SessionRef set by lifecycle hooks + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + return 0, fmt.Errorf("read pi transcript: %w", err) + } + return pijsonl.CountLines(data), nil +} + +// ExtractModifiedFilesFromOffset scans Pi assistant tool calls from startOffset +// onward and returns file paths touched by file-modifying tools (`write`, +// `edit`). Branch-aware: only counts entries on the active conversation +// branch. +func (a *PiAgent) ExtractModifiedFilesFromOffset(path string, startOffset int) ([]string, int, error) { + if path == "" { + return nil, 0, nil + } + //nolint:gosec // path from validated SessionRef + data, err := os.ReadFile(path) + if err != nil { + return nil, 0, fmt.Errorf("read pi transcript: %w", err) + } + + totalLines := pijsonl.CountLines(data) + active := pijsonl.ResolveActiveBranch(data) + content := pijsonl.SkipLines(data, startOffset) + + seen := make(map[string]bool) + var files []string + + scanner := pijsonl.NewScanner(content) + for scanner.Scan() { + var entry pijsonl.Entry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + continue + } + if entry.Type != pijsonl.EntryTypeMessage || entry.Message.Role != pijsonl.RoleAssistant { + continue + } + if active != nil && !active[entry.ID] { + continue + } + var items []pijsonl.ContentItem + if err := json.Unmarshal(entry.Message.Content, &items); err != nil { + continue + } + for _, item := range items { + if item.Type != "toolCall" { + continue + } + if item.Name != "write" && item.Name != "edit" { + continue + } + var args struct { + Path string `json:"path"` + } + if err := json.Unmarshal(item.Arguments, &args); err != nil { + continue + } + if args.Path != "" && !seen[args.Path] { + seen[args.Path] = true + files = append(files, args.Path) + } + } + } + if err := scanner.Err(); err != nil { + return files, totalLines, fmt.Errorf("pi transcript scanner: %w", err) + } + return files, totalLines, nil +} + +// ExtractPrompts returns user-message text from the transcript starting at +// the given line offset. Branch-aware (drops abandoned-branch prompts). +func (a *PiAgent) ExtractPrompts(sessionRef string, fromOffset int) ([]string, error) { + if sessionRef == "" { + return nil, nil + } + //nolint:gosec // sessionRef from validated SessionRef + data, err := os.ReadFile(sessionRef) + if err != nil { + return nil, fmt.Errorf("read pi transcript: %w", err) + } + + active := pijsonl.ResolveActiveBranch(data) + content := pijsonl.SkipLines(data, fromOffset) + + var prompts []string + scanner := pijsonl.NewScanner(content) + for scanner.Scan() { + var entry pijsonl.Entry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + continue + } + if entry.Type != pijsonl.EntryTypeMessage || entry.Message.Role != pijsonl.RoleUser { + continue + } + if active != nil && !active[entry.ID] { + continue + } + // User content can be either a plain string or an array of typed blocks. + if text := pijsonl.DecodeStringContent(entry.Message.Content); text != "" { + prompts = append(prompts, text) + continue + } + var items []pijsonl.ContentItem + if err := json.Unmarshal(entry.Message.Content, &items); err != nil { + continue + } + for _, item := range items { + if item.Type == pijsonl.ContentTypeText && item.Text != "" { + prompts = append(prompts, item.Text) + } + } + } + if err := scanner.Err(); err != nil { + return prompts, fmt.Errorf("pi transcript scanner: %w", err) + } + return prompts, nil +} diff --git a/cmd/entire/cli/agent/pi/transcript_test.go b/cmd/entire/cli/agent/pi/transcript_test.go new file mode 100644 index 0000000000..f51024c52f --- /dev/null +++ b/cmd/entire/cli/agent/pi/transcript_test.go @@ -0,0 +1,368 @@ +package pi + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/agent" +) + +// Compile-time interface assertions +var ( + _ agent.Agent = (*PiAgent)(nil) + _ agent.HookSupport = (*PiAgent)(nil) + _ agent.TokenCalculator = (*PiAgent)(nil) + _ agent.TranscriptAnalyzer = (*PiAgent)(nil) + _ agent.PromptExtractor = (*PiAgent)(nil) +) + +// testSessionJSONL — linear session: header + model_change + 4 messages. +const testSessionJSONL = `{"type":"session","version":3,"id":"test-uuid-123","timestamp":"2026-03-27T21:00:00.000Z","cwd":"/tmp/test"} +{"type":"model_change","id":"mc1","parentId":null,"timestamp":"2026-03-27T21:00:00.001Z","provider":"anthropic","modelId":"claude-sonnet-4-6"} +{"type":"message","id":"m1","parentId":"mc1","timestamp":"2026-03-27T21:00:01.000Z","message":{"role":"user","content":[{"type":"text","text":"Create hello.txt"}],"timestamp":1774646400000}} +{"type":"message","id":"m2","parentId":"m1","timestamp":"2026-03-27T21:00:02.000Z","message":{"role":"assistant","content":[{"type":"toolCall","id":"tc1","name":"write","arguments":{"path":"hello.txt","content":"hello world\n"}}],"usage":{"input":100,"output":50,"cacheRead":10,"cacheWrite":5},"stopReason":"toolUse","timestamp":1774646401000}} +{"type":"message","id":"m3","parentId":"m2","timestamp":"2026-03-27T21:00:03.000Z","message":{"role":"toolResult","toolCallId":"tc1","toolName":"write","content":[{"type":"text","text":"Written 12 bytes"}],"isError":false,"timestamp":1774646402000}} +{"type":"message","id":"m4","parentId":"m3","timestamp":"2026-03-27T21:00:04.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Created hello.txt with the content hello world."}],"usage":{"input":200,"output":30,"cacheRead":0,"cacheWrite":0},"stopReason":"stop","timestamp":1774646403000}} +` + +// testBranchingSessionJSONL — two branches from m1; m5 (active) wins because it's the last message. +const testBranchingSessionJSONL = `{"type":"session","version":3,"id":"test-branch-123","timestamp":"2026-03-27T22:00:00.000Z","cwd":"/tmp/test"} +{"type":"model_change","id":"mc1","parentId":null,"timestamp":"2026-03-27T22:00:00.001Z","provider":"anthropic","modelId":"claude-sonnet-4-6"} +{"type":"message","id":"m1","parentId":"mc1","timestamp":"2026-03-27T22:00:01.000Z","message":{"role":"user","content":[{"type":"text","text":"Create a file"}],"timestamp":1774650000000}} +{"type":"message","id":"m2","parentId":"m1","timestamp":"2026-03-27T22:00:02.000Z","message":{"role":"assistant","content":[{"type":"toolCall","id":"tc1","name":"write","arguments":{"path":"old.txt","content":"old\n"}}],"usage":{"input":100,"output":50,"cacheRead":0,"cacheWrite":0},"stopReason":"toolUse","timestamp":1774650001000}} +{"type":"message","id":"m3","parentId":"m2","timestamp":"2026-03-27T22:00:03.000Z","message":{"role":"toolResult","toolCallId":"tc1","toolName":"write","content":[{"type":"text","text":"Written 4 bytes"}],"isError":false,"timestamp":1774650002000}} +{"type":"message","id":"m4","parentId":"m3","timestamp":"2026-03-27T22:00:04.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Created old.txt"}],"usage":{"input":200,"output":30,"cacheRead":0,"cacheWrite":0},"stopReason":"stop","timestamp":1774650003000}} +{"type":"message","id":"m5","parentId":"m1","timestamp":"2026-03-27T22:00:05.000Z","message":{"role":"assistant","content":[{"type":"toolCall","id":"tc2","name":"write","arguments":{"path":"new.txt","content":"new\n"}}],"usage":{"input":150,"output":60,"cacheRead":5,"cacheWrite":3},"stopReason":"toolUse","timestamp":1774650004000}} +{"type":"message","id":"m6","parentId":"m5","timestamp":"2026-03-27T22:00:06.000Z","message":{"role":"toolResult","toolCallId":"tc2","toolName":"write","content":[{"type":"text","text":"Written 4 bytes"}],"isError":false,"timestamp":1774650005000}} +{"type":"message","id":"m7","parentId":"m6","timestamp":"2026-03-27T22:00:07.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Created new.txt"}],"usage":{"input":250,"output":40,"cacheRead":0,"cacheWrite":0},"stopReason":"stop","timestamp":1774650006000}} +` + +// testFlatSessionJSONL — no parentId references, flat structure. +const testFlatSessionJSONL = `{"type":"session","id":"flat-123"} +{"type":"message","id":"m1","message":{"role":"user","content":[{"type":"text","text":"hello"}]}} +{"type":"message","id":"m2","message":{"role":"assistant","content":[{"type":"text","text":"hi"}],"usage":{"input":10,"output":5,"cacheRead":0,"cacheWrite":0}}} +` + +func writeJSONL(t *testing.T, name, content string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, name) + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + t.Fatal(err) + } + return path +} + +func writeTestSession(t *testing.T) string { + t.Helper() + return writeJSONL(t, "2026-03-27T21-00-00-000Z_test-uuid-123.jsonl", testSessionJSONL) +} + +func writeBranchingSession(t *testing.T) string { + t.Helper() + return writeJSONL(t, "2026-03-27T22-00-00-000Z_test-branch-123.jsonl", testBranchingSessionJSONL) +} + +// --- ExtractModifiedFilesFromOffset --- + +func TestExtractModifiedFiles(t *testing.T) { + t.Parallel() + path := writeTestSession(t) + files, pos, err := (&PiAgent{}).ExtractModifiedFilesFromOffset(path, 0) + if err != nil { + t.Fatal(err) + } + if len(files) != 1 || files[0] != "hello.txt" { + t.Errorf("files = %v, want [hello.txt]", files) + } + if pos == 0 { + t.Error("position should be > 0") + } +} + +func TestExtractModifiedFiles_OffsetPastEnd(t *testing.T) { + t.Parallel() + path := writeTestSession(t) + files, _, err := (&PiAgent{}).ExtractModifiedFilesFromOffset(path, 100) + if err != nil { + t.Fatal(err) + } + if len(files) != 0 { + t.Errorf("files = %v, want empty", files) + } +} + +func TestExtractModifiedFiles_Branching(t *testing.T) { + t.Parallel() + path := writeBranchingSession(t) + files, _, err := (&PiAgent{}).ExtractModifiedFilesFromOffset(path, 0) + if err != nil { + t.Fatal(err) + } + if len(files) != 1 || files[0] != "new.txt" { + t.Errorf("files = %v, want [new.txt] (only active branch)", files) + } +} + +// --- ExtractPrompts --- + +func TestExtractPrompts(t *testing.T) { + t.Parallel() + path := writeTestSession(t) + prompts, err := (&PiAgent{}).ExtractPrompts(path, 0) + if err != nil { + t.Fatal(err) + } + if len(prompts) != 1 || prompts[0] != "Create hello.txt" { + t.Errorf("prompts = %v, want [Create hello.txt]", prompts) + } +} + +func TestExtractPrompts_Branching(t *testing.T) { + t.Parallel() + path := writeBranchingSession(t) + prompts, err := (&PiAgent{}).ExtractPrompts(path, 0) + if err != nil { + t.Fatal(err) + } + if len(prompts) != 1 || prompts[0] != "Create a file" { + t.Errorf("prompts = %v, want [Create a file]", prompts) + } +} + +// --- GetTranscriptPosition --- + +func TestGetTranscriptPosition(t *testing.T) { + t.Parallel() + path := writeTestSession(t) + pos, err := (&PiAgent{}).GetTranscriptPosition(path) + if err != nil { + t.Fatal(err) + } + if pos != 6 { + t.Errorf("position = %d, want 6", pos) + } +} + +func TestGetTranscriptPosition_Missing(t *testing.T) { + t.Parallel() + pos, err := (&PiAgent{}).GetTranscriptPosition("/nonexistent/file.jsonl") + if err != nil { + t.Fatalf("expected no error for missing file, got: %v", err) + } + if pos != 0 { + t.Errorf("position = %d, want 0", pos) + } +} + +// --- CalculateTokenUsage --- + +func TestCalculateTokenUsage(t *testing.T) { + t.Parallel() + usage, err := (&PiAgent{}).CalculateTokenUsage([]byte(testSessionJSONL), 0) + if err != nil { + t.Fatal(err) + } + if usage.InputTokens != 300 { + t.Errorf("InputTokens = %d, want 300", usage.InputTokens) + } + if usage.OutputTokens != 80 { + t.Errorf("OutputTokens = %d, want 80", usage.OutputTokens) + } + if usage.CacheReadTokens != 10 { + t.Errorf("CacheReadTokens = %d, want 10", usage.CacheReadTokens) + } + if usage.CacheCreationTokens != 5 { + t.Errorf("CacheCreationTokens = %d, want 5", usage.CacheCreationTokens) + } + if usage.APICallCount != 2 { + t.Errorf("APICallCount = %d, want 2", usage.APICallCount) + } +} + +func TestCalculateTokenUsage_Branching(t *testing.T) { + t.Parallel() + usage, err := (&PiAgent{}).CalculateTokenUsage([]byte(testBranchingSessionJSONL), 0) + if err != nil { + t.Fatal(err) + } + // Active branch: m5(input=150,output=60,cacheRead=5,cacheWrite=3) + // m7(input=250,output=40,cacheRead=0,cacheWrite=0) + if usage.InputTokens != 400 { + t.Errorf("InputTokens = %d, want 400", usage.InputTokens) + } + if usage.OutputTokens != 100 { + t.Errorf("OutputTokens = %d, want 100", usage.OutputTokens) + } + if usage.APICallCount != 2 { + t.Errorf("APICallCount = %d, want 2", usage.APICallCount) + } +} + +func TestCalculateTokenUsage_OffsetPastEnd(t *testing.T) { + t.Parallel() + usage, err := (&PiAgent{}).CalculateTokenUsage([]byte(testSessionJSONL), 100) + if err != nil { + t.Fatal(err) + } + if usage.APICallCount != 0 { + t.Errorf("expected 0 API calls past end, got %+v", usage) + } +} + +func TestCalculateTokenUsage_FlatTranscript(t *testing.T) { + t.Parallel() + usage, err := (&PiAgent{}).CalculateTokenUsage([]byte(testFlatSessionJSONL), 0) + if err != nil { + t.Fatal(err) + } + if usage.InputTokens != 10 || usage.OutputTokens != 5 || usage.APICallCount != 1 { + t.Errorf("flat: got %+v, want input=10 output=5 calls=1", usage) + } +} + +// Note: pijsonl.ResolveActiveBranch unit tests live in the pijsonl package +// itself; the in-tree tests here verify the agent surface (CalculateTokenUsage, +// ExtractModifiedFilesFromOffset, ExtractPrompts) honours active-branch +// filtering end-to-end. + +// --- ReadSession / WriteSession --- + +func TestReadSession(t *testing.T) { + t.Parallel() + path := writeTestSession(t) + s, err := (&PiAgent{}).ReadSession(&agent.HookInput{ + SessionID: "test-uuid-123", + SessionRef: path, + }) + if err != nil { + t.Fatal(err) + } + if s.SessionID != "test-uuid-123" { + t.Errorf("SessionID = %q", s.SessionID) + } + if s.AgentName != agent.AgentNamePi { + t.Errorf("AgentName = %q", s.AgentName) + } + if len(s.NativeData) == 0 { + t.Error("NativeData should not be empty") + } +} + +func TestWriteSession_RoundTrip(t *testing.T) { + t.Parallel() + dir := t.TempDir() + path := filepath.Join(dir, "subdir", "session.json") + body := []byte(`{"type":"session","version":3}` + "\n") + err := (&PiAgent{}).WriteSession(context.Background(), &agent.AgentSession{ + SessionID: "abc", + SessionRef: path, + NativeData: body, + }) + if err != nil { + t.Fatal(err) + } + got, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + if string(got) != string(body) { + t.Errorf("written body mismatch") + } +} + +// --- ChunkTranscript / ReassembleTranscript --- + +func TestChunkAndReassemble(t *testing.T) { + t.Parallel() + body := []byte(strings.Repeat(`{"type":"message"}`+"\n", 50)) + chunks, err := (&PiAgent{}).ChunkTranscript(context.Background(), body, 200) + if err != nil { + t.Fatal(err) + } + if len(chunks) < 2 { + t.Errorf("expected multiple chunks, got %d", len(chunks)) + } + reassembled, err := (&PiAgent{}).ReassembleTranscript(chunks) + if err != nil { + t.Fatal(err) + } + if string(reassembled) != string(body) { + t.Error("reassembled bytes differ from original") + } +} + +// --- format/identity --- + +func TestSelfRegistered(t *testing.T) { + t.Parallel() + a, err := agent.Get(agent.AgentNamePi) + if err != nil { + t.Fatalf("agent.Get(pi): %v", err) + } + if a.Name() != agent.AgentNamePi { + t.Errorf("Name() = %q", a.Name()) + } + if a.Type() != agent.AgentTypePi { + t.Errorf("Type() = %q", a.Type()) + } +} + +func TestProtectedDirsContainsDotPi(t *testing.T) { + t.Parallel() + dirs := (&PiAgent{}).ProtectedDirs() + for _, d := range dirs { + if d == ".pi" { + return + } + } + t.Errorf(".pi missing from ProtectedDirs: %v", dirs) +} + +func TestFormatResumeCommand(t *testing.T) { + t.Parallel() + a := &PiAgent{} + const piContinue = "pi --continue" + if got := a.FormatResumeCommand(""); got != piContinue { + t.Errorf("FormatResumeCommand(empty) = %q, want %q", got, piContinue) + } + if got := a.FormatResumeCommand(" "); got != piContinue { + t.Errorf("FormatResumeCommand(whitespace) = %q, want %q", got, piContinue) + } + if got, want := a.FormatResumeCommand("abc-123"), "pi --session abc-123"; got != want { + t.Errorf("FormatResumeCommand(id) = %q, want %q", got, want) + } +} + +// --- DetectPresence --- + +func TestDetectPresence_NoPiDir(t *testing.T) { + // Cannot use t.Parallel — t.Chdir mutates process state. + t.Chdir(t.TempDir()) + present, err := (&PiAgent{}).DetectPresence(context.Background()) + if err != nil { + t.Fatal(err) + } + if present { + t.Error("DetectPresence should be false when no .pi/ directory exists") + } +} + +func TestDetectPresence_WithPiDir(t *testing.T) { + // Cannot use t.Parallel — t.Chdir mutates process state. + dir := t.TempDir() + if err := os.MkdirAll(filepath.Join(dir, ".pi"), 0o755); err != nil { + t.Fatal(err) + } + t.Chdir(dir) + present, err := (&PiAgent{}).DetectPresence(context.Background()) + if err != nil { + t.Fatal(err) + } + if !present { + t.Error("DetectPresence should be true when .pi/ exists in repo") + } +} diff --git a/cmd/entire/cli/agent/registry.go b/cmd/entire/cli/agent/registry.go index 425cbc1a8e..ae417665d7 100644 --- a/cmd/entire/cli/agent/registry.go +++ b/cmd/entire/cli/agent/registry.go @@ -172,6 +172,7 @@ const ( AgentNameFactoryAIDroid types.AgentName = "factoryai-droid" AgentNameGemini types.AgentName = "gemini" AgentNameOpenCode types.AgentName = "opencode" + AgentNamePi types.AgentName = "pi" ) // Agent type constants (type identifiers stored in metadata/trailers) @@ -183,6 +184,7 @@ const ( AgentTypeFactoryAIDroid types.AgentType = "Factory AI Droid" AgentTypeGemini types.AgentType = "Gemini CLI" AgentTypeOpenCode types.AgentType = "OpenCode" + AgentTypePi types.AgentType = "Pi" AgentTypeUnknown types.AgentType = "Unknown" ) diff --git a/cmd/entire/cli/api/client.go b/cmd/entire/cli/api/client.go index 5fe7a6b012..9be47f45ed 100644 --- a/cmd/entire/cli/api/client.go +++ b/cmd/entire/cli/api/client.go @@ -59,7 +59,16 @@ func (t *bearerTransport) RoundTrip(req *http.Request) (*http.Response, error) { // Get sends an authenticated GET request to the given API-relative path. func (c *Client) Get(ctx context.Context, path string) (*http.Response, error) { - return c.do(ctx, http.MethodGet, path, nil) + return c.do(ctx, http.MethodGet, path, nil, nil) +} + +// GetStream sends an authenticated GET request with optional extra request +// headers (e.g. Accept: text/event-stream, Last-Event-ID) and returns the +// response with the body still open. Callers are responsible for reading and +// closing resp.Body. Intended for streaming endpoints such as Server-Sent +// Events; for normal JSON requests use Get. +func (c *Client) GetStream(ctx context.Context, path string, headers http.Header) (*http.Response, error) { + return c.do(ctx, http.MethodGet, path, nil, headers) } // Post sends an authenticated POST request with a JSON body to the given API-relative path. @@ -72,7 +81,7 @@ func (c *Client) Post(ctx context.Context, path string, body any) (*http.Respons } reader = bytes.NewReader(data) } - return c.do(ctx, http.MethodPost, path, reader) + return c.do(ctx, http.MethodPost, path, reader, nil) } // Put sends an authenticated PUT request with a JSON body to the given API-relative path. @@ -85,7 +94,7 @@ func (c *Client) Put(ctx context.Context, path string, body any) (*http.Response } reader = bytes.NewReader(data) } - return c.do(ctx, http.MethodPut, path, reader) + return c.do(ctx, http.MethodPut, path, reader, nil) } // Patch sends an authenticated PATCH request with a JSON body to the given API-relative path. @@ -98,15 +107,15 @@ func (c *Client) Patch(ctx context.Context, path string, body any) (*http.Respon } reader = bytes.NewReader(data) } - return c.do(ctx, http.MethodPatch, path, reader) + return c.do(ctx, http.MethodPatch, path, reader, nil) } // Delete sends an authenticated DELETE request to the given API-relative path. func (c *Client) Delete(ctx context.Context, path string) (*http.Response, error) { - return c.do(ctx, http.MethodDelete, path, nil) + return c.do(ctx, http.MethodDelete, path, nil, nil) } -func (c *Client) do(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) { +func (c *Client) do(ctx context.Context, method, path string, body io.Reader, headers http.Header) (*http.Response, error) { endpoint, err := ResolveURLFromBase(c.baseURL, path) if err != nil { return nil, fmt.Errorf("resolve URL %s: %w", path, err) @@ -117,6 +126,12 @@ func (c *Client) do(ctx context.Context, method, path string, body io.Reader) (* return nil, fmt.Errorf("create request: %w", err) } + for k, vs := range headers { + for _, v := range vs { + req.Header.Add(k, v) + } + } + if body != nil { req.Header.Set("Content-Type", "application/json") } diff --git a/cmd/entire/cli/hooks_cmd.go b/cmd/entire/cli/hooks_cmd.go index 229dfaa604..7f58d67f56 100644 --- a/cmd/entire/cli/hooks_cmd.go +++ b/cmd/entire/cli/hooks_cmd.go @@ -16,6 +16,7 @@ import ( _ "github.com/entireio/cli/cmd/entire/cli/agent/factoryaidroid" _ "github.com/entireio/cli/cmd/entire/cli/agent/geminicli" _ "github.com/entireio/cli/cmd/entire/cli/agent/opencode" + _ "github.com/entireio/cli/cmd/entire/cli/agent/pi" _ "github.com/entireio/cli/cmd/entire/cli/agent/vogon" // support external agents diff --git a/cmd/entire/cli/trail_cmd.go b/cmd/entire/cli/trail_cmd.go index fb64f4f744..4b71f33746 100644 --- a/cmd/entire/cli/trail_cmd.go +++ b/cmd/entire/cli/trail_cmd.go @@ -50,6 +50,7 @@ branch, or lists all trails if no trail exists for the current branch.`, cmd.AddCommand(newTrailListCmd()) cmd.AddCommand(newTrailCreateCmd()) cmd.AddCommand(newTrailUpdateCmd()) + cmd.AddCommand(newTrailWatchCmd()) return cmd } diff --git a/cmd/entire/cli/trail_watch_cmd.go b/cmd/entire/cli/trail_watch_cmd.go new file mode 100644 index 0000000000..4732487d8a --- /dev/null +++ b/cmd/entire/cli/trail_watch_cmd.go @@ -0,0 +1,468 @@ +package cli + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/api" + "github.com/entireio/cli/cmd/entire/cli/gitremote" + "github.com/entireio/cli/cmd/entire/cli/trail" + + "github.com/spf13/cobra" +) + +// SSE constants for the trail code-review stream. Field names mirror the +// client spec in entirehq/entire.io docs/trail-code-review-stream.md. +const ( + sseEventReady = "ready" + sseEventComment = "comment" + sseEventCommentDeleted = "comment_deleted" + sseEventReconnect = "reconnect" + sseEventDeleted = "deleted" + sseEventError = "error" +) + +// reconnectBackoffInitial / reconnectBackoffCap bound the exponential backoff +// used after network errors. The server's max stream duration (~50s) closes +// cleanly with an `event: reconnect`; in that case we reconnect immediately +// with no backoff. +const ( + reconnectBackoffInitial = 500 * time.Millisecond + reconnectBackoffCap = 30 * time.Second +) + +func newTrailWatchCmd() *cobra.Command { + var ( + jsonOutput bool + showPings bool + once bool + number int + ) + + cmd := &cobra.Command{ + Use: "watch []", + Short: "Tail a trail's code review (discussion) live", + Long: `Subscribe to the SSE stream of a trail's code-review discussion and +print events as they arrive. Reconnects automatically when the server +caps the connection (~50s) and on transient network errors. + +If is omitted, the trail for the current branch is used. + +Events emitted by the server: + ready initial frame, includes existing comment count + comment comment added or edited (with full payload) + comment_deleted comment removed + reconnect server cap reached; re-establishing + deleted trail row deleted; stream ends + error server-side error; treated as reconnect`, + Args: cobra.MaximumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) == 1 { + n, err := strconv.Atoi(args[0]) + if err != nil || n <= 0 { + return fmt.Errorf("invalid trail number %q", args[0]) + } + number = n + } + return runTrailWatch(cmd, number, jsonOutput, showPings, once) + }, + } + + cmd.Flags().BoolVar(&jsonOutput, "json", false, "Print each event as a single JSON line") + cmd.Flags().BoolVar(&showPings, "show-pings", false, "Print SSE keepalive pings (otherwise suppressed)") + cmd.Flags().BoolVar(&once, "once", false, "Drain the initial replay then exit (no reconnect, no live tail)") + + return cmd +} + +func runTrailWatch(cmd *cobra.Command, number int, jsonOutput, showPings, once bool) error { + ctx := cmd.Context() + w := cmd.OutOrStdout() + errW := cmd.ErrOrStderr() + + client, err := NewAuthenticatedAPIClient(trailInsecureHTTP(cmd)) + if err != nil { + return fmt.Errorf("authentication required: %w", err) + } + + host, owner, repo, err := gitremote.ResolveRemoteRepo(ctx, "origin") + if err != nil { + return fmt.Errorf("failed to resolve repository: %w", err) + } + + // Resolve trail number if not provided: look it up by current branch. + if number == 0 { + branch, err := GetCurrentBranch(ctx) + if err != nil { + return fmt.Errorf("no trail number given and current branch is unknown: %w", err) + } + found, err := findTrailByBranch(ctx, client, host, owner, repo, branch) + if err != nil { + return err + } + if found == nil { + return fmt.Errorf("no trail found for branch %q (pass an explicit trail number)", branch) + } + if found.Number <= 0 { + return fmt.Errorf("trail for branch %q has no numeric identifier yet", branch) + } + number = found.Number + } + + streamPath := fmt.Sprintf("%s/%d/code-review/stream", trailsBasePath(host, owner, repo), number) + + fmt.Fprintf(errW, "Watching trail #%d on %s/%s/%s — Ctrl+C to stop\n", number, host, owner, repo) + + backoff := reconnectBackoffInitial + lastEventID := "" + resumed := false + + for { + closeReason, lastSeenID, err := streamOnce(ctx, client, streamPath, lastEventID, resumed, jsonOutput, showPings, once, w, errW) + if lastSeenID != "" { + lastEventID = lastSeenID + } + + // Context cancelled (Ctrl+C) — exit cleanly. + if ctx.Err() != nil { + return nil //nolint:nilerr // ctx.Err() is the expected cancellation path; surface as clean exit + } + + switch closeReason { + case streamCloseTerminal: + return err + case streamCloseDeleted, streamCloseDone: + return nil + case streamCloseReconnect: + // Clean server-initiated reconnect (max_duration). No backoff. + resumed = true + backoff = reconnectBackoffInitial + continue + case streamCloseError: + // Server emitted `event: error` — reconnect with backoff. + fmt.Fprintf(errW, "stream error reported by server, reconnecting in %s\n", backoff) + case streamCloseTransport: + // Network/transport error. + if err != nil { + fmt.Fprintf(errW, "stream disconnected (%v), reconnecting in %s\n", err, backoff) + } + } + + // Sleep with cancellation. + select { + case <-ctx.Done(): + return nil + case <-time.After(backoff): + } + backoff *= 2 + if backoff > reconnectBackoffCap { + backoff = reconnectBackoffCap + } + resumed = true + } +} + +type streamCloseReason int + +const ( + streamCloseTransport streamCloseReason = iota // network/transport error + streamCloseReconnect // server `event: reconnect` + streamCloseDeleted // server `event: deleted` + streamCloseError // server `event: error` + streamCloseDone // local --once / EOF after replay + streamCloseTerminal // non-recoverable HTTP status (401/403/404/410) +) + +// terminalHTTPStatuses are HTTP response codes for which retrying the SSE +// stream cannot succeed: auth failures (401/403) and resource-not-found-style +// errors (404/410). 429 is intentionally *not* terminal — it's a transient +// rate-limit signal that should back off and retry. +var terminalHTTPStatuses = [...]int{ + http.StatusUnauthorized, // 401 + http.StatusForbidden, // 403 + http.StatusNotFound, // 404 + http.StatusGone, // 410 +} + +func isTerminalHTTPError(err error) bool { + for _, code := range terminalHTTPStatuses { + if api.IsHTTPErrorStatus(err, code) { + return true + } + } + return false +} + +// streamOnce opens a single SSE connection, prints events until it closes for +// any reason, and returns the close reason plus the last `id:` observed (so +// the caller can pass it as `Last-Event-ID` on reconnect). +// +//nolint:cyclop,gocognit // SSE framing parser is naturally branchy; splitting hurts readability +func streamOnce( + ctx context.Context, + client *api.Client, + path string, + lastEventID string, + resumed bool, + jsonOutput, showPings, once bool, + w, errW io.Writer, +) (streamCloseReason, string, error) { + headers := http.Header{} + headers.Set("Accept", "text/event-stream") + headers.Set("Cache-Control", "no-cache") + if lastEventID != "" { + headers.Set("Last-Event-ID", lastEventID) + } else if resumed { + // First reconnect after a transport error with no id seen: use the + // query-param form to suppress replay anyway. + path += "?replay=false" + } + + resp, err := client.GetStream(ctx, path, headers) + if err != nil { + return streamCloseTransport, "", fmt.Errorf("open SSE stream: %w", err) + } + defer resp.Body.Close() + + if err := checkTrailResponse(resp); err != nil { + // Terminal: surface the error and don't reconnect. 429 deliberately + // falls through to streamCloseTransport so the caller backs off. + if isTerminalHTTPError(err) { + return streamCloseTerminal, "", err + } + return streamCloseTransport, "", err + } + + scanner := bufio.NewScanner(resp.Body) + // SSE frames can be larger than the default 64KiB scanner buffer when a + // trail has long comment bodies; bump to 1 MiB to match the API's + // per-comment limits. + scanner.Buffer(make([]byte, 0, 64*1024), 1<<20) + + var ( + eventName string + dataLines []string + eventID string // id of the in-progress frame (reset on flush) + lastSeenID string // most recent SSE id from any frame that includes one + seenReady bool + remainReplay int // --once: comment events still to drain after ready + onceExitNext bool // --once: exit after this flush + ) + + flush := func() (streamCloseReason, bool) { + defer func() { + eventName = "" + dataLines = nil + eventID = "" + }() + if eventName == "" && len(dataLines) == 0 { + return streamCloseTransport, false + } + data := strings.Join(dataLines, "\n") + printSSEEvent(w, errW, eventName, data, jsonOutput) + + if eventID != "" { + lastSeenID = eventID + } + + switch eventName { + case sseEventReady: + seenReady = true + if once { + var p struct { + CommentCount int `json:"commentCount"` + Resumed bool `json:"resumed"` + } + if jerr := json.Unmarshal([]byte(data), &p); jerr != nil { + fmt.Fprintf(errW, "Warning: malformed ready payload: %v\n", jerr) + } + if p.Resumed || p.CommentCount == 0 { + onceExitNext = true + } else { + remainReplay = p.CommentCount + } + } + case sseEventComment: + if once && seenReady && remainReplay > 0 { + remainReplay-- + if remainReplay == 0 { + onceExitNext = true + } + } + case sseEventReconnect: + return streamCloseReconnect, true + case sseEventDeleted: + return streamCloseDeleted, true + case sseEventError: + return streamCloseError, true + } + if onceExitNext { + return streamCloseDone, true + } + return streamCloseTransport, false + } + + for scanner.Scan() { + line := scanner.Text() + + // Blank line dispatches the event. + if line == "" { + if reason, done := flush(); done { + return reason, lastSeenID, nil + } + continue + } + + // Comment / keepalive line. + if strings.HasPrefix(line, ":") { + if showPings { + fmt.Fprintln(errW, "ping:", strings.TrimSpace(strings.TrimPrefix(line, ":"))) + } + continue + } + + field, value, ok := strings.Cut(line, ":") + if !ok { + // Field-only line (no colon) — per spec the value is empty. + field = line + value = "" + } + // Per SSE spec: a single leading space after the colon is ignored. + value = strings.TrimPrefix(value, " ") + + switch field { + case "event": + eventName = value + case "data": + dataLines = append(dataLines, value) + case "id": + eventID = value + case "retry": + // Ignored — we manage backoff client-side. + } + } + + if err := scanner.Err(); err != nil { + // Context cancellation surfaces here as a wrapped "context canceled". + if errors.Is(err, context.Canceled) || ctx.Err() != nil { + return streamCloseDone, lastSeenID, nil + } + return streamCloseTransport, lastSeenID, fmt.Errorf("read SSE stream: %w", err) + } + return streamCloseTransport, lastSeenID, io.ErrUnexpectedEOF +} + +// printSSEEvent renders a single SSE event in either human-readable or +// json-line form. `data` may be empty (e.g. for the rare frame with only an +// `event:` line) — we still print something so the caller sees it. +func printSSEEvent(w, errW io.Writer, eventName, data string, jsonOutput bool) { + if jsonOutput { + // Emit a JSON envelope so consumers can reliably parse with + // `jq`/scripts. The data field is preserved verbatim if it isn't + // valid JSON; otherwise it's inlined as a sub-object. + envelope := map[string]any{"event": eventName} + if data != "" { + var sub any + if err := json.Unmarshal([]byte(data), &sub); err == nil { + envelope["data"] = sub + } else { + envelope["data"] = data + } + } + out, err := json.Marshal(envelope) + if err != nil { + fmt.Fprintf(errW, "failed to marshal event: %v\n", err) + return + } + fmt.Fprintln(w, string(out)) + return + } + + switch eventName { + case sseEventReady: + var p struct { + Repo string `json:"repo"` + TrailNumber int `json:"trailNumber"` + CommentCount int `json:"commentCount"` + Resumed bool `json:"resumed"` + } + if err := json.Unmarshal([]byte(data), &p); err == nil { + if p.Resumed { + fmt.Fprintf(w, "● connected to %s trail #%d (resumed; %d comment(s))\n", + p.Repo, p.TrailNumber, p.CommentCount) + } else { + fmt.Fprintf(w, "● connected to %s trail #%d (%d comment(s))\n", + p.Repo, p.TrailNumber, p.CommentCount) + } + return + } + case sseEventComment: + var p struct { + UpdatedAt time.Time `json:"updatedAt"` + Comment trail.Comment `json:"comment"` + } + if err := json.Unmarshal([]byte(data), &p); err == nil { + ts := p.UpdatedAt.Local().Format("15:04:05") + body := truncateForLog(p.Comment.Body, 200) + fmt.Fprintf(w, "[%s] %s: %s\n", ts, p.Comment.Author, body) + for _, r := range p.Comment.Replies { + fmt.Fprintf(w, " └─ %s: %s\n", r.Author, truncateForLog(r.Body, 200)) + } + return + } + case sseEventCommentDeleted: + var p struct { + UpdatedAt time.Time `json:"updatedAt"` + CommentID string `json:"commentId"` + } + if err := json.Unmarshal([]byte(data), &p); err == nil { + ts := p.UpdatedAt.Local().Format("15:04:05") + fmt.Fprintf(w, "[%s] (deleted comment %s)\n", ts, p.CommentID) + return + } + case sseEventReconnect: + fmt.Fprintln(errW, "↻ server requested reconnect") + return + case sseEventDeleted: + fmt.Fprintln(errW, "✖ trail was deleted") + return + case sseEventError: + var p struct { + Message string `json:"message"` + } + if jerr := json.Unmarshal([]byte(data), &p); jerr != nil { + // Best-effort: server payload may be missing or malformed; we still + // want to surface that an error event arrived. + fmt.Fprintf(errW, "Warning: malformed error payload: %v\n", jerr) + } + if p.Message != "" { + fmt.Fprintf(errW, "✖ stream error: %s\n", p.Message) + } else { + fmt.Fprintln(errW, "✖ stream error") + } + return + } + + // Fallback for unknown events or unparseable payloads: print raw. + fmt.Fprintf(w, "%s: %s\n", eventName, data) +} + +// truncateForLog clips body text on a rune boundary so a single multi-line +// comment doesn't blow up the watch view. Newlines collapse to spaces. +func truncateForLog(s string, maxRunes int) string { + s = strings.ReplaceAll(s, "\r\n", " ") + s = strings.ReplaceAll(s, "\n", " ") + rs := []rune(s) + if len(rs) <= maxRunes { + return s + } + return string(rs[:maxRunes]) + "…" +} diff --git a/cmd/entire/cli/trail_watch_cmd_test.go b/cmd/entire/cli/trail_watch_cmd_test.go new file mode 100644 index 0000000000..30ab7858bc --- /dev/null +++ b/cmd/entire/cli/trail_watch_cmd_test.go @@ -0,0 +1,233 @@ +package cli + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/entireio/cli/cmd/entire/cli/api" +) + +// fakeSSEServer streams a fixed sequence of frames with optional terminal +// behavior and records the Last-Event-ID header sent by the client. +func fakeSSEServer(t *testing.T, frames []string) (*httptest.Server, *string) { + t.Helper() + var lastEventID string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + lastEventID = r.Header.Get("Last-Event-ID") + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + flusher, ok := w.(http.Flusher) + if !ok { + t.Errorf("ResponseWriter is not a Flusher") + return + } + for _, f := range frames { + if _, err := fmt.Fprint(w, f); err != nil { + return + } + flusher.Flush() + } + })) + return srv, &lastEventID +} + +func TestStreamOnce_PrintsReadyAndComment(t *testing.T) { + frames := []string{ + "event: ready\ndata: {\"repo\":\"acme/web\",\"trailNumber\":42,\"commentCount\":1,\"resumed\":false}\nid: 1700000000000:ready\n\n", + "event: comment\ndata: {\"repo\":\"acme/web\",\"trailNumber\":42,\"updatedAt\":\"2026-01-01T00:00:00Z\",\"comment\":{\"id\":\"c1\",\"author\":\"alice\",\"body\":\"hello world\",\"created_at\":\"2026-01-01T00:00:00Z\",\"resolved\":false,\"resolved_by\":null,\"resolved_at\":null}}\nid: 1700000000000:c1\n\n", + } + srv, _ := fakeSSEServer(t, frames) + defer srv.Close() + + t.Setenv(api.BaseURLEnvVar, srv.URL) + client := api.NewClient("tok") + + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + reason, lastID, err := streamOnce(ctx, client, "/stream", "", false, false, false, true, &stdout, &stderr) + // `--once` with commentCount=1 should exit cleanly after seeing ready+comment. + if err != nil { + t.Fatalf("streamOnce error: %v", err) + } + if reason != streamCloseDone { + t.Errorf("close reason = %d, want streamCloseDone", reason) + } + if lastID != "1700000000000:c1" { + t.Errorf("lastID = %q, want %q", lastID, "1700000000000:c1") + } + out := stdout.String() + if !strings.Contains(out, "trail #42") { + t.Errorf("expected ready summary in output, got: %q", out) + } + if !strings.Contains(out, "alice") || !strings.Contains(out, "hello world") { + t.Errorf("expected comment line in output, got: %q", out) + } +} + +func TestStreamOnce_JSONOutputEnvelope(t *testing.T) { + frames := []string{ + "event: ready\ndata: {\"commentCount\":0}\nid: x\n\n", + } + srv, _ := fakeSSEServer(t, frames) + defer srv.Close() + + t.Setenv(api.BaseURLEnvVar, srv.URL) + client := api.NewClient("tok") + + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + if _, _, err := streamOnce(ctx, client, "/stream", "", false, true, false, true, &stdout, &stderr); err != nil { + t.Fatalf("streamOnce error: %v", err) + } + + line := strings.TrimSpace(stdout.String()) + var env map[string]any + if err := json.Unmarshal([]byte(line), &env); err != nil { + t.Fatalf("output is not JSON: %v\nline=%q", err, line) + } + if env["event"] != "ready" { + t.Errorf("event = %v, want ready", env["event"]) + } +} + +func TestStreamOnce_ShowPingsTrimsSSECommentWhitespace(t *testing.T) { + frames := []string{ + ": ping 123\n", + } + srv, _ := fakeSSEServer(t, frames) + defer srv.Close() + + t.Setenv(api.BaseURLEnvVar, srv.URL) + client := api.NewClient("tok") + + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, _, err := streamOnce(ctx, client, "/stream", "", false, false, true, false, &stdout, &stderr) + if err == nil { + t.Errorf("expected EOF after fixed test stream") + } + if got := stderr.String(); !strings.Contains(got, "ping: ping 123\n") { + t.Errorf("stderr = %q, want trimmed ping output", got) + } +} + +func TestStreamOnce_ReconnectEvent(t *testing.T) { + frames := []string{ + "event: ready\ndata: {\"commentCount\":0}\nid: r1\n\n", + "event: reconnect\ndata: {\"reason\":\"max_duration\"}\n\n", + } + srv, _ := fakeSSEServer(t, frames) + defer srv.Close() + + t.Setenv(api.BaseURLEnvVar, srv.URL) + client := api.NewClient("tok") + + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + reason, lastID, err := streamOnce(ctx, client, "/stream", "", false, false, false, false, &stdout, &stderr) + if err != nil { + t.Fatalf("streamOnce error: %v", err) + } + if reason != streamCloseReconnect { + t.Errorf("reason = %d, want streamCloseReconnect", reason) + } + if lastID != "r1" { + t.Errorf("lastID = %q, want r1 (reconnect frame has no id; should preserve last ready id)", lastID) + } +} + +func TestStreamOnce_TerminalHTTPStatusesDoNotReconnect(t *testing.T) { + cases := []struct { + name string + code int + }{ + {"unauthorized", http.StatusUnauthorized}, + {"forbidden", http.StatusForbidden}, + {"not_found", http.StatusNotFound}, + {"gone", http.StatusGone}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(tc.code) + _, _ = fmt.Fprintf(w, "{\"error\":\"%s\"}", http.StatusText(tc.code)) + })) + defer srv.Close() + + t.Setenv(api.BaseURLEnvVar, srv.URL) + client := api.NewClient("tok") + + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + reason, _, err := streamOnce(ctx, client, "/stream", "", false, false, false, false, &stdout, &stderr) + if reason != streamCloseTerminal { + t.Errorf("reason = %d, want streamCloseTerminal for %d", reason, tc.code) + } + if err == nil { + t.Errorf("want non-nil error for HTTP %d", tc.code) + } + }) + } +} + +func TestStreamOnce_TooManyRequestsIsRecoverable(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusTooManyRequests) + _, _ = fmt.Fprint(w, `{"error":"rate limited"}`) + })) + defer srv.Close() + + t.Setenv(api.BaseURLEnvVar, srv.URL) + client := api.NewClient("tok") + + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + reason, _, err := streamOnce(ctx, client, "/stream", "", false, false, false, false, &stdout, &stderr) + if reason != streamCloseTransport { + t.Errorf("reason = %d, want streamCloseTransport (429 should be retryable)", reason) + } + if err == nil { + t.Errorf("want non-nil error so caller can log the backoff reason") + } +} + +func TestStreamOnce_SendsLastEventIDHeader(t *testing.T) { + frames := []string{ + "event: deleted\ndata: {}\n\n", + } + srv, gotLastID := fakeSSEServer(t, frames) + defer srv.Close() + + t.Setenv(api.BaseURLEnvVar, srv.URL) + client := api.NewClient("tok") + + var stdout, stderr bytes.Buffer + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + if _, _, err := streamOnce(ctx, client, "/stream", "abc:c1", true, false, false, false, &stdout, &stderr); err != nil { + t.Fatalf("streamOnce error: %v", err) + } + if *gotLastID != "abc:c1" { + t.Errorf("server saw Last-Event-ID = %q, want %q", *gotLastID, "abc:c1") + } +} diff --git a/cmd/entire/cli/transcript/compact/compact.go b/cmd/entire/cli/transcript/compact/compact.go index 5d07a1c6a3..9f1e601746 100644 --- a/cmd/entire/cli/transcript/compact/compact.go +++ b/cmd/entire/cli/transcript/compact/compact.go @@ -94,6 +94,11 @@ func Compact(redacted redact.RedactedBytes, opts MetadataFields) ([]byte, error) return compactGemini(content, opts) } + // pi: detect on the raw header line. compactPi handles StartLine itself. + if isPiFormat(content) { + return compactPi(content, opts) + } + if isCodexFormat(content) { return compactCodex(content, opts) } diff --git a/cmd/entire/cli/transcript/compact/pi.go b/cmd/entire/cli/transcript/compact/pi.go new file mode 100644 index 0000000000..c64e53d27d --- /dev/null +++ b/cmd/entire/cli/transcript/compact/pi.go @@ -0,0 +1,316 @@ +package compact + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + + "github.com/entireio/cli/cmd/entire/cli/agent/pi/pijsonl" +) + +// --- pi format support --- +// +// Pi sessions are JSONL with a tree-shaped entry layout. Each entry has a +// top-level type ("session", "message", "model_change", "thinking_level_change", +// "compaction", "branch_summary", "label", "custom", "custom_message", +// "session_info") and a parentId pointer. +// +// Branching: when the user forks/branches mid-conversation the JSONL +// accumulates entries from BOTH branches. Compaction must walk only the +// active branch (root → most-recent message) so abandoned tool calls don't +// pollute the compact transcript. +// +// Parsing primitives (Entry/Message/ContentItem types, ResolveActiveBranch, +// SkipLines, NewScanner) are shared with the pi agent package via +// cmd/entire/cli/agent/pi/pijsonl so a fix applied here also lands there. + +const ( + piToolResultStatusOK = "success" + piToolResultStatusErr = "error" +) + +// piToolNameMap normalises Pi's lowercase tool names to the title-cased names +// used elsewhere in Entire's compact format (matching Claude's "Read"/"Write"/"Edit"). +var piToolNameMap = map[string]string{ + "edit": "Edit", + "read": "Read", + "write": "Write", +} + +// isPiFormat reports whether content looks like a Pi session JSONL file. +// Anchored on the persisted session header that pi writes as the first line. +func isPiFormat(content []byte) bool { + scanner := pijsonl.NewScanner(content) + for scanner.Scan() { + line := bytes.TrimSpace(scanner.Bytes()) + if len(line) == 0 { + continue + } + var probe struct { + Type string `json:"type"` + Version int `json:"version"` + } + if json.Unmarshal(line, &probe) != nil { + return false + } + // Pi auto-migrates v1/v2 to v3 on load; accept any positive version. + return probe.Type == "session" && probe.Version > 0 + } + return false +} + +// --- output structures (compact format) --- + +type piCompactUserBlock struct { + Text string `json:"text"` +} + +type piCompactAssistantTextBlock struct { + Type string `json:"type"` + Text string `json:"text"` +} + +type piCompactToolUseBlock struct { + Type string `json:"type"` + ID string `json:"id,omitempty"` + Name string `json:"name"` + Input any `json:"input"` + Result *piCompactToolResult `json:"result,omitempty"` +} + +type piCompactToolResult struct { + Output string `json:"output"` + Status string `json:"status"` +} + +// compactPi converts a Pi JSONL transcript into the Entire compact format. +// +// opts.StartLine is treated as a JSONL line offset. +// +// IMPORTANT: active-branch resolution and tool-result collection BOTH run on +// the original (untruncated) content. A truncated buffer breaks parentId +// chains and toolCallId references, which would let abandoned-branch entries +// and orphaned tool results leak into the compact output. +func compactPi(content []byte, opts MetadataFields) ([]byte, error) { + active := pijsonl.ResolveActiveBranch(content) + results, err := piCollectToolResults(content, active) + if err != nil { + return nil, err + } + + emit := content + if opts.StartLine > 0 { + emit = pijsonl.SkipLines(content, opts.StartLine) + if emit == nil { + return []byte{}, nil + } + } + + base := newTranscriptLine(opts) + var out []byte + + scanner := pijsonl.NewScanner(emit) + for scanner.Scan() { + var entry pijsonl.Entry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + continue + } + if entry.Type != pijsonl.EntryTypeMessage { + continue + } + if active != nil && !active[entry.ID] { + continue + } + + switch entry.Message.Role { + case pijsonl.RoleUser: + blocks := piEmitUserContent(entry.Message.Content) + if len(blocks) == 0 { + continue + } + contentJSON, err := json.Marshal(blocks) + if err != nil { + return nil, fmt.Errorf("marshal pi user content: %w", err) + } + line := base + line.Type = pijsonl.RoleUser + line.TS = piTimestampJSON(entry.Timestamp) + line.Content = contentJSON + appendLine(&out, line) + + case pijsonl.RoleAssistant: + blocks := piEmitAssistantContent(entry.Message.Content, results) + if len(blocks) == 0 { + continue + } + contentJSON, err := json.Marshal(blocks) + if err != nil { + return nil, fmt.Errorf("marshal pi assistant content: %w", err) + } + line := base + line.Type = pijsonl.RoleAssistant + line.TS = piTimestampJSON(entry.Timestamp) + line.ID = entry.ID + line.Content = contentJSON + if entry.Message.Usage != nil { + line.InputTokens = entry.Message.Usage.Input + line.OutputTokens = entry.Message.Usage.Output + } + appendLine(&out, line) + } + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("scan pi transcript: %w", err) + } + return out, nil +} + +// piEmitUserContent decodes a Pi user message's content (string or block array) +// into compact user blocks. +func piEmitUserContent(raw json.RawMessage) []piCompactUserBlock { + if text := pijsonl.DecodeStringContent(raw); text != "" { + return []piCompactUserBlock{{Text: text}} + } + var items []pijsonl.ContentItem + if err := json.Unmarshal(raw, &items); err != nil { + return nil + } + blocks := make([]piCompactUserBlock, 0, len(items)) + for _, item := range items { + if item.Type == pijsonl.ContentTypeText && item.Text != "" { + blocks = append(blocks, piCompactUserBlock{Text: item.Text}) + } + } + return blocks +} + +// piEmitAssistantContent decodes a Pi assistant message's content into +// compact assistant blocks (text + tool_use). Tool results are spliced in +// from `results` keyed by toolCallID. +func piEmitAssistantContent(raw json.RawMessage, results map[string]piCompactToolResult) []any { + if text := pijsonl.DecodeStringContent(raw); text != "" { + return []any{piCompactAssistantTextBlock{Type: pijsonl.ContentTypeText, Text: text}} + } + var items []pijsonl.ContentItem + if err := json.Unmarshal(raw, &items); err != nil { + return nil + } + blocks := make([]any, 0, len(items)) + for _, item := range items { + switch item.Type { + case pijsonl.ContentTypeText: + if item.Text != "" { + blocks = append(blocks, piCompactAssistantTextBlock{ + Type: pijsonl.ContentTypeText, + Text: item.Text, + }) + } + case "toolCall": + block := piCompactToolUseBlock{ + Type: "tool_use", + ID: item.ID, + Name: piNormalizeToolName(item.Name), + Input: piDecodeArguments(item.Arguments), + } + if r, ok := results[item.ID]; ok { + block.Result = &r + } + blocks = append(blocks, block) + } + } + return blocks +} + +// piCollectToolResults walks the transcript and returns a map of tool-call id +// to spliceable result. Branch-aware. Pass FULL transcript bytes — splicing +// requires resolving toolCallIds across the whole tree. +func piCollectToolResults(data []byte, active map[string]bool) (map[string]piCompactToolResult, error) { + results := map[string]piCompactToolResult{} + scanner := pijsonl.NewScanner(data) + for scanner.Scan() { + var entry pijsonl.Entry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + continue + } + if entry.Type != pijsonl.EntryTypeMessage || entry.Message.Role != pijsonl.RoleToolResult { + continue + } + if active != nil && !active[entry.ID] { + continue + } + if entry.Message.ToolCallID == "" { + continue + } + results[entry.Message.ToolCallID] = piCompactToolResult{ + Output: piDecodeResultOutput(entry.Message.Content), + Status: piResultStatus(entry.Message.IsError), + } + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("scan pi tool results: %w", err) + } + return results, nil +} + +func piNormalizeToolName(name string) string { + if normalized, ok := piToolNameMap[name]; ok { + return normalized + } + return name +} + +func piDecodeArguments(raw json.RawMessage) any { + if len(raw) == 0 { + return map[string]any{} + } + var decoded any + if err := json.Unmarshal(raw, &decoded); err != nil { + return map[string]any{} + } + return decoded +} + +func piDecodeResultOutput(raw json.RawMessage) string { + if text := pijsonl.DecodeStringContent(raw); text != "" { + return text + } + var items []pijsonl.ContentItem + if err := json.Unmarshal(raw, &items); err == nil { + texts := make([]string, 0, len(items)) + for _, item := range items { + if item.Type == pijsonl.ContentTypeText && item.Text != "" { + texts = append(texts, item.Text) + } + } + if len(texts) > 0 { + return strings.Join(texts, "\n") + } + } + // Fall through: serialize unknown structure as JSON. + var decoded any + if err := json.Unmarshal(raw, &decoded); err == nil { + if encoded, err := json.Marshal(decoded); err == nil { + return string(encoded) + } + } + return string(raw) +} + +func piResultStatus(isError bool) string { + if isError { + return piToolResultStatusErr + } + return piToolResultStatusOK +} + +func piTimestampJSON(ts string) json.RawMessage { + if ts == "" { + return nil + } + b, err := json.Marshal(ts) + if err != nil { + return nil + } + return b +} diff --git a/cmd/entire/cli/transcript/compact/pi_test.go b/cmd/entire/cli/transcript/compact/pi_test.go new file mode 100644 index 0000000000..3409d93d9b --- /dev/null +++ b/cmd/entire/cli/transcript/compact/pi_test.go @@ -0,0 +1,162 @@ +package compact + +import ( + "strings" + "testing" + + "github.com/entireio/cli/redact" +) + +const piTestSessionJSONL = `{"type":"session","version":3,"id":"test-uuid-123","timestamp":"2026-03-27T21:00:00.000Z","cwd":"/tmp/test"} +{"type":"model_change","id":"mc1","parentId":null,"timestamp":"2026-03-27T21:00:00.001Z","provider":"anthropic","modelId":"claude-sonnet-4-6"} +{"type":"message","id":"m1","parentId":"mc1","timestamp":"2026-03-27T21:00:01.000Z","message":{"role":"user","content":[{"type":"text","text":"Create hello.txt"}],"timestamp":1774646400000}} +{"type":"message","id":"m2","parentId":"m1","timestamp":"2026-03-27T21:00:02.000Z","message":{"role":"assistant","content":[{"type":"toolCall","id":"tc1","name":"write","arguments":{"path":"hello.txt","content":"hello world\n"}}],"usage":{"input":100,"output":50,"cacheRead":10,"cacheWrite":5},"stopReason":"toolUse","timestamp":1774646401000}} +{"type":"message","id":"m3","parentId":"m2","timestamp":"2026-03-27T21:00:03.000Z","message":{"role":"toolResult","toolCallId":"tc1","toolName":"write","content":[{"type":"text","text":"Written 12 bytes"}],"isError":false,"timestamp":1774646402000}} +{"type":"message","id":"m4","parentId":"m3","timestamp":"2026-03-27T21:00:04.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Created hello.txt with the content hello world."}],"usage":{"input":200,"output":30,"cacheRead":0,"cacheWrite":0},"stopReason":"stop","timestamp":1774646403000}} +` + +const piTestBranchingJSONL = `{"type":"session","version":3,"id":"test-branch-123","timestamp":"2026-03-27T22:00:00.000Z","cwd":"/tmp/test"} +{"type":"model_change","id":"mc1","parentId":null,"timestamp":"2026-03-27T22:00:00.001Z","provider":"anthropic","modelId":"claude-sonnet-4-6"} +{"type":"message","id":"m1","parentId":"mc1","timestamp":"2026-03-27T22:00:01.000Z","message":{"role":"user","content":[{"type":"text","text":"Create a file"}],"timestamp":1774650000000}} +{"type":"message","id":"m2","parentId":"m1","timestamp":"2026-03-27T22:00:02.000Z","message":{"role":"assistant","content":[{"type":"toolCall","id":"tc1","name":"write","arguments":{"path":"old.txt","content":"old\n"}}],"usage":{"input":100,"output":50,"cacheRead":0,"cacheWrite":0},"stopReason":"toolUse","timestamp":1774650001000}} +{"type":"message","id":"m3","parentId":"m2","timestamp":"2026-03-27T22:00:03.000Z","message":{"role":"toolResult","toolCallId":"tc1","toolName":"write","content":[{"type":"text","text":"Written 4 bytes"}],"isError":false,"timestamp":1774650002000}} +{"type":"message","id":"m4","parentId":"m3","timestamp":"2026-03-27T22:00:04.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Created old.txt"}],"usage":{"input":200,"output":30,"cacheRead":0,"cacheWrite":0},"stopReason":"stop","timestamp":1774650003000}} +{"type":"message","id":"m5","parentId":"m1","timestamp":"2026-03-27T22:00:05.000Z","message":{"role":"assistant","content":[{"type":"toolCall","id":"tc2","name":"write","arguments":{"path":"new.txt","content":"new\n"}}],"usage":{"input":150,"output":60,"cacheRead":5,"cacheWrite":3},"stopReason":"toolUse","timestamp":1774650004000}} +{"type":"message","id":"m6","parentId":"m5","timestamp":"2026-03-27T22:00:06.000Z","message":{"role":"toolResult","toolCallId":"tc2","toolName":"write","content":[{"type":"text","text":"Written 4 bytes"}],"isError":false,"timestamp":1774650005000}} +{"type":"message","id":"m7","parentId":"m6","timestamp":"2026-03-27T22:00:07.000Z","message":{"role":"assistant","content":[{"type":"text","text":"Created new.txt"}],"usage":{"input":250,"output":40,"cacheRead":0,"cacheWrite":0},"stopReason":"stop","timestamp":1774650006000}} +` + +func TestCompact_Pi_LinearTranscript(t *testing.T) { + t.Parallel() + + expected := []string{ + `{"v":1,"agent":"pi","cli_version":"0.5.1","type":"user","ts":"2026-03-27T21:00:01.000Z","content":[{"text":"Create hello.txt"}]}`, + `{"v":1,"agent":"pi","cli_version":"0.5.1","type":"assistant","ts":"2026-03-27T21:00:02.000Z","id":"m2","input_tokens":100,"output_tokens":50,"content":[{"type":"tool_use","id":"tc1","name":"Write","input":{"content":"hello world\n","path":"hello.txt"},"result":{"output":"Written 12 bytes","status":"success"}}]}`, + `{"v":1,"agent":"pi","cli_version":"0.5.1","type":"assistant","ts":"2026-03-27T21:00:04.000Z","id":"m4","input_tokens":200,"output_tokens":30,"content":[{"type":"text","text":"Created hello.txt with the content hello world."}]}`, + } + + result, err := Compact(redact.AlreadyRedacted([]byte(piTestSessionJSONL)), agentOpts("pi")) + if err != nil { + t.Fatalf("Compact: %v", err) + } + assertJSONLines(t, result, expected) +} + +func TestCompact_Pi_FiltersAbandonedBranches(t *testing.T) { + t.Parallel() + + result, err := Compact(redact.AlreadyRedacted([]byte(piTestBranchingJSONL)), agentOpts("pi")) + if err != nil { + t.Fatalf("Compact: %v", err) + } + got := string(result) + if strings.Contains(got, "old.txt") || strings.Contains(got, "Created old.txt") { + t.Errorf("compact should exclude abandoned branch (old.txt), got:\n%s", got) + } + if !strings.Contains(got, "new.txt") || !strings.Contains(got, "Created new.txt") { + t.Errorf("compact should include active branch (new.txt), got:\n%s", got) + } +} + +func TestCompact_Pi_NormalizesToolNamesAndErrors(t *testing.T) { + t.Parallel() + body := []byte(`{"type":"session","version":3,"id":"s1","timestamp":"2026-03-27T21:00:00.000Z","cwd":"/r"} +{"type":"message","id":"m1","parentId":null,"timestamp":"2026-03-27T21:00:01.000Z","message":{"role":"user","content":[{"type":"text","text":"Edit app.go"}]}} +{"type":"message","id":"m2","parentId":"m1","timestamp":"2026-03-27T21:00:02.000Z","message":{"role":"assistant","content":[{"type":"toolCall","id":"tc1","name":"edit","arguments":{"path":"app.go"}}]}} +{"type":"message","id":"m3","parentId":"m2","timestamp":"2026-03-27T21:00:03.000Z","message":{"role":"toolResult","toolCallId":"tc1","toolName":"edit","content":[{"type":"text","text":"file not found"}],"isError":true}} +`) + result, err := Compact(redact.AlreadyRedacted(body), agentOpts("pi")) + if err != nil { + t.Fatalf("Compact: %v", err) + } + got := string(result) + if !strings.Contains(got, `"name":"Edit"`) { + t.Errorf(`expected "name":"Edit" (normalized from "edit") in:\n%s`, got) + } + if !strings.Contains(got, `"status":"error"`) { + t.Errorf(`expected "status":"error" (mapped from isError:true) in:\n%s`, got) + } +} + +func TestCompact_Pi_FlatTranscriptNoTreeFiltering(t *testing.T) { + t.Parallel() + body := []byte(`{"type":"session","version":3,"id":"flat"} +{"type":"message","id":"m1","timestamp":"2026-03-27T21:00:01.000Z","message":{"role":"user","content":"hello"}} +{"type":"message","id":"m2","timestamp":"2026-03-27T21:00:02.000Z","message":{"role":"assistant","content":[{"type":"text","text":"hi"}],"usage":{"input":10,"output":5}}} +`) + expected := []string{ + `{"v":1,"agent":"pi","cli_version":"0.5.1","type":"user","ts":"2026-03-27T21:00:01.000Z","content":[{"text":"hello"}]}`, + `{"v":1,"agent":"pi","cli_version":"0.5.1","type":"assistant","ts":"2026-03-27T21:00:02.000Z","id":"m2","input_tokens":10,"output_tokens":5,"content":[{"type":"text","text":"hi"}]}`, + } + result, err := Compact(redact.AlreadyRedacted(body), agentOpts("pi")) + if err != nil { + t.Fatalf("Compact: %v", err) + } + assertJSONLines(t, result, expected) +} + +// TestCompact_Pi_StartLineDoesNotLeakAbandonedBranches is a regression test +// for a bug found in review: an earlier version of compactPi truncated the +// content via SkipLines BEFORE resolving the active branch. When StartLine +// skipped past fork-point entries, ResolveActiveBranch ran on a disconnected +// tree, returned nil, and the loop fell back to "all entries are active" — +// letting abandoned-branch tool calls leak into the compact transcript. +func TestCompact_Pi_StartLineDoesNotLeakAbandonedBranches(t *testing.T) { + t.Parallel() + // Same shape as piTestBranchingJSONL, but bump StartLine past the + // session header + model_change + m1 (fork point) — so that, on the + // truncated buffer, ResolveActiveBranch cannot anchor to a root. + opts := MetadataFields{Agent: "pi", CLIVersion: "0.5.1", StartLine: 3} + result, err := Compact(redact.AlreadyRedacted([]byte(piTestBranchingJSONL)), opts) + if err != nil { + t.Fatalf("Compact: %v", err) + } + got := string(result) + if strings.Contains(got, "old.txt") || strings.Contains(got, "Created old.txt") { + t.Errorf("abandoned-branch leak: compact contains old.txt entries:\n%s", got) + } + if !strings.Contains(got, "new.txt") { + t.Errorf("active-branch dropped: compact missing new.txt entries:\n%s", got) + } +} + +func TestCompact_Pi_StartLine(t *testing.T) { + t.Parallel() + // Skip header + model_change + first user → start scanning from m2. + opts := MetadataFields{Agent: "pi", CLIVersion: "0.5.1", StartLine: 3} + result, err := Compact(redact.AlreadyRedacted([]byte(piTestSessionJSONL)), opts) + if err != nil { + t.Fatalf("Compact: %v", err) + } + got := string(result) + if strings.Contains(got, "Create hello.txt") { + t.Errorf("StartLine=3 should skip user message, got:\n%s", got) + } + if !strings.Contains(got, `"id":"m2"`) || !strings.Contains(got, `"id":"m4"`) { + t.Errorf("StartLine=3 should keep m2 and m4, got:\n%s", got) + } +} + +func TestIsPiFormat(t *testing.T) { + t.Parallel() + cases := []struct { + name string + input string + want bool + }{ + {"v3 header", `{"type":"session","version":3,"id":"x","cwd":"/"}`, true}, + {"v1 legacy", `{"type":"session","version":1,"id":"x"}`, true}, + {"opencode", `{"info":{"id":"x"},"messages":[]}`, false}, + {"claude jsonl", `{"type":"user","uuid":"u1","message":{"content":"hi"}}`, false}, + {"empty", "", false}, + {"garbage", "not-json", false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Parallel() + if got := isPiFormat([]byte(c.input)); got != c.want { + t.Errorf("isPiFormat(%q) = %v, want %v", c.name, got, c.want) + } + }) + } +} diff --git a/e2e/agents/pi.go b/e2e/agents/pi.go new file mode 100644 index 0000000000..80c9606010 --- /dev/null +++ b/e2e/agents/pi.go @@ -0,0 +1,200 @@ +package agents + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +func init() { + if env := os.Getenv("E2E_AGENT"); env != "" && env != "pi" { + return + } + Register(&Pi{}) + RegisterGate("pi", 2) +} + +// Pi implements the E2E Agent interface for the Pi coding agent. +type Pi struct{} + +// PiSession exposes the per-test isolated Pi home so test fixtures can +// inspect or clean it up. +type PiSession struct { + *TmuxSession + + home string +} + +func (s *PiSession) Home() string { return s.home } + +func (p *Pi) Name() string { return "pi" } +func (p *Pi) Binary() string { return "pi" } +func (p *Pi) EntireAgent() string { return "pi" } +func (p *Pi) PromptPattern() string { return `\$\d` } +func (p *Pi) TimeoutMultiplier() float64 { return 1.5 } + +func (p *Pi) Bootstrap() error { + return nil +} + +func (p *Pi) IsTransientError(out Output, _ error) bool { + combined := out.Stdout + out.Stderr + for _, pat := range []string{ + "overloaded", + "rate limit", + "429", + "503", + "ECONNRESET", + "ETIMEDOUT", + "timeout", + } { + if strings.Contains(combined, pat) { + return true + } + } + return false +} + +// piHome creates an isolated PI_CODING_AGENT_DIR for a test run so +// parallel tests don't share session state with the real ~/.pi/agent. +// Auth (auth.json) and user-tunable defaults (settings.json) are +// symlinked from the real home if present so OAuth tokens flow through +// without re-prompting; tests that run with an explicit +// ANTHROPIC_API_KEY / OPENAI_API_KEY env will pick those up via the +// inherited environment regardless. +func piHome() (string, func(), error) { + cache, err := os.UserCacheDir() + if err != nil { + return "", nil, fmt.Errorf("resolve user cache dir: %w", err) + } + base := filepath.Join(cache, "entire-e2e") + if err := os.MkdirAll(base, 0o755); err != nil { + return "", nil, fmt.Errorf("create pi home base %q: %w", base, err) + } + dir, err := os.MkdirTemp(base, "pi-home-*") + if err != nil { + return "", nil, fmt.Errorf("create temporary pi home under %q: %w", base, err) + } + if err := seedPiHome(dir); err != nil { + _ = os.RemoveAll(dir) + return "", nil, fmt.Errorf("seed pi home: %w", err) + } + return dir, func() { _ = os.RemoveAll(dir) }, nil +} + +// seedPiHome links the user's auth.json + settings.json into the +// isolated home (best-effort) so Pi can authenticate without +// re-prompting. Sessions still write into the isolated home's +// sessions/ subdir, keeping per-test session state hermetic. +func seedPiHome(home string) error { + realHome, err := os.UserHomeDir() + if err != nil { + return nil //nolint:nilerr // no user home → can't seed; tests with API-key env still work + } + src := filepath.Join(realHome, ".pi", "agent") + for _, name := range []string{"auth.json", "settings.json"} { + from := filepath.Join(src, name) + if _, statErr := os.Stat(from); statErr != nil { + continue // file not present — skip + } + to := filepath.Join(home, name) + if linkErr := os.Symlink(from, to); linkErr != nil { + return fmt.Errorf("symlink %s: %w", name, linkErr) + } + } + return nil +} + +func (p *Pi) RunPrompt(ctx context.Context, dir string, prompt string, opts ...Option) (Output, error) { + cfg := &runConfig{} + for _, o := range opts { + o(cfg) + } + + bin, err := exec.LookPath(p.Binary()) + if err != nil { + return Output{}, fmt.Errorf("%s not in PATH: %w", p.Binary(), err) + } + + home, cleanup, err := piHome() + if err != nil { + return Output{}, fmt.Errorf("create pi home: %w", err) + } + defer cleanup() + + args := []string{"-p", prompt, "--no-skills", "--no-prompt-templates", "--no-themes"} + displayArgs := []string{"-p", fmt.Sprintf("%q", prompt), "--no-skills", "--no-prompt-templates", "--no-themes"} + + env := append(filterEnv(os.Environ(), "ENTIRE_TEST_TTY", "PI_CODING_AGENT_DIR"), + "PI_CODING_AGENT_DIR="+home, + ) + + cmd := exec.CommandContext(ctx, bin, args...) + cmd.Dir = dir + cmd.Env = env + setupProcessGroup(cmd) + cmd.WaitDelay = 5 * time.Second + + var stdout, stderr strings.Builder + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err = cmd.Run() + exitCode := 0 + if err != nil { + exitErr := &exec.ExitError{} + if errors.As(err, &exitErr) { + exitCode = exitErr.ExitCode() + } else { + exitCode = -1 + } + } + + return Output{ + Command: p.Binary() + " " + strings.Join(displayArgs, " "), + Stdout: stdout.String(), + Stderr: stderr.String(), + ExitCode: exitCode, + }, err +} + +func (p *Pi) StartSession(_ context.Context, dir string) (Session, error) { + name := fmt.Sprintf("pi-test-%d", time.Now().UnixNano()) + + home, cleanup, err := piHome() + if err != nil { + return nil, fmt.Errorf("create pi home: %w", err) + } + + s, err := p.startTmuxSession(name, dir, home, p.Binary()) + if err != nil { + cleanup() + return nil, err + } + s.OnClose(cleanup) + + if _, err := s.WaitFor(p.PromptPattern(), 30*time.Second); err != nil { + _ = s.Close() + return nil, fmt.Errorf("waiting for initial prompt: %w", err) + } + s.stableAtSend = "" + + return &PiSession{TmuxSession: s, home: home}, nil +} + +// startTmuxSession spawns Pi inside tmux with PI_CODING_AGENT_DIR set +// via the `env` command. Mirrors Codex's startTmuxSession pattern so +// the per-test home propagates into the agent process. +func (p *Pi) startTmuxSession(name, dir, home string, args ...string) (*TmuxSession, error) { + tmuxArgs := append([]string{ + "PI_CODING_AGENT_DIR=" + home, + "HOME=" + os.Getenv("HOME"), + "TERM=" + os.Getenv("TERM"), + }, args...) + return NewTmuxSession(name, dir, []string{"PI_CODING_AGENT_DIR", "ENTIRE_TEST_TTY"}, "env", tmuxArgs...) +}