diff --git a/market/indexstore/indexstore.go b/market/indexstore/indexstore.go index 7587ff9cc..43820955e 100644 --- a/market/indexstore/indexstore.go +++ b/market/indexstore/indexstore.go @@ -5,6 +5,7 @@ import ( "embed" "errors" "fmt" + "math" "math/rand" "sort" "strconv" @@ -18,6 +19,8 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/xerrors" + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/curio/deps/config" ) @@ -348,10 +351,10 @@ func (i *IndexStore) RemoveIndexes(ctx context.Context, pieceCidv2 cid.Cid) erro return nil } -// PieceInfo contains PieceCidV2 and BlockSize +// PieceInfo contains PieceCid and BlockSize. PieceCid can be either v1 or v2. type PieceInfo struct { - PieceCidV2 cid.Cid - BlockSize uint64 + PieceCid cid.Cid + BlockSize uint64 } // PiecesContainingMultihash gets all pieces that contain a multihash along with their BlockSize @@ -368,8 +371,8 @@ func (i *IndexStore) PiecesContainingMultihash(ctx context.Context, m multihash. return nil, fmt.Errorf("parsing piece cid: %w", err) } pieces = append(pieces, PieceInfo{ - PieceCidV2: pcid, - BlockSize: blockSize, + PieceCid: pcid, + BlockSize: blockSize, }) } if err := iter.Close(); err != nil { @@ -396,21 +399,41 @@ func (i *IndexStore) GetOffset(ctx context.Context, pieceCidv2 cid.Cid, hash mul } func (i *IndexStore) GetPieceHashRange(ctx context.Context, piecev2 cid.Cid, start multihash.Multihash, num int64) ([]multihash.Multihash, error) { - qry := "SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash >= ? ORDER BY PayloadMultihash ASC LIMIT ?" - iter := i.session.Query(qry, piecev2.Bytes(), []byte(start), num).WithContext(ctx).Iter() - - var hashes []multihash.Multihash - var r []byte - for iter.Scan(&r) { - m := multihash.Multihash(r) - hashes = append(hashes, m) + getHashes := func(pieceCid cid.Cid, start multihash.Multihash, num int64) ([]multihash.Multihash, error) { + qry := "SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash >= ? ORDER BY PayloadMultihash ASC LIMIT ?" + iter := i.session.Query(qry, pieceCid.Bytes(), []byte(start), num).WithContext(ctx).Iter() + + var hashes []multihash.Multihash + var r []byte + for iter.Scan(&r) { + m := multihash.Multihash(r) + hashes = append(hashes, m) + + // Allocate new r, preallocating the typical size of a multihash (36 bytes) + r = make([]byte, 0, 36) + } + if err := iter.Close(); err != nil { + return nil, xerrors.Errorf("iterating piece hash range (P:0x%02x, H:0x%02x, n:%d): %w", pieceCid.Bytes(), []byte(start), num, err) + } + return hashes, nil + } - // Allocate new r, preallocating the typical size of a multihash (36 bytes) - r = make([]byte, 0, 36) + hashes, err := getHashes(piecev2, start, num) + if err != nil { + return nil, err } - if err := iter.Close(); err != nil { - return nil, xerrors.Errorf("iterating piece hash range (P:0x%02x, H:0x%02x, n:%d): %w", piecev2.Bytes(), []byte(start), num, err) + + if len(hashes) == 0 { + pcid1, _, err := commcid.PieceCidV1FromV2(piecev2) + if err != nil { + return nil, xerrors.Errorf("getting piece cid v1 from v2: %w", err) + } + hashes, err = getHashes(pcid1, start, num) + if err != nil { + return nil, err + } } + if len(hashes) != int(num) { return nil, xerrors.Errorf("expected %d hashes, got %d (possibly missing indexes)", num, len(hashes)) } @@ -602,71 +625,100 @@ func (i *IndexStore) UpdatePieceCidV1ToV2(ctx context.Context, pieceCidV1 cid.Ci p1 := pieceCidV1.Bytes() p2 := pieceCidV2.Bytes() - // First, select all PayloadMultihash for the given PieceCid from PieceBlockOffsetSize - selectQry := `SELECT PayloadMultihash FROM PieceBlockOffsetSize WHERE PieceCid = ?` - iter := i.session.Query(selectQry, p1).WithContext(ctx).Iter() - - var payloadMultihashBytes []byte - var payloadMultihashes [][]byte - for iter.Scan(&payloadMultihashBytes) { - // Copy the bytes since the slice will be overwritten - mhCopy := make([]byte, len(payloadMultihashBytes)) - copy(mhCopy, payloadMultihashBytes) - payloadMultihashes = append(payloadMultihashes, mhCopy) - } - if err := iter.Close(); err != nil { - return xerrors.Errorf("scanning PayloadMultihash for piece %s: %w", pieceCidV1.String(), err) + batchLimit := i.settings.InsertBatchSize + if batchLimit <= 0 { + batchLimit = 15000 } - // Prepare batch replace for PayloadToPieces - updatePiecesQry := `UPDATE PayloadToPieces SET PieceCid = ? WHERE PayloadMultihash = ? AND PieceCid = ?` - batch := i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - batchSize := i.settings.InsertBatchSize + pageSize := int(math.Floor(float64(batchLimit) / 2)) - for idx, payloadMH := range payloadMultihashes { - batch.Entries = append(batch.Entries, gocql.BatchEntry{ - Stmt: updatePiecesQry, - Args: []interface{}{p2, payloadMH, p1}, - Idempotent: true, - }) - - if len(batch.Entries) >= batchSize || idx == len(payloadMultihashes)-1 { - if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil { - return xerrors.Errorf("executing batch replace for PayloadToPieces for piece %s: %w", pieceCidV1, err) - } - batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + flush := func(batch *gocql.Batch) error { + if len(batch.Entries) == 0 { + return nil } - } - - if len(batch.Entries) >= 0 { - if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil { - return xerrors.Errorf("executing batch replace for PayloadToPieces for piece %s: %w", pieceCidV1, err) + if err := i.executeBatchWithRetry(ctx, batch, pieceCidV2); err != nil { + return xerrors.Errorf("executing batch for updating index from piece %s to %s: %w", pieceCidV1.String(), pieceCidV2.String(), err) } + return nil } - // Prepare batch replace for PieceBlockOffsetSize - updatePiecesQry = `UPDATE PieceBlockOffsetSize SET PieceCid = ? WHERE PayloadMultihash = ? AND PieceCid = ?` - batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - batchSize = i.settings.InsertBatchSize - - for idx, payloadMH := range payloadMultihashes { - batch.Entries = append(batch.Entries, gocql.BatchEntry{ - Stmt: updatePiecesQry, - Args: []interface{}{p2, payloadMH, p1}, - Idempotent: true, - }) - - if len(batch.Entries) >= batchSize || idx == len(payloadMultihashes)-1 { - if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil { - return xerrors.Errorf("executing batch replace for PieceBlockOffsetSize for piece %s: %w", pieceCidV1, err) + // -------- Pass 1: PayloadToPieces -------- + { + iter := i.session.Query(`SELECT PayloadMultihash, BlockSize FROM PayloadToPieces WHERE PieceCid = ?`, p1).WithContext(ctx).PageSize(pageSize).Iter() + + batch := i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) // Batches must be logged for consistency + var mh []byte + var bs int64 + for iter.Scan(&mh, &bs) { + mhCopy := make([]byte, len(mh)) + copy(mhCopy, mh) + + // INSERT new mapping + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: `INSERT INTO PayloadToPieces (PayloadMultihash, PieceCid, BlockSize) VALUES (?, ?, ?)`, + Args: []any{mhCopy, p2, bs}, + Idempotent: true, + }) + // DELETE old mapping + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: `DELETE FROM PayloadToPieces WHERE PayloadMultihash = ? AND PieceCid = ?`, + Args: []any{mhCopy, p1}, + Idempotent: true, + }) + + if len(batch.Entries) >= batchLimit { + if err := flush(batch); err != nil { + _ = iter.Close() + return err + } + batch = i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) } - batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + } + if err := iter.Close(); err != nil { + return xerrors.Errorf("scan PayloadToPieces for piece %s: %w", pieceCidV1, err) + } + if err := flush(batch); err != nil { + return err } } - if len(batch.Entries) >= 0 { - if err := i.executeBatchWithRetry(ctx, batch, pieceCidV1); err != nil { - return xerrors.Errorf("executing batch replace for PieceBlockOffsetSize for piece %s: %w", pieceCidV1, err) + // -------- Pass 2: PieceBlockOffsetSize -------- + { + iter := i.session.Query(`SELECT PayloadMultihash, BlockOffset FROM PieceBlockOffsetSize WHERE PieceCid = ?`, p1).WithContext(ctx).PageSize(pageSize).Iter() + + batch := i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) + var mh []byte + var off int64 + for iter.Scan(&mh, &off) { + mhCopy := make([]byte, len(mh)) + copy(mhCopy, mh) + + // INSERT new mapping + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: `INSERT INTO PieceBlockOffsetSize (PieceCid, PayloadMultihash, BlockOffset) VALUES (?, ?, ?)`, + Args: []any{p2, mhCopy, off}, + Idempotent: true, + }) + // DELETE old mapping + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: `DELETE FROM PieceBlockOffsetSize WHERE PieceCid = ? AND PayloadMultihash = ?`, + Args: []any{p1, mhCopy}, + Idempotent: true, + }) + + if len(batch.Entries) >= batchLimit { + if err := flush(batch); err != nil { + _ = iter.Close() + return err + } + batch = i.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) + } + } + if err := iter.Close(); err != nil { + return xerrors.Errorf("scan PieceBlockOffsetSize for piece %s: %w", pieceCidV1, err) + } + if err := flush(batch); err != nil { + return err } } diff --git a/market/indexstore/indexstore_test.go b/market/indexstore/indexstore_test.go index 33e0cb44f..f245abb49 100644 --- a/market/indexstore/indexstore_test.go +++ b/market/indexstore/indexstore_test.go @@ -102,7 +102,7 @@ func TestNewIndexStore(t *testing.T) { // Add index to the store var eg errgroup.Group eg.Go(func() error { - serr := idxStore.AddIndex(ctx, pcid2, recs) + serr := idxStore.AddIndex(ctx, pcid1, recs) return serr }) @@ -132,10 +132,18 @@ func TestNewIndexStore(t *testing.T) { pcids, err := idxStore.PiecesContainingMultihash(ctx, m) require.NoError(t, err) require.Len(t, pcids, 1) - require.Equal(t, pcids[0].PieceCidV2.String(), pcid2.String()) + require.Equal(t, pcids[0].PieceCid.String(), pcid1.String()) + + // Migrate V1 to V2 + err = idxStore.UpdatePieceCidV1ToV2(ctx, pcid1, pcid2) + require.NoError(t, err) + pcids, err = idxStore.PiecesContainingMultihash(ctx, m) + require.NoError(t, err) + require.Len(t, pcids, 1) + require.Equal(t, pcids[0].PieceCid.String(), pcid2.String()) // Remove all indexes from the store - err = idxStore.RemoveIndexes(ctx, pcids[0].PieceCidV2) + err = idxStore.RemoveIndexes(ctx, pcids[0].PieceCid) require.NoError(t, err) err = idxStore.session.Query("SELECT * FROM piece_by_aggregate").Exec() diff --git a/market/ipni/chunker/serve-chunker.go b/market/ipni/chunker/serve-chunker.go index 975403c4d..01b00ff0a 100644 --- a/market/ipni/chunker/serve-chunker.go +++ b/market/ipni/chunker/serve-chunker.go @@ -6,6 +6,7 @@ import ( "context" "encoding/hex" "errors" + "fmt" "io" "time" @@ -177,6 +178,43 @@ func (p *ServeChunker) getEntry(rctx context.Context, block cid.Cid, speculated return nil, xerrors.Errorf("parsing piece CID: %w", err) } + // Convert to pcid2 if needed + yes := commcidv2.IsPieceCidV2(pieceCidv2) + if !yes { + var rawSize int64 + var singlePiece bool + err := p.db.QueryRow(ctx, `WITH meta AS ( + SELECT piece_size + FROM market_piece_metadata + WHERE piece_cid = $1 + ), + exact AS ( + SELECT COUNT(*) AS n, MIN(piece_size) AS piece_size + FROM meta + ), + raw AS ( + SELECT MAX(mpd.raw_size) AS raw_size + FROM market_piece_deal mpd + WHERE mpd.piece_cid = $1 + AND mpd.piece_length = (SELECT piece_size FROM exact) + AND (SELECT n FROM exact) = 1 + ) + SELECT + COALESCE((SELECT raw_size FROM raw), 0) AS raw_size, + ((SELECT n FROM exact) = 1) AS has_single_metadata;`, pieceCidv2.String()).Scan(&rawSize, &singlePiece) + if err != nil { + return nil, fmt.Errorf("failed to get piece metadata: %w", err) + } + if !singlePiece { + return nil, fmt.Errorf("more than 1 piece metadata found for piece cid %s, please use piece cid v2", pieceCidv2.String()) + } + pcid2, err := commcidv2.PieceCidV2FromV1(pieceCidv2, uint64(rawSize)) + if err != nil { + return nil, fmt.Errorf("failed to convert piece cid v1 to v2: %w", err) + } + pieceCidv2 = pcid2 + } + if leave, ok := p.noSkipCache.Get(pieceCidv2); !ok || time.Now().After(leave) { skip, err := p.checkIsEntrySkip(ctx, block) if err != nil { diff --git a/market/retrieval/remoteblockstore/remoteblockstore.go b/market/retrieval/remoteblockstore/remoteblockstore.go index f4b1c09f3..30eb8ee39 100644 --- a/market/retrieval/remoteblockstore/remoteblockstore.go +++ b/market/retrieval/remoteblockstore/remoteblockstore.go @@ -115,7 +115,7 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, var merr error for _, piece := range pieces { data, err := func() ([]byte, error) { - reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCidV2, true) + reader, _, err := ro.cpr.GetSharedPieceReader(ctx, piece.PieceCid, true) if err != nil { return nil, fmt.Errorf("getting piece reader: %w", err) } @@ -124,19 +124,19 @@ func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, }(reader) // Get the offset of the block within the piece (CAR file) - offset, err := ro.idxApi.GetOffset(ctx, piece.PieceCidV2, c.Hash()) + offset, err := ro.idxApi.GetOffset(ctx, piece.PieceCid, c.Hash()) // This can be pieceCidV2 or pieceCidV1, but we don't care because we are feeding back the db output if err != nil { - return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, piece.PieceCidV2, err) + return nil, fmt.Errorf("getting offset/size for cid %s in piece %s: %w", c, piece.PieceCid, err) } // Seek to the section offset readerAt := io.NewSectionReader(reader, int64(offset), int64(piece.BlockSize+MaxCarBlockPrefixSize)) readCid, data, err := util.ReadNode(bufio.NewReader(readerAt)) if err != nil { - return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, piece.PieceCidV2, err) + return nil, fmt.Errorf("reading data for block %s from reader for piece %s: %w", c, piece.PieceCid, err) } if !bytes.Equal(readCid.Hash(), c.Hash()) { - return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, piece.PieceCidV2, c) + return nil, fmt.Errorf("read block %s from reader for piece %s, but expected block %s", readCid, piece.PieceCid, c) } return data, nil }() diff --git a/tasks/gc/task_cleanup_piece.go b/tasks/gc/task_cleanup_piece.go index a4512d64e..977f6b4f3 100644 --- a/tasks/gc/task_cleanup_piece.go +++ b/tasks/gc/task_cleanup_piece.go @@ -321,7 +321,7 @@ func (p *PieceCleanupTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) } if dropIndex { - err = dropIndexes(ctx, p.indexStore, pcid2) + err = dropIndexes(ctx, p.indexStore, pcid2, pi.PieceCIDV1) if err != nil { return false, xerrors.Errorf("failed to drop indexes for piece %s: %w", pcid2, err) } @@ -392,11 +392,16 @@ func (p *PieceCleanupTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return true, nil } -func dropIndexes(ctx context.Context, indexStore *indexstore.IndexStore, pieceCid cid.Cid) error { +func dropIndexes(ctx context.Context, indexStore *indexstore.IndexStore, pieceCid, pieceCid2 cid.Cid) error { err := indexStore.RemoveIndexes(ctx, pieceCid) if err != nil { return xerrors.Errorf("failed to remove indexes for piece %s: %w", pieceCid, err) } + + err = indexStore.RemoveIndexes(ctx, pieceCid2) + if err != nil { + return xerrors.Errorf("failed to remove indexes for piece %s: %w", pieceCid, err) + } return nil } diff --git a/tasks/indexing/task_check_indexes.go b/tasks/indexing/task_check_indexes.go index c92363cfe..29c5445c4 100644 --- a/tasks/indexing/task_check_indexes.go +++ b/tasks/indexing/task_check_indexes.go @@ -154,12 +154,6 @@ func (c *CheckIndexesTask) checkIndexing(ctx context.Context, taskID harmonytask } if hasEnt { - err = c.indexStore.UpdatePieceCidV1ToV2(ctx, p.PieceCID, pieceCid) - if err != nil { - return xerrors.Errorf("updating piece cid v1 to v2: %w", err) - } - log.Infow("piece cid v1 to v2 updated", "piece", p.PieceCID, "task", taskID) - have++ continue } diff --git a/tasks/indexing/task_ipni.go b/tasks/indexing/task_ipni.go index d01d4c81c..97bf5f375 100644 --- a/tasks/indexing/task_ipni.go +++ b/tasks/indexing/task_ipni.go @@ -20,6 +20,7 @@ import ( "github.com/ipni/go-libipni/metadata" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-varint" "github.com/oklog/ulid" "github.com/yugabyte/pgx/v5" "golang.org/x/sync/errgroup" @@ -100,6 +101,10 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b return false, xerrors.Errorf("getting ipni task params: %w", err) } + if len(tasks) == 0 { + return true, nil + } + if len(tasks) != 1 { return false, xerrors.Errorf("expected 1 ipni task params, got %d", len(tasks)) } @@ -309,7 +314,11 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b eg.Go(func() error { defer close(addFail) for rec := range recs { - serr := chk.Accept(rec.Cid.Hash(), int64(rec.Offset), rec.Size) + // CAR sections are [varint (length), CID, blockData] + combinedSize := rec.Size + uint64(rec.Cid.ByteLen()) + lenSize := uint64(varint.UvarintSize(combinedSize)) + sectionSize := combinedSize + lenSize + serr := chk.Accept(rec.Cid.Hash(), int64(rec.Offset), sectionSize) if serr != nil { addFail <- struct{}{} return serr @@ -503,7 +512,130 @@ func (I *IPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b } func (I *IPNITask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - return &ids[0], nil + type task struct { + TaskID harmonytask.TaskID `db:"task_id"` + ID string `db:"id"` + StorageID sql.NullString `db:"storage_id"` + IsRm bool `db:"is_rm"` + } + + if storiface.FTUnsealed != 1 { + panic("storiface.FTUnsealed != 1") + } + + if storiface.FTPiece != 32 { + panic("storiface.FTPiece != 32") + } + + ctx := context.Background() + + indIDs := make([]int64, len(ids)) + for i, id := range ids { + indIDs[i] = int64(id) + } + + var tasks []task + + err := I.db.Select(ctx, &tasks, ` + SELECT task_id, id, is_rm FROM ipni_task WHERE task_id = ANY($1)`, indIDs) + if err != nil { + return nil, xerrors.Errorf("getting task details: %w", err) + } + + var mk12TaskIds []harmonytask.TaskID + var mk20TaskIds []harmonytask.TaskID + + for _, t := range tasks { + if t.IsRm { + return &ids[0], nil // If this is rm task then storage is not needed + } + _, err := ulid.Parse(t.ID) + if err == nil { + mk20TaskIds = append(mk20TaskIds, t.TaskID) + } else { + _, err := uuid.Parse(t.ID) + if err != nil { + return nil, xerrors.Errorf("parsing task id: %w", err) + } + mk12TaskIds = append(mk12TaskIds, t.TaskID) + } + } + + var finalTasks []task + + if len(mk12TaskIds) > 0 { + var mk12Tasks []task + err := I.db.Select(ctx, &mk12Tasks, ` + SELECT dp.task_id, dp.id, l.storage_id FROM ipni_task dp + LEFT JOIN sector_location l ON dp.sp_id = l.miner_id AND dp.sector = l.sector_num + WHERE dp.task_id = ANY ($1) AND (l.sector_filetype IS NULL OR l.sector_filetype = 1)`, indIDs) + if err != nil { + return nil, xerrors.Errorf("getting storage details: %w", err) + } + finalTasks = append(finalTasks, mk12Tasks...) + } + + if len(mk20TaskIds) > 0 { + var mk20Tasks []task + err := I.db.Select(ctx, &mk20Tasks, ` + SELECT + dp.task_id, + dp.id, + l.storage_id + FROM ipni_task dp + JOIN market_piece_deal mpd ON mpd.id = dp.id + JOIN parked_piece_refs pr ON pr.ref_id = mpd.piece_ref + JOIN sector_location l ON l.miner_id = 0 + AND l.sector_num = pr.piece_id + AND l.sector_filetype = 32 + WHERE dp.task_id = ANY ($1); + `, indIDs) + if err != nil { + return nil, xerrors.Errorf("getting storage details: %w", err) + } + finalTasks = append(finalTasks, mk20Tasks...) + } + + ls, err := I.sc.LocalStorage(ctx) + if err != nil { + return nil, xerrors.Errorf("getting local storage: %w", err) + } + + acceptables := map[harmonytask.TaskID]bool{} + + for _, t := range ids { + acceptables[t] = true + } + + for _, t := range finalTasks { + if _, ok := acceptables[t.TaskID]; !ok { + continue + } + + acceptables[t.TaskID] = false // note the task was found + + if !t.StorageID.Valid { + // no unsealed copy + return &t.TaskID, nil + } + + for _, l := range ls { + if string(l.ID) == t.StorageID.String { + return &t.TaskID, nil + } + } + } + + // special case for orphan tasks which are created for non-announced pieces + for taskID, notAccepted := range acceptables { + if !notAccepted { + continue + } + + return &taskID, nil + } + + return nil, nil } func (I *IPNITask) TypeDetails() harmonytask.TaskTypeDetails { diff --git a/tasks/indexing/task_pdp_ipni.go b/tasks/indexing/task_pdp_ipni.go index ee9f4c279..eac4f5ab7 100644 --- a/tasks/indexing/task_pdp_ipni.go +++ b/tasks/indexing/task_pdp_ipni.go @@ -17,6 +17,7 @@ import ( "github.com/ipni/go-libipni/metadata" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-varint" "github.com/oklog/ulid" "github.com/yugabyte/pgx/v5" "golang.org/x/sync/errgroup" @@ -82,6 +83,10 @@ func (P *PDPIPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don return false, xerrors.Errorf("getting ipni task params: %w", err) } + if len(tasks) == 0 { + return true, nil + } + if len(tasks) != 1 { return false, xerrors.Errorf("expected 1 ipni task params, got %d", len(tasks)) } @@ -264,7 +269,11 @@ func (P *PDPIPNITask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don eg.Go(func() error { defer close(addFail) for rec := range recs { - serr := chk.Accept(rec.Cid.Hash(), int64(rec.Offset), rec.Size) + // CAR sections are [varint (length), CID, blockData] + combinedSize := rec.Size + uint64(rec.Cid.ByteLen()) + lenSize := uint64(varint.UvarintSize(combinedSize)) + sectionSize := combinedSize + lenSize + serr := chk.Accept(rec.Cid.Hash(), int64(rec.Offset), sectionSize) if serr != nil { addFail <- struct{}{} return serr