diff --git a/pkg/pdp/smartcontracts/verifier.go b/pkg/pdp/smartcontracts/verifier.go index be3c2b64..28e3e5bf 100644 --- a/pkg/pdp/smartcontracts/verifier.go +++ b/pkg/pdp/smartcontracts/verifier.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" @@ -27,6 +28,8 @@ type Verifier interface { FindPieceIds(ctx context.Context, setId *big.Int, leafIndexs []*big.Int) ([]bindings.IPDPTypesPieceIdAndOffset, error) CalculateProofFee(ctx context.Context, setId *big.Int) (*big.Int, error) MaxPieceSizeLog2(ctx context.Context) (*big.Int, error) + GetActivePieces(ctx context.Context, setID *big.Int, offset *big.Int, limit *big.Int) (*ActivePieces, error) + GetActivePieceCount(ctx context.Context, setId *big.Int) (*big.Int, error) // not part of contract code, added for convience in testing and usage Address() common.Address @@ -60,6 +63,47 @@ func NewVerifierContract(address common.Address, backend bind.ContractBackend) ( }, nil } +type ActivePieces struct { + Pieces []cid.Cid + PieceIds []*big.Int + HasMore bool +} + +func (v *verifierContract) GetActivePieces( + ctx context.Context, + setID *big.Int, + offset *big.Int, + limit *big.Int, +) (*ActivePieces, error) { + res, err := v.verifier.GetActivePieces(&bind.CallOpts{Context: ctx}, setID, offset, limit) + if err != nil { + return nil, err + } + + pieces := make([]cid.Cid, len(res.Pieces)) + for i, piece := range res.Pieces { + parsedCid, err := cid.Cast(piece.Data) + if err != nil { + return nil, fmt.Errorf("failed to parse piece CID at index %d: %w", i, err) + } + pieces[i] = parsedCid + } + + out := &ActivePieces{ + Pieces: pieces, + PieceIds: res.PieceIds, + HasMore: res.HasMore, + } + + log.Debugw("cached GetActivePieces result", "setID", setID, "offset", offset, "limit", limit) + + return out, nil +} + +func (v *verifierContract) GetActivePieceCount(ctx context.Context, setId *big.Int) (*big.Int, error) { + return v.verifier.GetActivePieceCount(&bind.CallOpts{Context: ctx}, setId) +} + func (v *verifierContract) MaxPieceSizeLog2(ctx context.Context) (*big.Int, error) { return v.verifier.MAXPIECESIZELOG2(&bind.CallOpts{Context: ctx}) } diff --git a/pkg/pdp/tasks/watch_addroot.go b/pkg/pdp/tasks/watch_addroot.go index 41349b53..6c6957cd 100644 --- a/pkg/pdp/tasks/watch_addroot.go +++ b/pkg/pdp/tasks/watch_addroot.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/big" "github.com/ethereum/go-ethereum/core/types" "golang.org/x/xerrors" @@ -16,26 +17,6 @@ import ( "github.com/storacha/piri/pkg/pdp/smartcontracts" ) -// Structures to represent database records -type ProofSetRootAdd struct { - ProofSet uint64 `db:"proofset"` - AddMessageHash string `db:"add_message_hash"` -} - -// RootAddEntry represents entries from pdp_proofset_root_adds -type RootAddEntry struct { - ProofSet uint64 `db:"proofset"` - Root string `db:"root"` - AddMessageHash string `db:"add_message_hash"` - AddMessageIndex uint64 `db:"add_message_index"` - Subroot string `db:"subroot"` - SubrootOffset int64 `db:"subroot_offset"` - SubrootSize int64 `db:"subroot_size"` - PDPPieceRefID int64 `db:"pdp_pieceref"` - AddMessageOK *bool `db:"add_message_ok"` - PDPProofSetID uint64 `db:"proofset"` -} - // NewWatcherRootAdd sets up the watcher for proof set root additions func NewWatcherRootAdd(db *gorm.DB, pcs *chainsched.Scheduler, verifier smartcontracts.Verifier) error { if err := pcs.AddHandler(func(ctx context.Context, revert, apply *chainyypes.TipSet) error { @@ -87,26 +68,32 @@ func processProofSetRootAdd(ctx context.Context, db *gorm.DB, rootAdd models.PDP Select("tx_receipt"). Where("signed_tx_hash = ?", rootAdd.AddMessageHash). First(&msgWait).Error - if err != nil { - return fmt.Errorf("failed to get tx_receipt for tx %s: %w", rootAdd.AddMessageHash, err) - } - txReceiptJSON := msgWait.TxReceipt - // Unmarshal the tx_receipt JSON into types.Receipt - var txReceipt types.Receipt - err = json.Unmarshal(txReceiptJSON, &txReceipt) - if err != nil { - return xerrors.Errorf("failed to unmarshal tx_receipt for tx %s: %w", rootAdd.AddMessageHash, err) + // NB(forrest): the below handles the case where the operator was unhealthy for > 16 hours + // lotus snapshots only contain 2000 epochs of state, and therefor it is possible for a + // receipt to be irretrievable from a lotus node when its from a block outside that time frame. + + var txReceipt *types.Receipt + if err == nil && msgWait.TxReceipt != nil && len(msgWait.TxReceipt) > 0 { + var receipt types.Receipt + if err := json.Unmarshal(msgWait.TxReceipt, &receipt); err == nil { + txReceipt = &receipt + } else { + log.Warnf("Failed to unmarshal tx_receipt for tx %s: %v", rootAdd.AddMessageHash, err) + } + } else if err != nil { + log.Warnf("Failed to get tx_receipt from database for tx %s: %v", rootAdd.AddMessageHash, err) } - rootIds, err := verifier.GetPieceIdsFromReceipt(&txReceipt) + // Use fallback strategy to get piece IDs + rootIds, err := getPieceIdsWithFallback(ctx, db, verifier, rootAdd, txReceipt) if err != nil { - return err + return fmt.Errorf("failed to get piece IDs for tx %s: %w", rootAdd.AddMessageHash, err) } - // Parse the logs to extract root IDs and other data + // Insert the root IDs if err := insertRootIds(ctx, db, rootAdd, rootIds); err != nil { - return xerrors.Errorf("failed to extract roots from receipt for tx %s: %w", rootAdd.AddMessageHash, err) + return xerrors.Errorf("failed to insert root IDs for tx %s: %w", rootAdd.AddMessageHash, err) } return nil @@ -172,3 +159,178 @@ func insertRootIds( return nil } + +// getPieceIdsWithFallback attempts to get piece IDs from the transaction receipt first, +// and falls back to querying active pieces from the contract if the receipt is unavailable. +func getPieceIdsWithFallback( + ctx context.Context, + db *gorm.DB, + verifier smartcontracts.Verifier, + rootAdd models.PDPProofsetRootAdd, + txReceipt *types.Receipt, +) ([]uint64, error) { + // Try to get piece IDs from receipt first if it exists, else we request from the contract state. + if txReceipt != nil { + pieceIds, err := verifier.GetPieceIdsFromReceipt(txReceipt) + if err == nil { + return pieceIds, nil + } + log.Warnf("Failed to get piece IDs from receipt for tx %s: %v, falling back to getActivePieces", rootAdd.AddMessageHash, err) + } + + // Fallback Use verifier contract getActivePieces to reconstruct piece IDs + return getPieceIdsByMatching(ctx, db, verifier, rootAdd) +} + +// getPieceIdsByMatching fetches active pieces from the contract and matches them +// with the pieces in the database by their CID to determine piece IDs. +func getPieceIdsByMatching( + ctx context.Context, + db *gorm.DB, + verifier smartcontracts.Verifier, + rootAdd models.PDPProofsetRootAdd, +) ([]uint64, error) { + var rootAddEntries []models.PDPProofsetRootAdd + err := db.WithContext(ctx). + Where("proofset_id = ? AND add_message_hash = ?", rootAdd.ProofsetID, rootAdd.AddMessageHash). + Order("add_message_index ASC, subroot_offset ASC"). + Find(&rootAddEntries).Error + if err != nil { + return nil, fmt.Errorf("failed to get root add entries: %w", err) + } + + // pieces we wanted receipts for but failed to find + targetCIDs := make(map[string]int) // CID string -> index in result array + for _, entry := range rootAddEntries { + if entry.AddMessageIndex != nil { + targetCIDs[entry.Root] = int(*entry.AddMessageIndex) + } + } + + // piece we get from looking in the contract, returned to caller eventually. + pieceIDMap := make(map[string]uint64) // CID string -> piece ID + + // Need to fetch missing pieces from contract + err = fetchPieces(ctx, verifier, rootAdd, targetCIDs, pieceIDMap) + if err != nil { + return nil, err + } + + // Build and return result + return buildPieceIdResult(rootAddEntries, pieceIDMap, targetCIDs, rootAdd) +} + +// fetchPieces fetches active pieces from the contract in batches and matches them +// with target CIDs to determine piece IDs. Uses batch processing for efficiency. +func fetchPieces( + ctx context.Context, + verifier smartcontracts.Verifier, + rootAdd models.PDPProofsetRootAdd, + targetCIDs map[string]int, + pieceIDMap map[string]uint64, +) error { + // Find the maximum piece offset we might need + totalPieces, err := verifier.GetActivePieceCount(ctx, big.NewInt(rootAdd.ProofsetID)) + if err != nil { + return fmt.Errorf("failed to get active piece count: %w", err) + } + maxNeeded := totalPieces.Uint64() + + // Batch configuration + offset := big.NewInt(0) + limit := big.NewInt(500) + proofsetID := big.NewInt(rootAdd.ProofsetID) + + log.Infof("Starting to fetch pieces for proofset %d, need %d pieces, max available: %d", + rootAdd.ProofsetID, len(targetCIDs), maxNeeded) + + // Fetch pieces in batches until we find all needed pieces or reach the end + for offset.Uint64() < maxNeeded { + activePieces, err := verifier.GetActivePieces(ctx, proofsetID, offset, limit) + if err != nil { + return fmt.Errorf("failed to get active pieces at offset %d: %w", offset.Int64(), err) + } + + // Process the pieces in this batch + for i, piece := range activePieces.Pieces { + cidStr := piece.String() + if _, found := targetCIDs[cidStr]; found { + pieceIDMap[cidStr] = activePieces.PieceIds[i].Uint64() + } + } + + log.Infof("Fetched batch at offset %d: found %d/%d pieces so far", + offset.Int64(), len(pieceIDMap), len(targetCIDs)) + + // Check if we found all pieces + if len(pieceIDMap) == len(targetCIDs) { + log.Infof("Found all %d pieces after fetching %d items", + len(targetCIDs), offset.Int64()+int64(len(activePieces.PieceIds))) + return nil + } + + // Check if there are more pieces to fetch + if !activePieces.HasMore { + log.Infof("Reached end of active pieces at offset %d", + offset.Int64()+int64(len(activePieces.PieceIds))) + break + } + + // Move to next batch - use actual number of pieces returned, not the limit + actualBatchSize := big.NewInt(int64(len(activePieces.PieceIds))) + offset.Add(offset, actualBatchSize) + } + + // Check if we found all required pieces + if len(pieceIDMap) < len(targetCIDs) { + // Log which pieces weren't found for debugging + missing := []string{} + for cid := range targetCIDs { + if _, found := pieceIDMap[cid]; !found { + missing = append(missing, cid) + } + } + return fmt.Errorf("failed to find all pieces: found %d/%d, missing CIDs: %v", + len(pieceIDMap), len(targetCIDs), missing) + } + + return nil +} + +// buildPieceIdResult constructs the final result array from the piece ID map +func buildPieceIdResult( + rootAddEntries []models.PDPProofsetRootAdd, + pieceIDMap map[string]uint64, + targetCIDs map[string]int, + rootAdd models.PDPProofsetRootAdd, +) ([]uint64, error) { + maxIndex := -1 + for _, idx := range targetCIDs { + if idx > maxIndex { + maxIndex = idx + } + } + + result := make([]uint64, maxIndex+1) + foundCount := 0 + + for _, entry := range rootAddEntries { + if entry.AddMessageIndex == nil { + continue + } + + pieceID, found := pieceIDMap[entry.Root] + if !found { + return nil, fmt.Errorf("piece CID %s not found in active pieces for proofset %d", entry.Subroot, rootAdd.ProofsetID) + } + + result[*entry.AddMessageIndex] = pieceID + foundCount++ + } + + if foundCount == 0 { + return nil, fmt.Errorf("no pieces found for tx %s", rootAdd.AddMessageHash) + } + + return result, nil +}