diff --git a/cmd/internal/upload/ui/ui.go b/cmd/internal/upload/ui/ui.go index 63f4bd73..80b9b6ea 100644 --- a/cmd/internal/upload/ui/ui.go +++ b/cmd/internal/upload/ui/ui.go @@ -21,10 +21,12 @@ import ( "github.com/storacha/go-libstoracha/digestutil" "github.com/storacha/guppy/internal/largeupload/bubbleup" "github.com/storacha/guppy/pkg/preparation" + scansmodel "github.com/storacha/guppy/pkg/preparation/scans/model" shardsmodel "github.com/storacha/guppy/pkg/preparation/shards/model" "github.com/storacha/guppy/pkg/preparation/sqlrepo" "github.com/storacha/guppy/pkg/preparation/types" "github.com/storacha/guppy/pkg/preparation/types/id" + "github.com/storacha/guppy/pkg/preparation/uploads" uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model" ) @@ -46,6 +48,7 @@ type uploadModel struct { // State (Maps for multi-upload support) recentAddedShards map[id.UploadID][]*shardsmodel.Shard recentClosedShards map[id.UploadID][]*shardsmodel.Shard + uploadingShards map[id.UploadID][]*shardsmodel.Shard filesToDAGScan map[id.UploadID][]sqlrepo.FileInfo shardedFiles map[id.UploadID][]sqlrepo.FileInfo @@ -99,6 +102,21 @@ func (m uploadModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } return m, nil + case uploads.FSEntryAddedMessage: + if file, ok := msg.FSEntry.(*scansmodel.File); ok { + + m.filesToDAGScan[msg.UploadID] = append([]sqlrepo.FileInfo{ + { + Path: file.Path(), + Size: file.Size(), + }, + }, m.filesToDAGScan[msg.UploadID][:6]...) + m.dagScanStats[msg.UploadID] += file.Size() + } + return m, nil + + case uploads.NodeAddedMesssage: + case statsMsg: var cmds []tea.Cmd @@ -157,7 +175,7 @@ func (m uploadModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.dagScans += size } - return m, tea.Batch(append(cmds, checkStats(m.ctx, m.repo, msg.uploadID))...) + return m, tea.Batch(append(cmds)...) case error: @@ -179,7 +197,7 @@ func (m uploadModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.err = fmt.Errorf("could not find upload %s to retry after retriable error", msg.id) return m, tea.Quit } - return m, tea.Batch(executeUpload(m.ctx, m.api, upload)) + return m, nil } m.err = msg.err return m, tea.Quit @@ -303,7 +321,7 @@ type uploadErrMsg struct { err error } -func executeUpload(ctx context.Context, api preparation.API, upload *uploadsmodel.Upload) tea.Cmd { +func executeUpload(ctx context.Context, api preparation.API, upload *uploadsmodel.Upload) { return func() tea.Msg { rootCID, err := api.ExecuteUpload(ctx, upload) if err != nil { diff --git a/cmd/upload.go b/cmd/upload.go index 61c46a3d..84f60bfe 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -385,3 +385,86 @@ func init() { func makeRepo(ctx context.Context) (*sqlrepo.Repo, error) { return preparation.OpenRepo(ctx, uploadFlags.dbPath) } + +func doUpload(ctx context.Context, uploadsToRun []*uploadsmodel.Upload, api preparation.API, send) error { + + type uploadResult struct { + upload *uploadsmodel.Upload + cid cid.Cid + attempts int + } + + type uploadFailure struct { + upload *uploadsmodel.Upload + err error + attempts int + } + + var completedUploads []uploadResult + var failedUploads []uploadFailure + for _, u := range uploadsToRun { + start := time.Now() + log.Infow("Starting upload", "upload", u.ID()) + attempt := 0 + var uploadCID cid.Cid + var lastErr error + + for { + attempt++ + uploadCID, err = api.ExecuteUpload(ctx, u) + if err == nil { + lastErr = nil + break + } + + var re types.RetriableError + if errors.As(err, &re) { + lastErr = err + if uploadFlags.retry { + log.Warnw("Retriable upload error encountered, retrying", "upload", u.ID(), "attempt", attempt, + "err", err) + continue + } + + log.Errorw("Retriable upload error encountered (retry disabled)", "upload", u.ID(), "attempt", + attempt, "err", err) + break + } + + lastErr = err + log.Errorw("Upload failed with non-retriable error", "upload", u.ID(), "attempt", attempt, "err", err) + break + } + + if lastErr != nil { + failedUploads = append(failedUploads, uploadFailure{ + upload: u, + err: lastErr, + attempts: attempt, + }) + log.Errorw("Upload failed", "upload", u.ID(), "duration", time.Since(start), "attempts", attempt, "err", + lastErr) + continue + } + + completedUploads = append(completedUploads, uploadResult{ + upload: u, + cid: uploadCID, + attempts: attempt, + }) + log.Infow("Completed upload", "upload", u.ID(), "cid", uploadCID.String(), "duration", time.Since(start), "attempts", attempt) + } + + for _, u := range completedUploads { + cmd.Printf("Upload completed successfully: %s\n", u.cid.String()) + } + + if len(failedUploads) > 0 { + cmd.Println("Uploads failed:") + for _, u := range failedUploads { + cmd.Printf("- %s: %v\n", u.upload.ID(), u.err) + } + return cmdutil.NewHandledCliError(fmt.Errorf("%d upload(s) failed", len(failedUploads))) + } + return nil +} diff --git a/pkg/preparation/dags/dags.go b/pkg/preparation/dags/dags.go index e69d7947..c4e5eed9 100644 --- a/pkg/preparation/dags/dags.go +++ b/pkg/preparation/dags/dags.go @@ -49,7 +49,7 @@ var _ uploads.ExecuteDagScansForUploadFunc = API{}.ExecuteDagScansForUpload var _ uploads.RemoveBadNodesFunc = API{}.RemoveBadNodes // ExecuteDagScansForUpload runs all pending and awaiting children DAG scans for the given upload, until there are no more scans to process. -func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, nodeCB func(node model.Node, data []byte) error) error { +func (a API) ExecuteDagScansForUpload(ctx context.Context, uploadID id.UploadID, scanCB func(scan *model.DAGScan) error, nodeCB func(node model.Node, data []byte) error) error { var badFsEntryErrs []types.BadFSEntryError for { ctx, span := tracer.Start(ctx, "dag-scans-batch", trace.WithAttributes( diff --git a/pkg/preparation/preparation.go b/pkg/preparation/preparation.go index 1d56a94b..df6c1c1d 100644 --- a/pkg/preparation/preparation.go +++ b/pkg/preparation/preparation.go @@ -213,6 +213,6 @@ func (a API) AddSourceToSpace(ctx context.Context, spaceDID did.DID, sourceID id return a.Repo.AddSourceToSpace(ctx, spaceDID, sourceID) } -func (a API) ExecuteUpload(ctx context.Context, upload *uploadsmodel.Upload) (cid.Cid, error) { - return a.Uploads.ExecuteUpload(ctx, upload.ID(), upload.SpaceDID()) +func (a API) ExecuteUpload(ctx context.Context, upload *uploadsmodel.Upload, progressCallback func(uploads.UploadProgressMessage) error) (cid.Cid, error) { + return a.Uploads.ExecuteUpload(ctx, upload.ID(), upload.SpaceDID(), progressCallback) } diff --git a/pkg/preparation/preparation_test.go b/pkg/preparation/preparation_test.go index 5e1008c2..7ee937c3 100644 --- a/pkg/preparation/preparation_test.go +++ b/pkg/preparation/preparation_test.go @@ -53,6 +53,7 @@ import ( spacesmodel "github.com/storacha/guppy/pkg/preparation/spaces/model" "github.com/storacha/guppy/pkg/preparation/sqlrepo" gtypes "github.com/storacha/guppy/pkg/preparation/types" + "github.com/storacha/guppy/pkg/preparation/uploads" uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model" ) @@ -250,7 +251,7 @@ func TestExecuteUpload(t *testing.T) { upload := createUpload(t, uploadSourcePath, repo, space.DID(), api) - returnedRootCID, err := api.ExecuteUpload(t.Context(), upload) + returnedRootCID, err := api.ExecuteUpload(t.Context(), upload, func(upm uploads.UploadProgressMessage) error { return nil }) require.NoError(t, err) require.NotEmpty(t, returnedRootCID, "expected non-empty root CID") @@ -373,7 +374,7 @@ func TestExecuteUpload(t *testing.T) { upload := createUpload(t, uploadSourcePath, repo, space.DID(), api) // The first time, it should hit an error (on the third PUT) - _, err = api.ExecuteUpload(t.Context(), upload) + _, err = api.ExecuteUpload(t.Context(), upload, func(upm uploads.UploadProgressMessage) error { return nil }) var shardUploadErrors gtypes.ShardUploadErrors require.ErrorAs(t, err, &shardUploadErrors, "expected a ShardUploadErrors error") @@ -396,7 +397,7 @@ func TestExecuteUpload(t *testing.T) { require.NoError(t, err) // The second time, it should succeed - returnedRootCID, err := api.ExecuteUpload(t.Context(), upload) + returnedRootCID, err := api.ExecuteUpload(t.Context(), upload, func(upm uploads.UploadProgressMessage) error { return nil }) require.NoError(t, err, "expected upload to succeed on retry") require.NotEmpty(t, returnedRootCID, "expected non-empty root CID") diff --git a/pkg/preparation/scans/scans.go b/pkg/preparation/scans/scans.go index 92fba79e..3ec31a30 100644 --- a/pkg/preparation/scans/scans.go +++ b/pkg/preparation/scans/scans.go @@ -110,7 +110,7 @@ func (a API) getFileByID(ctx context.Context, fileID id.FSEntryID) (*model.File, } // OpenFileByID retrieves a file by its ID and opens it for reading, returning an error if not found or if the file cannot be opened. -func (a API) OpenFileByID(ctx context.Context, fileID id.FSEntryID) (fs.File, id.SourceID, string, error) { +func (a API) OpenFileByID(ctx context.Context, fileID id.FSEntryID) (fs.File, error) { file, err := a.getFileByID(ctx, fileID) if err != nil { return nil, id.Nil, "", err diff --git a/pkg/preparation/shards/shards.go b/pkg/preparation/shards/shards.go index bff94701..5e3e9240 100644 --- a/pkg/preparation/shards/shards.go +++ b/pkg/preparation/shards/shards.go @@ -44,27 +44,26 @@ var _ uploads.CloseUploadShardsFunc = API{}.CloseUploadShards var _ storacha.ReaderForShardFunc = API{}.ReaderForShard var _ storacha.IndexesForUploadFunc = API{}.IndexesForUpload -func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte) (bool, error) { +func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte, callback func(shard *model.Shard) error) error { space, err := a.Repo.GetSpaceByDID(ctx, spaceDID) if err != nil { - return false, fmt.Errorf("failed to get space %s: %w", spaceDID, err) + return fmt.Errorf("failed to get space %s: %w", spaceDID, err) } openShards, err := a.Repo.ShardsForUploadByState(ctx, uploadID, model.ShardStateOpen) if err != nil { - return false, fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err) + return fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err) } node, err := a.Repo.FindNodeByCIDAndSpaceDID(ctx, nodeCID, spaceDID) if err != nil { - return false, fmt.Errorf("failed to find node %s: %w", nodeCID, err) + return fmt.Errorf("failed to find node %s: %w", nodeCID, err) } if node == nil { - return false, fmt.Errorf("node %s not found", nodeCID) + return fmt.Errorf("node %s not found", nodeCID) } var shard *model.Shard - var closed bool // Look for an open shard that has room for the node, and close any that don't // have room. (There should only be at most one open shard, but there's no @@ -72,16 +71,18 @@ func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, sp for _, s := range openShards { hasRoom, err := roomInShard(a.ShardEncoder, s, node, space) if err != nil { - return false, fmt.Errorf("failed to check room in shard %s for node %s: %w", s.ID(), node.CID(), err) + return fmt.Errorf("failed to check room in shard %s for node %s: %w", s.ID(), node.CID(), err) } if hasRoom { shard = s break } if err := a.finalizeShardDigests(ctx, s); err != nil { - return false, fmt.Errorf("finalizing shard %s: %w", s.ID(), err) + return fmt.Errorf("finalizing shard %s: %w", s.ID(), err) + } + if err := callback(s); err != nil { + return fmt.Errorf("callback for shard %s: %w", s.ID(), err) } - closed = true } // If no such shard exists, create a new one @@ -91,16 +92,18 @@ func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, sp a.ShardEncoder.HeaderEncodingLength(), a.ShardEncoder.HeaderDigestState(), a.ShardEncoder.HeaderPieceCIDState()) - + if err := callback(shard); err != nil { + return fmt.Errorf("callback for new shard %s: %w", shard.ID(), err) + } if err != nil { - return false, fmt.Errorf("failed to create new shard for upload %s: %w", uploadID, err) + return fmt.Errorf("failed to create new shard for upload %s: %w", uploadID, err) } hasRoom, err := roomInShard(a.ShardEncoder, shard, node, space) if err != nil { - return false, fmt.Errorf("failed to check room in new shard for node %s: %w", node.CID(), err) + return fmt.Errorf("failed to check room in new shard for node %s: %w", node.CID(), err) } if !hasRoom { - return false, fmt.Errorf("node %s (%d bytes) too large to fit in new shard for upload %s (shard size %d bytes)", node.CID(), node.Size(), uploadID, space.ShardSize()) + return fmt.Errorf("node %s (%d bytes) too large to fit in new shard for upload %s (shard size %d bytes)", node.CID(), node.Size(), uploadID, space.ShardSize()) } } @@ -108,16 +111,16 @@ func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, sp if data != nil { digestStateUpdate, err := a.addNodeToDigestState(ctx, shard, node, data) if err != nil { - return false, fmt.Errorf("failed to add node %s to shard %s digest state: %w", node.CID(), shard.ID(), err) + return fmt.Errorf("failed to add node %s to shard %s digest state: %w", node.CID(), shard.ID(), err) } addNodeOptions = append(addNodeOptions, WithDigestStateUpdate(digestStateUpdate.digestStateUpTo, digestStateUpdate.digestState, digestStateUpdate.pieceCIDState)) } if err := a.Repo.AddNodeToShard(ctx, shard.ID(), node.CID(), spaceDID, a.ShardEncoder.NodeEncodingLength(node)-node.Size(), addNodeOptions...); err != nil { - return false, fmt.Errorf("failed to add node %s to shard %s for upload %s: %w", node.CID(), shard.ID(), uploadID, err) + return fmt.Errorf("failed to add node %s to shard %s for upload %s: %w", node.CID(), shard.ID(), uploadID, err) } - return closed, nil + return nil } func roomInShard(encoder ShardEncoder, shard *model.Shard, node dagsmodel.Node, space *spacesmodel.Space) (bool, error) { @@ -195,23 +198,22 @@ func (a API) finalizeShardDigests(ctx context.Context, shard *model.Shard) error return a.Repo.UpdateShard(ctx, shard) } -func (a API) CloseUploadShards(ctx context.Context, uploadID id.UploadID) (bool, error) { +func (a API) CloseUploadShards(ctx context.Context, uploadID id.UploadID, callback func(shard *model.Shard) error) error { openShards, err := a.Repo.ShardsForUploadByState(ctx, uploadID, model.ShardStateOpen) if err != nil { - - return false, fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err) + return fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err) } - var closed bool - for _, s := range openShards { if err := a.finalizeShardDigests(ctx, s); err != nil { - return false, fmt.Errorf("updating shard %s for upload %s: %w", s.ID(), uploadID, err) + return fmt.Errorf("updating shard %s for upload %s: %w", s.ID(), uploadID, err) + } + if err := callback(s); err != nil { + return fmt.Errorf("callback for shard %s for upload %s: %w", s.ID(), uploadID, err) } - closed = true } - return closed, nil + return nil } // ReaderForShard uses fastWriteShard connected to a pipe to provide an io.Reader diff --git a/pkg/preparation/shards/shards_test.go b/pkg/preparation/shards/shards_test.go index 5542f4c9..8306f094 100644 --- a/pkg/preparation/shards/shards_test.go +++ b/pkg/preparation/shards/shards_test.go @@ -56,10 +56,7 @@ func TestAddNodeToUploadShardsAndCloseUploadShards(t *testing.T) { require.NoError(t, err) data := testutil.RandomBytes(t, int(n.Size())) - shardClosed, err := api.AddNodeToUploadShards(t.Context(), upload.ID(), space.DID(), nodeCID1.(cidlink.Link).Cid, data) - require.NoError(t, err) - - require.False(t, shardClosed) + mustAddNodeNodeToUploadShards(t, api, upload.ID(), space.DID(), nodeCID1.(cidlink.Link).Cid, data, false) openShards, err = repo.ShardsForUploadByState(t.Context(), upload.ID(), model.ShardStateOpen) require.NoError(t, err) require.Len(t, openShards, 1) @@ -79,10 +76,8 @@ func TestAddNodeToUploadShardsAndCloseUploadShards(t *testing.T) { n, _, err = repo.FindOrCreateRawNode(t.Context(), nodeCID2.(cidlink.Link).Cid, 1<<14, space.DID(), "some/other/path", source.ID(), 0) require.NoError(t, err) data = testutil.RandomBytes(t, int(n.Size())) - shardClosed, err = api.AddNodeToUploadShards(t.Context(), upload.ID(), space.DID(), nodeCID2.(cidlink.Link).Cid, data) - require.NoError(t, err) + mustAddNodeNodeToUploadShards(t, api, upload.ID(), space.DID(), nodeCID2.(cidlink.Link).Cid, data, false) - require.False(t, shardClosed) openShards, err = repo.ShardsForUploadByState(t.Context(), upload.ID(), model.ShardStateOpen) require.NoError(t, err) require.Len(t, openShards, 1) @@ -101,10 +96,8 @@ func TestAddNodeToUploadShardsAndCloseUploadShards(t *testing.T) { require.NoError(t, err) data = testutil.RandomBytes(t, int(n.Size())) - shardClosed, err = api.AddNodeToUploadShards(t.Context(), upload.ID(), space.DID(), nodeCID3.(cidlink.Link).Cid, data) - require.NoError(t, err) + mustAddNodeNodeToUploadShards(t, api, upload.ID(), space.DID(), nodeCID3.(cidlink.Link).Cid, data, true) - require.True(t, shardClosed) closedShards, err := repo.ShardsForUploadByState(t.Context(), upload.ID(), model.ShardStateClosed) require.NoError(t, err) require.Len(t, closedShards, 1) @@ -125,9 +118,7 @@ func TestAddNodeToUploadShardsAndCloseUploadShards(t *testing.T) { // finally, close the last shard with CloseUploadShards() - shardClosed, err = api.CloseUploadShards(t.Context(), upload.ID()) - require.NoError(t, err) - require.True(t, shardClosed) + mustCloseUploadShards(t, api, upload.ID(), true) closedShards, err = repo.ShardsForUploadByState(t.Context(), upload.ID(), model.ShardStateClosed) require.NoError(t, err) @@ -162,7 +153,7 @@ func TestAddNodeToUploadShardsAndCloseUploadShards(t *testing.T) { _, _, err = repo.FindOrCreateRawNode(t.Context(), nodeCID1.(cidlink.Link).Cid, 120, space.DID(), "some/path", source.ID(), 0) require.NoError(t, err) - _, err = api.AddNodeToUploadShards(t.Context(), upload.ID(), space.DID(), nodeCID1.(cidlink.Link).Cid, nil) + err = api.AddNodeToUploadShards(t.Context(), upload.ID(), space.DID(), nodeCID1.(cidlink.Link).Cid, nil, func(shard *model.Shard) error { return nil }) require.ErrorContains(t, err, "too large to fit in new shard for upload") }) } @@ -499,13 +490,10 @@ func TestComputedShardCIDs(t *testing.T) { nodeCID := cid.NewCidV1(cid.Raw, testutil.Must(multihash.Sum(data, multihash.SHA2_256, -1))(t)) _, _, err = repo.FindOrCreateRawNode(t.Context(), nodeCID, 1<<12, space.DID(), fmt.Sprintf("some/path/%d", offset), source.ID(), 0) require.NoError(t, err) - _, err = api.AddNodeToUploadShards(t.Context(), upload.ID(), space.DID(), nodeCID, data) - require.NoError(t, err) + mustAddNodeNodeToUploadShards(t, api, upload.ID(), space.DID(), nodeCID, data, false) } - shardClosed, err := api.CloseUploadShards(t.Context(), upload.ID()) - require.NoError(t, err) - require.True(t, shardClosed) + mustCloseUploadShards(t, api, upload.ID(), true) closedShards, err := repo.ShardsForUploadByState(t.Context(), upload.ID(), model.ShardStateClosed) require.NoError(t, err) @@ -516,3 +504,27 @@ func TestComputedShardCIDs(t *testing.T) { require.Equal(t, expectedDigest, shard.Digest()) require.Equal(t, expectedPieceCID, shard.PieceCID()) } + +func mustAddNodeNodeToUploadShards(t *testing.T, api shards.API, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte, expectShardClosed bool) { + var shardClosed bool + err := api.AddNodeToUploadShards(t.Context(), uploadID, spaceDID, nodeCID, data, func(shard *model.Shard) error { + if shard.State() == model.ShardStateClosed { + shardClosed = true + } + return nil + }) + require.NoError(t, err) + require.Equal(t, expectShardClosed, shardClosed) +} + +func mustCloseUploadShards(t *testing.T, api shards.API, uploadID id.UploadID, expectShardClosed bool) { + var shardClosed bool + err := api.CloseUploadShards(t.Context(), uploadID, func(shard *model.Shard) error { + if shard.State() == model.ShardStateClosed { + shardClosed = true + } + return nil + }) + require.NoError(t, err) + require.Equal(t, expectShardClosed, shardClosed) +} diff --git a/pkg/preparation/sqlrepo/stats.go b/pkg/preparation/sqlrepo/stats.go index 99e43fbc..1ff925c2 100644 --- a/pkg/preparation/sqlrepo/stats.go +++ b/pkg/preparation/sqlrepo/stats.go @@ -36,7 +36,7 @@ func (r *Repo) FilesToDAGScan(ctx context.Context, uploadID id.UploadID, count i JOIN dag_scans ON dag_scans.fs_entry_id = fs_entries.id WHERE dag_scans.upload_id = $1 AND dag_scans.cid IS NULL - ORDER BY dag_scans.created_at ASC + ORDER BY dag_scans.created_at DESC LIMIT $2 `, uploadID, @@ -71,10 +71,9 @@ func (r *Repo) ShardedFiles(ctx context.Context, uploadID id.UploadID, count int JOIN dag_scans ON dag_scans.fs_entry_id = fs_entries.id JOIN nodes_in_shards ON nodes_in_shards.node_cid = dag_scans.cid JOIN shards ON shards.id = nodes_in_shards.shard_id --- WHERE dag_scans.upload_id = X'637a2e3a0ce5448882a080576829696f' WHERE shards.state = 'open' - ORDER BY dag_scans.created_at ASC - LIMIT 6 + ORDER BY dag_scans.created_at DESC + LIMIT $2 `, uploadID, count, diff --git a/pkg/preparation/storacha/storacha.go b/pkg/preparation/storacha/storacha.go index 982e3676..1e9c41f9 100644 --- a/pkg/preparation/storacha/storacha.go +++ b/pkg/preparation/storacha/storacha.go @@ -68,7 +68,7 @@ var _ uploads.AddShardsForUploadFunc = API{}.AddShardsForUpload var _ uploads.AddIndexesForUploadFunc = API{}.AddIndexesForUpload var _ uploads.AddStorachaUploadForUploadFunc = API{}.AddStorachaUploadForUpload -func (a API) AddShardsForUpload(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) error { +func (a API) AddShardsForUpload(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, cb func(shard *shardsmodel.Shard) error) error { ctx, span := tracer.Start(ctx, "add-shards-for-upload") defer span.End() closedShards, err := a.Repo.ShardsForUploadByState(ctx, uploadID, shardsmodel.ShardStateClosed) @@ -89,6 +89,10 @@ func (a API) AddShardsForUpload(ctx context.Context, uploadID id.UploadID, space sem <- struct{}{} eg.Go(func() error { defer func() { <-sem }() + // this is closed but it means the beginning of an upload + if err := cb(shard); err != nil { + return fmt.Errorf("callback for shard %s failed: %w", shard.ID(), err) + } if err := a.addShard(gctx, shard, spaceDID); err != nil { err = fmt.Errorf("failed to add shard %s for upload %s: %w", shard.ID(), uploadID, err) var errShardUpload gtypes.ShardUploadError @@ -99,6 +103,9 @@ func (a API) AddShardsForUpload(ctx context.Context, uploadID id.UploadID, space log.Errorf("%v", err) return err } + if err := cb(shard); err != nil { + return fmt.Errorf("callback for shard %s failed: %w", shard.ID(), err) + } log.Infof("Successfully added shard %s for upload %s", shard.ID(), uploadID) return nil }) diff --git a/pkg/preparation/storacha/storacha_test.go b/pkg/preparation/storacha/storacha_test.go index 6932af8a..4287bc0f 100644 --- a/pkg/preparation/storacha/storacha_test.go +++ b/pkg/preparation/storacha/storacha_test.go @@ -73,17 +73,17 @@ func TestAddShardsForUpload(t *testing.T) { n, _, err := repo.FindOrCreateRawNode(t.Context(), nodeCID1, 1<<14, spaceDID, "some/path", source.ID(), 0) require.NoError(t, err) data := testutil.RandomBytes(t, int(n.Size())) - _, err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID1, data) + err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID1, data, func(shard *model.Shard) error { return nil }) require.NoError(t, err) n, _, err = repo.FindOrCreateRawNode(t.Context(), nodeCID2, 1<<14, spaceDID, "some/other/path", source.ID(), 0) data = testutil.RandomBytes(t, int(n.Size())) require.NoError(t, err) - _, err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID2, data) + err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID2, data, func(shard *model.Shard) error { return nil }) require.NoError(t, err) n, _, err = repo.FindOrCreateRawNode(t.Context(), nodeCID3, 1<<15, spaceDID, "yet/other/path", source.ID(), 0) require.NoError(t, err) data = testutil.RandomBytes(t, int(n.Size())) - _, err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID3, data) + err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID3, data, func(shard *model.Shard) error { return nil }) require.NoError(t, err) shards, err := repo.ShardsForUploadByState(t.Context(), upload.ID(), model.ShardStateClosed) @@ -97,7 +97,7 @@ func TestAddShardsForUpload(t *testing.T) { secondShard := shards[0] // Upload shards that are ready to go. - err = api.AddShardsForUpload(t.Context(), upload.ID(), spaceDID) + err = api.AddShardsForUpload(t.Context(), upload.ID(), spaceDID, func(shard *model.Shard) error { return nil }) require.NoError(t, err) // Reload shards @@ -129,9 +129,9 @@ func TestAddShardsForUpload(t *testing.T) { require.Equal(t, client.SpaceBlobAddInvocations[0].ReturnedPDPAccept, client.FilecoinOfferInvocations[0].Options.PDPAcceptInvocation()) // Now close the upload shards and run it again. - _, err = shardsApi.CloseUploadShards(t.Context(), upload.ID()) + err = shardsApi.CloseUploadShards(t.Context(), upload.ID(), func(shard *model.Shard) error { return nil }) require.NoError(t, err) - err = api.AddShardsForUpload(t.Context(), upload.ID(), spaceDID) + err = api.AddShardsForUpload(t.Context(), upload.ID(), spaceDID, func(shard *model.Shard) error { return nil }) require.NoError(t, err) // Reload second shard @@ -277,12 +277,12 @@ func TestAddShardsForUpload(t *testing.T) { _, _, err = repo.FindOrCreateRawNode(t.Context(), nodeCID1, uint64(len(data)), spaceDID, "some/path", source.ID(), 0) require.NoError(t, err) - _, err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID1, data) + err = shardsApi.AddNodeToUploadShards(t.Context(), upload.ID(), spaceDID, nodeCID1, data, func(shard *model.Shard) error { return nil }) require.NoError(t, err) - _, err = shardsApi.CloseUploadShards(t.Context(), upload.ID()) + err = shardsApi.CloseUploadShards(t.Context(), upload.ID(), func(shard *model.Shard) error { return nil }) require.NoError(t, err) - err = api.AddShardsForUpload(t.Context(), upload.ID(), spaceDID) + err = api.AddShardsForUpload(t.Context(), upload.ID(), spaceDID, func(shard *model.Shard) error { return nil }) require.NoError(t, err) // It should `space/blob/add`... diff --git a/pkg/preparation/uploads/uploads.go b/pkg/preparation/uploads/uploads.go index 51fdfda5..6b8597ca 100644 --- a/pkg/preparation/uploads/uploads.go +++ b/pkg/preparation/uploads/uploads.go @@ -11,6 +11,7 @@ import ( "github.com/storacha/guppy/pkg/preparation/bettererrgroup" dagmodel "github.com/storacha/guppy/pkg/preparation/dags/model" scanmodel "github.com/storacha/guppy/pkg/preparation/scans/model" + shardsmodel "github.com/storacha/guppy/pkg/preparation/shards/model" "github.com/storacha/guppy/pkg/preparation/types" "github.com/storacha/guppy/pkg/preparation/types/id" "github.com/storacha/guppy/pkg/preparation/uploads/model" @@ -25,10 +26,10 @@ var ( ) type ExecuteScanFunc func(ctx context.Context, uploadID id.UploadID, nodeCB func(node scanmodel.FSEntry) error) error -type ExecuteDagScansForUploadFunc func(ctx context.Context, uploadID id.UploadID, nodeCB func(node dagmodel.Node, data []byte) error) error -type AddNodeToUploadShardsFunc func(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte) (bool, error) -type CloseUploadShardsFunc func(ctx context.Context, uploadID id.UploadID) (bool, error) -type AddShardsForUploadFunc func(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) error +type ExecuteDagScansForUploadFunc func(ctx context.Context, uploadID id.UploadID, scanCB func(dagScan *dagmodel.DAGScan) error, nodeCB func(node dagmodel.Node, data []byte) error) error +type AddNodeToUploadShardsFunc func(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte, callback func(shard *shardsmodel.Shard) error) error +type CloseUploadShardsFunc func(ctx context.Context, uploadID id.UploadID, callback func(shard *shardsmodel.Shard) error) error +type AddShardsForUploadFunc func(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, cb func(shard *shardsmodel.Shard) error) error type AddIndexesForUploadFunc func(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) error type AddStorachaUploadForUploadFunc func(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) error type RemoveBadFSEntryFunc func(ctx context.Context, spaceDID did.DID, fsEntryID id.FSEntryID) error @@ -92,8 +93,54 @@ func signal(work chan<- struct{}) { } } +type UploadProgressMessage interface { + isUploadProgressMessage() +} + +type FSEntryAddedMessage struct { + UploadID id.UploadID + FSEntry scanmodel.FSEntry +} + +type FSEntryScannedMessage struct { + UploadID id.UploadID + FSEntry scanmodel.FSEntry +} + +type NodeAddedMesssage struct { + UploadID id.UploadID + Node dagmodel.Node +} + +type ShardCreatedMessage struct { + UploadID id.UploadID + Shard shardsmodel.Shard +} + +type ShardClosedMessage struct { + UploadID id.UploadID + Shard shardsmodel.Shard +} + +type ShardUploadStartedMessage struct { + UploadID id.UploadID + Shard shardsmodel.Shard +} + +type ShardUploadCompletedMessage struct { + UploadID id.UploadID + Shard shardsmodel.Shard +} + +func (FSEntryAddedMessage) isUploadProgressMessage() {} +func (NodeAddedMesssage) isUploadProgressMessage() {} +func (ShardCreatedMessage) isUploadProgressMessage() {} +func (ShardClosedMessage) isUploadProgressMessage() {} +func (ShardUploadStartedMessage) isUploadProgressMessage() {} +func (ShardUploadCompletedMessage) isUploadProgressMessage() {} + // ExecuteUpload executes the upload process for a given upload, handling its state transitions and processing steps. -func (a API) ExecuteUpload(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) (cid.Cid, error) { +func (a API) ExecuteUpload(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, progressCallback func(message UploadProgressMessage) error) (cid.Cid, error) { ctx, span := tracer.Start(ctx, "execute-upload", trace.WithAttributes( attribute.String("upload.id", uploadID.String()), attribute.String("space.did", spaceDID.String()), @@ -109,21 +156,21 @@ func (a API) ExecuteUpload(ctx context.Context, uploadID id.UploadID, spaceDID d // Start the workers eg, wCtx := bettererrgroup.WithContext(ctx) eg.Go(func() error { - err := runScanWorker(wCtx, a, uploadID, spaceDID, scansAvailable, dagScansAvailable) + err := runScanWorker(wCtx, a, uploadID, spaceDID, scansAvailable, dagScansAvailable, progressCallback) if err != nil { return fmt.Errorf("scan worker: %w", err) } return nil }) eg.Go(func() error { - err := runDAGScanWorker(wCtx, a, uploadID, spaceDID, dagScansAvailable, closedShardsAvailable) + err := runDAGScanWorker(wCtx, a, uploadID, spaceDID, dagScansAvailable, closedShardsAvailable, progressCallback) if err != nil { return fmt.Errorf("DAG scan worker: %w", err) } return nil }) eg.Go(func() error { - err := runStorachaWorker(wCtx, a, uploadID, spaceDID, closedShardsAvailable) + err := runStorachaWorker(wCtx, a, uploadID, spaceDID, closedShardsAvailable, progressCallback) if err != nil { return fmt.Errorf("storacha worker: %w", err) } @@ -239,7 +286,7 @@ func (a API) handleBadNodes(ctx context.Context, uploadID id.UploadID, spaceDID log.Debug("Adding good CIDs back to upload in a different shard", uploadID, ": ", badNodesErr.GoodCIDs()) for _, goodCID := range badNodesErr.GoodCIDs().Keys() { - _, err := a.AddNodeToUploadShards(ctx, uploadID, spaceDID, goodCID, nil) + _err := a.AddNodeToUploadShards(ctx, uploadID, spaceDID, goodCID, nil, handleShardCreationUpdate(uploadID, func() {}, progressCallback)) if err != nil { return fmt.Errorf("adding good CID %s back to upload %s: %w", goodCID, uploadID, err) } @@ -257,7 +304,7 @@ func (a API) handleBadNodes(ctx context.Context, uploadID id.UploadID, spaceDID return nil } -func runScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID did.DID, scansAvailable <-chan struct{}, dagScansAvailable chan<- struct{}) error { +func runScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID did.DID, scansAvailable <-chan struct{}, dagScansAvailable chan<- struct{}, progressCallback func(message UploadProgressMessage) error) error { ctx, span := tracer.Start(ctx, "scan-worker", trace.WithAttributes( attribute.String("upload.id", uploadID.String()), attribute.String("space.did", spaceDID.String()), @@ -272,6 +319,12 @@ func runScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID // doWork func() error { err := api.ExecuteScan(ctx, uploadID, func(entry scanmodel.FSEntry) error { + if err := progressCallback(FSEntryAddedMessage{ + UploadID: uploadID, + FSEntry: entry, + }); err != nil { + return fmt.Errorf("upload progress callback for fs entry %s: %w", entry.ID(), err) + } _, isDirectory := entry.(*scanmodel.Directory) _, err := api.Repo.CreateDAGScan(ctx, entry.ID(), isDirectory, uploadID, spaceDID) if err != nil { @@ -298,7 +351,7 @@ func runScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID // runDAGScanWorker runs the worker that scans files and directories into blocks, // and buckets them into shards. -func runDAGScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID did.DID, dagScansAvailable <-chan struct{}, closedShardsAvailable chan<- struct{}) error { +func runDAGScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID did.DID, dagScansAvailable <-chan struct{}, closedShardsAvailable chan<- struct{}, progressCallback func(message UploadProgressMessage) error) error { ctx, span := tracer.Start(ctx, "dag-scan-worker", trace.WithAttributes( attribute.String("upload.id", uploadID.String()), attribute.String("space.did", spaceDID.String()), @@ -306,6 +359,8 @@ func runDAGScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceD defer log.Debugf("DAG scan worker for upload %s exiting", uploadID) defer span.End() + shardCreateUpdateCallback := handleShardCreationUpdate(uploadID, func() { signal(closedShardsAvailable) }, progressCallback) + return Worker( ctx, dagScansAvailable, @@ -314,15 +369,17 @@ func runDAGScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceD func() error { err := api.ExecuteDagScansForUpload(ctx, uploadID, func(node dagmodel.Node, data []byte) error { log.Debugf("Adding node %s to upload shards for upload %s", node.CID(), uploadID) - shardClosed, err := api.AddNodeToUploadShards(ctx, uploadID, spaceDID, node.CID(), data) + if err := progressCallback(NodeAddedMesssage{ + UploadID: uploadID, + Node: node, + }); err != nil { + return fmt.Errorf("upload progress callback for node %s: %w", node.CID(), err) + } + err := api.AddNodeToUploadShards(ctx, uploadID, spaceDID, node.CID(), data, shardCreateUpdateCallback) if err != nil { return fmt.Errorf("adding node to upload shard: %w", err) } - if shardClosed { - signal(closedShardsAvailable) - } - return nil }) @@ -336,15 +393,11 @@ func runDAGScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceD // finalize func() error { // We're out of nodes, so we can close any open shards for this upload. - shardClosed, err := api.CloseUploadShards(ctx, uploadID) + err := api.CloseUploadShards(ctx, uploadID, shardCreateUpdateCallback) if err != nil { return fmt.Errorf("closing upload shards for upload %s: %w", uploadID, err) } - if shardClosed { - signal(closedShardsAvailable) - } - // Reload the upload to get the latest state from the DB. upload, err := api.Repo.GetUploadByID(ctx, uploadID) if err != nil { @@ -369,7 +422,7 @@ func runDAGScanWorker(ctx context.Context, api API, uploadID id.UploadID, spaceD } // runStorachaWorker runs the worker that adds shards and indexes to Storacha. -func runStorachaWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID did.DID, blobWork <-chan struct{}) error { +func runStorachaWorker(ctx context.Context, api API, uploadID id.UploadID, spaceDID did.DID, blobWork <-chan struct{}, progressCallback func(UploadProgressMessage) error) error { ctx, span := tracer.Start(ctx, "storacha-worker", trace.WithAttributes( attribute.String("upload.id", uploadID.String()), attribute.String("space.did", spaceDID.String()), @@ -383,7 +436,24 @@ func runStorachaWorker(ctx context.Context, api API, uploadID id.UploadID, space // doWork func() error { - err := api.AddShardsForUpload(ctx, uploadID, spaceDID) + err := api.AddShardsForUpload(ctx, uploadID, spaceDID, func(shard *shardsmodel.Shard) error { + if shard.State() == shardsmodel.ShardStateClosed { + if err := progressCallback(ShardUploadStartedMessage{ + UploadID: uploadID, + Shard: *shard, + }); err != nil { + return fmt.Errorf("upload progress callback for started shard upload %s: %w", shard.ID(), err) + } + } else { + if err := progressCallback(ShardUploadCompletedMessage{ + UploadID: uploadID, + Shard: *shard, + }); err != nil { + return fmt.Errorf("upload progress callback for completed shard upload %s: %w", shard.ID(), err) + } + } + return nil + }) if err != nil { return fmt.Errorf("`space/blob/add`ing shards for upload %s: %w", uploadID, err) } @@ -407,3 +477,25 @@ func runStorachaWorker(ctx context.Context, api API, uploadID id.UploadID, space }, ) } + +func handleShardCreationUpdate(uploadID id.UploadID, signal func(), progressCallback func(UploadProgressMessage) error) func(shard *shardsmodel.Shard) error { + return func(shard *shardsmodel.Shard) error { + if shard.State() == shardsmodel.ShardStateClosed { + if err := progressCallback(ShardClosedMessage{ + UploadID: uploadID, + Shard: *shard, + }); err != nil { + return fmt.Errorf("upload progress callback for closed shard %s: %w", shard.ID(), err) + } + signal() + } else { + if err := progressCallback(ShardCreatedMessage{ + UploadID: uploadID, + Shard: *shard, + }); err != nil { + return fmt.Errorf("upload progress callback for created shard %s: %w", shard.ID(), err) + } + } + return nil + } +}