Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions cmd/internal/upload/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"github.com/storacha/go-libstoracha/digestutil"
"github.com/storacha/guppy/internal/largeupload/bubbleup"
"github.com/storacha/guppy/pkg/preparation"
scansmodel "github.com/storacha/guppy/pkg/preparation/scans/model"
shardsmodel "github.com/storacha/guppy/pkg/preparation/shards/model"
"github.com/storacha/guppy/pkg/preparation/sqlrepo"
"github.com/storacha/guppy/pkg/preparation/types"
"github.com/storacha/guppy/pkg/preparation/types/id"
"github.com/storacha/guppy/pkg/preparation/uploads"
uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model"
)

Expand All @@ -46,6 +48,7 @@ type uploadModel struct {
// State (Maps for multi-upload support)
recentAddedShards map[id.UploadID][]*shardsmodel.Shard
recentClosedShards map[id.UploadID][]*shardsmodel.Shard
uploadingShards map[id.UploadID][]*shardsmodel.Shard
filesToDAGScan map[id.UploadID][]sqlrepo.FileInfo
shardedFiles map[id.UploadID][]sqlrepo.FileInfo

Expand Down Expand Up @@ -99,6 +102,21 @@ func (m uploadModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}
return m, nil

case uploads.FSEntryAddedMessage:
if file, ok := msg.FSEntry.(*scansmodel.File); ok {

m.filesToDAGScan[msg.UploadID] = append([]sqlrepo.FileInfo{
{
Path: file.Path(),
Size: file.Size(),
},
}, m.filesToDAGScan[msg.UploadID][:6]...)
m.dagScanStats[msg.UploadID] += file.Size()
}
return m, nil

case uploads.NodeAddedMesssage:

case statsMsg:
var cmds []tea.Cmd

Expand Down Expand Up @@ -157,7 +175,7 @@ func (m uploadModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.dagScans += size
}

return m, tea.Batch(append(cmds, checkStats(m.ctx, m.repo, msg.uploadID))...)
return m, tea.Batch(append(cmds)...)

case error:

Expand All @@ -179,7 +197,7 @@ func (m uploadModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.err = fmt.Errorf("could not find upload %s to retry after retriable error", msg.id)
return m, tea.Quit
}
return m, tea.Batch(executeUpload(m.ctx, m.api, upload))
return m, nil
}
m.err = msg.err
return m, tea.Quit
Expand Down Expand Up @@ -303,7 +321,7 @@ type uploadErrMsg struct {
err error
}

func executeUpload(ctx context.Context, api preparation.API, upload *uploadsmodel.Upload) tea.Cmd {
func executeUpload(ctx context.Context, api preparation.API, upload *uploadsmodel.Upload) {
return func() tea.Msg {
rootCID, err := api.ExecuteUpload(ctx, upload)
if err != nil {
Expand Down
83 changes: 83 additions & 0 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,86 @@
func makeRepo(ctx context.Context) (*sqlrepo.Repo, error) {
return preparation.OpenRepo(ctx, uploadFlags.dbPath)
}

func doUpload(ctx context.Context, uploadsToRun []*uploadsmodel.Upload, api preparation.API, send) error {

Check failure on line 389 in cmd/upload.go

View workflow job for this annotation

GitHub Actions / check

missing parameter type

type uploadResult struct {
upload *uploadsmodel.Upload
cid cid.Cid
attempts int
}

type uploadFailure struct {
upload *uploadsmodel.Upload
err error
attempts int
}

var completedUploads []uploadResult
var failedUploads []uploadFailure
for _, u := range uploadsToRun {
start := time.Now()
log.Infow("Starting upload", "upload", u.ID())
attempt := 0
var uploadCID cid.Cid
var lastErr error

for {
attempt++
uploadCID, err = api.ExecuteUpload(ctx, u)
if err == nil {
lastErr = nil
break
}

var re types.RetriableError
if errors.As(err, &re) {
lastErr = err
if uploadFlags.retry {
log.Warnw("Retriable upload error encountered, retrying", "upload", u.ID(), "attempt", attempt,
"err", err)
continue
}

log.Errorw("Retriable upload error encountered (retry disabled)", "upload", u.ID(), "attempt",
attempt, "err", err)
break
}

lastErr = err
log.Errorw("Upload failed with non-retriable error", "upload", u.ID(), "attempt", attempt, "err", err)
break
}

if lastErr != nil {
failedUploads = append(failedUploads, uploadFailure{
upload: u,
err: lastErr,
attempts: attempt,
})
log.Errorw("Upload failed", "upload", u.ID(), "duration", time.Since(start), "attempts", attempt, "err",
lastErr)
continue
}

completedUploads = append(completedUploads, uploadResult{
upload: u,
cid: uploadCID,
attempts: attempt,
})
log.Infow("Completed upload", "upload", u.ID(), "cid", uploadCID.String(), "duration", time.Since(start), "attempts", attempt)
}

for _, u := range completedUploads {
cmd.Printf("Upload completed successfully: %s\n", u.cid.String())
}

if len(failedUploads) > 0 {
cmd.Println("Uploads failed:")
for _, u := range failedUploads {
cmd.Printf("- %s: %v\n", u.upload.ID(), u.err)
}
return cmdutil.NewHandledCliError(fmt.Errorf("%d upload(s) failed", len(failedUploads)))
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/preparation/dags/dags.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var _ uploads.ExecuteDagScansForUploadFunc = API{}.ExecuteDagScansForUpload
var _ uploads.RemoveBadNodesFunc = API{}.RemoveBadNodes

// ExecuteDagScansForUpload runs all pending and awaiting children DAG scans for the given upload, until there are no more scans to process.
func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, nodeCB func(node model.Node, data []byte) error) error {
func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, scanCB func(scan *model.DAGScan) error, nodeCB func(node model.Node, data []byte) error) error {
var badFsEntryErrs []types.BadFSEntryError
for {
ctx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes(
Expand Down
4 changes: 2 additions & 2 deletions pkg/preparation/preparation.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,6 @@ func (a API) AddSourceToSpace(ctx context.Context, spaceDID did.DID, sourceID id
return a.Repo.AddSourceToSpace(ctx, spaceDID, sourceID)
}

func (a API) ExecuteUpload(ctx context.Context, upload *uploadsmodel.Upload) (cid.Cid, error) {
return a.Uploads.ExecuteUpload(ctx, upload.ID(), upload.SpaceDID())
func (a API) ExecuteUpload(ctx context.Context, upload *uploadsmodel.Upload, progressCallback func(uploads.UploadProgressMessage) error) (cid.Cid, error) {
return a.Uploads.ExecuteUpload(ctx, upload.ID(), upload.SpaceDID(), progressCallback)
}
7 changes: 4 additions & 3 deletions pkg/preparation/preparation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
spacesmodel "github.com/storacha/guppy/pkg/preparation/spaces/model"
"github.com/storacha/guppy/pkg/preparation/sqlrepo"
gtypes "github.com/storacha/guppy/pkg/preparation/types"
"github.com/storacha/guppy/pkg/preparation/uploads"
uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model"
)

Expand Down Expand Up @@ -250,7 +251,7 @@ func TestExecuteUpload(t *testing.T) {

upload := createUpload(t, uploadSourcePath, repo, space.DID(), api)

returnedRootCID, err := api.ExecuteUpload(t.Context(), upload)
returnedRootCID, err := api.ExecuteUpload(t.Context(), upload, func(upm uploads.UploadProgressMessage) error { return nil })
require.NoError(t, err)
require.NotEmpty(t, returnedRootCID, "expected non-empty root CID")

Expand Down Expand Up @@ -373,7 +374,7 @@ func TestExecuteUpload(t *testing.T) {
upload := createUpload(t, uploadSourcePath, repo, space.DID(), api)

// The first time, it should hit an error (on the third PUT)
_, err = api.ExecuteUpload(t.Context(), upload)
_, err = api.ExecuteUpload(t.Context(), upload, func(upm uploads.UploadProgressMessage) error { return nil })

var shardUploadErrors gtypes.ShardUploadErrors
require.ErrorAs(t, err, &shardUploadErrors, "expected a ShardUploadErrors error")
Expand All @@ -396,7 +397,7 @@ func TestExecuteUpload(t *testing.T) {
require.NoError(t, err)

// The second time, it should succeed
returnedRootCID, err := api.ExecuteUpload(t.Context(), upload)
returnedRootCID, err := api.ExecuteUpload(t.Context(), upload, func(upm uploads.UploadProgressMessage) error { return nil })
require.NoError(t, err, "expected upload to succeed on retry")
require.NotEmpty(t, returnedRootCID, "expected non-empty root CID")

Expand Down
2 changes: 1 addition & 1 deletion pkg/preparation/scans/scans.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (a API) getFileByID(ctx context.Context, fileID id.FSEntryID) (*model.File,
}

// OpenFileByID retrieves a file by its ID and opens it for reading, returning an error if not found or if the file cannot be opened.
func (a API) OpenFileByID(ctx context.Context, fileID id.FSEntryID) (fs.File, id.SourceID, string, error) {
func (a API) OpenFileByID(ctx context.Context, fileID id.FSEntryID) (fs.File, error) {
file, err := a.getFileByID(ctx, fileID)
if err != nil {
return nil, id.Nil, "", err
Expand Down
50 changes: 26 additions & 24 deletions pkg/preparation/shards/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,44 +44,45 @@ var _ uploads.CloseUploadShardsFunc = API{}.CloseUploadShards
var _ storacha.ReaderForShardFunc = API{}.ReaderForShard
var _ storacha.IndexesForUploadFunc = API{}.IndexesForUpload

func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte) (bool, error) {
func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte, callback func(shard *model.Shard) error) error {
space, err := a.Repo.GetSpaceByDID(ctx, spaceDID)
if err != nil {
return false, fmt.Errorf("failed to get space %s: %w", spaceDID, err)
return fmt.Errorf("failed to get space %s: %w", spaceDID, err)
}

openShards, err := a.Repo.ShardsForUploadByState(ctx, uploadID, model.ShardStateOpen)
if err != nil {
return false, fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err)
return fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err)
}

node, err := a.Repo.FindNodeByCIDAndSpaceDID(ctx, nodeCID, spaceDID)
if err != nil {
return false, fmt.Errorf("failed to find node %s: %w", nodeCID, err)
return fmt.Errorf("failed to find node %s: %w", nodeCID, err)
}
if node == nil {
return false, fmt.Errorf("node %s not found", nodeCID)
return fmt.Errorf("node %s not found", nodeCID)
}

var shard *model.Shard
var closed bool

// Look for an open shard that has room for the node, and close any that don't
// have room. (There should only be at most one open shard, but there's no
// harm handling multiple if they exist.)
for _, s := range openShards {
hasRoom, err := roomInShard(a.ShardEncoder, s, node, space)
if err != nil {
return false, fmt.Errorf("failed to check room in shard %s for node %s: %w", s.ID(), node.CID(), err)
return fmt.Errorf("failed to check room in shard %s for node %s: %w", s.ID(), node.CID(), err)
}
if hasRoom {
shard = s
break
}
if err := a.finalizeShardDigests(ctx, s); err != nil {
return false, fmt.Errorf("finalizing shard %s: %w", s.ID(), err)
return fmt.Errorf("finalizing shard %s: %w", s.ID(), err)
}
if err := callback(s); err != nil {
return fmt.Errorf("callback for shard %s: %w", s.ID(), err)
}
closed = true
}

// If no such shard exists, create a new one
Expand All @@ -91,33 +92,35 @@ func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, sp
a.ShardEncoder.HeaderEncodingLength(),
a.ShardEncoder.HeaderDigestState(),
a.ShardEncoder.HeaderPieceCIDState())

if err := callback(shard); err != nil {
return fmt.Errorf("callback for new shard %s: %w", shard.ID(), err)
}
if err != nil {
return false, fmt.Errorf("failed to create new shard for upload %s: %w", uploadID, err)
return fmt.Errorf("failed to create new shard for upload %s: %w", uploadID, err)
}
hasRoom, err := roomInShard(a.ShardEncoder, shard, node, space)
if err != nil {
return false, fmt.Errorf("failed to check room in new shard for node %s: %w", node.CID(), err)
return fmt.Errorf("failed to check room in new shard for node %s: %w", node.CID(), err)
}
if !hasRoom {
return false, fmt.Errorf("node %s (%d bytes) too large to fit in new shard for upload %s (shard size %d bytes)", node.CID(), node.Size(), uploadID, space.ShardSize())
return fmt.Errorf("node %s (%d bytes) too large to fit in new shard for upload %s (shard size %d bytes)", node.CID(), node.Size(), uploadID, space.ShardSize())
}
}

var addNodeOptions []AddNodeToShardOption
if data != nil {
digestStateUpdate, err := a.addNodeToDigestState(ctx, shard, node, data)
if err != nil {
return false, fmt.Errorf("failed to add node %s to shard %s digest state: %w", node.CID(), shard.ID(), err)
return fmt.Errorf("failed to add node %s to shard %s digest state: %w", node.CID(), shard.ID(), err)
}
addNodeOptions = append(addNodeOptions, WithDigestStateUpdate(digestStateUpdate.digestStateUpTo, digestStateUpdate.digestState, digestStateUpdate.pieceCIDState))
}

if err := a.Repo.AddNodeToShard(ctx, shard.ID(), node.CID(), spaceDID, a.ShardEncoder.NodeEncodingLength(node)-node.Size(), addNodeOptions...); err != nil {
return false, fmt.Errorf("failed to add node %s to shard %s for upload %s: %w", node.CID(), shard.ID(), uploadID, err)
return fmt.Errorf("failed to add node %s to shard %s for upload %s: %w", node.CID(), shard.ID(), uploadID, err)
}

return closed, nil
return nil
}

func roomInShard(encoder ShardEncoder, shard *model.Shard, node dagsmodel.Node, space *spacesmodel.Space) (bool, error) {
Expand Down Expand Up @@ -195,23 +198,22 @@ func (a API) finalizeShardDigests(ctx context.Context, shard *model.Shard) error
return a.Repo.UpdateShard(ctx, shard)
}

func (a API) CloseUploadShards(ctx context.Context, uploadID id.UploadID) (bool, error) {
func (a API) CloseUploadShards(ctx context.Context, uploadID id.UploadID, callback func(shard *model.Shard) error) error {
openShards, err := a.Repo.ShardsForUploadByState(ctx, uploadID, model.ShardStateOpen)
if err != nil {

return false, fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err)
return fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err)
}

var closed bool

for _, s := range openShards {
if err := a.finalizeShardDigests(ctx, s); err != nil {
return false, fmt.Errorf("updating shard %s for upload %s: %w", s.ID(), uploadID, err)
return fmt.Errorf("updating shard %s for upload %s: %w", s.ID(), uploadID, err)
}
if err := callback(s); err != nil {
return fmt.Errorf("callback for shard %s for upload %s: %w", s.ID(), uploadID, err)
}
closed = true
}

return closed, nil
return nil
}

// ReaderForShard uses fastWriteShard connected to a pipe to provide an io.Reader
Expand Down
Loading
Loading