Skip to content

Commit 4fae3ed

Browse files
authored
Merge pull request #1114 from entireio/more-migration-speedups
Checkpoints v2: Fix migration rerun logic (speedups, avoid duplicated migration efforts)
2 parents 95f9d61 + 788d33e commit 4fae3ed

9 files changed

Lines changed: 962 additions & 874 deletions

File tree

cmd/entire/cli/checkpoint/v2_committed.go

Lines changed: 111 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"log/slog"
1010
"os"
11+
"strconv"
1112
"strings"
1213

1314
"github.com/entireio/cli/cmd/entire/cli/agent"
@@ -135,6 +136,116 @@ func (s *V2GitStore) findFullSessionArtifacts(checkpointID id.CheckpointID, sess
135136
return fullSessionArtifacts{}, nil
136137
}
137138

139+
// FullSessionArtifactsIndex answers "does this session have complete /full/*
140+
// artifacts?" with an O(1) map lookup. Build it once via
141+
// BuildFullSessionArtifactsIndex.
142+
type FullSessionArtifactsIndex map[string]struct{}
143+
144+
// Has reports whether the given session has a complete pair of
145+
// raw_transcript and raw_transcript_hash.txt entries in some /full/* ref.
146+
func (idx FullSessionArtifactsIndex) Has(checkpointID id.CheckpointID, sessionIndex int) bool {
147+
if idx == nil {
148+
return false
149+
}
150+
_, ok := idx[fullArtifactsIndexKey(checkpointID, sessionIndex)]
151+
return ok
152+
}
153+
154+
func fullArtifactsIndexKey(checkpointID id.CheckpointID, sessionIndex int) string {
155+
return string(checkpointID) + "/" + strconv.Itoa(sessionIndex)
156+
}
157+
158+
// BuildFullSessionArtifactsIndex walks every /full/* ref's tree once and
159+
// records sessions whose subtree contains both raw_transcript[/.NNN] and
160+
// raw_transcript_hash.txt. Amortizes per-session HasFullSessionArtifacts
161+
// calls — each of which would otherwise list every git ref and re-walk every
162+
// /full/* tree — across the rest of the run.
163+
func (s *V2GitStore) BuildFullSessionArtifactsIndex() (FullSessionArtifactsIndex, error) {
164+
refNames, err := s.fullRefSearchOrder()
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
index := make(FullSessionArtifactsIndex)
170+
for _, refName := range refNames {
171+
_, rootTreeHash, refErr := s.GetRefState(refName)
172+
if refErr != nil {
173+
if errors.Is(refErr, plumbing.ErrReferenceNotFound) {
174+
continue
175+
}
176+
return nil, fmt.Errorf("read %s: %w", refName, refErr)
177+
}
178+
rootTree, treeErr := s.repo.TreeObject(rootTreeHash)
179+
if treeErr != nil {
180+
return nil, fmt.Errorf("read %s root tree: %w", refName, treeErr)
181+
}
182+
keys, err := s.listFullSessionsInTree(rootTree)
183+
if err != nil {
184+
return nil, fmt.Errorf("walk %s: %w", refName, err)
185+
}
186+
for _, key := range keys {
187+
index[key] = struct{}{}
188+
}
189+
}
190+
return index, nil
191+
}
192+
193+
func (s *V2GitStore) listFullSessionsInTree(rootTree *object.Tree) ([]string, error) {
194+
var keys []string
195+
for _, shardEntry := range rootTree.Entries {
196+
if shardEntry.Mode != filemode.Dir || len(shardEntry.Name) != 2 {
197+
continue
198+
}
199+
shardTree, err := s.repo.TreeObject(shardEntry.Hash)
200+
if err != nil {
201+
return nil, fmt.Errorf("read shard %s: %w", shardEntry.Name, err)
202+
}
203+
for _, cpEntry := range shardTree.Entries {
204+
if cpEntry.Mode != filemode.Dir {
205+
continue
206+
}
207+
cpTree, err := s.repo.TreeObject(cpEntry.Hash)
208+
if err != nil {
209+
return nil, fmt.Errorf("read checkpoint tree %s/%s: %w", shardEntry.Name, cpEntry.Name, err)
210+
}
211+
cpid := id.CheckpointID(shardEntry.Name + cpEntry.Name)
212+
for _, sessionEntry := range cpTree.Entries {
213+
if sessionEntry.Mode != filemode.Dir {
214+
continue
215+
}
216+
sessionIdx, atoiErr := strconv.Atoi(sessionEntry.Name)
217+
if atoiErr != nil {
218+
continue
219+
}
220+
sessionTree, err := s.repo.TreeObject(sessionEntry.Hash)
221+
if err != nil {
222+
return nil, fmt.Errorf("read session tree %s/%s/%d: %w", shardEntry.Name, cpEntry.Name, sessionIdx, err)
223+
}
224+
if !sessionHasCompleteFullArtifacts(sessionTree.Entries) {
225+
continue
226+
}
227+
keys = append(keys, fullArtifactsIndexKey(cpid, sessionIdx))
228+
}
229+
}
230+
}
231+
return keys, nil
232+
}
233+
234+
func sessionHasCompleteFullArtifacts(entries []object.TreeEntry) bool {
235+
hasTranscript := false
236+
hasHash := false
237+
for _, entry := range entries {
238+
switch {
239+
case entry.Name == paths.V2RawTranscriptFileName,
240+
strings.HasPrefix(entry.Name, paths.V2RawTranscriptFileName+"."):
241+
hasTranscript = true
242+
case entry.Name == paths.V2RawTranscriptHashFileName:
243+
hasHash = true
244+
}
245+
}
246+
return hasTranscript && hasHash
247+
}
248+
138249
func (s *V2GitStore) fullRefSearchOrder() ([]plumbing.ReferenceName, error) {
139250
refNames := []plumbing.ReferenceName{plumbing.ReferenceName(paths.V2FullCurrentRefName)}
140251

@@ -861,57 +972,3 @@ func (s *V2GitStore) UpdateSummary(ctx context.Context, checkpointID id.Checkpoi
861972
commitMsg := fmt.Sprintf("Update summary for checkpoint %s (session: %s)", checkpointID, metadata.SessionID)
862973
return s.updateRef(ctx, refName, newTreeHash, parentHash, commitMsg, authorName, authorEmail)
863974
}
864-
865-
// CleanupV1TranscriptFiles removes legacy v1-named transcript files (full.jsonl,
866-
// full.jsonl.*, content_hash.txt) from /full/current for a given checkpoint.
867-
// Older CLI versions wrote these before the rename to raw_transcript.
868-
// Returns nil if /full/current doesn't exist or no v1 files were found.
869-
func (s *V2GitStore) CleanupV1TranscriptFiles(ctx context.Context, checkpointID id.CheckpointID, sessionCount int) error {
870-
refName := plumbing.ReferenceName(paths.V2FullCurrentRefName)
871-
parentHash, rootTreeHash, err := s.GetRefState(refName)
872-
if err != nil {
873-
if errors.Is(err, plumbing.ErrReferenceNotFound) {
874-
return nil // /full/current doesn't exist yet — nothing to clean
875-
}
876-
return err
877-
}
878-
879-
checkpointPath := checkpointID.Path()
880-
basePath := checkpointPath + "/"
881-
882-
entries, err := s.gs.flattenCheckpointEntries(rootTreeHash, checkpointPath)
883-
if err != nil {
884-
return err
885-
}
886-
887-
changed := false
888-
for sessionIdx := range sessionCount {
889-
sessionPath := fmt.Sprintf("%s%d/", basePath, sessionIdx)
890-
v1TranscriptPath := sessionPath + paths.TranscriptFileName
891-
v1HashPath := sessionPath + paths.ContentHashFileName
892-
893-
for key := range entries {
894-
switch {
895-
case key == v1TranscriptPath,
896-
strings.HasPrefix(key, v1TranscriptPath+"."),
897-
key == v1HashPath:
898-
delete(entries, key)
899-
changed = true
900-
}
901-
}
902-
}
903-
904-
if !changed {
905-
return nil
906-
}
907-
908-
newTreeHash, err := s.gs.spliceCheckpointSubtree(ctx, rootTreeHash, checkpointID, basePath, entries)
909-
if err != nil {
910-
return fmt.Errorf("tree surgery failed: %w", err)
911-
}
912-
913-
authorName, authorEmail := GetGitAuthorFromRepo(s.repo)
914-
return s.updateRef(ctx, refName, newTreeHash, parentHash,
915-
fmt.Sprintf("Clean up v1 transcript files for %s\n", checkpointID),
916-
authorName, authorEmail)
917-
}

cmd/entire/cli/checkpoint/v2_generation.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,23 @@ func checkpointTimestampRangeFromFullTree(cpTree *object.Tree) (GenerationMetada
333333
return gen, found
334334
}
335335

336+
// AggregateTranscriptTimestamps derives a generation timestamp envelope from
337+
// transcripts already in memory, using the same first/last-event semantics
338+
// as ComputeGenerationTimestampsFromTrees but skipping the blob reads.
339+
func AggregateTranscriptTimestamps(transcripts [][]byte) (GenerationMetadata, bool) {
340+
var gen GenerationMetadata
341+
found := false
342+
for _, transcript := range transcripts {
343+
if len(transcript) == 0 {
344+
continue
345+
}
346+
if r, ok := timestampRangeFromTranscript(transcript); ok {
347+
mergeGenerationRange(&gen, &found, r)
348+
}
349+
}
350+
return gen, found
351+
}
352+
336353
func timestampRangeFromTranscript(transcript []byte) (GenerationMetadata, bool) {
337354
reader := bufio.NewReader(bytes.NewReader(transcript))
338355
var gen GenerationMetadata

cmd/entire/cli/checkpoint/v2_read.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,53 @@ func (s *V2GitStore) ReadSessionCompactTranscript(ctx context.Context, checkpoin
183183
return []byte(content), nil
184184
}
185185

186+
// ReadSessionMetadata reads only the session's metadata.json from the v2
187+
// /main ref — cheaper than ReadSessionMetadataAndPrompts when the caller
188+
// only needs metadata fields (e.g. SessionID). Returns ErrCheckpointNotFound
189+
// if the checkpoint or session doesn't exist on /main.
190+
func (s *V2GitStore) ReadSessionMetadata(ctx context.Context, checkpointID id.CheckpointID, sessionIndex int) (CommittedMetadata, error) {
191+
if err := ctx.Err(); err != nil {
192+
return CommittedMetadata{}, err //nolint:wrapcheck // Propagating context cancellation
193+
}
194+
195+
refName := plumbing.ReferenceName(paths.V2MainRefName)
196+
_, rootTreeHash, err := s.GetRefState(refName)
197+
if err != nil {
198+
return CommittedMetadata{}, ErrCheckpointNotFound
199+
}
200+
201+
rootTree, err := s.repo.TreeObject(rootTreeHash)
202+
if err != nil {
203+
return CommittedMetadata{}, ErrCheckpointNotFound
204+
}
205+
206+
cpTree, err := rootTree.Tree(checkpointID.Path())
207+
if err != nil {
208+
return CommittedMetadata{}, ErrCheckpointNotFound
209+
}
210+
211+
sessionTree, err := cpTree.Tree(strconv.Itoa(sessionIndex))
212+
if err != nil {
213+
return CommittedMetadata{}, ErrCheckpointNotFound
214+
}
215+
216+
sessionFT := s.wrapWithFetcher(ctx, sessionTree)
217+
metadataFile, err := sessionFT.File(paths.MetadataFileName)
218+
if err != nil {
219+
return CommittedMetadata{}, fmt.Errorf("read session metadata file: %w", err)
220+
}
221+
content, err := metadataFile.Contents()
222+
if err != nil {
223+
return CommittedMetadata{}, fmt.Errorf("read session metadata contents: %w", err)
224+
}
225+
226+
var meta CommittedMetadata
227+
if err := json.Unmarshal([]byte(content), &meta); err != nil {
228+
return CommittedMetadata{}, fmt.Errorf("parse session metadata: %w", err)
229+
}
230+
return meta, nil
231+
}
232+
186233
// ReadSessionMetadataAndPrompts reads a session's metadata and prompts from the
187234
// v2 /main ref without requiring the raw transcript from /full/* refs.
188235
// Used by explain when the raw transcript is unavailable but compact transcript

cmd/entire/cli/checkpoint/v2_read_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package checkpoint
22

33
import (
44
"context"
5+
"os"
6+
"path/filepath"
57
"testing"
68

79
"github.com/entireio/cli/cmd/entire/cli/checkpoint/id"
@@ -10,6 +12,7 @@ import (
1012
"github.com/stretchr/testify/assert"
1113
"github.com/stretchr/testify/require"
1214

15+
"github.com/go-git/go-git/v6"
1316
"github.com/go-git/go-git/v6/plumbing"
1417
"github.com/go-git/go-git/v6/plumbing/filemode"
1518
"github.com/go-git/go-git/v6/plumbing/object"
@@ -166,6 +169,75 @@ func TestV2ReadSessionMetadataAndPrompts_ReturnsWithoutTranscript(t *testing.T)
166169
assert.Empty(t, content.Transcript)
167170
}
168171

172+
func TestV2ReadSessionMetadata_ReturnsMetadata(t *testing.T) {
173+
t.Parallel()
174+
repo := initTestRepo(t)
175+
store := NewV2GitStore(repo, "origin")
176+
cpID := id.MustCheckpointID("f1f2f3f4f5fa")
177+
ctx := context.Background()
178+
179+
err := store.WriteCommitted(ctx, WriteCommittedOptions{
180+
CheckpointID: cpID,
181+
SessionID: "session-metadata-only",
182+
Strategy: "manual-commit",
183+
Prompts: []string{"test prompt"},
184+
AuthorName: "Test",
185+
AuthorEmail: "test@test.com",
186+
})
187+
require.NoError(t, err)
188+
189+
meta, err := store.ReadSessionMetadata(ctx, cpID, 0)
190+
require.NoError(t, err)
191+
assert.Equal(t, "session-metadata-only", meta.SessionID)
192+
}
193+
194+
func TestV2ReadSessionMetadata_FetchesMissingMetadataBlob(t *testing.T) {
195+
t.Parallel()
196+
repo := initTestRepo(t)
197+
store := NewV2GitStore(repo, "origin")
198+
cpID := id.MustCheckpointID("f1f2f3f4f5fb")
199+
ctx := context.Background()
200+
201+
err := store.WriteCommitted(ctx, WriteCommittedOptions{
202+
CheckpointID: cpID,
203+
SessionID: "session-fetch-metadata",
204+
Strategy: "manual-commit",
205+
Prompts: []string{"test prompt"},
206+
AuthorName: "Test",
207+
AuthorEmail: "test@test.com",
208+
})
209+
require.NoError(t, err)
210+
211+
wt, err := repo.Worktree()
212+
require.NoError(t, err)
213+
repoRoot := wt.Filesystem.Root()
214+
mainTree := v2MainTree(t, repo)
215+
sessionTree, err := mainTree.Tree(cpID.Path() + "/0")
216+
require.NoError(t, err)
217+
metadataEntry, err := sessionTree.FindEntry(paths.MetadataFileName)
218+
require.NoError(t, err)
219+
metadataContent := v2ReadFile(t, mainTree, cpID.Path()+"/0/"+paths.MetadataFileName)
220+
221+
metadataObjectPath := filepath.Join(repoRoot, ".git", "objects", metadataEntry.Hash.String()[:2], metadataEntry.Hash.String()[2:])
222+
require.NoError(t, os.Remove(metadataObjectPath))
223+
224+
reopenedRepo, err := git.PlainOpen(repoRoot)
225+
require.NoError(t, err)
226+
reopenedStore := NewV2GitStore(reopenedRepo, "origin")
227+
fetchCalled := false
228+
reopenedStore.SetBlobFetcher(func(_ context.Context, hashes []plumbing.Hash) error {
229+
fetchCalled = true
230+
require.Equal(t, []plumbing.Hash{metadataEntry.Hash}, hashes)
231+
_, createErr := CreateBlobFromContent(reopenedRepo, []byte(metadataContent))
232+
return createErr
233+
})
234+
235+
meta, err := reopenedStore.ReadSessionMetadata(ctx, cpID, 0)
236+
require.NoError(t, err)
237+
assert.True(t, fetchCalled)
238+
assert.Equal(t, "session-fetch-metadata", meta.SessionID)
239+
}
240+
169241
func TestV2ReadSessionMetadataAndPrompts_MissingCheckpoint(t *testing.T) {
170242
t.Parallel()
171243
repo := initTestRepo(t)

0 commit comments

Comments
 (0)