Skip to content

Commit

Permalink
feat: extract TraversalResumerPathState interface and allow it to be …
Browse files Browse the repository at this point in the history
…shared across traversals
  • Loading branch information
rvagg committed Aug 12, 2022
1 parent e9c7d8a commit d2e5313
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 33 deletions.
2 changes: 2 additions & 0 deletions v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/multiformats/go-multicodec"

"github.com/ipld/go-car/v2/internal/carv1"
resumetraversal "github.com/ipld/go-car/v2/traversal"
)

// DefaultMaxIndexCidSize specifies the maximum size in byptes accepted as a section CID by CARv2 index.
Expand Down Expand Up @@ -62,6 +63,7 @@ type Options struct {
TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser
DataPayloadSize uint64
SkipOffset uint64
TraversalResumerPathState resumetraversal.TraversalResumerPathState

MaxAllowedHeaderSize uint64
MaxAllowedSectionSize uint64
Expand Down
20 changes: 19 additions & 1 deletion v2/selective.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ func WithDataPayloadSize(size uint64) Option {
}
}

// WithTraversalResumerPathState provides a custom TraversalResumerPathState
// that can be reused between selective CAR creations where traversals may need
// to be resumed at arbitrary points within the DAG.
//
// A TraversalResumerPathState shared across multiple traversals using the same
// selector and DAG will yield the same state. This allows us to resume at
// arbitrary points within in the DAG and load the minimal additional blocks
// required to resume the traversal at that point.
func WithTraversalResumerPathState(pathState resumetraversal.TraversalResumerPathState) Option {
return func(o *Options) {
o.TraversalResumerPathState = pathState
}
}

// NewSelectiveWriter walks through the proposed dag traversal to learn its total size in order to be able to
// stream out a car to a writer in the expected traversal order in one go.
func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (Writer, error) {
Expand Down Expand Up @@ -321,7 +335,11 @@ func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Opt
}

ls.TrustedStorage = true
resumer, err := resumetraversal.WithTraversingLinksystem(&progress)
pathState := opts.TraversalResumerPathState
if pathState == nil {
pathState = resumetraversal.NewTraversalResumerPathState()
}
resumer, err := resumetraversal.WithTraversingLinksystem(&progress, pathState)
if err != nil {
return err
}
Expand Down
78 changes: 49 additions & 29 deletions v2/traversal/resumption.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,42 @@ import (
"github.com/ipld/go-ipld-prime/traversal"
)

// TraverseResumer allows resuming a progress from a previously encountered path
// in the selector.
type TraverseResumer interface {
RewindToPath(from datamodel.Path) error
RewindToOffset(offset uint64) error
Position() uint64
}

// TraversalResumerPathState tracks a traversal state for the purpose of
// building a CAR. For each block in the CAR it tracks the path to that block,
// the Link of the block and where in the CAR the block is located.
//
// A TraversalResumerPathState shared across multiple traversals using the same
// selector and DAG will yield the same state. This allows us to resume at
// arbitrary points within in the DAG and load the minimal additional blocks
// required to resume the traversal at that point.
type TraversalResumerPathState interface {
AddPath(path []datamodel.PathSegment, link datamodel.Link, atOffset uint64)
GetLinks(root datamodel.Path) []datamodel.Link
GetOffsetAfter(root datamodel.Path) (uint64, error)
}

type pathNode struct {
link datamodel.Link
offset uint64
children map[datamodel.PathSegment]*pathNode
}

// NewTraversalResumerPathState creates a new TraversalResumerPathState.
//
// Note that the TraversalResumerPathState returned by this factory is not
// thread-safe.
func NewTraversalResumerPathState() TraversalResumerPathState {
return newPath(nil, 0)
}

func newPath(link datamodel.Link, at uint64) *pathNode {
return &pathNode{
link: link,
Expand All @@ -31,15 +61,15 @@ func newPath(link datamodel.Link, at uint64) *pathNode {
}
}

func (pn pathNode) addPath(p []datamodel.PathSegment, link datamodel.Link, at uint64) {
func (pn pathNode) AddPath(p []datamodel.PathSegment, link datamodel.Link, atOffset uint64) {
if len(p) == 0 {
return
}
if _, ok := pn.children[p[0]]; !ok {
child := newPath(link, at)
child := newPath(link, atOffset)
pn.children[p[0]] = child
}
pn.children[p[0]].addPath(p[1:], link, at)
pn.children[p[0]].AddPath(p[1:], link, atOffset)
}

func (pn pathNode) allLinks() []datamodel.Link {
Expand All @@ -57,7 +87,7 @@ func (pn pathNode) allLinks() []datamodel.Link {
}

// getPaths returns reconstructed paths in the tree rooted at 'root'
func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link {
func (pn pathNode) GetLinks(root datamodel.Path) []datamodel.Link {
segs := root.Segments()
switch len(segs) {
case 0:
Expand All @@ -80,12 +110,12 @@ func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link {
// base case 2: not registered sub-path.
return []datamodel.Link{}
}
return pn.children[next].getLinks(datamodel.NewPathNocopy(segs[1:]))
return pn.children[next].GetLinks(datamodel.NewPathNocopy(segs[1:]))
}

var errInvalid = fmt.Errorf("invalid path")

func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) {
func (pn pathNode) GetOffsetAfter(root datamodel.Path) (uint64, error) {
// we look for offset of next sibling.
// if no next sibling recurse up the path segments until we find a next sibling.
segs := root.Segments()
Expand All @@ -100,7 +130,7 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) {
closest := chld.offset
// try recursive path
if len(segs) > 1 {
co, err := chld.offsetAfter(datamodel.NewPathNocopy(segs[1:]))
co, err := chld.GetOffsetAfter(datamodel.NewPathNocopy(segs[1:]))
if err == nil {
return co, err
}
Expand All @@ -121,35 +151,28 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) {
return 0, errInvalid
}

// TraverseResumer allows resuming a progress from a previously encountered path in the selector.
type TraverseResumer interface {
RewindToPath(from datamodel.Path) error
RewindToOffset(offset uint64) error
Position() uint64
}

type traversalState struct {
wrappedLinksystem *linking.LinkSystem
lsCounter *loader.Counter
blockNumber int
pathOrder map[int]datamodel.Path
pathTree *pathNode
pathTree TraversalResumerPathState
rewindPathTarget *datamodel.Path
rewindOffsetTarget uint64
pendingBlockStart uint64 // on rewinds, we store where the counter was in order to know the length of the last read block.
progress *traversal.Progress
}

var _ TraverseResumer = (*traversalState)(nil)

func (ts *traversalState) RewindToPath(from datamodel.Path) error {
if ts.progress == nil {
return nil
}
// reset progress and traverse until target.
ts.progress.SeenLinks = make(map[datamodel.Link]struct{})
ts.blockNumber = 0
ts.pendingBlockStart = ts.lsCounter.Size()
ts.lsCounter.TotalRead = 0
ts.rewindPathTarget = &from
ts.rewindOffsetTarget = 0
return nil
}

Expand All @@ -163,10 +186,10 @@ func (ts *traversalState) RewindToOffset(offset uint64) error {
}
// reset progress and traverse until target.
ts.progress.SeenLinks = make(map[datamodel.Link]struct{})
ts.blockNumber = 0
ts.pendingBlockStart = ts.lsCounter.Size()
ts.lsCounter.TotalRead = 0
ts.rewindOffsetTarget = offset
ts.rewindPathTarget = nil
return nil
}

Expand All @@ -177,9 +200,7 @@ func (ts *traversalState) Position() uint64 {
func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Reader, error) {
// when not in replay mode, we track metadata
if ts.rewindPathTarget == nil && ts.rewindOffsetTarget == 0 {
ts.pathOrder[ts.blockNumber] = lc.LinkPath
ts.pathTree.addPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size())
ts.blockNumber++
ts.pathTree.AddPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size())
return ts.wrappedLinksystem.StorageReadOpener(lc, l)
}

Expand All @@ -205,12 +226,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read
break
}
if targetSegments[i].String() != s.String() {
links := ts.pathTree.getLinks(datamodel.NewPathNocopy(seg[0 : i+1]))
links := ts.pathTree.GetLinks(datamodel.NewPathNocopy(seg[0 : i+1]))
for _, l := range links {
ts.progress.SeenLinks[l] = struct{}{}
}
var err error
ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(datamodel.NewPathNocopy(seg[0 : i+1]))
ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(datamodel.NewPathNocopy(seg[0 : i+1]))
if err == errInvalid {
ts.lsCounter.TotalRead = ts.pendingBlockStart
} else if err != nil {
Expand All @@ -222,12 +243,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read
}
}
if ts.rewindOffsetTarget != 0 {
links := ts.pathTree.getLinks(lc.LinkPath)
links := ts.pathTree.GetLinks(lc.LinkPath)
for _, l := range links {
ts.progress.SeenLinks[l] = struct{}{}
}
var err error
ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(lc.LinkPath)
ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(lc.LinkPath)
if err == errInvalid {
ts.lsCounter.TotalRead = ts.pendingBlockStart
} else if err != nil {
Expand All @@ -243,13 +264,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read
// WithTraversingLinksystem extends a progress for traversal such that it can
// subsequently resume and perform subsets of the walk efficiently from
// an arbitrary position within the selector traversal.
func WithTraversingLinksystem(p *traversal.Progress) (TraverseResumer, error) {
func WithTraversingLinksystem(p *traversal.Progress, pathState TraversalResumerPathState) (TraverseResumer, error) {
wls, ctr := loader.CountingLinkSystem(p.Cfg.LinkSystem)
ts := &traversalState{
wrappedLinksystem: &wls,
lsCounter: ctr.(*loader.Counter),
pathOrder: make(map[int]datamodel.Path),
pathTree: newPath(nil, 0),
pathTree: pathState,
progress: p,
}
p.Cfg.LinkSystem.StorageReadOpener = ts.traverse
Expand Down
6 changes: 3 additions & 3 deletions v2/traversal/resumption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestWalkResumeByPath(t *testing.T) {
LinkTargetNodePrototypeChooser: basicnode.Chooser,
},
}
resumer, err := cartraversal.WithTraversingLinksystem(&p)
resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestWalkResumeByPathPartialWalk(t *testing.T) {
LinkTargetNodePrototypeChooser: basicnode.Chooser,
},
}
resumer, err := cartraversal.WithTraversingLinksystem(&p)
resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestWalkResumeByOffset(t *testing.T) {
LinkTargetNodePrototypeChooser: basicnode.Chooser,
},
}
resumer, err := cartraversal.WithTraversingLinksystem(&p)
resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState())
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit d2e5313

Please sign in to comment.