Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions ledger/complete/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,6 @@ func (l *Ledger) Trie(rootHash ledger.RootHash) (*trie.MTrie, error) {
return l.forest.GetTrie(rootHash)
}

// Checkpointer returns a checkpointer instance
func (l *Ledger) Checkpointer() (*realWAL.Checkpointer, error) {
checkpointer, err := l.wal.NewCheckpointer()
if err != nil {
return nil, fmt.Errorf("cannot create checkpointer for compactor: %w", err)
}
return checkpointer, nil
}

func (l *Ledger) MigrateAt(
state ledger.State,
migration ledger.Migration,
Expand Down
2 changes: 0 additions & 2 deletions ledger/complete/ledger_with_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ func NewLedgerWithCompactor(
logger zerolog.Logger,
pathFinderVersion uint8,
) (*LedgerWithCompactor, error) {
logger = logger.With().Str("ledger_mod", "complete").Logger()

// Create the ledger
l, err := NewLedger(diskWAL, ledgerCapacity, metrics, logger, pathFinderVersion)
if err != nil {
Expand Down
104 changes: 52 additions & 52 deletions ledger/complete/payloadless_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,58 +239,6 @@ Loop:
}
}

// processTrieUpdate writes the WAL record, tracks the active segment, hands
// the newly-built trie to the queue, and signals when enough segments have
// rolled over to checkpoint. Mirrors [Compactor.processTrieUpdate].
func (c *PayloadlessCompactor) processTrieUpdate(
update *WALPayloadlessTrieUpdate,
trieQueue *realWAL.PayloadlessTrieQueue,
activeSegmentNum int,
nextCheckpointNum int,
) (_activeSegmentNum int, checkpointNum int, checkpointTries []*payloadless.MTrie) {

segmentNum, skipped, updateErr := c.wal.RecordUpdate(update.Update)
update.ResultCh <- updateErr

defer func() {
// Receive the freshly-built trie from the ledger goroutine and stage it.
trie := <-update.TrieCh
if trie == nil {
c.logger.Error().Msg("payloadless compactor failed to get updated trie")
return
}
trieQueue.Push(trie)
}()

if activeSegmentNum == -1 {
return segmentNum, -1, nil
}

if updateErr != nil || skipped || segmentNum == activeSegmentNum {
return activeSegmentNum, -1, nil
}

// segmentNum > activeSegmentNum — a segment just rolled over.

if segmentNum != activeSegmentNum+1 {
c.logger.Error().Msgf("payloadless compactor got unexpected new segment %d, want %d", segmentNum, activeSegmentNum+1)
}

prevSegmentNum := activeSegmentNum
activeSegmentNum = segmentNum

c.logger.Info().Msgf("finish writing segment file %v, payloadless trie update writing to segment %v; checkpoint triggers at segment %v",
prevSegmentNum, activeSegmentNum, nextCheckpointNum)

if nextCheckpointNum > prevSegmentNum {
return activeSegmentNum, -1, nil
}

// nextCheckpointNum == prevSegmentNum — enough segments accumulated.
tries := trieQueue.Tries()
return activeSegmentNum, nextCheckpointNum, tries
}

// checkpoint serializes a V7 checkpoint, then prunes older V7 files per the
// retention policy, and notifies observers.
func (c *PayloadlessCompactor) checkpoint(ctx context.Context, tries []*payloadless.MTrie, checkpointNum int) error {
Expand Down Expand Up @@ -370,6 +318,58 @@ func cleanupCheckpointsV7(checkpointer *realWAL.Checkpointer, checkpointsToKeep
return nil
}

// processTrieUpdate writes the WAL record, tracks the active segment, hands
// the newly-built trie to the queue, and signals when enough segments have
// rolled over to checkpoint. Mirrors [Compactor.processTrieUpdate].
func (c *PayloadlessCompactor) processTrieUpdate(
update *WALPayloadlessTrieUpdate,
trieQueue *realWAL.PayloadlessTrieQueue,
activeSegmentNum int,
nextCheckpointNum int,
) (_activeSegmentNum int, checkpointNum int, checkpointTries []*payloadless.MTrie) {

segmentNum, skipped, updateErr := c.wal.RecordUpdate(update.Update)
update.ResultCh <- updateErr

defer func() {
// Receive the freshly-built trie from the ledger goroutine and stage it.
trie := <-update.TrieCh
if trie == nil {
c.logger.Error().Msg("payloadless compactor failed to get updated trie")
return
}
trieQueue.Push(trie)
}()

if activeSegmentNum == -1 {
return segmentNum, -1, nil
}

if updateErr != nil || skipped || segmentNum == activeSegmentNum {
return activeSegmentNum, -1, nil
}

// segmentNum > activeSegmentNum — a segment just rolled over.

if segmentNum != activeSegmentNum+1 {
c.logger.Error().Msgf("payloadless compactor got unexpected new segment %d, want %d", segmentNum, activeSegmentNum+1)
}

prevSegmentNum := activeSegmentNum
activeSegmentNum = segmentNum

c.logger.Info().Msgf("finish writing segment file %v, payloadless trie update writing to segment %v; checkpoint triggers at segment %v",
prevSegmentNum, activeSegmentNum, nextCheckpointNum)

if nextCheckpointNum > prevSegmentNum {
return activeSegmentNum, -1, nil
}

// nextCheckpointNum == prevSegmentNum — enough segments accumulated.
tries := trieQueue.Tries()
return activeSegmentNum, nextCheckpointNum, tries
}

// latestV7CheckpointNum returns the highest V7 checkpoint number on disk,
// or -1 if none exist or listing fails (with the error logged).
func latestV7CheckpointNum(checkpointer *realWAL.Checkpointer, logger zerolog.Logger) int {
Expand Down
61 changes: 61 additions & 0 deletions ledger/complete/wal/checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,67 @@ func (c *Checkpointer) LoadRootCheckpointV7() ([]*payloadless.MTrie, error) {
return OpenAndReadCheckpointV7(c.dir, fileName, c.wal.log)
}

// LoadLatestCheckpointV7 loads the most recent usable V7 (payloadless) checkpoint from
// the WAL directory and returns its tries together with the number of the loaded
// checkpoint.
//
// It tries the newest numbered V7 checkpoint first, falling back to older ones if
// a checkpoint file fails to load. This mirrors the V6 checkpoint selection in
// [DiskWAL.replay]. The returned `loadedCheckpoint` is the number of the numbered
// checkpoint that was loaded, used by callers to determine the first WAL segment
// to replay.
//
// When no numbered V7 checkpoint is usable, it falls back to the V7 root
// checkpoint (converted from the V6 root.checkpoint during bootstrap), if present.
// In that case, and when no V7 checkpoint of either kind exists, `loadedCheckpoint`
// is -1, signalling that all segments must be replayed on top of the returned
// tries (which is the empty slice when no checkpoint exists at all).
//
// No error returns are expected during normal operation.
func (c *Checkpointer) LoadLatestCheckpointV7() (tries []*payloadless.MTrie, loadedCheckpoint int, err error) {
checkpoints, err := c.CheckpointsV7()
if err != nil {
return nil, -1, fmt.Errorf("cannot list V7 checkpoints: %w", err)
}

// Try the newest V7 checkpoint first, falling back to older ones if a file
// fails to load. This mirrors the V6 checkpoint selection in [DiskWAL.replay].
for i := len(checkpoints) - 1; i >= 0; i-- {
num := checkpoints[i]
name := NumberToFilenameV7(num)
tries, err := OpenAndReadCheckpointV7(c.dir, name, c.wal.log)
if err != nil {
c.wal.log.Warn().Int("checkpoint", num).Err(err).
Msg("V7 checkpoint loading failed; falling back to older checkpoint")
continue
}
c.wal.log.Info().Int("checkpoint", num).Int("trie_count", len(tries)).
Msg("loaded V7 checkpoint")
return tries, num, nil
}

// No numbered V7 checkpoint loaded: fall back to the V7 root checkpoint, if
// present. This is the payloadless analog of the root-checkpoint branch in
// [DiskWAL.replay]; like that branch it does not advance the replay start
// (loadedCheckpoint stays -1), so all segments are replayed on top of the
// root state.
hasV7Root, err := c.HasRootCheckpointV7()
if err != nil {
return nil, -1, fmt.Errorf("cannot check for V7 root checkpoint: %w", err)
}
if hasV7Root {
tries, err := c.LoadRootCheckpointV7()
if err != nil {
return nil, -1, fmt.Errorf("failed to load V7 root checkpoint: %w", err)
}
c.wal.log.Info().Int("trie_count", len(tries)).
Msg("loaded V7 root checkpoint")
return tries, -1, nil
}

return nil, -1, nil
}

func (c *Checkpointer) HasRootCheckpoint() (bool, error) {
return HasRootCheckpoint(c.dir)
}
Expand Down
121 changes: 121 additions & 0 deletions ledger/complete/wal/payloadless_replay_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package wal

import (
"os"
"path"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/complete/mtrie"
"github.com/onflow/flow-go/ledger/complete/payloadless"
"github.com/onflow/flow-go/model/bootstrap"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/unittest"
)

// TestReplayOnPayloadlessForest_IgnoresV6RootCheckpoint is a regression test for
// the case where a payloadless node boots with both a V7 root checkpoint (the
// real seed) and a V6 root.checkpoint present in the trie dir. The forest must
// be seeded from the V7 checkpoint, and the V6 root.checkpoint must NOT be read.
//
// To prove the V6 file is never touched, a corrupt root.checkpoint is placed
// alongside the V7 checkpoint: the previous implementation routed payloadless
// segment replay through [DiskWAL.replay], which falls back to loading the V6
// root checkpoint when replaying from segment 0 — that fallback would fail on
// the corrupt file. With the fix, the V6 file is ignored and replay succeeds.
func TestReplayOnPayloadlessForest_IgnoresV6RootCheckpoint(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
logger := zerolog.Nop()

// Build a V7 root checkpoint from a simple trie and write it as the
// payloadless root checkpoint (root.checkpoint.v7).
v6Tries := createSimpleTrie(t)
rootHash := v6Tries[0].RootHash()
v7Tries, err := FromV6Tries(v6Tries)
require.NoError(t, err)
require.NoError(t, StoreCheckpointV7Concurrently(v7Tries, dir, RootCheckpointFilenameV7(), logger))

// Place a corrupt V6 root checkpoint next to the V7 one. If the
// payloadless replay path attempts to load it, the load fails — which is
// exactly the regression this test guards against.
junkPath := path.Join(dir, bootstrap.FilenameWALRootCheckpoint)
require.NoError(t, os.WriteFile(junkPath, []byte("not a valid v6 checkpoint"), 0644))

w, err := NewDiskWAL(logger, nil, metrics.NewNoopCollector(), dir, 10, pathByteSize, segmentSize)
require.NoError(t, err)
defer func() { <-w.Done() }()

forest, err := payloadless.NewForest(100, &metrics.NoopCollector{}, nil)
require.NoError(t, err)

err = w.ReplayOnPayloadlessForest(forest)
require.NoError(t, err, "replay must seed from V7 and must not load the V6 root checkpoint")

require.True(t, forest.HasTrie(rootHash), "forest must be seeded from the V7 root checkpoint")
})
}

// TestReplayOnPayloadlessForest_ReplaysWALSegments verifies that after seeding
// the forest from the V7 root checkpoint, WAL segment records that are newer
// than the checkpoint are still replayed onto the payloadless forest. This
// guards against the segment-replay refactor accidentally skipping segments.
func TestReplayOnPayloadlessForest_ReplaysWALSegments(t *testing.T) {
unittest.RunWithTempDir(t, func(dir string) {
logger := zerolog.Nop()

// Seed state: a full forest with an initial update, captured as the V7
// root checkpoint.
fullForest, err := mtrie.NewForest(100, &metrics.NoopCollector{}, nil)
require.NoError(t, err)

paths0, payloads0 := randNPathPayloads(10)
seed := &ledger.TrieUpdate{
RootHash: fullForest.GetEmptyRootHash(),
Paths: paths0,
Payloads: toPayloadPtrs(payloads0),
}
root0, err := fullForest.Update(seed)
require.NoError(t, err)

v6Tries, err := fullForest.GetTries()
require.NoError(t, err)
v7Tries, err := FromV6Tries(v6Tries)
require.NoError(t, err)
require.NoError(t, StoreCheckpointV7Concurrently(v7Tries, dir, RootCheckpointFilenameV7(), logger))

// A second update, built on root0, recorded into the WAL but NOT in the
// checkpoint. Replay must apply it to reach root1.
paths1, payloads1 := randNPathPayloads(10)
update1 := &ledger.TrieUpdate{
RootHash: root0,
Paths: paths1,
Payloads: toPayloadPtrs(payloads1),
}
root1, err := fullForest.Update(update1)
require.NoError(t, err)

// Record update1 into the WAL, then close to flush the segment to disk.
recordWAL, err := NewDiskWAL(logger, nil, metrics.NewNoopCollector(), dir, 10, pathByteSize, segmentSize)
require.NoError(t, err)
_, _, err = recordWAL.RecordUpdate(update1)
require.NoError(t, err)
<-recordWAL.Done()

// Replay on a fresh WAL: seed from V7 (root0), then replay the WAL
// segment carrying update1 to reach root1.
w, err := NewDiskWAL(logger, nil, metrics.NewNoopCollector(), dir, 10, pathByteSize, segmentSize)
require.NoError(t, err)
defer func() { <-w.Done() }()

forest, err := payloadless.NewForest(100, &metrics.NoopCollector{}, nil)
require.NoError(t, err)

require.NoError(t, w.ReplayOnPayloadlessForest(forest))

require.True(t, forest.HasTrie(root0), "forest must contain the V7 checkpoint root")
require.True(t, forest.HasTrie(root1), "forest must contain the root produced by replaying the WAL segment")
})
}
Loading