Skip to content
Merged
5 changes: 5 additions & 0 deletions cmd/entire/cli/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ type SessionContent struct {
// Transcript is the session transcript content
Transcript []byte

// TranscriptBlobHashes are the stored raw transcript blob hashes in chunk
// order. Callers that rewrite the same transcript under a different path can
// reuse these content-addressed blobs instead of storing duplicate blobs.
TranscriptBlobHashes []plumbing.Hash

// Prompts contains user prompts from this session
Prompts string
}
Expand Down
49 changes: 49 additions & 0 deletions cmd/entire/cli/checkpoint/committed.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,7 @@ func (s *GitStore) ReadSessionContent(ctx context.Context, checkpointID id.Check
// Read transcript (auto-fetches blobs if needed)
if transcript, transcriptErr := readTranscriptFromTree(ctx, sessionTree, agentType); transcriptErr == nil && transcript != nil {
result.Transcript = transcript
result.TranscriptBlobHashes = transcriptBlobHashesFromTreeEntries(sessionTree.RawEntries())
}

// Read prompts (auto-fetches blob if needed)
Expand Down Expand Up @@ -1981,6 +1982,54 @@ func readTranscriptFromTree(ctx context.Context, tree *FetchingTree, agentType t
return nil, nil
}

func transcriptBlobHashesFromTreeEntries(entries []object.TreeEntry) []plumbing.Hash {
hashesByName := make(map[string]plumbing.Hash)
var chunkFiles []string
var baseHash plumbing.Hash
var legacyHash plumbing.Hash
hasBaseFile := false
hasLegacyFile := false

for _, entry := range entries {
if !entry.Mode.IsFile() {
continue
}
switch {
case entry.Name == paths.TranscriptFileName:
hasBaseFile = true
baseHash = entry.Hash
hashesByName[entry.Name] = entry.Hash
case entry.Name == paths.TranscriptFileNameLegacy:
hasLegacyFile = true
legacyHash = entry.Hash
case strings.HasPrefix(entry.Name, paths.TranscriptFileName+"."):
if idx := agent.ParseChunkIndex(entry.Name, paths.TranscriptFileName); idx > 0 {
chunkFiles = append(chunkFiles, entry.Name)
hashesByName[entry.Name] = entry.Hash
}
}
}

if len(chunkFiles) > 0 {
chunkFiles = agent.SortChunkFiles(chunkFiles, paths.TranscriptFileName)
hashes := make([]plumbing.Hash, 0, len(chunkFiles)+1)
if hasBaseFile {
hashes = append(hashes, baseHash)
}
for _, chunkFile := range chunkFiles {
hashes = append(hashes, hashesByName[chunkFile])
}
return hashes
}
if hasBaseFile {
return []plumbing.Hash{baseHash}
}
if hasLegacyFile {
return []plumbing.Hash{legacyHash}
}
return nil
}

// Author contains author information for a checkpoint.
type Author struct {
Name string
Expand Down
279 changes: 279 additions & 0 deletions cmd/entire/cli/checkpoint/v2_committed.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,285 @@ func (s *V2GitStore) WriteCommittedWithSessionIndex(ctx context.Context, opts Wr
return sessionIndex, nil
}

// WriteCommittedMainBatch writes /main entries for every (checkpoint, session)
// pair in batch using a single commit and a single ref CAS. The /full ref is
// left untouched — callers handle full-transcript artifacts via the existing
// pack flow. Returns session indexes parallel to batch.
//
// Matches per-session writeCommittedMain semantics within each checkpoint
// group: existing-SessionID dedupe, slot-0 refuse-overwrite, last non-nil
// combinedAttribution, and sticky HasReview.
func (s *V2GitStore) WriteCommittedMainBatch(ctx context.Context, batch []WriteCommittedOptions) ([]int, error) {
if len(batch) == 0 {
return nil, nil
}
for i, opts := range batch {
if err := validateWriteOpts(opts); err != nil {
return nil, fmt.Errorf("batch entry %d: %w", i, err)
}
}

refName := plumbing.ReferenceName(paths.V2MainRefName)
if err := s.ensureRef(ctx, refName); err != nil {
return nil, fmt.Errorf("failed to ensure /main ref: %w", err)
}
parentHash, rootTreeHash, err := s.GetRefState(refName)
if err != nil {
return nil, err
}

type cpGroup struct {
cpID id.CheckpointID
ordinals []int
}
groups := []*cpGroup{}
byID := map[id.CheckpointID]*cpGroup{}
for i, opts := range batch {
g, ok := byID[opts.CheckpointID]
if !ok {
g = &cpGroup{cpID: opts.CheckpointID}
byID[opts.CheckpointID] = g
groups = append(groups, g)
}
g.ordinals = append(g.ordinals, i)
Comment thread
computermode marked this conversation as resolved.
}

sessionIndexes := make([]int, len(batch))
existingCheckpoints, err := s.existingMainCheckpointIDs(rootTreeHash)
if err != nil {
return nil, err
}
changes := make([]TreeChange, 0, len(groups))
for _, g := range groups {
var checkpointTreeHash plumbing.Hash
if _, exists := existingCheckpoints[g.cpID]; exists {
checkpointTreeHash, err = s.buildMainBatchGroupTree(ctx, rootTreeHash, g.cpID, g.ordinals, batch, sessionIndexes)
} else {
checkpointTreeHash, err = s.buildFreshMainBatchGroupTree(ctx, g.cpID, g.ordinals, batch, sessionIndexes)
}
if err != nil {
return nil, err
}
changes = append(changes, TreeChange{
Path: g.cpID.Path(),
Entry: &object.TreeEntry{
Mode: filemode.Dir,
Hash: checkpointTreeHash,
},
})
}
if len(changes) > 0 {
rootTreeHash, err = ApplyTreeChanges(ctx, s.repo, rootTreeHash, changes)
if err != nil {
return nil, fmt.Errorf("failed to apply batched /main checkpoint trees: %w", err)
}
}

// One commit, one ref update for the entire batch.
commitMsg := fmt.Sprintf("Migrate batch: %d checkpoint(s), %d session(s)\n", len(groups), len(batch))
last := batch[len(batch)-1]
authorName, authorEmail := last.AuthorName, last.AuthorEmail
if authorName == "" || authorEmail == "" {
fallbackName, fallbackEmail := GetGitAuthorFromRepo(s.repo)
if authorName == "" {
authorName = fallbackName
}
if authorEmail == "" {
authorEmail = fallbackEmail
}
}
if err := s.updateRef(ctx, refName, rootTreeHash, parentHash, commitMsg, authorName, authorEmail); err != nil {
return nil, err
}
return sessionIndexes, nil
}

func (s *V2GitStore) existingMainCheckpointIDs(rootTreeHash plumbing.Hash) (map[id.CheckpointID]struct{}, error) {
existing := make(map[id.CheckpointID]struct{})
if rootTreeHash == plumbing.ZeroHash {
return existing, nil
}
rootTree, err := s.repo.TreeObject(rootTreeHash)
if err != nil {
return nil, fmt.Errorf("failed to read /main root tree: %w", err)
}
if err := WalkCheckpointShards(s.repo, rootTree, func(cpID id.CheckpointID, _ plumbing.Hash) error {
existing[cpID] = struct{}{}
return nil
}); err != nil {
return nil, fmt.Errorf("failed to walk existing /main checkpoints: %w", err)
}
return existing, nil
}

func (s *V2GitStore) buildFreshMainBatchGroupTree(ctx context.Context, cpID id.CheckpointID, ordinals []int, batch []WriteCommittedOptions, sessionIndexes []int) (plumbing.Hash, error) {
basePath := cpID.Path() + "/"
entries := make(map[string]object.TreeEntry)
sessions := make([]SessionFilePaths, len(ordinals))

for sessionIndex, ordinal := range ordinals {
opts := batch[ordinal]
sessionPath := fmt.Sprintf("%s%d/", basePath, sessionIndex)
filePaths, err := s.writeMainSessionToSubdirectory(opts, sessionPath, entries)
if err != nil {
return plumbing.ZeroHash, err
}
sessions[sessionIndex] = filePaths
Comment thread
computermode marked this conversation as resolved.
sessionIndexes[ordinal] = sessionIndex
}

lastOpts := batch[ordinals[len(ordinals)-1]]
if err := s.writeFreshMainBatchCheckpointSummary(lastOpts, basePath, entries, sessions, ordinals, batch); err != nil {
return plumbing.ZeroHash, err
}
return s.buildCheckpointSubtree(ctx, basePath, entries)
}

func (s *V2GitStore) writeFreshMainBatchCheckpointSummary(lastOpts WriteCommittedOptions, basePath string, entries map[string]object.TreeEntry, sessions []SessionFilePaths, ordinals []int, batch []WriteCommittedOptions) error {
var checkpointsCount int
var filesTouched []string
var tokenUsage *agent.TokenUsage
var combinedAttribution *InitialAttribution
var hasReview bool
for _, ordinal := range ordinals {
opts := batch[ordinal]
checkpointsCount += opts.CheckpointsCount
filesTouched = mergeFilesTouched(filesTouched, opts.FilesTouched)
tokenUsage = aggregateTokenUsage(tokenUsage, opts.TokenUsage)
if opts.CombinedAttribution != nil {
combinedAttribution = opts.CombinedAttribution
}
hasReview = hasReview || opts.HasReview
}

summary := CheckpointSummary{
CheckpointID: lastOpts.CheckpointID,
CLIVersion: versioninfo.Version,
Strategy: lastOpts.Strategy,
Branch: lastOpts.Branch,
CheckpointsCount: checkpointsCount,
FilesTouched: filesTouched,
Sessions: sessions,
TokenUsage: tokenUsage,
CombinedAttribution: combinedAttribution,
HasReview: hasReview,
}

metadataJSON, err := jsonutil.MarshalIndentWithNewline(summary, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal checkpoint summary: %w", err)
}
metadataHash, err := CreateBlobFromContent(s.repo, metadataJSON)
if err != nil {
return err
}
entries[basePath+paths.MetadataFileName] = object.TreeEntry{
Name: basePath + paths.MetadataFileName,
Mode: filemode.Regular,
Hash: metadataHash,
}
return nil
}

// buildMainBatchGroupTree writes every session in one checkpoint group into an
// isolated checkpoint subtree. WriteCommittedMainBatch splices all returned
// checkpoint trees into /main in one pass, instead of rewriting the root and
// shard trees once per checkpoint.
func (s *V2GitStore) buildMainBatchGroupTree(ctx context.Context, rootTreeHash plumbing.Hash, cpID id.CheckpointID, ordinals []int, batch []WriteCommittedOptions, sessionIndexes []int) (plumbing.Hash, error) {
basePath := cpID.Path() + "/"
checkpointPath := cpID.Path()

entries, err := s.gs.flattenCheckpointEntries(rootTreeHash, checkpointPath)
if err != nil {
return plumbing.ZeroHash, err
}

var existingSummary *CheckpointSummary
if entry, exists := entries[basePath+paths.MetadataFileName]; exists {
if existing, readErr := readJSONFromBlob[CheckpointSummary](s.repo, entry.Hash); readErr == nil {
existingSummary = existing
}
}

// Track the sessions slice as we write so writeCheckpointSummary at the
// end of the group sees a complete picture and findSessionIndex on later
// entries in the group can dedupe against earlier ones.
var sessions []SessionFilePaths
if existingSummary != nil {
sessions = make([]SessionFilePaths, len(existingSummary.Sessions))
copy(sessions, existingSummary.Sessions)
}

for _, ordinal := range ordinals {
opts := batch[ordinal]

// findSessionIndex needs a summary whose Sessions length reflects
// what we've written so far in this group.
runningSummary := existingSummary
if len(sessions) > 0 {
runningSummary = &CheckpointSummary{Sessions: sessions}
}
sessionIndex := s.gs.findSessionIndex(ctx, basePath, runningSummary, entries, opts.SessionID)

if sessionIndex == 0 {
if entry, exists := entries[fmt.Sprintf("%s0/%s", basePath, paths.MetadataFileName)]; exists {
if existingMeta, readErr := s.gs.readMetadataFromBlob(entry.Hash); readErr == nil && existingMeta.SessionID != opts.SessionID {
logging.Error(ctx, "refusing v2 checkpoint write: session 0 holds a different sessionID",
slog.String("checkpoint_id", opts.CheckpointID.String()),
slog.String("existing_session_id", existingMeta.SessionID),
slog.String("write_session_id", opts.SessionID),
slog.Bool("existing_summary_nil", existingSummary == nil))
return plumbing.ZeroHash, fmt.Errorf(
"refusing to overwrite session 0 of checkpoint %s: existing session ID %q differs from write session ID %q. The v2 checkpoint tree is inconsistent (session 0 belongs to a different session than this write claims). No automated repair exists for this shape — please report it along with the output of `git ls-tree %s %s/`",
opts.CheckpointID, existingMeta.SessionID, opts.SessionID, paths.V2MainRefName, opts.CheckpointID.Path(),
)
}
}
}

sessionPath := fmt.Sprintf("%s%d/", basePath, sessionIndex)
filePaths, err := s.writeMainSessionToSubdirectory(opts, sessionPath, entries)
if err != nil {
return plumbing.ZeroHash, err
}

if sessionIndex >= len(sessions) {
grown := make([]SessionFilePaths, sessionIndex+1)
copy(grown, sessions)
sessions = grown
}
sessions[sessionIndex] = filePaths
sessionIndexes[ordinal] = sessionIndex
}

// Last write wins for combinedAttribution / HasReview, matching the
// behavior of N sequential writeCommittedMain calls where each rewrites
// the summary blob.
lastOpts := batch[ordinals[len(ordinals)-1]]
if err := s.gs.writeCheckpointSummary(lastOpts, basePath, entries, sessions); err != nil {
return plumbing.ZeroHash, err
}

return s.buildCheckpointSubtree(ctx, basePath, entries)
}

func (s *V2GitStore) buildCheckpointSubtree(ctx context.Context, basePath string, entries map[string]object.TreeEntry) (plumbing.Hash, error) {
relEntries := make(map[string]object.TreeEntry, len(entries))
for path, entry := range entries {
relPath := strings.TrimPrefix(path, basePath)
if relPath == path {
continue
}
relEntries[relPath] = entry
}

checkpointTreeHash, err := BuildTreeFromEntries(ctx, s.repo, relEntries)
if err != nil {
return plumbing.ZeroHash, fmt.Errorf("failed to build checkpoint subtree: %w", err)
}
return checkpointTreeHash, nil
}

// UpdateCommitted replaces the prompts and/or transcript for an existing v2
// checkpoint. Called at stop time to finalize checkpoints with the complete
// session transcript.
Expand Down
Loading
Loading