diff --git a/cmd/entire/cli/checkpoint/shadow_ref.go b/cmd/entire/cli/checkpoint/shadow_ref.go new file mode 100644 index 0000000000..5724903410 --- /dev/null +++ b/cmd/entire/cli/checkpoint/shadow_ref.go @@ -0,0 +1,164 @@ +package checkpoint + +import ( + "context" + "errors" + "fmt" + "math/rand/v2" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/internal/flock" + + "github.com/go-git/go-git/v6/plumbing" +) + +// ErrShadowRefBusy is returned by casUpdateShadowBranchRef when the ref has +// moved since the caller read it. Callers retry with a fresh parent. +var ErrShadowRefBusy = errors.New("shadow branch ref moved (CAS mismatch)") + +// shadowRefMaxRetries bounds the WriteTemporary retry loop. With the +// per-shadow-branch flock held, our own writers never collide; this budget +// is purely a safety net against an external `git update-ref` writer that +// repeatedly beats us to the ref. +const shadowRefMaxRetries = 16 + +// shadowRefMaxJitter is the upper bound for randomized backoff between CAS +// retries. Random jitter avoids thundering-herd retry patterns when many +// sessions hit the same shadow branch simultaneously. +const shadowRefMaxJitter = 8 * time.Millisecond + +// repoDirs returns the worktree root and git common dir for the store's +// repository. Callers use the worktree root as cmd.Dir for git invocations +// and the common dir to locate filesystem paths (lock files, loose objects) +// — both without depending on the process cwd. +func (s *GitStore) repoDirs(ctx context.Context) (worktreeRoot, commonDir string, err error) { + wt, err := s.repo.Worktree() + if err != nil { + return "", "", fmt.Errorf("open worktree: %w", err) + } + worktreeRoot = wt.Filesystem().Root() + if worktreeRoot == "" { + return "", "", errors.New("repository worktree filesystem has no root path") + } + commonDir, err = resolveGitCommonDir(ctx, s.repo) + if err != nil { + return "", "", err + } + return worktreeRoot, commonDir, nil +} + +// casUpdateShadowBranchRef atomically updates a shadow branch ref via +// `git update-ref `. Pass plumbing.ZeroHash as expectedHash +// to require the ref to NOT exist (first-checkpoint case). +// +// repoRoot is used as cmd.Dir so the update targets the same repository as +// the rest of WriteTemporary (i.e. s.repo) regardless of the process cwd. +// +// Returns ErrShadowRefBusy when git reports the ref moved since expectedHash +// was observed; callers retry with a fresh parent. Any other failure is +// returned wrapped. +// +// Why shell out: git's ref-locking is the canonical cross-process atomic +// CAS — go-git's CheckAndSetReference doesn't interoperate with native git's +// .lock files, and shadow branches can be touched concurrently by separate +// `entire` hook processes. +func casUpdateShadowBranchRef(ctx context.Context, repoRoot, branchName string, newHash, expectedHash plumbing.Hash) error { + refName := "refs/heads/" + branchName + + // All-zeros OID with the repo's object-format width means "must not + // exist". SHA-1 repos want 40 zeros, SHA-256 repos want 64; mirror + // newHash's hex width so we pick the right one without an extra git call. + newValue := newHash.String() + oldValue := strings.Repeat("0", newHash.HexSize()) + if expectedHash != plumbing.ZeroHash { + oldValue = expectedHash.String() + } + + cmd := exec.CommandContext(ctx, "git", "update-ref", refName, newValue, oldValue) + cmd.Dir = repoRoot + // Force English diagnostics so the CAS-conflict pattern match below + // isn't defeated by a translated stderr message in a non-C locale. + cmd.Env = append(os.Environ(), "LC_ALL=C", "LANG=C") + output, err := cmd.CombinedOutput() + if err == nil { + return nil + } + + out := string(output) + // Git's CAS-failure messages: "cannot lock ref ..." (covers both + // "is at X but expected Y" and "reference already exists" for the + // zero-OID case). Other failures propagate. + if strings.Contains(out, "cannot lock ref") || strings.Contains(out, "but expected") { + return ErrShadowRefBusy + } + return fmt.Errorf("git update-ref %s: %s: %w", refName, strings.TrimSpace(out), err) +} + +// shadowRefBackoff sleeps for a small random jitter before the next CAS +// retry. After several retries the upper bound doubles to slow the +// thundering herd further. Respects context cancellation. +func shadowRefBackoff(ctx context.Context, attempt int) error { + base := shadowRefMaxJitter + if attempt > 4 { + base *= 2 + } + // Add a 1ms floor so the chosen sleep is always non-trivial, even when + // rand.Int64N happens to return 0. + d := time.Duration(rand.Int64N(int64(base))) + time.Millisecond //nolint:gosec // jitter, not security-sensitive + select { + case <-time.After(d): + return nil + case <-ctx.Done(): + return ctx.Err() //nolint:wrapcheck // canonical context cancellation + } +} + +// shadowBranchLockPath returns the per-shadow-branch flock file path. Lock +// files live in /entire-shadow-locks/ so they don't pollute +// the session-state directory. Branch names are slash-escaped because the +// shadow-branch convention "entire/" would otherwise nest directories. +func shadowBranchLockPath(commonDir, branchName string) (string, error) { + lockDir := filepath.Join(commonDir, "entire-shadow-locks") + if err := os.MkdirAll(lockDir, 0o750); err != nil { + return "", fmt.Errorf("create shadow lock directory: %w", err) + } + safe := strings.ReplaceAll(branchName, "/", "_") + return filepath.Join(lockDir, safe+".lock"), nil +} + +// withShadowBranchFlock acquires the per-shadow-branch flock, runs fn, and +// releases the flock. Serializes all WriteTemporary callers that target the +// same shadow branch — across goroutines AND across processes — so the CAS +// in casUpdateShadowBranchRef only sees external writers as contention. +// +// commonDir is the git common directory (from s.repoDirs); it locates the +// lock file independently of the process cwd. +func withShadowBranchFlock(commonDir, branchName string, fn func() error) error { + path, err := shadowBranchLockPath(commonDir, branchName) + if err != nil { + return err + } + release, err := flock.Acquire(path) + if err != nil { + return fmt.Errorf("acquire shadow flock %s: %w", branchName, err) + } + defer release() + return fn() +} + +// tryDeleteLooseObject best-effort removes a loose object file. Used to +// clean up dangling commits created during a CAS-losing attempt. Failures +// (e.g. object already packed by a concurrent gc, or never written as a +// loose object) are ignored — the object will be picked up by the next gc +// pass either way. +func tryDeleteLooseObject(commonDir string, hash plumbing.Hash) { + h := hash.String() + if len(h) < 3 { + return + } + _ = os.Remove(filepath.Join(commonDir, "objects", h[:2], h[2:])) +} diff --git a/cmd/entire/cli/checkpoint/temporary.go b/cmd/entire/cli/checkpoint/temporary.go index d2784428f2..ec0cd6a8a3 100644 --- a/cmd/entire/cli/checkpoint/temporary.go +++ b/cmd/entire/cli/checkpoint/temporary.go @@ -68,21 +68,9 @@ func (s *GitStore) WriteTemporary(ctx context.Context, opts WriteTemporaryOption // Get shadow branch name shadowBranchName := ShadowBranchNameForCommit(opts.BaseCommit, opts.WorktreeID) - // Get or create shadow branch - parentHash, baseTreeHash, err := s.getOrCreateShadowBranch(shadowBranchName) - if err != nil { - return WriteTemporaryResult{}, fmt.Errorf("failed to get shadow branch: %w", err) - } - - // Get the last checkpoint's tree hash for deduplication - var lastTreeHash plumbing.Hash - if parentHash != plumbing.ZeroHash { - if lastCommit, err := s.repo.CommitObject(parentHash); err == nil { - lastTreeHash = lastCommit.TreeHash - } - } - - // Collect all files to include + // Collect file lists once — the worktree state is stable across retries, + // so this work doesn't repeat per CAS attempt. Tree-building (which + // depends on the parent tree) is what re-runs inside the retry loop. var allFiles []string var allDeletedFiles []string if opts.IsFirstCheckpoint { @@ -110,39 +98,86 @@ func (s *GitStore) WriteTemporary(ctx context.Context, opts WriteTemporaryOption allDeletedFiles = opts.DeletedFiles } - // Build tree with changes - treeHash, err := s.buildTreeWithChanges(ctx, baseTreeHash, allFiles, allDeletedFiles, opts.MetadataDir, opts.MetadataDirAbs) + commitMsg := trailers.FormatShadowCommit(opts.CommitMessage, opts.MetadataDir, opts.SessionID) + + repoRoot, commonDir, err := s.repoDirs(ctx) if err != nil { - return WriteTemporaryResult{}, fmt.Errorf("failed to build tree: %w", err) - } + return WriteTemporaryResult{}, fmt.Errorf("failed to resolve repo dirs: %w", err) + } + + var result WriteTemporaryResult + // withShadowBranchFlock serializes all writers targeting this shadow + // branch — across goroutines and across processes — so the inner CAS + // only sees contention from external `git update-ref` callers (rare). + err = withShadowBranchFlock(commonDir, shadowBranchName, func() error { + // Tiny CAS retry budget: with the flock held, races against our own + // code are impossible. Retries cover the pathological case of an + // external writer (a user invoking `git update-ref` manually, etc.). + for attempt := range shadowRefMaxRetries { + parentHash, baseTreeHash, gErr := s.getOrCreateShadowBranch(shadowBranchName) + if gErr != nil { + return fmt.Errorf("failed to get shadow branch: %w", gErr) + } - // Deduplication: skip if tree hash matches the last checkpoint - if lastTreeHash != plumbing.ZeroHash && treeHash == lastTreeHash { - return WriteTemporaryResult{ - CommitHash: parentHash, - Skipped: true, - }, nil - } + // Get the last checkpoint's tree hash for deduplication + var lastTreeHash plumbing.Hash + if parentHash != plumbing.ZeroHash { + if lastCommit, lcErr := s.repo.CommitObject(parentHash); lcErr == nil { + lastTreeHash = lastCommit.TreeHash + } + } - // Create checkpoint commit with trailers - commitMsg := trailers.FormatShadowCommit(opts.CommitMessage, opts.MetadataDir, opts.SessionID) + treeHash, tErr := s.buildTreeWithChanges(ctx, baseTreeHash, allFiles, allDeletedFiles, opts.MetadataDir, opts.MetadataDirAbs) + if tErr != nil { + return fmt.Errorf("failed to build tree: %w", tErr) + } - commitHash, err := s.createCommit(ctx, treeHash, parentHash, commitMsg, opts.AuthorName, opts.AuthorEmail) - if err != nil { - return WriteTemporaryResult{}, fmt.Errorf("failed to create commit: %w", err) - } + // Deduplication: skip if tree hash matches the current shadow tip. + if lastTreeHash != plumbing.ZeroHash && treeHash == lastTreeHash { + result = WriteTemporaryResult{ + CommitHash: parentHash, + Skipped: true, + } + return nil + } - // Update branch reference - refName := plumbing.NewBranchReferenceName(shadowBranchName) - newRef := plumbing.NewHashReference(refName, commitHash) - if err := s.repo.Storer.SetReference(newRef); err != nil { - return WriteTemporaryResult{}, fmt.Errorf("failed to update branch reference: %w", err) - } + commitHash, cErr := s.createCommit(ctx, treeHash, parentHash, commitMsg, opts.AuthorName, opts.AuthorEmail) + if cErr != nil { + return fmt.Errorf("failed to create commit: %w", cErr) + } - return WriteTemporaryResult{ - CommitHash: commitHash, - Skipped: false, - }, nil + refErr := casUpdateShadowBranchRef(ctx, repoRoot, shadowBranchName, commitHash, parentHash) + if refErr == nil { + result = WriteTemporaryResult{ + CommitHash: commitHash, + Skipped: false, + } + return nil + } + if !errors.Is(refErr, ErrShadowRefBusy) { + return fmt.Errorf("failed to update shadow branch reference: %w", refErr) + } + // Our commit is now dangling — best-effort remove it so we don't + // leak loose objects across many losing attempts. + tryDeleteLooseObject(commonDir, commitHash) + if bErr := shadowRefBackoff(ctx, attempt); bErr != nil { + return bErr + } + } + // Retry budget exhausted. With the flock held this means an external + // writer beat us shadowRefMaxRetries times in a row — surface it in + // .entire/logs/ so operators can see a stuck shadow branch. + logging.Warn(logging.WithComponent(ctx, "checkpoint"), + "shadow branch CAS retry budget exhausted", + slog.String("shadow_branch", shadowBranchName), + slog.Int("retries", shadowRefMaxRetries), + ) + return fmt.Errorf("failed to update shadow branch reference after %d CAS retries: %w", shadowRefMaxRetries, ErrShadowRefBusy) + }) + if err != nil { + return WriteTemporaryResult{}, err + } + return result, nil } // ReadTemporary reads the latest checkpoint from a shadow branch. @@ -257,12 +292,6 @@ func (s *GitStore) WriteTemporaryTask(ctx context.Context, opts WriteTemporaryTa // Get shadow branch name shadowBranchName := ShadowBranchNameForCommit(opts.BaseCommit, opts.WorktreeID) - // Get or create shadow branch - parentHash, baseTreeHash, err := s.getOrCreateShadowBranch(shadowBranchName) - if err != nil { - return plumbing.ZeroHash, fmt.Errorf("failed to get shadow branch: %w", err) - } - // Collect all files to include in the commit. // Filter out gitignored files — subagent transcripts may report files like .env // that exist on disk but are gitignored. Without filtering, secrets would leak @@ -272,32 +301,58 @@ func (s *GitStore) WriteTemporaryTask(ctx context.Context, opts WriteTemporaryTa candidateFiles = append(candidateFiles, opts.NewFiles...) allFiles := filterGitIgnoredFiles(ctx, s.repo, candidateFiles) - // Build new tree with code changes (no metadata dir yet) - newTreeHash, err := s.buildTreeWithChanges(ctx, baseTreeHash, allFiles, opts.DeletedFiles, "", "") + repoRoot, commonDir, err := s.repoDirs(ctx) if err != nil { - return plumbing.ZeroHash, fmt.Errorf("failed to build tree: %w", err) + return plumbing.ZeroHash, fmt.Errorf("failed to resolve repo dirs: %w", err) } - // Add task metadata to tree - newTreeHash, err = s.addTaskMetadataToTree(ctx, newTreeHash, opts) - if err != nil { - return plumbing.ZeroHash, fmt.Errorf("failed to add task metadata: %w", err) - } + var resultHash plumbing.Hash + err = withShadowBranchFlock(commonDir, shadowBranchName, func() error { + for attempt := range shadowRefMaxRetries { + parentHash, baseTreeHash, gErr := s.getOrCreateShadowBranch(shadowBranchName) + if gErr != nil { + return fmt.Errorf("failed to get shadow branch: %w", gErr) + } - // Create the commit - commitHash, err := s.createCommit(ctx, newTreeHash, parentHash, opts.CommitMessage, opts.AuthorName, opts.AuthorEmail) - if err != nil { - return plumbing.ZeroHash, fmt.Errorf("failed to create commit: %w", err) - } + newTreeHash, tErr := s.buildTreeWithChanges(ctx, baseTreeHash, allFiles, opts.DeletedFiles, "", "") + if tErr != nil { + return fmt.Errorf("failed to build tree: %w", tErr) + } - // Update shadow branch reference - refName := plumbing.NewBranchReferenceName(shadowBranchName) - ref := plumbing.NewHashReference(refName, commitHash) - if err := s.repo.Storer.SetReference(ref); err != nil { - return plumbing.ZeroHash, fmt.Errorf("failed to update shadow branch reference: %w", err) - } + newTreeHash, tErr = s.addTaskMetadataToTree(ctx, newTreeHash, opts) + if tErr != nil { + return fmt.Errorf("failed to add task metadata: %w", tErr) + } - return commitHash, nil + commitHash, cErr := s.createCommit(ctx, newTreeHash, parentHash, opts.CommitMessage, opts.AuthorName, opts.AuthorEmail) + if cErr != nil { + return fmt.Errorf("failed to create commit: %w", cErr) + } + + refErr := casUpdateShadowBranchRef(ctx, repoRoot, shadowBranchName, commitHash, parentHash) + if refErr == nil { + resultHash = commitHash + return nil + } + if !errors.Is(refErr, ErrShadowRefBusy) { + return fmt.Errorf("failed to update shadow branch reference: %w", refErr) + } + tryDeleteLooseObject(commonDir, commitHash) + if bErr := shadowRefBackoff(ctx, attempt); bErr != nil { + return bErr + } + } + logging.Warn(logging.WithComponent(ctx, "checkpoint"), + "shadow branch CAS retry budget exhausted (task checkpoint)", + slog.String("shadow_branch", shadowBranchName), + slog.Int("retries", shadowRefMaxRetries), + ) + return fmt.Errorf("failed to update shadow branch reference after %d CAS retries: %w", shadowRefMaxRetries, ErrShadowRefBusy) + }) + if err != nil { + return plumbing.ZeroHash, err + } + return resultHash, nil } // addTaskMetadataToTree adds task checkpoint metadata to a git tree. diff --git a/cmd/entire/cli/internal/flock/flock_unix.go b/cmd/entire/cli/internal/flock/flock_unix.go new file mode 100644 index 0000000000..680153f21b --- /dev/null +++ b/cmd/entire/cli/internal/flock/flock_unix.go @@ -0,0 +1,30 @@ +//go:build unix + +// Package flock provides a small cross-process advisory-lock primitive built +// on POSIX flock (Unix) / LockFileEx (Windows). It exists so that checkpoint +// and strategy can both serialize on shared resources without one taking +// the other as an import dependency. +package flock + +import ( + "fmt" + "os" + "syscall" +) + +// Acquire takes an exclusive advisory lock on path, creating the file if +// needed. The returned release closes the file, which drops the flock. +// Callers must invoke release exactly once. The lock file persists between +// runs — flock state is held by the file descriptor, not by the inode on +// disk — so the lockfile contents are immaterial. +func Acquire(path string) (release func(), err error) { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) //nolint:gosec // caller is responsible for path validation + if err != nil { + return nil, fmt.Errorf("open flock: %w", err) + } + if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil { //nolint:gosec // file descriptors are non-negative; standard Go pattern for syscall.Flock + _ = f.Close() + return nil, fmt.Errorf("flock: %w", err) + } + return func() { _ = f.Close() }, nil +} diff --git a/cmd/entire/cli/strategy/state_lock_windows.go b/cmd/entire/cli/internal/flock/flock_windows.go similarity index 52% rename from cmd/entire/cli/strategy/state_lock_windows.go rename to cmd/entire/cli/internal/flock/flock_windows.go index e6cff2b385..93cc0c4c48 100644 --- a/cmd/entire/cli/strategy/state_lock_windows.go +++ b/cmd/entire/cli/internal/flock/flock_windows.go @@ -1,6 +1,6 @@ //go:build windows -package strategy +package flock import ( "fmt" @@ -9,18 +9,18 @@ import ( "golang.org/x/sys/windows" ) -// acquireStateFileLock takes an exclusive lock on path via Windows -// LockFileEx. The returned release unlocks and closes the file. Callers -// must call release exactly once. -func acquireStateFileLock(path string) (release func(), err error) { - f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) //nolint:gosec // path built from validated session ID +// Acquire takes an exclusive lock on path via Windows LockFileEx. The +// returned release unlocks and closes the file. Callers must invoke release +// exactly once. +func Acquire(path string) (release func(), err error) { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) //nolint:gosec // caller is responsible for path validation if err != nil { - return nil, fmt.Errorf("open state lock: %w", err) + return nil, fmt.Errorf("open flock: %w", err) } overlapped := new(windows.Overlapped) if err := windows.LockFileEx(windows.Handle(f.Fd()), windows.LOCKFILE_EXCLUSIVE_LOCK, 0, 1, 0, overlapped); err != nil { _ = f.Close() - return nil, fmt.Errorf("lock state lock: %w", err) + return nil, fmt.Errorf("lock flock: %w", err) } return func() { _ = windows.UnlockFileEx(windows.Handle(f.Fd()), 0, 1, 0, overlapped) diff --git a/cmd/entire/cli/strategy/manual_commit_concurrent_test.go b/cmd/entire/cli/strategy/manual_commit_concurrent_test.go new file mode 100644 index 0000000000..9dc2df16bf --- /dev/null +++ b/cmd/entire/cli/strategy/manual_commit_concurrent_test.go @@ -0,0 +1,286 @@ +package strategy + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "testing" + + "github.com/entireio/cli/cmd/entire/cli/checkpoint" + "github.com/entireio/cli/cmd/entire/cli/paths" + "github.com/entireio/cli/cmd/entire/cli/testutil" + + "github.com/go-git/go-git/v6" + "github.com/go-git/go-git/v6/plumbing" + "github.com/go-git/go-git/v6/plumbing/filemode" +) + +// TestSaveStep_ConcurrentSessionsSameShadowBranch reproduces the parallel-agents +// scenario behind the Stop-hook error +// +// failed to write temporary checkpoint: failed to build tree: +// failed to apply changes in .entire: failed to read tree: object not found +// +// Multiple sessions in the same worktree on the same base commit all hash to the +// same shadow branch name. SaveStep is serialized per-session-ID via +// acquireSessionGate but there is no shadow-branch-wide lock, so the ref update +// at the end of WriteTemporary races. +// +// Each goroutine writes to a unique agent-XX.txt file with unique content per +// step, so every checkpoint produces a distinct tree hash — i.e. the dedup +// short-circuit in WriteTemporary never fires. That invariant is what lets us +// assert per-session StepCount == checkpointsPerWorker below; if a future +// change ever lands two identical checkpoints in a row, the StepCount +// assertion (not just the commit-count check) will catch it. +// +// Assertions: +// - no SaveStep returns an error +// - every session's persisted StepCount equals checkpointsPerWorker (no +// checkpoint was skipped or lost) +// - the resulting shadow branch is internally consistent: every commit +// reachable from the ref has a tree where every directory entry resolves +// (no "object not found" anywhere in the chain) +// - the shadow branch commit count equals numSessions * checkpointsPerWorker +func TestSaveStep_ConcurrentSessionsSameShadowBranch(t *testing.T) { + const ( + numSessions = 8 + checkpointsPerWorker = 4 + ) + + dir := t.TempDir() + testutil.InitRepo(t, dir) + testutil.WriteFile(t, dir, "seed.txt", "seed\n") + testutil.GitAdd(t, dir, "seed.txt") + testutil.GitCommit(t, dir, "initial commit") + + t.Chdir(dir) + paths.ClearWorktreeRootCache() + + type session struct { + id string + metadataDir string + metadataDirAbs string + file string + } + sessions := make([]session, numSessions) + for i := range sessions { + id := fmt.Sprintf("2026-05-14-concurrent-%02d", i) + md := paths.EntireMetadataDir + "/" + id + sessions[i] = session{ + id: id, + metadataDir: md, + metadataDirAbs: filepath.Join(dir, md), + file: fmt.Sprintf("agent-%02d.txt", i), + } + testutil.WriteFile(t, dir, md+"/"+paths.TranscriptFileName, + "{\"type\":\"human\",\"message\":{\"content\":\"start\"}}\n") + } + + type goroutineErr struct { + session string + step int + err error + } + errCh := make(chan goroutineErr, numSessions*(checkpointsPerWorker+1)) + start := make(chan struct{}) + + var wg sync.WaitGroup + for i := range sessions { + sess := sessions[i] + wg.Go(func() { + ctx := context.Background() + // Each goroutine owns its own strategy + repo handle, mirroring the + // production case where every hook invocation is a fresh process. + s := NewManualCommitStrategy() + + if err := s.InitializeSession(ctx, sess.id, "Claude Code", "", "", ""); err != nil { + errCh <- goroutineErr{session: sess.id, step: -1, err: fmt.Errorf("InitializeSession: %w", err)} + return + } + + // Wait for all goroutines to be ready, then start together to widen + // the race window for the SetReference contention. + <-start + + for step := range checkpointsPerWorker { + content := fmt.Sprintf("session=%s step=%d\n", sess.id, step) + if err := writeFileForRaceTest(filepath.Join(dir, sess.file), content); err != nil { + errCh <- goroutineErr{session: sess.id, step: step, err: fmt.Errorf("write worker file: %w", err)} + return + } + transcriptLine := fmt.Sprintf("{\"type\":\"assistant\",\"step\":%d}\n", step) + transcriptPath := filepath.Join(sess.metadataDirAbs, paths.TranscriptFileName) + if err := writeFileForRaceTest(transcriptPath, transcriptLine); err != nil { + errCh <- goroutineErr{session: sess.id, step: step, err: fmt.Errorf("write transcript: %w", err)} + return + } + + var modified, newFiles []string + if step == 0 { + newFiles = []string{sess.file} + } else { + modified = []string{sess.file} + } + + err := s.SaveStep(ctx, StepContext{ + SessionID: sess.id, + ModifiedFiles: modified, + NewFiles: newFiles, + MetadataDir: sess.metadataDir, + MetadataDirAbs: sess.metadataDirAbs, + CommitMessage: fmt.Sprintf("Checkpoint %d for %s", step, sess.id), + AuthorName: "Test", + AuthorEmail: "test@example.com", + }) + if err != nil { + errCh <- goroutineErr{session: sess.id, step: step, err: fmt.Errorf("SaveStep: %w", err)} + return + } + } + }) + } + + close(start) + wg.Wait() + close(errCh) + + for ge := range errCh { + t.Errorf("session %s step %d: %v", ge.session, ge.step, ge.err) + } + if t.Failed() { + return + } + + // Per-session invariant: every SaveStep call should have landed a + // checkpoint (no skips from the dedup short-circuit). StepCount is + // incremented in SaveStep only when WriteTemporary returns Skipped=false, + // so this catches a future test change that accidentally writes + // duplicate-content checkpoints — which would surface as a misleading + // "commits were lost" message in the commit-count check below. + stateStrategy := NewManualCommitStrategy() + for _, sess := range sessions { + state, err := stateStrategy.loadSessionState(context.Background(), sess.id) + if err != nil { + t.Errorf("load state for %s: %v", sess.id, err) + continue + } + if state == nil { + t.Errorf("missing state for %s", sess.id) + continue + } + if state.StepCount != checkpointsPerWorker { + t.Errorf("session %s StepCount = %d, want %d", sess.id, state.StepCount, checkpointsPerWorker) + } + } + + // Verify the shadow branch is internally consistent. + repo, err := git.PlainOpen(dir) + if err != nil { + t.Fatalf("open repo: %v", err) + } + + shadowBranches := listShadowBranches(t, repo) + if len(shadowBranches) == 0 { + t.Fatal("expected at least one shadow branch after SaveStep, found none") + } + if len(shadowBranches) > 1 { + names := make([]string, 0, len(shadowBranches)) + for _, ref := range shadowBranches { + names = append(names, ref.Name().Short()) + } + t.Fatalf("expected sessions to share a single shadow branch, got %d: %v", len(shadowBranches), names) + } + + commits := walkShadowBranchAssertConsistent(t, repo, shadowBranches[0]) + + // Commit-count check: every distinct checkpoint we issued should have + // landed on the shadow branch. See the test-level comment for why dedup + // can't quietly defeat this assertion. + expected := numSessions * checkpointsPerWorker + switch { + case commits > expected: + t.Errorf("walked %d commits, more than the %d SaveStep calls — accounting bug", commits, expected) + case commits < expected: + t.Errorf("walked %d commits but issued %d SaveStep calls — %d commits were lost", + commits, expected, expected-commits) + default: + t.Logf("walked %d commits matching %d SaveStep calls — no checkpoints lost", commits, expected) + } +} + +func listShadowBranches(t *testing.T, repo *git.Repository) []plumbing.Reference { + t.Helper() + refs, err := repo.References() + if err != nil { + t.Fatalf("list refs: %v", err) + } + var out []plumbing.Reference + err = refs.ForEach(func(ref *plumbing.Reference) error { + if ref.Name().IsBranch() && strings.HasPrefix(ref.Name().Short(), checkpoint.ShadowBranchPrefix) && + ref.Name().Short() != paths.MetadataBranchName { + out = append(out, *ref) + } + return nil + }) + if err != nil { + t.Fatalf("iterate refs: %v", err) + } + return out +} + +// walkShadowBranchAssertConsistent walks every commit reachable from the shadow +// branch ref and asserts every tree (and recursively every subtree) is in the +// object database. Returns the number of commits visited. +func walkShadowBranchAssertConsistent(t *testing.T, repo *git.Repository, ref plumbing.Reference) int { + t.Helper() + visited := make(map[plumbing.Hash]bool) + count := 0 + hash := ref.Hash() + for hash != plumbing.ZeroHash { + if visited[hash] { + t.Fatalf("shadow branch %s: cycle at commit %s", ref.Name().Short(), hash) + } + visited[hash] = true + count++ + + commit, err := repo.CommitObject(hash) + if err != nil { + t.Fatalf("shadow branch %s: commit %s unreadable: %v", ref.Name().Short(), hash, err) + } + walkTreeAssertConsistent(t, repo, commit.TreeHash, "/") + + if len(commit.ParentHashes) == 0 { + break + } + hash = commit.ParentHashes[0] + } + return count +} + +func walkTreeAssertConsistent(t *testing.T, repo *git.Repository, hash plumbing.Hash, path string) { + t.Helper() + tree, err := repo.TreeObject(hash) + if err != nil { + t.Fatalf("tree %s at %s unreadable: %v", hash, path, err) + } + for _, entry := range tree.Entries { + if entry.Mode == filemode.Dir { + walkTreeAssertConsistent(t, repo, entry.Hash, path+entry.Name+"/") + } + } +} + +// writeFileForRaceTest is a goroutine-safe alternative to testutil.WriteFile, +// scoped to this test file. testutil.WriteFile calls t.Fatalf, which doesn't +// fail the test cleanly from a sub-goroutine. Name kept long and specific so +// it can't accidentally shadow a more general helper added to the package +// later. +func writeFileForRaceTest(absPath, content string) error { + if err := os.MkdirAll(filepath.Dir(absPath), 0o755); err != nil { + return fmt.Errorf("mkdir: %w", err) + } + return os.WriteFile(absPath, []byte(content), 0o644) +} diff --git a/cmd/entire/cli/strategy/session_state.go b/cmd/entire/cli/strategy/session_state.go index 9afd40a5a8..1d40313b1e 100644 --- a/cmd/entire/cli/strategy/session_state.go +++ b/cmd/entire/cli/strategy/session_state.go @@ -15,6 +15,7 @@ import ( "github.com/entireio/cli/cmd/entire/cli/agent" "github.com/entireio/cli/cmd/entire/cli/agent/types" + "github.com/entireio/cli/cmd/entire/cli/internal/flock" "github.com/entireio/cli/cmd/entire/cli/logging" "github.com/entireio/cli/cmd/entire/cli/paths" "github.com/entireio/cli/cmd/entire/cli/session" @@ -509,7 +510,7 @@ func acquireSessionGate(ctx context.Context, sessionID string) (gate *sessionGat if err != nil { return nil, false, nil, fmt.Errorf("resolve state lock path: %w", err) } - flockRel, err := acquireStateFileLock(lockPath) + flockRel, err := flock.Acquire(lockPath) if err != nil { return nil, false, nil, fmt.Errorf("acquire state lock: %w", err) } diff --git a/cmd/entire/cli/strategy/state_lock_unix.go b/cmd/entire/cli/strategy/state_lock_unix.go deleted file mode 100644 index 0b88715442..0000000000 --- a/cmd/entire/cli/strategy/state_lock_unix.go +++ /dev/null @@ -1,25 +0,0 @@ -//go:build unix - -package strategy - -import ( - "fmt" - "os" - "syscall" -) - -// acquireStateFileLock takes an exclusive POSIX advisory lock on path. The -// returned release closes the file (which drops the flock). Callers must call -// release exactly once. The lock file persists between runs — that's fine, -// flock state is held by the file descriptor, not the inode on disk. -func acquireStateFileLock(path string) (release func(), err error) { - f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o600) //nolint:gosec // path built from validated session ID - if err != nil { - return nil, fmt.Errorf("open state lock: %w", err) - } - if err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil { //nolint:gosec // file descriptors are non-negative; standard Go pattern for syscall.Flock - _ = f.Close() - return nil, fmt.Errorf("flock state lock: %w", err) - } - return func() { _ = f.Close() }, nil -}