Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions pkg/pdp/smartcontracts/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

Expand All @@ -27,6 +28,8 @@ type Verifier interface {
FindPieceIds(ctx context.Context, setId *big.Int, leafIndexs []*big.Int) ([]bindings.IPDPTypesPieceIdAndOffset, error)
CalculateProofFee(ctx context.Context, setId *big.Int) (*big.Int, error)
MaxPieceSizeLog2(ctx context.Context) (*big.Int, error)
GetActivePieces(ctx context.Context, setID *big.Int, offset *big.Int, limit *big.Int) (*ActivePieces, error)
GetActivePieceCount(ctx context.Context, setId *big.Int) (*big.Int, error)

// not part of contract code, added for convience in testing and usage
Address() common.Address
Expand Down Expand Up @@ -60,6 +63,47 @@ func NewVerifierContract(address common.Address, backend bind.ContractBackend) (
}, nil
}

type ActivePieces struct {
Pieces []cid.Cid
PieceIds []*big.Int
HasMore bool
}

func (v *verifierContract) GetActivePieces(
ctx context.Context,
setID *big.Int,
offset *big.Int,
limit *big.Int,
) (*ActivePieces, error) {
res, err := v.verifier.GetActivePieces(&bind.CallOpts{Context: ctx}, setID, offset, limit)
if err != nil {
return nil, err
}

pieces := make([]cid.Cid, len(res.Pieces))
for i, piece := range res.Pieces {
parsedCid, err := cid.Cast(piece.Data)
if err != nil {
return nil, fmt.Errorf("failed to parse piece CID at index %d: %w", i, err)
}
pieces[i] = parsedCid
}

out := &ActivePieces{
Pieces: pieces,
PieceIds: res.PieceIds,
HasMore: res.HasMore,
}

log.Debugw("cached GetActivePieces result", "setID", setID, "offset", offset, "limit", limit)

return out, nil
}

func (v *verifierContract) GetActivePieceCount(ctx context.Context, setId *big.Int) (*big.Int, error) {
return v.verifier.GetActivePieceCount(&bind.CallOpts{Context: ctx}, setId)
}

func (v *verifierContract) MaxPieceSizeLog2(ctx context.Context) (*big.Int, error) {
return v.verifier.MAXPIECESIZELOG2(&bind.CallOpts{Context: ctx})
}
Expand Down
228 changes: 195 additions & 33 deletions pkg/pdp/tasks/watch_addroot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"golang.org/x/xerrors"
Expand All @@ -16,26 +17,6 @@ import (
"github.com/storacha/piri/pkg/pdp/smartcontracts"
)

// Structures to represent database records
type ProofSetRootAdd struct {
ProofSet uint64 `db:"proofset"`
AddMessageHash string `db:"add_message_hash"`
}

// RootAddEntry represents entries from pdp_proofset_root_adds
type RootAddEntry struct {
ProofSet uint64 `db:"proofset"`
Root string `db:"root"`
AddMessageHash string `db:"add_message_hash"`
AddMessageIndex uint64 `db:"add_message_index"`
Subroot string `db:"subroot"`
SubrootOffset int64 `db:"subroot_offset"`
SubrootSize int64 `db:"subroot_size"`
PDPPieceRefID int64 `db:"pdp_pieceref"`
AddMessageOK *bool `db:"add_message_ok"`
PDPProofSetID uint64 `db:"proofset"`
}

// NewWatcherRootAdd sets up the watcher for proof set root additions
func NewWatcherRootAdd(db *gorm.DB, pcs *chainsched.Scheduler, verifier smartcontracts.Verifier) error {
if err := pcs.AddHandler(func(ctx context.Context, revert, apply *chainyypes.TipSet) error {
Expand Down Expand Up @@ -87,26 +68,32 @@ func processProofSetRootAdd(ctx context.Context, db *gorm.DB, rootAdd models.PDP
Select("tx_receipt").
Where("signed_tx_hash = ?", rootAdd.AddMessageHash).
First(&msgWait).Error
if err != nil {
return fmt.Errorf("failed to get tx_receipt for tx %s: %w", rootAdd.AddMessageHash, err)
}
txReceiptJSON := msgWait.TxReceipt

// Unmarshal the tx_receipt JSON into types.Receipt
var txReceipt types.Receipt
err = json.Unmarshal(txReceiptJSON, &txReceipt)
if err != nil {
return xerrors.Errorf("failed to unmarshal tx_receipt for tx %s: %w", rootAdd.AddMessageHash, err)
// NB(forrest): the below handles the case where the operator was unhealthy for > 16 hours
// lotus snapshots only contain 2000 epochs of state, and therefor it is possible for a
// receipt to be irretrievable from a lotus node when its from a block outside that time frame.

var txReceipt *types.Receipt
if err == nil && msgWait.TxReceipt != nil && len(msgWait.TxReceipt) > 0 {
var receipt types.Receipt
if err := json.Unmarshal(msgWait.TxReceipt, &receipt); err == nil {
txReceipt = &receipt
} else {
log.Warnf("Failed to unmarshal tx_receipt for tx %s: %v", rootAdd.AddMessageHash, err)
}
} else if err != nil {
log.Warnf("Failed to get tx_receipt from database for tx %s: %v", rootAdd.AddMessageHash, err)
}

rootIds, err := verifier.GetPieceIdsFromReceipt(&txReceipt)
// Use fallback strategy to get piece IDs
rootIds, err := getPieceIdsWithFallback(ctx, db, verifier, rootAdd, txReceipt)
if err != nil {
return err
return fmt.Errorf("failed to get piece IDs for tx %s: %w", rootAdd.AddMessageHash, err)
}

// Parse the logs to extract root IDs and other data
// Insert the root IDs
if err := insertRootIds(ctx, db, rootAdd, rootIds); err != nil {
return xerrors.Errorf("failed to extract roots from receipt for tx %s: %w", rootAdd.AddMessageHash, err)
return xerrors.Errorf("failed to insert root IDs for tx %s: %w", rootAdd.AddMessageHash, err)
}

return nil
Expand Down Expand Up @@ -172,3 +159,178 @@ func insertRootIds(

return nil
}

// getPieceIdsWithFallback attempts to get piece IDs from the transaction receipt first,
// and falls back to querying active pieces from the contract if the receipt is unavailable.
func getPieceIdsWithFallback(
ctx context.Context,
db *gorm.DB,
verifier smartcontracts.Verifier,
rootAdd models.PDPProofsetRootAdd,
txReceipt *types.Receipt,
) ([]uint64, error) {
// Try to get piece IDs from receipt first if it exists, else we request from the contract state.
if txReceipt != nil {
pieceIds, err := verifier.GetPieceIdsFromReceipt(txReceipt)
if err == nil {
return pieceIds, nil
}
log.Warnf("Failed to get piece IDs from receipt for tx %s: %v, falling back to getActivePieces", rootAdd.AddMessageHash, err)
}

// Fallback Use verifier contract getActivePieces to reconstruct piece IDs
return getPieceIdsByMatching(ctx, db, verifier, rootAdd)
}

// getPieceIdsByMatching fetches active pieces from the contract and matches them
// with the pieces in the database by their CID to determine piece IDs.
func getPieceIdsByMatching(
ctx context.Context,
db *gorm.DB,
verifier smartcontracts.Verifier,
rootAdd models.PDPProofsetRootAdd,
) ([]uint64, error) {
var rootAddEntries []models.PDPProofsetRootAdd
err := db.WithContext(ctx).
Where("proofset_id = ? AND add_message_hash = ?", rootAdd.ProofsetID, rootAdd.AddMessageHash).
Order("add_message_index ASC, subroot_offset ASC").
Find(&rootAddEntries).Error
if err != nil {
return nil, fmt.Errorf("failed to get root add entries: %w", err)
}

// pieces we wanted receipts for but failed to find
targetCIDs := make(map[string]int) // CID string -> index in result array
for _, entry := range rootAddEntries {
if entry.AddMessageIndex != nil {
targetCIDs[entry.Root] = int(*entry.AddMessageIndex)
}
}

// piece we get from looking in the contract, returned to caller eventually.
pieceIDMap := make(map[string]uint64) // CID string -> piece ID

// Need to fetch missing pieces from contract
err = fetchPieces(ctx, verifier, rootAdd, targetCIDs, pieceIDMap)
if err != nil {
return nil, err
}

// Build and return result
return buildPieceIdResult(rootAddEntries, pieceIDMap, targetCIDs, rootAdd)
}

// fetchPieces fetches active pieces from the contract in batches and matches them
// with target CIDs to determine piece IDs. Uses batch processing for efficiency.
func fetchPieces(
ctx context.Context,
verifier smartcontracts.Verifier,
rootAdd models.PDPProofsetRootAdd,
targetCIDs map[string]int,
pieceIDMap map[string]uint64,
) error {
// Find the maximum piece offset we might need
totalPieces, err := verifier.GetActivePieceCount(ctx, big.NewInt(rootAdd.ProofsetID))
if err != nil {
return fmt.Errorf("failed to get active piece count: %w", err)
}
maxNeeded := totalPieces.Uint64()

// Batch configuration
offset := big.NewInt(0)
limit := big.NewInt(500)
proofsetID := big.NewInt(rootAdd.ProofsetID)

log.Infof("Starting to fetch pieces for proofset %d, need %d pieces, max available: %d",
rootAdd.ProofsetID, len(targetCIDs), maxNeeded)

// Fetch pieces in batches until we find all needed pieces or reach the end
for offset.Uint64() < maxNeeded {
activePieces, err := verifier.GetActivePieces(ctx, proofsetID, offset, limit)
if err != nil {
return fmt.Errorf("failed to get active pieces at offset %d: %w", offset.Int64(), err)
}

// Process the pieces in this batch
for i, piece := range activePieces.Pieces {
cidStr := piece.String()
if _, found := targetCIDs[cidStr]; found {
pieceIDMap[cidStr] = activePieces.PieceIds[i].Uint64()
}
}

log.Infof("Fetched batch at offset %d: found %d/%d pieces so far",
offset.Int64(), len(pieceIDMap), len(targetCIDs))

// Check if we found all pieces
if len(pieceIDMap) == len(targetCIDs) {
log.Infof("Found all %d pieces after fetching %d items",
len(targetCIDs), offset.Int64()+int64(len(activePieces.PieceIds)))
return nil
}

// Check if there are more pieces to fetch
if !activePieces.HasMore {
log.Infof("Reached end of active pieces at offset %d",
offset.Int64()+int64(len(activePieces.PieceIds)))
break
}

// Move to next batch - use actual number of pieces returned, not the limit
actualBatchSize := big.NewInt(int64(len(activePieces.PieceIds)))
offset.Add(offset, actualBatchSize)
}

// Check if we found all required pieces
if len(pieceIDMap) < len(targetCIDs) {
// Log which pieces weren't found for debugging
missing := []string{}
for cid := range targetCIDs {
if _, found := pieceIDMap[cid]; !found {
missing = append(missing, cid)
}
}
return fmt.Errorf("failed to find all pieces: found %d/%d, missing CIDs: %v",
len(pieceIDMap), len(targetCIDs), missing)
}

return nil
}

// buildPieceIdResult constructs the final result array from the piece ID map
func buildPieceIdResult(
rootAddEntries []models.PDPProofsetRootAdd,
pieceIDMap map[string]uint64,
targetCIDs map[string]int,
rootAdd models.PDPProofsetRootAdd,
) ([]uint64, error) {
maxIndex := -1
for _, idx := range targetCIDs {
if idx > maxIndex {
maxIndex = idx
}
}

result := make([]uint64, maxIndex+1)
foundCount := 0

for _, entry := range rootAddEntries {
if entry.AddMessageIndex == nil {
continue
}

pieceID, found := pieceIDMap[entry.Root]
if !found {
return nil, fmt.Errorf("piece CID %s not found in active pieces for proofset %d", entry.Subroot, rootAdd.ProofsetID)
}

result[*entry.AddMessageIndex] = pieceID
foundCount++
}

if foundCount == 0 {
return nil, fmt.Errorf("no pieces found for tx %s", rootAdd.AddMessageHash)
}

return result, nil
}
Loading