diff --git a/cmd/entire/cli/checkpoint/v2_generation.go b/cmd/entire/cli/checkpoint/v2_generation.go index 0363609ddc..c161d6a56c 100644 --- a/cmd/entire/cli/checkpoint/v2_generation.go +++ b/cmd/entire/cli/checkpoint/v2_generation.go @@ -506,7 +506,7 @@ func (s *V2GitStore) RotateCurrentGenerationIfNeeded(ctx context.Context, maxChe return "", false, fmt.Errorf("rotation: failed to determine next generation number: %w", err) } - // Phase 1: Archive — create ref pointing to the current commit. + // Phase 1: Prepare archive and reset commits without changing refs yet. // If the archive ref already exists, another instance already rotated — skip. archiveRefName := ArchivedGenerationRefName(archiveNumber) if _, refErr := s.repo.Reference(archiveRefName, true); refErr == nil { @@ -515,22 +515,6 @@ func (s *V2GitStore) RotateCurrentGenerationIfNeeded(ctx context.Context, maxChe ) return archiveRefName, false, nil } - archiveRef := plumbing.NewHashReference(archiveRefName, currentRef.Hash()) - if err := s.repo.Storer.SetReference(archiveRef); err != nil { - return "", false, fmt.Errorf("rotation: failed to create archived ref %s: %w", archiveRefName, err) - } - - // Verify /full/current hasn't been advanced by another writer since we read it. - // If it changed, abort — the archive ref is harmless (points to a valid commit) - // and the next writer will trigger rotation again. - postArchiveRef, err := s.repo.Reference(refName, true) - if err != nil { - return "", false, fmt.Errorf("rotation: failed to re-read /full/current: %w", err) - } - if postArchiveRef.Hash() != currentRef.Hash() { - logging.Info(ctx, "rotation: /full/current changed during rotation, aborting reset") - return archiveRefName, false, nil - } // Write generation.json to the current tree before archiving. gen := s.computeGenerationTimestamps(currentTreeHash) @@ -545,13 +529,7 @@ func (s *V2GitStore) RotateCurrentGenerationIfNeeded(ctx context.Context, maxChe return "", false, fmt.Errorf("rotation: failed to create archive commit: %w", err) } - // Update the archive ref to point to the commit with generation.json - archiveRef = plumbing.NewHashReference(archiveRefName, archiveCommitHash) - if err := s.repo.Storer.SetReference(archiveRef); err != nil { - return "", false, fmt.Errorf("rotation: failed to update archived ref %s: %w", archiveRefName, err) - } - - // Phase 2: Create fresh orphan /full/current (empty tree, no generation.json) + // Create fresh orphan /full/current (empty tree, no generation.json). emptyTreeHash, err := BuildTreeFromEntries(ctx, s.repo, make(map[string]object.TreeEntry)) if err != nil { return "", false, fmt.Errorf("rotation: failed to build empty tree: %w", err) @@ -562,6 +540,59 @@ func (s *V2GitStore) RotateCurrentGenerationIfNeeded(ctx context.Context, maxChe return "", false, fmt.Errorf("rotation: failed to create orphan commit: %w", err) } + // Verify /full/current hasn't been advanced by another writer since we read it. + // If it changed, abort before recording a publication marker. + postArchiveRef, err := s.repo.Reference(refName, true) + if err != nil { + return "", false, fmt.Errorf("rotation: failed to re-read /full/current: %w", err) + } + if postArchiveRef.Hash() != currentRef.Hash() { + logging.Info(ctx, "rotation: /full/current changed during rotation, aborting reset") + return archiveRefName, false, nil + } + + publication := PendingV2FullGenerationPublication{ + ArchiveRefName: archiveRefName.String(), + ArchiveCommitHash: archiveCommitHash.String(), + PreviousFullCurrentHash: currentRef.Hash().String(), + ResetFullCurrentRootHash: orphanCommitHash.String(), + QueuedAt: time.Now().UTC(), + } + + // The commit objects above are not reachable from the v2 refs yet. Record + // the pending publication before moving refs so pre-push can recover the + // intended rotation if the process stops after either ref update below. + if err := s.AppendPendingFullGenerationPublication(ctx, publication); err != nil { + return "", false, fmt.Errorf("rotation: failed to record pending full rotation: %w", err) + } + keepPendingPublication := false + defer func() { + if keepPendingPublication { + return + } + if removeErr := s.RemovePendingFullGenerationPublications(ctx, []PendingV2FullGenerationPublication{publication}); removeErr != nil { + logging.Warn(ctx, "rotation: failed to remove pending full rotation after failed rotation", + slog.String("error", removeErr.Error()), + slog.String("archive_ref", string(archiveRefName)), + slog.String("previous_full_current_hash", currentRef.Hash().String()), + slog.String("archive_commit_hash", archiveCommitHash.String()), + slog.String("reset_full_current_root_hash", orphanCommitHash.String()), + ) + } + }() + + // Phase 2: publish local refs after the pending publication marker exists. + if _, refErr := s.repo.Reference(archiveRefName, true); refErr == nil { + logging.Info(ctx, "rotation: archive ref already exists, skipping", + slog.String("archive_ref", string(archiveRefName)), + ) + return archiveRefName, false, nil + } + archiveRef := plumbing.NewHashReference(archiveRefName, archiveCommitHash) + if err := s.repo.Storer.SetReference(archiveRef); err != nil { + return "", false, fmt.Errorf("rotation: failed to update archived ref %s: %w", archiveRefName, err) + } + reset, err := s.resetFullCurrentRefIfUnchanged(ctx, refName, postArchiveRef, orphanCommitHash) if err != nil { return "", false, err @@ -570,6 +601,7 @@ func (s *V2GitStore) RotateCurrentGenerationIfNeeded(ctx context.Context, maxChe return archiveRefName, false, nil } + keepPendingPublication = true logging.Info(ctx, "generation rotation complete", slog.Int("archived_generation", archiveNumber), slog.String("archive_ref", string(archiveRefName)), diff --git a/cmd/entire/cli/checkpoint/v2_generation_test.go b/cmd/entire/cli/checkpoint/v2_generation_test.go index 319247cf8d..cd232bf1c6 100644 --- a/cmd/entire/cli/checkpoint/v2_generation_test.go +++ b/cmd/entire/cli/checkpoint/v2_generation_test.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "os" + "path/filepath" "testing" "time" @@ -447,6 +449,65 @@ func TestRotateGeneration_ArchivesCurrentAndCreatesNewOrphan(t *testing.T) { assert.Empty(t, freshTree.Entries, "fresh tree should be empty (no generation.json)") } +func TestRotateGeneration_FailsBeforeResetWhenPendingMarkerCannotBeRecorded(t *testing.T) { + t.Parallel() + repo := initTestRepo(t) + store := NewV2GitStore(repo, "origin") + ctx := context.Background() + + populateFullCurrent(t, store, 3, 0) + + worktree, err := repo.Worktree() + require.NoError(t, err) + blockingPath := filepath.Join(worktree.Filesystem().Root(), ".git", pendingV2FullGenerationPublicationDirName) + require.NoError(t, os.WriteFile(blockingPath, []byte("not a directory"), 0o600)) + + refName, rotated, err := store.RotateCurrentGenerationIfNeeded(ctx, 3) + require.Error(t, err) + require.False(t, rotated) + require.Empty(t, refName) + assert.Contains(t, err.Error(), "failed to record pending full rotation") + + _, currentTreeHash, err := store.GetRefState(plumbing.ReferenceName(paths.V2FullCurrentRefName)) + require.NoError(t, err) + currentCount, err := store.CountCheckpointsInTree(currentTreeHash) + require.NoError(t, err) + assert.Equal(t, 3, currentCount) + + _, _, err = store.GetRefState(ArchivedGenerationRefName(1)) + require.Error(t, err) +} + +func TestRemovePendingFullGenerationPublications_PreservesLaterQueuedEntries(t *testing.T) { + t.Parallel() + repo := initTestRepo(t) + store := NewV2GitStore(repo, "origin") + ctx := context.Background() + + first := PendingV2FullGenerationPublication{ + ArchiveRefName: paths.V2FullRefPrefix + "0000000000001", + ArchiveCommitHash: "1111111111111111111111111111111111111111", + QueuedAt: time.Date(2026, 3, 19, 1, 2, 3, 0, time.UTC), + } + later := PendingV2FullGenerationPublication{ + ArchiveRefName: paths.V2FullRefPrefix + "0000000000002", + ArchiveCommitHash: "2222222222222222222222222222222222222222", + QueuedAt: time.Date(2026, 3, 19, 4, 5, 6, 0, time.UTC), + } + + require.NoError(t, store.AppendPendingFullGenerationPublication(ctx, first)) + snapshot, err := store.ReadPendingFullGenerationPublications(ctx) + require.NoError(t, err) + require.Equal(t, []PendingV2FullGenerationPublication{first}, snapshot) + + require.NoError(t, store.AppendPendingFullGenerationPublication(ctx, later)) + require.NoError(t, store.RemovePendingFullGenerationPublications(ctx, snapshot)) + + remaining, err := store.ReadPendingFullGenerationPublications(ctx) + require.NoError(t, err) + assert.Equal(t, []PendingV2FullGenerationPublication{later}, remaining) +} + func TestResetFullCurrentRefIfUnchangedRejectsConcurrentChange(t *testing.T) { t.Parallel() repo := initTestRepo(t) diff --git a/cmd/entire/cli/checkpoint/v2_pending_rotation.go b/cmd/entire/cli/checkpoint/v2_pending_rotation.go new file mode 100644 index 0000000000..13ef8fc50a --- /dev/null +++ b/cmd/entire/cli/checkpoint/v2_pending_rotation.go @@ -0,0 +1,248 @@ +package checkpoint + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/entireio/cli/cmd/entire/cli/jsonutil" + "github.com/entireio/cli/cmd/entire/cli/lockfile" + "github.com/go-git/go-git/v6" +) + +const ( + pendingV2FullGenerationPublicationVersion = 1 + pendingV2FullGenerationPublicationDirName = "entire-v2-rotations" + pendingV2FullGenerationPublicationFile = "pending.json" + pendingV2FullGenerationPublicationLock = "pending.lock" + pendingV2FullGenerationPublicationLockTTL = 5 * time.Second +) + +type PendingV2FullGenerationPublication struct { + ArchiveRefName string `json:"archive_ref_name"` + ArchiveCommitHash string `json:"archive_commit_hash"` + // PreviousFullCurrentHash and ResetFullCurrentRootHash are set when the + // archive publication came from a local /full/current rotation. + PreviousFullCurrentHash string `json:"previous_full_current_hash,omitempty"` + ResetFullCurrentRootHash string `json:"reset_full_current_root_hash,omitempty"` + QueuedAt time.Time `json:"queued_at"` +} + +type pendingV2FullGenerationPublicationState struct { + Version int `json:"version"` + Publications []PendingV2FullGenerationPublication `json:"publications"` +} + +func (s *V2GitStore) AppendPendingFullGenerationPublication(ctx context.Context, publication PendingV2FullGenerationPublication) error { + return s.AppendPendingFullGenerationPublications(ctx, []PendingV2FullGenerationPublication{publication}) +} + +func (s *V2GitStore) AppendPendingFullGenerationPublications(ctx context.Context, publications []PendingV2FullGenerationPublication) error { + if len(publications) == 0 { + return nil + } + return s.withPendingFullGenerationPublicationLock(ctx, func() error { + state, err := s.readPendingFullGenerationPublicationState(ctx) + if err != nil { + return err + } + state.Version = pendingV2FullGenerationPublicationVersion + state.Publications = append(state.Publications, publications...) + return s.writePendingFullGenerationPublicationState(ctx, state) + }) +} + +func (s *V2GitStore) ReadPendingFullGenerationPublications(ctx context.Context) ([]PendingV2FullGenerationPublication, error) { + state, err := s.readPendingFullGenerationPublicationState(ctx) + if err != nil { + return nil, err + } + return state.Publications, nil +} + +func (s *V2GitStore) RemovePendingFullGenerationPublications(ctx context.Context, publications []PendingV2FullGenerationPublication) error { + if len(publications) == 0 { + return nil + } + return s.withPendingFullGenerationPublicationLock(ctx, func() error { + state, err := s.readPendingFullGenerationPublicationState(ctx) + if err != nil { + return err + } + previousCount := len(state.Publications) + state.Publications = removePendingFullGenerationPublications(state.Publications, publications) + if len(state.Publications) == previousCount { + return nil + } + if len(state.Publications) == 0 { + return s.removePendingFullGenerationPublicationFile(ctx) + } + state.Version = pendingV2FullGenerationPublicationVersion + return s.writePendingFullGenerationPublicationState(ctx, state) + }) +} + +func removePendingFullGenerationPublications(current, remove []PendingV2FullGenerationPublication) []PendingV2FullGenerationPublication { + removeCounts := make(map[PendingV2FullGenerationPublication]int, len(remove)) + for _, publication := range remove { + removeCounts[comparablePendingFullGenerationPublication(publication)]++ + } + + remaining := make([]PendingV2FullGenerationPublication, 0, len(current)) + for _, publication := range current { + key := comparablePendingFullGenerationPublication(publication) + if removeCounts[key] > 0 { + removeCounts[key]-- + continue + } + remaining = append(remaining, publication) + } + return remaining +} + +func comparablePendingFullGenerationPublication(publication PendingV2FullGenerationPublication) PendingV2FullGenerationPublication { + publication.QueuedAt = publication.QueuedAt.Round(0).UTC() + return publication +} + +func (s *V2GitStore) removePendingFullGenerationPublicationFile(ctx context.Context) error { + path, err := s.pendingFullGenerationPublicationFilePath(ctx) + if err != nil { + return err + } + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("remove pending v2 full generation publications: %w", err) + } + return nil +} + +func (s *V2GitStore) withPendingFullGenerationPublicationLock(ctx context.Context, fn func() error) (err error) { + lockPath, err := s.pendingFullGenerationPublicationLockPath(ctx) + if err != nil { + return err + } + if err := os.MkdirAll(filepath.Dir(lockPath), 0o750); err != nil { + return fmt.Errorf("create pending v2 full generation publication lock dir: %w", err) + } + if err := lockfile.WithTimeout(ctx, lockPath, pendingV2FullGenerationPublicationLockTTL, fn); err != nil { + return fmt.Errorf("pending v2 full generation publication lock: %w", err) + } + return nil +} + +func (s *V2GitStore) readPendingFullGenerationPublicationState(ctx context.Context) (pendingV2FullGenerationPublicationState, error) { + path, err := s.pendingFullGenerationPublicationFilePath(ctx) + if err != nil { + return pendingV2FullGenerationPublicationState{}, err + } + data, err := os.ReadFile(path) //nolint:gosec // path is under git common dir + if os.IsNotExist(err) { + return pendingV2FullGenerationPublicationState{Version: pendingV2FullGenerationPublicationVersion}, nil + } + if err != nil { + return pendingV2FullGenerationPublicationState{}, fmt.Errorf("read pending v2 full generation publications: %w", err) + } + + var state pendingV2FullGenerationPublicationState + if err := json.Unmarshal(data, &state); err != nil { + return pendingV2FullGenerationPublicationState{}, fmt.Errorf("parse pending v2 full generation publications: %w", err) + } + if state.Version != pendingV2FullGenerationPublicationVersion { + return pendingV2FullGenerationPublicationState{}, fmt.Errorf("unsupported pending v2 full generation publication version %d", state.Version) + } + return state, nil +} + +func (s *V2GitStore) writePendingFullGenerationPublicationState(ctx context.Context, state pendingV2FullGenerationPublicationState) error { + path, err := s.pendingFullGenerationPublicationFilePath(ctx) + if err != nil { + return err + } + if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil { + return fmt.Errorf("create pending v2 full generation publication dir: %w", err) + } + + data, err := jsonutil.MarshalIndentWithNewline(state, "", " ") + if err != nil { + return fmt.Errorf("marshal pending v2 full generation publications: %w", err) + } + + tmpFile, err := os.CreateTemp(filepath.Dir(path), pendingV2FullGenerationPublicationFile+".*.tmp") + if err != nil { + return fmt.Errorf("create pending v2 full generation publication temp file: %w", err) + } + tmpName := tmpFile.Name() + removeTmp := true + defer func() { + if removeTmp { + _ = os.Remove(tmpName) + } + }() + + if _, err := tmpFile.Write(data); err != nil { + _ = tmpFile.Close() + return fmt.Errorf("write pending v2 full generation publications: %w", err) + } + if err := tmpFile.Close(); err != nil { + return fmt.Errorf("close pending v2 full generation publications: %w", err) + } + if err := os.Rename(tmpName, path); err != nil { + return fmt.Errorf("replace pending v2 full generation publications: %w", err) + } + removeTmp = false + return nil +} + +func (s *V2GitStore) pendingFullGenerationPublicationFilePath(ctx context.Context) (string, error) { + commonDir, err := s.gitCommonDir(ctx) + if err != nil { + return "", err + } + return filepath.Join(commonDir, pendingV2FullGenerationPublicationDirName, pendingV2FullGenerationPublicationFile), nil +} + +func (s *V2GitStore) pendingFullGenerationPublicationLockPath(ctx context.Context) (string, error) { + commonDir, err := s.gitCommonDir(ctx) + if err != nil { + return "", err + } + return filepath.Join(commonDir, pendingV2FullGenerationPublicationDirName, pendingV2FullGenerationPublicationLock), nil +} + +func (s *V2GitStore) gitCommonDir(ctx context.Context) (string, error) { + s.commonDirOnce.Do(func() { + s.commonDir, s.commonDirErr = resolveGitCommonDir(ctx, s.repo) + }) + return s.commonDir, s.commonDirErr +} + +func resolveGitCommonDir(ctx context.Context, repo *git.Repository) (string, error) { + worktree, err := repo.Worktree() + if err != nil { + return "", fmt.Errorf("open worktree for pending v2 full generation publications: %w", err) + } + root := worktree.Filesystem().Root() + if root == "" { + return "", errors.New("resolve worktree root for pending v2 full generation publications") + } + + cmd := exec.CommandContext(ctx, "git", "-C", root, "rev-parse", "--git-common-dir") + output, err := cmd.Output() + if err != nil { + return "", fmt.Errorf("resolve git common dir for pending v2 full generation publications: %w", err) + } + commonDir := strings.TrimSpace(string(output)) + if commonDir == "" { + return "", errors.New("resolve git common dir for pending v2 full generation publications: empty output") + } + if !filepath.IsAbs(commonDir) { + commonDir = filepath.Join(root, commonDir) + } + return filepath.Clean(commonDir), nil +} diff --git a/cmd/entire/cli/checkpoint/v2_read.go b/cmd/entire/cli/checkpoint/v2_read.go index 1ee9d13a4b..33a6a1ff17 100644 --- a/cmd/entire/cli/checkpoint/v2_read.go +++ b/cmd/entire/cli/checkpoint/v2_read.go @@ -421,7 +421,7 @@ func (s *V2GitStore) fetchRemoteFullRefs(ctx context.Context) error { } var refSpecs []string - for _, line := range strings.Split(strings.TrimSpace(string(output)), "\n") { + for line := range strings.SplitSeq(strings.TrimSpace(string(output)), "\n") { if line == "" { continue } diff --git a/cmd/entire/cli/checkpoint/v2_store.go b/cmd/entire/cli/checkpoint/v2_store.go index beec88b561..1cc3ef0ea2 100644 --- a/cmd/entire/cli/checkpoint/v2_store.go +++ b/cmd/entire/cli/checkpoint/v2_store.go @@ -3,6 +3,7 @@ package checkpoint import ( "context" "fmt" + "sync" "github.com/go-git/go-git/v6" "github.com/go-git/go-git/v6/plumbing" @@ -36,6 +37,10 @@ type V2GitStore struct { // cat-file fallback covers partial-clone-filtered blobs that go-git's // storer can't see). blobFetcher BlobFetchFunc + + commonDirOnce sync.Once + commonDir string + commonDirErr error } // maxCheckpoints returns the effective rotation threshold. diff --git a/cmd/entire/cli/lockfile/lockfile.go b/cmd/entire/cli/lockfile/lockfile.go index 71ad3a13a3..86da0f326a 100644 --- a/cmd/entire/cli/lockfile/lockfile.go +++ b/cmd/entire/cli/lockfile/lockfile.go @@ -3,11 +3,13 @@ package lockfile import ( + "context" "errors" "fmt" "os" "strconv" "strings" + "time" ) // ErrLocked is returned by Acquire when another process holds the lock. @@ -41,6 +43,45 @@ func Acquire(path string) (*Lock, error) { return &Lock{f: f}, nil } +// WithTimeout acquires path, runs fn while holding the lock, and releases it. +// If another process holds the lock, it retries until ctx is canceled or timeout +// elapses. The returned lock error wraps ErrLocked on timeout. +func WithTimeout(ctx context.Context, path string, timeout time.Duration, fn func() error) (err error) { + deadline := time.NewTimer(timeout) + defer deadline.Stop() + ticker := time.NewTicker(25 * time.Millisecond) + defer ticker.Stop() + + var lk *Lock + for { + lk, err = Acquire(path) + if err == nil { + break + } + if !errors.Is(err, ErrLocked) { + return err + } + + select { + case <-ctx.Done(): + return fmt.Errorf("acquire lock %s: %w", path, ctx.Err()) + case <-deadline.C: + if holder := ReadHolderPID(path); holder > 0 { + return fmt.Errorf("lock %s held by PID %d: %w", path, holder, ErrLocked) + } + return fmt.Errorf("lock %s held: %w", path, ErrLocked) + case <-ticker.C: + } + } + defer func() { + if releaseErr := lk.Release(); err == nil && releaseErr != nil { + err = releaseErr + } + }() + + return fn() +} + // Release releases the OS lock and closes the file. Idempotent. func (l *Lock) Release() error { if l == nil || l.f == nil { diff --git a/cmd/entire/cli/lockfile/lockfile_test.go b/cmd/entire/cli/lockfile/lockfile_test.go index 8509a83bee..3464316e6b 100644 --- a/cmd/entire/cli/lockfile/lockfile_test.go +++ b/cmd/entire/cli/lockfile/lockfile_test.go @@ -2,11 +2,13 @@ package lockfile_test import ( "bytes" + "context" "os" "path/filepath" "runtime" "strconv" "testing" + "time" "github.com/entireio/cli/cmd/entire/cli/lockfile" "github.com/stretchr/testify/assert" @@ -65,6 +67,36 @@ func TestRelease_PermitsReacquire(t *testing.T) { t.Cleanup(func() { _ = lk2.Release() }) //nolint:errcheck // test cleanup } +func TestWithTimeout_RunsCallback(t *testing.T) { + t.Parallel() + path := filepath.Join(t.TempDir(), "test.lock") + called := false + + err := lockfile.WithTimeout(context.Background(), path, time.Second, func() error { + called = true + assert.Equal(t, os.Getpid(), lockfile.ReadHolderPID(path)) + return nil + }) + + require.NoError(t, err) + assert.True(t, called) +} + +func TestWithTimeout_ReturnsErrLockedOnTimeout(t *testing.T) { + t.Parallel() + path := filepath.Join(t.TempDir(), "test.lock") + lk, err := lockfile.Acquire(path) + require.NoError(t, err) + t.Cleanup(func() { _ = lk.Release() }) //nolint:errcheck // test cleanup + + err = lockfile.WithTimeout(context.Background(), path, time.Nanosecond, func() error { + t.Fatal("callback should not run") + return nil + }) + + require.ErrorIs(t, err, lockfile.ErrLocked) +} + func TestReadHolderPID_EmptyFile(t *testing.T) { t.Parallel() path := filepath.Join(t.TempDir(), "test.lock") diff --git a/cmd/entire/cli/migrate.go b/cmd/entire/cli/migrate.go index 1cdb188425..bb9e1a9acf 100644 --- a/cmd/entire/cli/migrate.go +++ b/cmd/entire/cli/migrate.go @@ -14,6 +14,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/entireio/cli/cmd/entire/cli/agent" agenttypes "github.com/entireio/cli/cmd/entire/cli/agent/types" @@ -443,9 +444,19 @@ func (s *migrateLoopState) packCurrentBatch(ctx context.Context, repo *git.Repos s.nextGeneration = next } refName := checkpoint.ArchivedGenerationRefName(s.nextGeneration) - if err := writeMigratedFullGeneration(ctx, repo, refName, s.pendingFull); err != nil { + archiveCommitHash, err := writeMigratedFullGeneration(ctx, repo, v2Store, refName, s.pendingFull) + if err != nil { return fmt.Errorf("failed to pack migrated raw transcripts: %w", err) } + if queueErr := queueMigratedFullGenerationPublication(ctx, v2Store, refName, archiveCommitHash); queueErr != nil { + // The archive ref and pending publication record must move together. + // If queueing fails, leave no archive ref behind so a retry can + // repack the generation and queue it again. + if removeErr := repo.Storer.RemoveReference(refName); removeErr != nil { + queueErr = fmt.Errorf("%w; failed to remove unqueued generation ref %s: %w", queueErr, refName, removeErr) + } + return fmt.Errorf("failed to queue migrated raw transcript generation for push: %w", queueErr) + } s.writtenRefs = append(s.writtenRefs, refName) s.nextGeneration++ s.pendingFull = make([]migratedFullCheckpoint, 0, batchSize) @@ -791,17 +802,17 @@ func repoWorktreeRoot(repo *git.Repository) (string, error) { return root, nil } -func writeMigratedFullGeneration(ctx context.Context, repo *git.Repository, refName plumbing.ReferenceName, checkpoints []migratedFullCheckpoint) error { +func writeMigratedFullGeneration(ctx context.Context, repo *git.Repository, v2Store *checkpoint.V2GitStore, refName plumbing.ReferenceName, checkpoints []migratedFullCheckpoint) (plumbing.Hash, error) { fullEntries, err := buildMigratedFullEntrySet(ctx, repo, checkpoints) if err != nil { - return fmt.Errorf("write migrated generation entries: %w", err) + return plumbing.ZeroHash, fmt.Errorf("write migrated generation entries: %w", err) } entries := make(map[string]object.TreeEntry, len(fullEntries.rawEntries)+len(fullEntries.taskEntries)) fullEntries.mergeInto(entries) treeHash, err := checkpoint.BuildTreeFromEntries(ctx, repo, entries) if err != nil { - return fmt.Errorf("build migrated generation tree: %w", err) + return plumbing.ZeroHash, fmt.Errorf("build migrated generation tree: %w", err) } gen, found := generationMetadataFromMigratedSessions(checkpoints) @@ -809,32 +820,41 @@ func writeMigratedFullGeneration(ctx context.Context, repo *git.Repository, refN gen, found = checkpoint.AggregateTranscriptTimestamps(migratedTranscripts(checkpoints)) } if !found { - v2Store := checkpoint.NewV2GitStore(repo, migrateRemoteName) var err error gen, found, err = v2Store.ComputeGenerationCheckpointTimestamps(treeHash) if err != nil { - return fmt.Errorf("compute checkpoint timestamps: %w", err) + return plumbing.ZeroHash, fmt.Errorf("compute checkpoint timestamps: %w", err) } } if !found { - return fmt.Errorf("no timestamps found for migrated generation %s", refName) + return plumbing.ZeroHash, fmt.Errorf("no timestamps found for migrated generation %s", refName) } - v2Store := checkpoint.NewV2GitStore(repo, migrateRemoteName) treeHash, err = v2Store.AddGenerationJSONToTree(treeHash, gen) if err != nil { - return fmt.Errorf("add generation metadata: %w", err) + return plumbing.ZeroHash, fmt.Errorf("add generation metadata: %w", err) } commitHash, err := checkpoint.CreateCommit(ctx, repo, treeHash, plumbing.ZeroHash, fmt.Sprintf("Archive migrated generation: %s\n", refName), migrateAuthorName, migrateAuthorEmail) if err != nil { - return fmt.Errorf("create migrated generation commit: %w", err) + return plumbing.ZeroHash, fmt.Errorf("create migrated generation commit: %w", err) } if err := repo.Storer.SetReference(plumbing.NewHashReference(refName, commitHash)); err != nil { - return fmt.Errorf("update migrated generation ref %s: %w", refName, err) + return plumbing.ZeroHash, fmt.Errorf("update migrated generation ref %s: %w", refName, err) + } + return commitHash, nil +} + +func queueMigratedFullGenerationPublication(ctx context.Context, v2Store *checkpoint.V2GitStore, refName plumbing.ReferenceName, archiveCommitHash plumbing.Hash) error { + if err := v2Store.AppendPendingFullGenerationPublication(ctx, checkpoint.PendingV2FullGenerationPublication{ + ArchiveRefName: refName.String(), + ArchiveCommitHash: archiveCommitHash.String(), + QueuedAt: time.Now().UTC(), + }); err != nil { + return fmt.Errorf("append pending archive publication for %s: %w", refName, err) } return nil } diff --git a/cmd/entire/cli/migrate_test.go b/cmd/entire/cli/migrate_test.go index 5d166c6ae9..eaa56e74dc 100644 --- a/cmd/entire/cli/migrate_test.go +++ b/cmd/entire/cli/migrate_test.go @@ -401,6 +401,12 @@ func TestMigrateCheckpointsV2_PacksFullGenerationsOldestFirst(t *testing.T) { require.NoError(t, err) require.Equal(t, []string{"0000000000001", "0000000000002"}, archived) + pendingPublications, err := v2Store.ReadPendingFullGenerationPublications(ctx) + require.NoError(t, err) + require.Len(t, pendingPublications, 2) + assert.Equal(t, paths.V2FullRefPrefix+"0000000000001", pendingPublications[0].ArchiveRefName) + assert.Equal(t, paths.V2FullRefPrefix+"0000000000002", pendingPublications[1].ArchiveRefName) + expectedBatches := [][]int{ {0, 1}, {2, 3}, @@ -438,6 +444,43 @@ func TestMigrateCheckpointsV2_PacksFullGenerationsOldestFirst(t *testing.T) { require.NoError(t, err, "/full/current should contain final partial checkpoint") } +func TestMigrateCheckpointsV2_RollsBackArchiveWhenPublicationQueueFails(t *testing.T) { + oldMax := migrateMaxCheckpointsPerGeneration + migrateMaxCheckpointsPerGeneration = 2 + t.Cleanup(func() { + migrateMaxCheckpointsPerGeneration = oldMax + }) + + repo := initMigrateTestRepo(t) + v1Store, v2Store := newMigrateStores(repo) + ctx := context.Background() + + for i, cpID := range []id.CheckpointID{ + id.MustCheckpointID("000000000301"), + id.MustCheckpointID("000000000302"), + } { + writeV1Checkpoint(t, v1Store, cpID, "session-queue-fail-"+strconv.Itoa(i), + []byte(`{"type":"assistant","message":"checkpoint `+strconv.Itoa(i)+`"}`+"\n"), + []string{"prompt " + strconv.Itoa(i)}) + } + + worktree, err := repo.Worktree() + require.NoError(t, err) + blockingPath := filepath.Join(worktree.Filesystem().Root(), ".git", "entire-v2-rotations") + require.NoError(t, os.WriteFile(blockingPath, []byte("not a directory"), 0o600)) + + var stdout bytes.Buffer + result, writtenRefs, err := migrateCheckpointsV2(ctx, repo, v1Store, v2Store, &stdout, false) + require.Error(t, err) + require.ErrorContains(t, err, "failed to queue migrated raw transcript generation for push") + assert.Empty(t, writtenRefs) + assert.NotNil(t, result) + + archived, listErr := v2Store.ListArchivedGenerations() + require.NoError(t, listErr) + assert.Empty(t, archived, "failed queueing must not leave an unqueued archive ref behind") +} + func TestUpdateV2FullCurrentRefRejectsConcurrentChange(t *testing.T) { t.Parallel() repo := initMigrateTestRepo(t) diff --git a/cmd/entire/cli/strategy/cleanup.go b/cmd/entire/cli/strategy/cleanup.go index 6ecfc853ea..c63215c319 100644 --- a/cmd/entire/cli/strategy/cleanup.go +++ b/cmd/entire/cli/strategy/cleanup.go @@ -507,7 +507,7 @@ func listRemoteArchivedV2GenerationRefs(ctx context.Context, target string) (map } refs := make(map[string]string) - for _, line := range strings.Split(strings.TrimSpace(string(output)), "\n") { + for line := range strings.SplitSeq(strings.TrimSpace(string(output)), "\n") { if line == "" { continue } diff --git a/cmd/entire/cli/strategy/push_v2.go b/cmd/entire/cli/strategy/push_v2.go index 88daaff0b1..9e7f53c90b 100644 --- a/cmd/entire/cli/strategy/push_v2.go +++ b/cmd/entire/cli/strategy/push_v2.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "os" + "slices" "sort" "strings" "time" @@ -25,7 +26,7 @@ import ( ) // tryPushRef attempts to push a custom ref using an explicit refspec. -func tryPushRef(ctx context.Context, target string, refName plumbing.ReferenceName) (pushResult, error) { +func tryPushRef(ctx context.Context, target string, refName plumbing.ReferenceName) error { ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() @@ -33,10 +34,10 @@ func tryPushRef(ctx context.Context, target string, refName plumbing.ReferenceNa result, err := remote.Push(ctx, target, refSpec) outputStr := result.Output if err != nil { - return pushResult{}, classifyPushFailure(ctx, outputStr, err) + return classifyPushFailure(ctx, outputStr, err) } - return parsePushResult(outputStr), nil + return nil } type v2RefPushResult struct { @@ -45,7 +46,16 @@ type v2RefPushResult struct { err error } +type pendingV2FullGenerationPublicationResult struct { + successfulRefs []plumbing.ReferenceName + fullCurrentResetHandled bool +} + func tryPushV2Refs(ctx context.Context, target string, refs []plumbing.ReferenceName) []v2RefPushResult { + if len(refs) == 0 { + return nil + } + ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() @@ -104,6 +114,180 @@ func pushV2RefsWithRecovery(ctx context.Context, target string, refs []plumbing. return results } +func publishPendingV2FullGenerationPublications( + ctx context.Context, + repo *git.Repository, + store *checkpoint.V2GitStore, + target string, + publications []checkpoint.PendingV2FullGenerationPublication, +) (pendingV2FullGenerationPublicationResult, error) { + var result pendingV2FullGenerationPublicationResult + if len(publications) == 0 { + return result, nil + } + + currentRefName := plumbing.ReferenceName(paths.V2FullCurrentRefName) + var localCurrentRef *plumbing.Reference + for { + latestResetPublicationIndex := latestPendingFullCurrentResetPublicationIndex(publications) + if latestResetPublicationIndex == -1 { + break + } + var err error + localCurrentRef, err = repo.Reference(currentRefName, true) + if err != nil { + return result, fmt.Errorf("read local %s: %w", shortRefName(currentRefName), err) + } + latestResetPublication := publications[latestResetPublicationIndex] + if pendingResetPublicationMatchesLocalCurrent(ctx, repo, latestResetPublication, localCurrentRef.Hash()) { + break + } + if err := store.RemovePendingFullGenerationPublications(ctx, []checkpoint.PendingV2FullGenerationPublication{latestResetPublication}); err != nil { + return result, fmt.Errorf("clear stale pending v2 full generation publications: %w", err) + } + publications = slices.Delete(publications, latestResetPublicationIndex, latestResetPublicationIndex+1) + } + + archiveRefs := pendingFullArchiveRefs(publications) + if len(archiveRefs) > 0 { + var archivePushErr error + for _, pushResult := range tryPushV2Refs(ctx, target, archiveRefs) { + if pushResult.err != nil { + if archivePushErr == nil { + archivePushErr = fmt.Errorf("push pending archive %s: %w", shortRefName(pushResult.refName), pushResult.err) + } + continue + } + result.successfulRefs = append(result.successfulRefs, pushResult.refName) + } + if archivePushErr != nil { + return result, archivePushErr + } + } + + resetPublications := pendingFullCurrentResetPublications(publications) + if len(resetPublications) == 0 { + if err := store.RemovePendingFullGenerationPublications(ctx, publications); err != nil { + return result, fmt.Errorf("clear pending v2 full generation publications: %w", err) + } + return result, nil + } + + remoteCurrentHash, remoteCurrentFound, err := lsRemoteRefHash(ctx, target, currentRefName) + if err != nil { + return result, fmt.Errorf("read remote %s: %w", shortRefName(currentRefName), err) + } + if remoteCurrentFound && remoteCurrentHash == localCurrentRef.Hash() { + if err := store.RemovePendingFullGenerationPublications(ctx, publications); err != nil { + return result, fmt.Errorf("clear pending v2 full generation publications: %w", err) + } + result.fullCurrentResetHandled = true + return result, nil + } + + if remoteCurrentFound && !pendingResetPublicationsContainAncestor(ctx, repo, resetPublications, remoteCurrentHash) { + return result, fmt.Errorf("remote %s at %s is not covered by pending local archives", shortRefName(currentRefName), remoteCurrentHash) + } + + expectedRemoteHash := "" + if remoteCurrentFound { + expectedRemoteHash = remoteCurrentHash.String() + } + currentRefSpec := fmt.Sprintf("%s:%s", currentRefName, currentRefName) + if err := pushWithLease(ctx, target, currentRefSpec, currentRefName.String(), expectedRemoteHash, "push rotated "+shortRefName(currentRefName)); err != nil { + return result, fmt.Errorf("push rotated %s: %w", shortRefName(currentRefName), err) + } + result.successfulRefs = append(result.successfulRefs, currentRefName) + result.fullCurrentResetHandled = true + + if err := store.RemovePendingFullGenerationPublications(ctx, publications); err != nil { + return result, fmt.Errorf("clear pending v2 full generation publications: %w", err) + } + return result, nil +} + +func pendingFullArchiveRefs(publications []checkpoint.PendingV2FullGenerationPublication) []plumbing.ReferenceName { + seen := make(map[plumbing.ReferenceName]struct{}, len(publications)) + refs := make([]plumbing.ReferenceName, 0, len(publications)) + for _, publication := range publications { + suffix, ok := strings.CutPrefix(publication.ArchiveRefName, paths.V2FullRefPrefix) + if !ok || !checkpoint.GenerationRefPattern.MatchString(suffix) { + continue + } + refName := plumbing.ReferenceName(paths.V2FullRefPrefix + suffix) + if _, ok := seen[refName]; ok { + continue + } + seen[refName] = struct{}{} + refs = append(refs, refName) + } + return refs +} + +func pendingFullCurrentResetPublications(publications []checkpoint.PendingV2FullGenerationPublication) []checkpoint.PendingV2FullGenerationPublication { + resetPublications := make([]checkpoint.PendingV2FullGenerationPublication, 0, len(publications)) + for _, publication := range publications { + if publication.PreviousFullCurrentHash == "" && publication.ResetFullCurrentRootHash == "" { + continue + } + resetPublications = append(resetPublications, publication) + } + return resetPublications +} + +func pendingResetPublicationsContainAncestor(ctx context.Context, repo *git.Repository, publications []checkpoint.PendingV2FullGenerationPublication, hash plumbing.Hash) bool { + for _, publication := range publications { + if publication.ArchiveCommitHash == "" { + continue + } + if IsAncestorOf(ctx, repo, hash, plumbing.NewHash(publication.ArchiveCommitHash)) { + return true + } + } + return false +} + +func latestPendingFullCurrentResetPublicationIndex(publications []checkpoint.PendingV2FullGenerationPublication) int { + for i := len(publications) - 1; i >= 0; i-- { + if publications[i].PreviousFullCurrentHash == "" && publications[i].ResetFullCurrentRootHash == "" { + continue + } + return i + } + return -1 +} + +func pendingResetPublicationMatchesLocalCurrent(ctx context.Context, repo *git.Repository, publication checkpoint.PendingV2FullGenerationPublication, localCurrentHash plumbing.Hash) bool { + if len(publication.ResetFullCurrentRootHash) != 40 { + return false + } + return IsAncestorOf(ctx, repo, plumbing.NewHash(publication.ResetFullCurrentRootHash), localCurrentHash) +} + +func lsRemoteRefHash(ctx context.Context, target string, refName plumbing.ReferenceName) (plumbing.Hash, bool, error) { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + output, err := remote.LsRemote(ctx, target, refName.String()) + if err != nil { + return plumbing.ZeroHash, false, fmt.Errorf("ls-remote %s: %w", refName, err) + } + for line := range strings.SplitSeq(strings.TrimSpace(string(output)), "\n") { + if line == "" { + continue + } + parts := strings.Fields(line) + if len(parts) < 2 || parts[1] != refName.String() { + continue + } + if len(parts[0]) != 40 { + return plumbing.ZeroHash, false, fmt.Errorf("invalid remote hash %q for %s", parts[0], refName) + } + return plumbing.NewHash(parts[0]), true, nil + } + return plumbing.ZeroHash, false, nil +} + func refSpecsForRefs(refs []plumbing.ReferenceName) []string { refSpecs := make([]string, 0, len(refs)) for _, refName := range refs { @@ -114,7 +298,7 @@ func refSpecsForRefs(refs []plumbing.ReferenceName) []string { func parsePushRefResults(ctx context.Context, output string, refs []plumbing.ReferenceName, pushErr error) []v2RefPushResult { parsed := make(map[plumbing.ReferenceName]v2RefPushResult, len(refs)) - for _, line := range strings.Split(output, "\n") { + for line := range strings.SplitSeq(output, "\n") { result, ok := parsePushRefStatusLine(line) if ok { parsed[result.refName] = result @@ -215,10 +399,13 @@ func fetchAndMergeRef(ctx context.Context, target string, refName plumbing.Refer tmpRefName := plumbing.ReferenceName("refs/entire-fetch-tmp/" + tmpRefSuffix) refSpec := fmt.Sprintf("+%s:%s", refName, tmpRefName) + // Recovery flattens fetched trees recursively, so it needs a complete object + // graph instead of the normal blobless sync fetch. if output, err := remote.Fetch(ctx, remote.FetchOptions{ Remote: fetchTarget, RefSpecs: []string{refSpec}, NoTags: true, + NoFilter: true, ExtraArgs: []string{"--no-write-fetch-head"}, }); err != nil { return fmt.Errorf("fetch failed: %s", output) @@ -234,9 +421,9 @@ func fetchAndMergeRef(ctx context.Context, target string, refName plumbing.Refer // Check for rotation conflict on /full/current if refName == plumbing.ReferenceName(paths.V2FullCurrentRefName) { - remoteOnlyArchives, detectErr := detectRemoteOnlyArchives(ctx, target, repo) - if detectErr == nil && len(remoteOnlyArchives) > 0 { - return handleRotationConflict(ctx, target, fetchTarget, repo, refName, tmpRefName, remoteOnlyArchives) + remoteRotationArchives, detectErr := detectRemoteRotationArchives(ctx, target, repo) + if detectErr == nil && len(remoteRotationArchives) > 0 { + return handleRotationConflict(ctx, target, fetchTarget, repo, refName, tmpRefName, remoteRotationArchives) } } @@ -295,9 +482,10 @@ func fetchAndMergeRef(ctx context.Context, target string, refName plumbing.Refer return nil } -// detectRemoteOnlyArchives discovers archived generation refs on the remote -// that don't exist locally. Returns them sorted ascending (oldest first). -func detectRemoteOnlyArchives(ctx context.Context, target string, repo *git.Repository) ([]string, error) { +// detectRemoteRotationArchives discovers archived generation refs on the remote +// that are missing locally or whose local ref hash differs from the remote ref +// hash. Returns them sorted ascending (oldest first). +func detectRemoteRotationArchives(ctx context.Context, target string, repo *git.Repository) ([]string, error) { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -306,8 +494,8 @@ func detectRemoteOnlyArchives(ctx context.Context, target string, repo *git.Repo return nil, fmt.Errorf("ls-remote failed: %w", err) } - var remoteOnly []string - for _, line := range strings.Split(strings.TrimSpace(string(output)), "\n") { + var remoteRotationArchives []string + for line := range strings.SplitSeq(strings.TrimSpace(string(output)), "\n") { if line == "" { continue } @@ -320,60 +508,192 @@ func detectRemoteOnlyArchives(ctx context.Context, target string, repo *git.Repo if suffix == "current" || !checkpoint.GenerationRefPattern.MatchString(suffix) { continue } - // Only check for existence, not hash equality. A locally-present archive - // could be stale if another machine updated it via rotation conflict recovery, - // but that's unlikely and the checkpoints are still on /main regardless. - if _, err := repo.Reference(plumbing.ReferenceName(refName), true); err != nil { - remoteOnly = append(remoteOnly, suffix) + if len(parts[0]) != 40 { + return nil, fmt.Errorf("invalid remote archive hash %q for %s", parts[0], refName) + } + remoteHash := plumbing.NewHash(parts[0]) + localRef, err := repo.Reference(plumbing.ReferenceName(refName), true) + if err != nil || localRef.Hash() != remoteHash { + remoteRotationArchives = append(remoteRotationArchives, suffix) } } - sort.Strings(remoteOnly) - return remoteOnly, nil + sort.Strings(remoteRotationArchives) + return remoteRotationArchives, nil } -// handleRotationConflict handles the case where remote /full/current was rotated. -// Merges local /full/current into the latest remote archived generation to avoid -// duplicating checkpoint data, then adopts remote's /full/current as local. -func handleRotationConflict(ctx context.Context, target, fetchTarget string, repo *git.Repository, refName, tmpRefName plumbing.ReferenceName, remoteOnlyArchives []string) error { - // Use the latest remote-only archive - latestArchive := remoteOnlyArchives[len(remoteOnlyArchives)-1] - archiveRefName := plumbing.ReferenceName(paths.V2FullRefPrefix + latestArchive) - - // Fetch the latest archived generation - archiveTmpRef := plumbing.ReferenceName("refs/entire-fetch-tmp/archive-" + latestArchive) - archiveRefSpec := fmt.Sprintf("+%s:%s", archiveRefName, archiveTmpRef) +type fetchedRemoteRotationArchive struct { + repo *git.Repository + refName plumbing.ReferenceName + tmpRefName plumbing.ReferenceName + ref *plumbing.Reference + tree *object.Tree +} + +func readFetchedRemoteRotationArchive(repo *git.Repository, archive string) (fetchedRemoteRotationArchive, error) { + archiveRefName := plumbing.ReferenceName(paths.V2FullRefPrefix + archive) + archiveTmpRef := archiveTmpRefName(archive) + archiveRef, err := repo.Reference(archiveTmpRef, true) + if err != nil { + return fetchedRemoteRotationArchive{}, fmt.Errorf("failed to get archived ref: %w", err) + } + archiveCommit, err := repo.CommitObject(archiveRef.Hash()) + if err != nil { + return fetchedRemoteRotationArchive{}, fmt.Errorf("failed to get archive commit: %w", err) + } + archiveTree, err := archiveCommit.Tree() + if err != nil { + return fetchedRemoteRotationArchive{}, fmt.Errorf("failed to get archive tree: %w", err) + } + + return fetchedRemoteRotationArchive{ + repo: repo, + refName: archiveRefName, + tmpRefName: archiveTmpRef, + ref: archiveRef, + tree: archiveTree, + }, nil +} + +func fetchRelatedRemoteRotationArchive(ctx context.Context, fetchTarget string, archives []string, localCurrentHash plumbing.Hash) (fetchedRemoteRotationArchive, error) { + refSpecs := make([]string, 0, len(archives)) + archiveTmpRefs := make([]plumbing.ReferenceName, 0, len(archives)) + + for _, archive := range archives { + archiveRefName := plumbing.ReferenceName(paths.V2FullRefPrefix + archive) + archiveTmpRef := archiveTmpRefName(archive) + refSpecs = append(refSpecs, fmt.Sprintf("+%s:%s", archiveRefName, archiveTmpRef)) + archiveTmpRefs = append(archiveTmpRefs, archiveTmpRef) + } + + // These archive commits are read immediately through go-git for tree + // flattening, so fetch the complete refs rather than blobless packfiles. if output, fetchErr := remote.Fetch(ctx, remote.FetchOptions{ Remote: fetchTarget, - RefSpecs: []string{archiveRefSpec}, + RefSpecs: refSpecs, NoTags: true, + NoFilter: true, ExtraArgs: []string{"--no-write-fetch-head"}, }); fetchErr != nil { - return fmt.Errorf("fetch archived generation failed: %s", output) + if repo, openErr := OpenRepository(ctx); openErr == nil { + cleanupFetchedArchiveTmpRefs(repo, archiveTmpRefs) + } + return fetchedRemoteRotationArchive{}, fmt.Errorf("fetch archived generations failed: %s", output) + } + + repo, err := OpenRepository(ctx) + if err != nil { + return fetchedRemoteRotationArchive{}, fmt.Errorf("reopen repository after fetching archived generations: %w", err) } + tmpRefsToCleanup := archiveTmpRefs defer func() { - _ = repo.Storer.RemoveReference(archiveTmpRef) //nolint:errcheck // cleanup is best-effort + cleanupFetchedArchiveTmpRefs(repo, tmpRefsToCleanup) }() - // Get archived generation state - archiveRef, err := repo.Reference(archiveTmpRef, true) - if err != nil { - return fmt.Errorf("failed to get archived ref: %w", err) + localCurrentAncestors, ok := currentGenerationAncestors(ctx, repo, localCurrentHash) + if !ok { + return fetchedRemoteRotationArchive{}, errors.New("failed to read local /full/current history") } - archiveCommit, err := repo.CommitObject(archiveRef.Hash()) + for _, archive := range archives { + fetched, err := readFetchedRemoteRotationArchive(repo, archive) + if err != nil { + return fetchedRemoteRotationArchive{}, err + } + if archiveSharesHistoryWithCurrentGeneration(ctx, repo, localCurrentAncestors, fetched.ref.Hash()) { + tmpRefsToCleanup = removeRef(tmpRefsToCleanup, fetched.tmpRefName) + return fetched, nil + } + } + return fetchedRemoteRotationArchive{}, errors.New("no remote archive shares history with local /full/current") +} + +func archiveTmpRefName(archive string) plumbing.ReferenceName { + return plumbing.ReferenceName("refs/entire-fetch-tmp/archive-" + archive) +} + +func cleanupFetchedArchiveTmpRefs(repo *git.Repository, tmpRefs []plumbing.ReferenceName) { + for _, tmpRef := range tmpRefs { + _ = repo.Storer.RemoveReference(tmpRef) //nolint:errcheck // cleanup is best-effort + } +} + +func currentGenerationAncestors(ctx context.Context, repo *git.Repository, currentHash plumbing.Hash) (map[plumbing.Hash]struct{}, bool) { + ancestors := make(map[plumbing.Hash]struct{}) + iter, err := repo.Log(&git.LogOptions{From: currentHash}) if err != nil { - return fmt.Errorf("failed to get archive commit: %w", err) + return nil, false } - archiveTree, err := archiveCommit.Tree() + defer iter.Close() + + count := 0 + _ = iter.ForEach(func(c *object.Commit) error { //nolint:errcheck // Best-effort search, errors are non-fatal + if err := ctx.Err(); err != nil { + return err //nolint:wrapcheck // Propagating context cancellation + } + count++ + if count > MaxCommitTraversalDepth { + return errStop + } + ancestors[c.Hash] = struct{}{} + return nil + }) + return ancestors, true +} + +func archiveSharesHistoryWithCurrentGeneration(ctx context.Context, repo *git.Repository, currentAncestors map[plumbing.Hash]struct{}, archiveHash plumbing.Hash) bool { + if _, ok := currentAncestors[archiveHash]; ok { + return true + } + + iter, err := repo.Log(&git.LogOptions{From: archiveHash}) if err != nil { - return fmt.Errorf("failed to get archive tree: %w", err) + return false } + defer iter.Close() - // Get local /full/current state + found := false + count := 0 + _ = iter.ForEach(func(c *object.Commit) error { //nolint:errcheck // Best-effort search, errors are non-fatal + if err := ctx.Err(); err != nil { + return err //nolint:wrapcheck // Propagating context cancellation + } + count++ + if count > MaxCommitTraversalDepth { + return errStop + } + if _, ok := currentAncestors[c.Hash]; ok { + found = true + return errStop + } + return nil + }) + return found +} + +// handleRotationConflict handles the case where remote /full/current was rotated. +// Merges local /full/current into the related remote archived generation to avoid +// duplicating checkpoint data, then adopts remote's /full/current as local. +func handleRotationConflict(ctx context.Context, target, fetchTarget string, repo *git.Repository, refName, tmpRefName plumbing.ReferenceName, remoteRotationArchives []string) error { localRef, err := repo.Reference(refName, true) if err != nil { return fmt.Errorf("failed to get local ref: %w", err) } + + archive, err := fetchRelatedRemoteRotationArchive(ctx, fetchTarget, remoteRotationArchives, localRef.Hash()) + if err != nil { + return fmt.Errorf("failed to find related archived generation: %w", err) + } + // fetchRelatedRemoteRotationArchive fetches via git CLI, so continue with + // the fresh go-git handle it used to avoid stale pack indexes. + repo = archive.repo + defer func() { + _ = repo.Storer.RemoveReference(archive.tmpRefName) //nolint:errcheck // cleanup is best-effort + }() + + localRef, err = repo.Reference(refName, true) + if err != nil { + return fmt.Errorf("failed to get local ref: %w", err) + } localCommit, err := repo.CommitObject(localRef.Hash()) if err != nil { return fmt.Errorf("failed to get local commit: %w", err) @@ -386,7 +706,7 @@ func handleRotationConflict(ctx context.Context, target, fetchTarget string, rep // Tree-merge local /full/current into archived generation. // Git content-addressing deduplicates shared shard paths automatically. entries := make(map[string]object.TreeEntry) - if err := checkpoint.FlattenTree(repo, archiveTree, "", entries); err != nil { + if err := checkpoint.FlattenTree(repo, archive.tree, "", entries); err != nil { return fmt.Errorf("failed to flatten archive tree: %w", err) } if err := checkpoint.FlattenTree(repo, localTree, "", entries); err != nil { @@ -413,19 +733,19 @@ func handleRotationConflict(ctx context.Context, target, fetchTarget string, rep // Create commit parented on archive's commit (fast-forward) mergeCommitHash, err := createMergeCommitCommon(ctx, repo, mergedTreeHash, - []plumbing.Hash{archiveRef.Hash()}, + []plumbing.Hash{archive.ref.Hash()}, "Merge local checkpoints into archived generation") if err != nil { return fmt.Errorf("failed to create merge commit: %w", err) } // Update local archived ref and push it - newArchiveRef := plumbing.NewHashReference(archiveRefName, mergeCommitHash) + newArchiveRef := plumbing.NewHashReference(archive.refName, mergeCommitHash) if err := repo.Storer.SetReference(newArchiveRef); err != nil { return fmt.Errorf("failed to update archive ref: %w", err) } - if _, pushErr := tryPushRef(ctx, target, archiveRefName); pushErr != nil { + if pushErr := tryPushRef(ctx, target, archive.refName); pushErr != nil { return fmt.Errorf("failed to push updated archive: %w", pushErr) } @@ -489,39 +809,66 @@ func updateGenerationTimestamps(repo *git.Repository, genBlobHash plumbing.Hash, } // pushV2Refs pushes v2 checkpoint refs to the target. -// Pushes /main, /full/current, and the latest archived generation (if any) in -// one git push. Older archived generations are immutable and were pushed when created. +// Pushes active refs in one git push. Pending full-generation publications are +// handled separately before /full/current recovery. func pushV2Refs(ctx context.Context, target string) { - refs := v2RefsToPush(ctx) - if len(refs) == 0 { + repo, err := OpenRepository(ctx) + if err != nil { + printV2PushFailures(ctx, target, nil, []error{fmt.Errorf("open repository: %w", err)}, false) + return + } + store := checkpoint.NewV2GitStore(repo, target) + + refs := v2RefsToPush(repo) + pendingPublications, pendingReadErr := readPendingV2FullGenerationPublications(ctx, store) + if pendingReadErr != nil { + printV2PushFailures(ctx, target, nil, []error{pendingReadErr}, false) + return + } + + if len(refs) == 0 && len(pendingPublications) == 0 { return } fmt.Fprintln(os.Stderr, "[entire] Syncing and pushing v2 checkpoints...") - fmt.Fprintf(os.Stderr, "[entire] Pushing %s...\n", strings.Join(shortRefNames(refs), ", ")) + pushNames := shortRefNames(refs) + if len(pushNames) == 0 { + pushNames = []string{"pending v2/full generations"} + } + fmt.Fprintf(os.Stderr, "[entire] Pushing %s...\n", strings.Join(pushNames, ", ")) - results := pushV2RefsWithRecovery(ctx, target, refs) var failures []error var successfulRefs []plumbing.ReferenceName pushedContent := false + + pendingPublicationResult, pendingPublishErr := publishPendingV2FullGenerationPublications(ctx, repo, store, target, pendingPublications) + successfulRefs = append(successfulRefs, pendingPublicationResult.successfulRefs...) + if len(pendingPublicationResult.successfulRefs) > 0 { + pushedContent = true + } + if pendingPublishErr != nil { + failures = append(failures, fmt.Errorf("couldn't publish pending v2 full generation refs: %w", pendingPublishErr)) + printV2PushFailures(ctx, target, successfulRefs, failures, pushedContent) + return + } + if pendingPublicationResult.fullCurrentResetHandled { + refs = removeRef(refs, plumbing.ReferenceName(paths.V2FullCurrentRefName)) + } + + results := pushV2RefsWithRecovery(ctx, target, refs) for _, result := range results { if result.err != nil { failures = append(failures, result.err) continue } - successfulRefs = append(successfulRefs, result.refName) + successfulRefs = appendUniqueRef(successfulRefs, result.refName) if !result.result.upToDate { pushedContent = true } } if len(failures) > 0 { - printV2PartialPushResult(os.Stderr, successfulRefs, failures) - printCheckpointRemoteHint(target) - if pushedContent { - printSettingsCommitHint(ctx, target) - } - printCheckpointsV2MigrationHint(ctx) + printV2PushFailures(ctx, target, successfulRefs, failures, pushedContent) return } @@ -532,6 +879,14 @@ func pushV2Refs(ctx context.Context, target string) { printCheckpointsV2MigrationHint(ctx) } +func readPendingV2FullGenerationPublications(ctx context.Context, store *checkpoint.V2GitStore) ([]checkpoint.PendingV2FullGenerationPublication, error) { + publications, err := store.ReadPendingFullGenerationPublications(ctx) + if err != nil { + return nil, fmt.Errorf("read pending v2 full generation publications: %w", err) + } + return publications, nil +} + func printV2PartialPushResult(w io.Writer, successfulRefs []plumbing.ReferenceName, failures []error) { if len(successfulRefs) > 0 { fmt.Fprintf(w, "[entire] Successfully pushed %s\n", strings.Join(shortRefNames(successfulRefs), ", ")) @@ -541,12 +896,16 @@ func printV2PartialPushResult(w io.Writer, successfulRefs []plumbing.ReferenceNa } } -func v2RefsToPush(ctx context.Context) []plumbing.ReferenceName { - repo, err := OpenRepository(ctx) - if err != nil { - return nil +func printV2PushFailures(ctx context.Context, target string, successfulRefs []plumbing.ReferenceName, failures []error, pushedContent bool) { + printV2PartialPushResult(os.Stderr, successfulRefs, failures) + printCheckpointRemoteHint(target) + if pushedContent { + printSettingsCommitHint(ctx, target) } + printCheckpointsV2MigrationHint(ctx) +} +func v2RefsToPush(repo *git.Repository) []plumbing.ReferenceName { var refs []plumbing.ReferenceName for _, refName := range []plumbing.ReferenceName{ plumbing.ReferenceName(paths.V2MainRefName), @@ -557,15 +916,6 @@ func v2RefsToPush(ctx context.Context) []plumbing.ReferenceName { } } - // Push only the latest archived generation (most likely to be newly created). - store := checkpoint.NewV2GitStore(repo, "") - archived, err := store.ListArchivedGenerations() - if err != nil || len(archived) == 0 { - return refs - } - latest := archived[len(archived)-1] - refs = append(refs, plumbing.ReferenceName(paths.V2FullRefPrefix+latest)) - return refs } @@ -577,6 +927,19 @@ func shortRefNames(refs []plumbing.ReferenceName) []string { return names } +func appendUniqueRef(refs []plumbing.ReferenceName, refName plumbing.ReferenceName) []plumbing.ReferenceName { + if slices.Contains(refs, refName) { + return refs + } + return append(refs, refName) +} + +func removeRef(refs []plumbing.ReferenceName, refToRemove plumbing.ReferenceName) []plumbing.ReferenceName { + return slices.DeleteFunc(refs, func(refName plumbing.ReferenceName) bool { + return refName == refToRemove + }) +} + // shortRefName returns a human-readable short form of a ref name for log output. // e.g., "refs/entire/checkpoints/v2/main" -> "v2/main" func shortRefName(refName plumbing.ReferenceName) string { diff --git a/cmd/entire/cli/strategy/push_v2_test.go b/cmd/entire/cli/strategy/push_v2_test.go index 7340f03fe8..b603c771a3 100644 --- a/cmd/entire/cli/strategy/push_v2_test.go +++ b/cmd/entire/cli/strategy/push_v2_test.go @@ -3,6 +3,7 @@ package strategy import ( "context" "errors" + "os" "os/exec" "path/filepath" "strings" @@ -98,6 +99,103 @@ func writeV2Checkpoint(t *testing.T, repo *git.Repository, cpID id.CheckpointID, require.NoError(t, err) } +func v2CheckpointCountInRef(t *testing.T, repo *git.Repository, refName plumbing.ReferenceName) int { + t.Helper() + + store := checkpoint.NewV2GitStore(repo, "origin") + _, treeHash, err := store.GetRefState(refName) + require.NoError(t, err) + count, err := store.CountCheckpointsInTree(treeHash) + require.NoError(t, err) + return count +} + +func writeV2ArchiveRef(t *testing.T, repo *git.Repository, refName plumbing.ReferenceName, marker string) plumbing.Hash { + t.Helper() + + blobHash, err := checkpoint.CreateBlobFromContent(repo, []byte(marker)) + require.NoError(t, err) + treeHash, err := checkpoint.BuildTreeFromEntries(context.Background(), repo, map[string]object.TreeEntry{ + paths.GenerationFileName: { + Name: paths.GenerationFileName, + Mode: 0o100644, + Hash: blobHash, + }, + }) + require.NoError(t, err) + commitHash, err := checkpoint.CreateCommit(context.Background(), repo, treeHash, plumbing.ZeroHash, + "Archive generation", "Test", "test@test.com") + require.NoError(t, err) + require.NoError(t, repo.Storer.SetReference(plumbing.NewHashReference(refName, commitHash))) + return commitHash +} + +func enableFilteredFetchesForTest(t *testing.T, dir string) { + t.Helper() + + entireDir := filepath.Join(dir, ".entire") + require.NoError(t, os.MkdirAll(entireDir, 0o755)) + require.NoError(t, os.WriteFile( + filepath.Join(entireDir, paths.SettingsFileName), + []byte(`{"enabled": true, "strategy_options": {"filtered_fetches": true}}`), + 0o644, + )) +} + +func enableFilteredFetchServingForTest(t *testing.T, bareDir string) { + t.Helper() + + cmd := exec.CommandContext(t.Context(), "git", "-C", bareDir, "config", "uploadpack.allowFilter", "true") + cmd.Env = testutil.GitIsolatedEnv() + out, err := cmd.CombinedOutput() + require.NoError(t, err, "enable uploadpack.allowFilter failed: %s", out) +} + +func rotateV2CurrentForTest(t *testing.T, repo *git.Repository, archiveRefName plumbing.ReferenceName) plumbing.Hash { + t.Helper() + + ctx := context.Background() + fullCurrentRef := plumbing.ReferenceName(paths.V2FullCurrentRefName) + store := checkpoint.NewV2GitStore(repo, "origin") + + currentRef, err := repo.Reference(fullCurrentRef, true) + require.NoError(t, err) + _, currentTreeHash, err := store.GetRefState(fullCurrentRef) + require.NoError(t, err) + + gen := checkpoint.GenerationMetadata{ + OldestCheckpointAt: time.Now().UTC().Add(-time.Hour), + NewestCheckpointAt: time.Now().UTC(), + } + archiveTreeHash, err := store.AddGenerationJSONToTree(currentTreeHash, gen) + require.NoError(t, err) + archiveCommitHash, err := checkpoint.CreateCommit(ctx, repo, archiveTreeHash, + currentRef.Hash(), "Archive", "Test", "test@test.com") + require.NoError(t, err) + require.NoError(t, repo.Storer.SetReference(plumbing.NewHashReference(archiveRefName, archiveCommitHash))) + + emptyTree, err := checkpoint.BuildTreeFromEntries(ctx, repo, map[string]object.TreeEntry{}) + require.NoError(t, err) + orphanHash, err := checkpoint.CreateCommit(ctx, repo, emptyTree, plumbing.ZeroHash, + "Start generation", "Test", "test@test.com") + require.NoError(t, err) + require.NoError(t, repo.Storer.SetReference(plumbing.NewHashReference(fullCurrentRef, orphanHash))) + return archiveCommitHash +} + +func refContainsV2Checkpoint(t *testing.T, repo *git.Repository, refName plumbing.ReferenceName, cpID id.CheckpointID) bool { + t.Helper() + + ref, err := repo.Reference(refName, true) + require.NoError(t, err) + commit, err := repo.CommitObject(ref.Hash()) + require.NoError(t, err) + tree, err := commit.Tree() + require.NoError(t, err) + _, err = tree.Tree(cpID.Path()) + return err == nil +} + // TestFetchAndMergeRef_MergesTrees verifies that fetchAndMergeRef correctly // merges divergent trees from two repos sharing a common ref. // Not parallel: uses t.Chdir() @@ -163,10 +261,11 @@ func TestFetchAndMergeRef_MergesTrees(t *testing.T) { assert.True(t, has11, "merged tree should contain checkpoint 112233445566") } -// TestPushV2Refs_PushesAllRefs verifies that pushV2Refs pushes /main, -// /full/current, and any archived generations to a bare repo. +// TestPushV2Refs_SkipsUnrecordedArchiveRefs verifies that pushV2Refs pushes +// /main and /full/current, but does not push archived generations unless a +// local handoff records them as needing publication. // Not parallel: uses t.Chdir() -func TestPushV2Refs_PushesAllRefs(t *testing.T) { +func TestPushV2Refs_SkipsUnrecordedArchiveRefs(t *testing.T) { ctx := context.Background() tmpDir := setupRepoWithV2Ref(t) @@ -176,7 +275,8 @@ func TestPushV2Refs_PushesAllRefs(t *testing.T) { // Write a checkpoint (creates both /main and /full/current) writeV2Checkpoint(t, repo, id.MustCheckpointID("aabbccddeeff"), "test-session") - // Create two fake archived generation refs — only the latest should be pushed + // Create two fake archived generation refs without recording a pending + // publication handoff. fullRef, err := repo.Reference(plumbing.ReferenceName(paths.V2FullCurrentRefName), true) require.NoError(t, err) for _, num := range []string{"0000000000001", "0000000000002"} { @@ -199,7 +299,7 @@ func TestPushV2Refs_PushesAllRefs(t *testing.T) { pushV2Refs(ctx, bareDir) output := restore() - // Verify all three refs exist in bare repo + // Verify only active refs exist in bare repo. bareRepo, err := git.PlainOpen(bareDir) require.NoError(t, err) @@ -210,19 +310,489 @@ func TestPushV2Refs_PushesAllRefs(t *testing.T) { require.NoError(t, err, "/full/current ref should exist in bare repo") _, err = bareRepo.Reference(plumbing.ReferenceName(paths.V2FullRefPrefix+"0000000000002"), true) - require.NoError(t, err, "latest archived generation should exist in bare repo") + require.Error(t, err, "unrecorded latest archived generation should not be pushed") _, err = bareRepo.Reference(plumbing.ReferenceName(paths.V2FullRefPrefix+"0000000000001"), true) - require.Error(t, err, "older archived generation should NOT be pushed") + require.Error(t, err, "unrecorded older archived generation should not be pushed") assert.Contains(t, output, "[entire] Syncing and pushing v2 checkpoints...") - assert.Contains(t, output, "[entire] Pushing v2/main, v2/full/current, v2/full/0000000000002...") + assert.Contains(t, output, "[entire] Pushing v2/main, v2/full/current...") assert.Contains(t, output, "[entire] All v2 checkpoints pushed") assert.NotContains(t, output, "[entire] Successfully pushed", "successful refs should only be listed on partial failure") assert.NotContains(t, output, "Pushing v2/main to", "per-ref progress should stay quiet") assert.NotContains(t, output, "Syncing v2/main with remote", "per-ref sync progress should stay quiet") } +// TestPushV2Refs_PushesPendingArchivePublications verifies migration-created +// archived generations can be queued for the next pre-push without making every +// local archived generation part of the default push set. +// +// Not parallel: uses t.Chdir() +func TestPushV2Refs_PushesPendingArchivePublications(t *testing.T) { + ctx := context.Background() + + tmpDir := setupRepoWithV2Ref(t) + repo, err := git.PlainOpen(tmpDir) + require.NoError(t, err) + store := checkpoint.NewV2GitStore(repo, "origin") + + writeV2Checkpoint(t, repo, id.MustCheckpointID("aabbccddeeff"), "test-session") + + fullRef, err := repo.Reference(plumbing.ReferenceName(paths.V2FullCurrentRefName), true) + require.NoError(t, err) + + var publications []checkpoint.PendingV2FullGenerationPublication + for _, num := range []string{"0000000000001", "0000000000002"} { + refName := plumbing.ReferenceName(paths.V2FullRefPrefix + num) + ref := plumbing.NewHashReference(refName, fullRef.Hash()) + require.NoError(t, repo.Storer.SetReference(ref)) + publications = append(publications, checkpoint.PendingV2FullGenerationPublication{ + ArchiveRefName: refName.String(), + ArchiveCommitHash: fullRef.Hash().String(), + }) + } + require.NoError(t, store.AppendPendingFullGenerationPublications(ctx, publications)) + + t.Chdir(tmpDir) + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + require.NoError(t, initCmd.Run()) + + restore := captureStderr(t) + pushV2Refs(ctx, bareDir) + _ = restore() + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + for _, num := range []string{"0000000000001", "0000000000002"} { + _, err = bareRepo.Reference(plumbing.ReferenceName(paths.V2FullRefPrefix+num), true) + require.NoError(t, err, "pending archived generation should exist in bare repo") + } + + remaining, err := store.ReadPendingFullGenerationPublications(ctx) + require.NoError(t, err) + assert.Empty(t, remaining, "pending archive publications should be cleared after push") +} + +// TestPushV2Refs_PushesPendingArchivePublicationsWithoutActiveRefs verifies a +// pending archive publication is still honored when there is no /main or +// /full/current ref to include in the normal active-ref push set. +// +// Not parallel: uses t.Chdir() +func TestPushV2Refs_PushesPendingArchivePublicationsWithoutActiveRefs(t *testing.T) { + ctx := context.Background() + + tmpDir := t.TempDir() + testutil.InitRepo(t, tmpDir) + testutil.WriteFile(t, tmpDir, "f.txt", "init") + testutil.GitAdd(t, tmpDir, "f.txt") + testutil.GitCommit(t, tmpDir, "init") + configCmd := exec.CommandContext(ctx, "git", "config", "push.default", "current") + configCmd.Dir = tmpDir + require.NoError(t, configCmd.Run()) + + repo, err := git.PlainOpen(tmpDir) + require.NoError(t, err) + headRef, err := repo.Head() + require.NoError(t, err) + store := checkpoint.NewV2GitStore(repo, "origin") + + archiveRef := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + archiveCommitHash := writeV2ArchiveRef(t, repo, archiveRef, "pending archive") + require.NoError(t, store.AppendPendingFullGenerationPublication(ctx, checkpoint.PendingV2FullGenerationPublication{ + ArchiveRefName: archiveRef.String(), + ArchiveCommitHash: archiveCommitHash.String(), + })) + + t.Chdir(tmpDir) + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + require.NoError(t, initCmd.Run()) + + restore := captureStderr(t) + pushV2Refs(ctx, bareDir) + _ = restore() + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + _, err = bareRepo.Reference(archiveRef, true) + require.NoError(t, err, "pending archived generation should be pushed even without active refs") + _, err = bareRepo.Reference(headRef.Name(), true) + require.Error(t, err, "empty active-ref push must not push the current branch") + + remaining, err := store.ReadPendingFullGenerationPublications(ctx) + require.NoError(t, err) + assert.Empty(t, remaining, "pending archive publication should be cleared after push") +} + +// TestPushV2Refs_PendingPublicationReadErrorDoesNotReportActiveRefFailures +// verifies that a pending-publication failure is reported as such. Active refs +// have not been pushed at that point, so they must not be reported as failed +// ref pushes. +// +// Not parallel: uses t.Chdir() and os.Stderr redirection. +func TestPushV2Refs_PendingPublicationReadErrorDoesNotReportActiveRefFailures(t *testing.T) { + ctx := context.Background() + + tmpDir := setupRepoWithV2Ref(t) + pendingDir := filepath.Join(tmpDir, ".git", "entire-v2-rotations") + require.NoError(t, os.MkdirAll(pendingDir, 0o750)) + require.NoError(t, os.WriteFile(filepath.Join(pendingDir, "pending.json"), []byte("{"), 0o600)) + + t.Chdir(tmpDir) + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + require.NoError(t, initCmd.Run()) + + restore := captureStderr(t) + pushV2Refs(ctx, bareDir) + output := restore() + + assert.Contains(t, output, "[entire] Warning: read pending v2 full generation publications:") + assert.NotContains(t, output, "failed to push v2/main") + assert.NotContains(t, output, "failed to push v2/full/current") + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + _, err = bareRepo.Reference(plumbing.ReferenceName(paths.V2MainRefName), true) + require.Error(t, err, "/main should not be pushed after pending-publication read failure") + _, err = bareRepo.Reference(plumbing.ReferenceName(paths.V2FullCurrentRefName), true) + require.Error(t, err, "/full/current should not be pushed after pending-publication read failure") +} + +// TestPushV2Refs_DropsPendingResetPublicationWhenCurrentWasNotReset verifies +// that a marker recorded before a failed/crashed reset does not publish the +// would-be archive while /full/current still points at the old generation. +// +// Not parallel: uses t.Chdir() and os.Stderr redirection. +func TestPushV2Refs_DropsPendingResetPublicationWhenCurrentWasNotReset(t *testing.T) { + ctx := context.Background() + + tmpDir := setupRepoWithV2Ref(t) + repo, err := git.PlainOpen(tmpDir) + require.NoError(t, err) + store := checkpoint.NewV2GitStore(repo, "origin") + + writeV2Checkpoint(t, repo, id.MustCheckpointID("aabbccddeeff"), "test-session") + currentRef, err := repo.Reference(plumbing.ReferenceName(paths.V2FullCurrentRefName), true) + require.NoError(t, err) + + emptyTree, err := checkpoint.BuildTreeFromEntries(ctx, repo, map[string]object.TreeEntry{}) + require.NoError(t, err) + resetRoot, err := checkpoint.CreateCommit(ctx, repo, emptyTree, plumbing.ZeroHash, + "Start generation", "Test", "test@test.com") + require.NoError(t, err) + + archiveRef := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + require.NoError(t, store.AppendPendingFullGenerationPublication(ctx, checkpoint.PendingV2FullGenerationPublication{ + ArchiveRefName: archiveRef.String(), + ArchiveCommitHash: currentRef.Hash().String(), + PreviousFullCurrentHash: currentRef.Hash().String(), + ResetFullCurrentRootHash: resetRoot.String(), + QueuedAt: time.Now().UTC(), + })) + + t.Chdir(tmpDir) + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + require.NoError(t, initCmd.Run()) + + restore := captureStderr(t) + pushV2Refs(ctx, bareDir) + output := restore() + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + _, err = bareRepo.Reference(archiveRef, true) + require.Error(t, err, "stale pending archive should not be pushed") + + remaining, err := store.ReadPendingFullGenerationPublications(ctx) + require.NoError(t, err) + assert.Empty(t, remaining, "stale pending reset publication should be cleared") + assert.Contains(t, output, "[entire] All v2 checkpoints pushed") + assert.NotContains(t, output, "push pending archive") +} + +func TestPendingFullArchiveRefs_OnlyReturnsV2ArchiveGenerationRefs(t *testing.T) { + t.Parallel() + + refs := pendingFullArchiveRefs([]checkpoint.PendingV2FullGenerationPublication{ + {ArchiveRefName: paths.V2FullRefPrefix + "0000000000001"}, + {ArchiveRefName: paths.V2FullRefPrefix + "0000000000001"}, + {ArchiveRefName: paths.V2FullRefPrefix + "0000000000002"}, + {ArchiveRefName: paths.V2FullCurrentRefName}, + {ArchiveRefName: paths.V2FullRefPrefix + "not-a-generation"}, + {ArchiveRefName: paths.V2FullRefPrefix + "0000000000003/extra"}, + {ArchiveRefName: paths.V2MainRefName}, + {ArchiveRefName: "refs/heads/main"}, + {ArchiveRefName: ""}, + }) + + assert.Equal(t, []plumbing.ReferenceName{ + plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001"), + plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000002"), + }, refs) +} + +// TestPushV2Refs_LocalRotationDoesNotRehydrateArchivedCurrent verifies that +// publishing a locally rotated generation does not merge the remote old +// /full/current tree back into the fresh local /full/current. +// +// Not parallel: uses t.Chdir() and os.Stderr redirection. +func TestPushV2Refs_LocalRotationDoesNotRehydrateArchivedCurrent(t *testing.T) { + ctx := context.Background() + fullCurrentRef := plumbing.ReferenceName(paths.V2FullCurrentRefName) + archiveRef := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + + localDir := setupRepoWithV2Ref(t) + localRepo, err := git.PlainOpen(localDir) + require.NoError(t, err) + localStore := checkpoint.NewV2GitStore(localRepo, "origin") + + for i, cpID := range []id.CheckpointID{ + id.MustCheckpointID("000000000001"), + id.MustCheckpointID("000000000002"), + id.MustCheckpointID("000000000003"), + } { + writeV2Checkpoint(t, localRepo, cpID, "session-before-rotation-"+string(rune('a'+i))) + } + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + out, err := initCmd.CombinedOutput() + require.NoError(t, err, "git init --bare failed: %s", out) + + pushCurrent := exec.CommandContext(ctx, "git", "push", bareDir, + string(fullCurrentRef)+":"+string(fullCurrentRef)) + pushCurrent.Dir = localDir + out, err = pushCurrent.CombinedOutput() + require.NoError(t, err, "initial full/current push failed: %s", out) + + refName, rotated, err := localStore.RotateCurrentGenerationIfNeeded(ctx, 3) + require.NoError(t, err) + require.True(t, rotated) + require.Equal(t, archiveRef, refName) + assert.Equal(t, 0, v2CheckpointCountInRef(t, localRepo, fullCurrentRef)) + assert.Equal(t, 3, v2CheckpointCountInRef(t, localRepo, archiveRef)) + + t.Chdir(localDir) + restore := captureStderr(t) + pushV2Refs(ctx, bareDir) + _ = restore() + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + assert.Equal(t, 3, v2CheckpointCountInRef(t, bareRepo, archiveRef)) + assert.Equal(t, 0, v2CheckpointCountInRef(t, bareRepo, fullCurrentRef), + "remote /full/current must stay fresh after publishing a local rotation") +} + +// TestPushV2Refs_LocalRotationPublishesCurrentWorkAddedBeforePush verifies that +// publishing a pending local rotation pushes the live /full/current head, not +// just the fresh orphan root created when the rotation happened. +// +// Not parallel: uses t.Chdir() and os.Stderr redirection. +func TestPushV2Refs_LocalRotationPublishesCurrentWorkAddedBeforePush(t *testing.T) { + ctx := context.Background() + fullCurrentRef := plumbing.ReferenceName(paths.V2FullCurrentRefName) + archiveRef := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + oldCPs := []id.CheckpointID{ + id.MustCheckpointID("000000000001"), + id.MustCheckpointID("000000000002"), + id.MustCheckpointID("000000000003"), + } + newCPs := []id.CheckpointID{ + id.MustCheckpointID("000000000004"), + id.MustCheckpointID("000000000005"), + } + + localDir := setupRepoWithV2Ref(t) + localRepo, err := git.PlainOpen(localDir) + require.NoError(t, err) + localStore := checkpoint.NewV2GitStore(localRepo, "origin") + + for i, cpID := range oldCPs { + writeV2Checkpoint(t, localRepo, cpID, "session-before-rotation-"+string(rune('a'+i))) + } + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + out, err := initCmd.CombinedOutput() + require.NoError(t, err, "git init --bare failed: %s", out) + + pushCurrent := exec.CommandContext(ctx, "git", "push", bareDir, + string(fullCurrentRef)+":"+string(fullCurrentRef)) + pushCurrent.Dir = localDir + out, err = pushCurrent.CombinedOutput() + require.NoError(t, err, "initial full/current push failed: %s", out) + + refName, rotated, err := localStore.RotateCurrentGenerationIfNeeded(ctx, 3) + require.NoError(t, err) + require.True(t, rotated) + require.Equal(t, archiveRef, refName) + + for i, cpID := range newCPs { + writeV2Checkpoint(t, localRepo, cpID, "session-after-rotation-"+string(rune('a'+i))) + } + + t.Chdir(localDir) + restore := captureStderr(t) + pushV2Refs(ctx, bareDir) + _ = restore() + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + for _, cpID := range oldCPs { + assert.True(t, refContainsV2Checkpoint(t, bareRepo, archiveRef, cpID), + "archive should contain old checkpoint %s", cpID) + assert.False(t, refContainsV2Checkpoint(t, bareRepo, fullCurrentRef, cpID), + "current should not contain archived checkpoint %s", cpID) + } + for _, cpID := range newCPs { + assert.False(t, refContainsV2Checkpoint(t, bareRepo, archiveRef, cpID), + "archive should not contain new checkpoint %s", cpID) + assert.True(t, refContainsV2Checkpoint(t, bareRepo, fullCurrentRef, cpID), + "current should contain new checkpoint %s", cpID) + } +} + +// TestPushV2Refs_RepeatedLocalRotationsBeforePushPublishesAllArchives verifies +// that more than one local rotation can be queued and published in one pre-push. +// +// Not parallel: uses t.Chdir() and os.Stderr redirection. +func TestPushV2Refs_RepeatedLocalRotationsBeforePushPublishesAllArchives(t *testing.T) { + ctx := context.Background() + fullCurrentRef := plumbing.ReferenceName(paths.V2FullCurrentRefName) + archive1Ref := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + archive2Ref := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000002") + gen1CPs := []id.CheckpointID{ + id.MustCheckpointID("000000000001"), + id.MustCheckpointID("000000000002"), + } + gen2CPs := []id.CheckpointID{ + id.MustCheckpointID("000000000003"), + id.MustCheckpointID("000000000004"), + } + currentCP := id.MustCheckpointID("000000000005") + + localDir := setupRepoWithV2Ref(t) + localRepo, err := git.PlainOpen(localDir) + require.NoError(t, err) + localStore := checkpoint.NewV2GitStore(localRepo, "origin") + + for i, cpID := range gen1CPs { + writeV2Checkpoint(t, localRepo, cpID, "session-gen-1-"+string(rune('a'+i))) + } + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + out, err := initCmd.CombinedOutput() + require.NoError(t, err, "git init --bare failed: %s", out) + + pushCurrent := exec.CommandContext(ctx, "git", "push", bareDir, + string(fullCurrentRef)+":"+string(fullCurrentRef)) + pushCurrent.Dir = localDir + out, err = pushCurrent.CombinedOutput() + require.NoError(t, err, "initial full/current push failed: %s", out) + + refName, rotated, err := localStore.RotateCurrentGenerationIfNeeded(ctx, len(gen1CPs)) + require.NoError(t, err) + require.True(t, rotated) + require.Equal(t, archive1Ref, refName) + + for i, cpID := range gen2CPs { + writeV2Checkpoint(t, localRepo, cpID, "session-gen-2-"+string(rune('a'+i))) + } + refName, rotated, err = localStore.RotateCurrentGenerationIfNeeded(ctx, len(gen2CPs)) + require.NoError(t, err) + require.True(t, rotated) + require.Equal(t, archive2Ref, refName) + + writeV2Checkpoint(t, localRepo, currentCP, "session-current") + + t.Chdir(localDir) + restore := captureStderr(t) + pushV2Refs(ctx, bareDir) + _ = restore() + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + for _, cpID := range gen1CPs { + assert.True(t, refContainsV2Checkpoint(t, bareRepo, archive1Ref, cpID), + "archive 1 should contain generation 1 checkpoint %s", cpID) + assert.False(t, refContainsV2Checkpoint(t, bareRepo, archive2Ref, cpID), + "archive 2 should not contain generation 1 checkpoint %s", cpID) + assert.False(t, refContainsV2Checkpoint(t, bareRepo, fullCurrentRef, cpID), + "current should not contain generation 1 checkpoint %s", cpID) + } + for _, cpID := range gen2CPs { + assert.False(t, refContainsV2Checkpoint(t, bareRepo, archive1Ref, cpID), + "archive 1 should not contain generation 2 checkpoint %s", cpID) + assert.True(t, refContainsV2Checkpoint(t, bareRepo, archive2Ref, cpID), + "archive 2 should contain generation 2 checkpoint %s", cpID) + assert.False(t, refContainsV2Checkpoint(t, bareRepo, fullCurrentRef, cpID), + "current should not contain generation 2 checkpoint %s", cpID) + } + assert.False(t, refContainsV2Checkpoint(t, bareRepo, archive1Ref, currentCP), + "archive 1 should not contain current checkpoint") + assert.False(t, refContainsV2Checkpoint(t, bareRepo, archive2Ref, currentCP), + "archive 2 should not contain current checkpoint") + assert.True(t, refContainsV2Checkpoint(t, bareRepo, fullCurrentRef, currentCP), + "current should contain current checkpoint") +} + +func TestDetectRemoteRotationArchives_IncludesSameNameDifferentHash(t *testing.T) { + t.Parallel() + ctx := context.Background() + archiveRef := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + + localDir := setupRepoWithV2Ref(t) + localRepo, err := git.PlainOpen(localDir) + require.NoError(t, err) + localHash := writeV2ArchiveRef(t, localRepo, archiveRef, "local archive") + + remoteDir := setupRepoWithV2Ref(t) + remoteRepo, err := git.PlainOpen(remoteDir) + require.NoError(t, err) + remoteHash := writeV2ArchiveRef(t, remoteRepo, archiveRef, "remote archive") + require.NotEqual(t, localHash, remoteHash) + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + out, err := initCmd.CombinedOutput() + require.NoError(t, err, "git init --bare failed: %s", out) + + pushArchive := exec.CommandContext(ctx, "git", "push", bareDir, + string(archiveRef)+":"+string(archiveRef)) + pushArchive.Dir = remoteDir + out, err = pushArchive.CombinedOutput() + require.NoError(t, err, "archive push failed: %s", out) + + archives, err := detectRemoteRotationArchives(ctx, bareDir, localRepo) + require.NoError(t, err) + assert.Contains(t, archives, "0000000000001") +} + func TestPrintV2PartialPushResult(t *testing.T) { t.Parallel() @@ -446,3 +1016,156 @@ func TestFetchAndMergeRef_RotationConflict(t *testing.T) { _, err = archiveTree.Tree("11/2233445566") assert.NoError(t, err, "archived generation should contain remote checkpoint 112233445566") } + +func TestFetchAndMergeRef_RotationConflictWithFilteredFetches(t *testing.T) { + ctx := context.Background() + fullCurrentRef := plumbing.ReferenceName(paths.V2FullCurrentRefName) + archiveRefName := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + sharedCP := id.MustCheckpointID("aabbccddeeff") + remoteCP := id.MustCheckpointID("112233445566") + localOnlyCP := id.MustCheckpointID("ffeeddccbbaa") + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + require.NoError(t, initCmd.Run()) + enableFilteredFetchServingForTest(t, bareDir) + bareURL := "file://" + bareDir + + localDir := t.TempDir() + testutil.InitRepo(t, localDir) + testutil.WriteFile(t, localDir, "f.txt", "init") + testutil.GitAdd(t, localDir, "f.txt") + testutil.GitCommit(t, localDir, "init") + enableFilteredFetchesForTest(t, localDir) + + localRepo, err := git.PlainOpen(localDir) + require.NoError(t, err) + writeV2Checkpoint(t, localRepo, sharedCP, "shared-session") + + pushCurrent := exec.CommandContext(ctx, "git", "push", bareDir, + string(fullCurrentRef)+":"+string(fullCurrentRef)) + pushCurrent.Dir = localDir + out, err := pushCurrent.CombinedOutput() + require.NoError(t, err, "initial full/current push failed: %s", out) + + remoteDir := t.TempDir() + testutil.InitRepo(t, remoteDir) + testutil.WriteFile(t, remoteDir, "f.txt", "init") + testutil.GitAdd(t, remoteDir, "f.txt") + testutil.GitCommit(t, remoteDir, "init") + + fetchCurrent := exec.CommandContext(ctx, "git", "fetch", bareDir, + "+"+string(fullCurrentRef)+":"+string(fullCurrentRef)) + fetchCurrent.Dir = remoteDir + out, err = fetchCurrent.CombinedOutput() + require.NoError(t, err, "fetch full/current failed: %s", out) + + remoteRepo, err := git.PlainOpen(remoteDir) + require.NoError(t, err) + writeV2Checkpoint(t, remoteRepo, remoteCP, "remote-session") + rotateV2CurrentForTest(t, remoteRepo, archiveRefName) + + pushRotated := exec.CommandContext(ctx, "git", "push", "--force", bareDir, + string(fullCurrentRef)+":"+string(fullCurrentRef), + string(archiveRefName)+":"+string(archiveRefName)) + pushRotated.Dir = remoteDir + out, err = pushRotated.CombinedOutput() + require.NoError(t, err, "push rotated state failed: %s", out) + + writeV2Checkpoint(t, localRepo, localOnlyCP, "local-session") + + t.Chdir(localDir) + err = fetchAndMergeRef(ctx, bareURL, fullCurrentRef) + require.NoError(t, err) + + localRepo, err = git.PlainOpen(localDir) + require.NoError(t, err) + assert.True(t, refContainsV2Checkpoint(t, localRepo, archiveRefName, sharedCP)) + assert.True(t, refContainsV2Checkpoint(t, localRepo, archiveRefName, remoteCP)) + assert.True(t, refContainsV2Checkpoint(t, localRepo, archiveRefName, localOnlyCP)) + assert.False(t, refContainsV2Checkpoint(t, localRepo, fullCurrentRef, sharedCP)) + assert.False(t, refContainsV2Checkpoint(t, localRepo, fullCurrentRef, localOnlyCP)) +} + +func TestFetchAndMergeRef_RemoteRotatedMultipleTimesUsesRelatedArchive(t *testing.T) { + ctx := context.Background() + fullCurrentRef := plumbing.ReferenceName(paths.V2FullCurrentRefName) + archive1Ref := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000001") + archive2Ref := plumbing.ReferenceName(paths.V2FullRefPrefix + "0000000000002") + sharedCP := id.MustCheckpointID("aabbccddeeff") + remoteGen1CP := id.MustCheckpointID("112233445566") + remoteGen2CP := id.MustCheckpointID("223344556677") + localOnlyCP := id.MustCheckpointID("ffeeddccbbaa") + + bareDir := t.TempDir() + initCmd := exec.CommandContext(ctx, "git", "init", "--bare") + initCmd.Dir = bareDir + initCmd.Env = testutil.GitIsolatedEnv() + require.NoError(t, initCmd.Run()) + + localDir := t.TempDir() + testutil.InitRepo(t, localDir) + testutil.WriteFile(t, localDir, "f.txt", "init") + testutil.GitAdd(t, localDir, "f.txt") + testutil.GitCommit(t, localDir, "init") + localRepo, err := git.PlainOpen(localDir) + require.NoError(t, err) + writeV2Checkpoint(t, localRepo, sharedCP, "shared-session") + + pushCurrent := exec.CommandContext(ctx, "git", "push", bareDir, + string(fullCurrentRef)+":"+string(fullCurrentRef)) + pushCurrent.Dir = localDir + require.NoError(t, pushCurrent.Run()) + + remoteDir := t.TempDir() + testutil.InitRepo(t, remoteDir) + testutil.WriteFile(t, remoteDir, "f.txt", "init") + testutil.GitAdd(t, remoteDir, "f.txt") + testutil.GitCommit(t, remoteDir, "init") + fetchCurrent := exec.CommandContext(ctx, "git", "fetch", bareDir, + "+"+string(fullCurrentRef)+":"+string(fullCurrentRef)) + fetchCurrent.Dir = remoteDir + require.NoError(t, fetchCurrent.Run()) + + remoteRepo, err := git.PlainOpen(remoteDir) + require.NoError(t, err) + writeV2Checkpoint(t, remoteRepo, remoteGen1CP, "remote-gen-1") + rotateV2CurrentForTest(t, remoteRepo, archive1Ref) + writeV2Checkpoint(t, remoteRepo, remoteGen2CP, "remote-gen-2") + rotateV2CurrentForTest(t, remoteRepo, archive2Ref) + + pushRotated := exec.CommandContext(ctx, "git", "push", "--force", bareDir, + string(fullCurrentRef)+":"+string(fullCurrentRef), + string(archive1Ref)+":"+string(archive1Ref), + string(archive2Ref)+":"+string(archive2Ref)) + pushRotated.Dir = remoteDir + out, pushErr := pushRotated.CombinedOutput() + require.NoError(t, pushErr, "push rotated state failed: %s", out) + + writeV2Checkpoint(t, localRepo, localOnlyCP, "local-session") + + t.Chdir(localDir) + err = fetchAndMergeRef(ctx, bareDir, fullCurrentRef) + require.NoError(t, err) + + localRepo, err = git.PlainOpen(localDir) + require.NoError(t, err) + _, err = localRepo.Reference(archiveTmpRefName("0000000000001"), true) + require.Error(t, err, "selected archive temp ref should be removed after rotation recovery") + _, err = localRepo.Reference(archiveTmpRefName("0000000000002"), true) + require.Error(t, err, "unselected archive temp ref should be removed after rotation recovery") + + assert.True(t, refContainsV2Checkpoint(t, localRepo, archive1Ref, localOnlyCP), + "local checkpoint from the first generation should be merged into archive 1") + assert.True(t, refContainsV2Checkpoint(t, localRepo, archive1Ref, sharedCP), + "shared first-generation checkpoint should remain in archive 1") + + bareRepo, err := git.PlainOpen(bareDir) + require.NoError(t, err) + assert.False(t, refContainsV2Checkpoint(t, bareRepo, archive2Ref, localOnlyCP), + "local first-generation checkpoint must not be merged into later remote archive 2") + assert.True(t, refContainsV2Checkpoint(t, bareRepo, archive2Ref, remoteGen2CP), + "remote generation 2 checkpoint should remain in archive 2") +}