Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
es := getSenderEth()
sdeps.EthSender = es

pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched)
pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched, es)
pdp.NewWatcherPieceAdd(db, must.One(dependencies.EthClient.Val()), chainSched)

pdpProveTask := pdp.NewProveTask(chainSched, db, must.One(dependencies.EthClient.Val()), dependencies.Chain, es, dependencies.CachedPieceReader)
Expand Down
10 changes: 10 additions & 0 deletions harmony/harmonydb/sql/20251007-pdp-upload-and-create.sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE IF NOT EXISTS pdp_pending_piece_adds (
create_message_hash TEXT PRIMARY KEY,
service TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX IF NOT EXISTS idx_pdp_pending_piece_adds_service ON pdp_pending_piece_adds(service);


20 changes: 20 additions & 0 deletions pdp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)
type RequestBody struct {
RecordKeeper string `json:"recordKeeper"`
ExtraData *string `json:"extraData,omitempty"`
Pieces *struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's type this struct rather than using an anonymous one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this is going to need to be mandatory, so don't make it a pointer; the old CreateDataSet was removed (I didn't realise that when I wrote the original issue)

Pieces []struct {
PieceCID string `json:"pieceCid"`
SubPieces []struct {
SubPieceCID string `json:"subPieceCid"`
} `json:"subPieces"`
} `json:"pieces"`
ExtraData *string `json:"extraData,omitempty"`
} `json:"pieces,omitempty"`
}

body, err := io.ReadAll(r.Body)
Expand Down Expand Up @@ -245,6 +254,17 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request)
return
}

if reqBody.Pieces != nil {
_, err := p.db.Exec(ctx, `
INSERT INTO pdp_pending_piece_adds (create_message_hash, service, payload)
VALUES ($1, $2, $3)
ON CONFLICT (create_message_hash) DO UPDATE SET payload = EXCLUDED.payload
`, txHashLower, serviceLabel, body)
if err != nil {
log.Errorf("Failed to persist pending piece add intent: %v", err)
}
}

// Step 7: Respond with 201 Created and Location header
w.Header().Set("Location", path.Join("/pdp/data-sets/created", txHashLower))
w.WriteHeader(http.StatusCreated)
Expand Down
220 changes: 215 additions & 5 deletions tasks/pdp/proofset_create_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@ package pdp

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-commp-utils/nonffi"
commcid "github.com/filecoin-project/go-fil-commcid"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/filecoin-project/curio/lib/chainsched"
"github.com/filecoin-project/curio/pdp/contract"
"github.com/filecoin-project/curio/tasks/message"

chainTypes "github.com/filecoin-project/lotus/chain/types"
)
Expand All @@ -23,9 +32,9 @@ type DataSetCreate struct {
Service string `db:"service"`
}

func NewWatcherCreate(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chainsched.CurioChainSched) {
func NewWatcherCreate(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chainsched.CurioChainSched, sender *message.SenderETH) {
if err := pcs.AddHandler(func(ctx context.Context, revert, apply *chainTypes.TipSet) error {
err := processPendingDataSetCreates(ctx, db, ethClient)
err := processPendingDataSetCreates(ctx, db, ethClient, sender)
if err != nil {
log.Warnf("Failed to process pending data set creates: %v", err)
}
Expand All @@ -35,7 +44,7 @@ func NewWatcherCreate(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chains
}
}

func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client) error {
func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client, sender *message.SenderETH) error {
// Query for pdp_data_set_creates entries where ok = TRUE and data_set_created = FALSE
var dataSetCreates []DataSetCreate

Expand All @@ -60,7 +69,7 @@ func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClie
log.Infow("Processing data set create",
"txHash", psc.CreateMessageHash,
"service", psc.Service)
err := processDataSetCreate(ctx, db, psc, ethClient)
err := processDataSetCreate(ctx, db, psc, ethClient, sender)
if err != nil {
log.Warnf("Failed to process data set create for tx %s: %v", psc.CreateMessageHash, err)
continue
Expand All @@ -71,7 +80,7 @@ func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClie
return nil
}

func processDataSetCreate(ctx context.Context, db *harmonydb.DB, psc DataSetCreate, ethClient *ethclient.Client) error {
func processDataSetCreate(ctx context.Context, db *harmonydb.DB, psc DataSetCreate, ethClient *ethclient.Client, sender *message.SenderETH) error {
// Retrieve the tx_receipt from message_waits_eth
var txReceiptJSON []byte
log.Debugw("Fetching tx_receipt from message_waits_eth", "txHash", psc.CreateMessageHash)
Expand Down Expand Up @@ -134,9 +143,210 @@ func processDataSetCreate(ctx context.Context, db *harmonydb.DB, psc DataSetCrea
return xerrors.Errorf("failed to update data_set_creates for tx %s: %w", psc.CreateMessageHash, err)
}

var raw json.RawMessage
err = db.QueryRow(ctx, `SELECT payload FROM pdp_pending_piece_adds WHERE create_message_hash = $1 AND service = $2`, psc.CreateMessageHash, psc.Service).Scan(&raw)
if err == nil && len(raw) > 0 {
var req struct {
RecordKeeper string `json:"recordKeeper"`
ExtraData *string `json:"extraData,omitempty"`
Pieces *struct {
Pieces []struct {
PieceCID string `json:"pieceCid"`
SubPieces []struct {
SubPieceCID string `json:"subPieceCid"`
} `json:"subPieces"`
} `json:"pieces"`
ExtraData *string `json:"extraData,omitempty"`
} `json:"pieces,omitempty"`
}
if err := json.Unmarshal(raw, &req); err != nil {
return xerrors.Errorf("failed to decode pending piece intent: %w", err)
}
if req.Pieces == nil || len(req.Pieces.Pieces) == 0 {
_, _ = db.Exec(ctx, `DELETE FROM pdp_pending_piece_adds WHERE create_message_hash = $1`, psc.CreateMessageHash)
return nil
}

// Build pieceData array and sub-piece info similar to handler
type subInfo struct {
PaddedSize uint64
RawSize uint64
Offset uint64
RefID int64
}
subPieceInfoMap := make(map[string]*subInfo)

unique := make(map[string]struct{})
for _, p := range req.Pieces.Pieces {
for _, sp := range p.SubPieces {
unique[sp.SubPieceCID] = struct{}{}
}
}
subList := make([]string, 0, len(unique))
for k := range unique {
subList = append(subList, k)
}

rows, err := db.Query(ctx, `
SELECT ppr.piece_cid, ppr.id AS pdp_pieceref_id, pp.piece_padded_size, pp.piece_raw_size
FROM pdp_piecerefs ppr
JOIN parked_piece_refs pprf ON pprf.ref_id = ppr.piece_ref
JOIN parked_pieces pp ON pp.id = pprf.piece_id
WHERE ppr.service = $1 AND ppr.piece_cid = ANY($2)
`, psc.Service, subList)
if err != nil {
return xerrors.Errorf("failed subpiece lookup: %w", err)
}
for rows.Next() {
var cidStr string
var refID int64
var pad, raw uint64
if err := rows.Scan(&cidStr, &refID, &pad, &raw); err != nil {
return err
}
subPieceInfoMap[cidStr] = &subInfo{PaddedSize: pad, RawSize: raw, RefID: refID}
}
rows.Close()

type PieceData struct{ Data []byte }
var pieceDataArray []PieceData

type staged struct {
PieceCIDv1 string
SubCIDv1 string
Offset uint64
Size uint64
RefID int64
MsgIdx int
}
var stagedRows []staged

for msgIdx, p := range req.Pieces.Pieces {
var totalOffset uint64
c, err := cid.Decode(p.PieceCID)
if err != nil {
return xerrors.Errorf("invalid pieceCid: %w", err)
}
_, rawSize, err := commcid.PieceCidV1FromV2(c)
if err != nil {
return xerrors.Errorf("invalid commP v2: %w", err)
}

pieceInfos := make([]abi.PieceInfo, len(p.SubPieces))
var sum uint64
var prevSize uint64
for i, sp := range p.SubPieces {
info, ok := subPieceInfoMap[sp.SubPieceCID]
if !ok {
return fmt.Errorf("subPiece not found: %s", sp.SubPieceCID)
}
if i == 0 {
prevSize = info.PaddedSize
} else if info.PaddedSize > prevSize {
return fmt.Errorf("subPieces must be in descending size")
} else {
prevSize = info.PaddedSize
}

subPieceCid, err := cid.Decode(sp.SubPieceCID)
if err != nil {
return xerrors.Errorf("invalid subPieceCid %s: %w", sp.SubPieceCID, err)
}

pieceInfos[i] = abi.PieceInfo{
Size: abi.PaddedPieceSize(info.PaddedSize),
PieceCID: subPieceCid,
}

stagedRows = append(stagedRows, staged{PieceCIDv1: p.PieceCID, SubCIDv1: sp.SubPieceCID, Offset: totalOffset, Size: info.PaddedSize, RefID: info.RefID, MsgIdx: msgIdx})
totalOffset += info.PaddedSize
sum += info.RawSize
}
if sum != rawSize {
return fmt.Errorf("raw size mismatch")
}

proofType := abi.RegisteredSealProof_StackedDrg64GiBV1_1
generatedPieceCid, err := nonffi.GenerateUnsealedCID(proofType, pieceInfos)
if err != nil {
return xerrors.Errorf("failed to generate PieceCid: %w", err)
}
providedPieceCidv1, _, err := commcid.PieceCidV1FromV2(c)
if err != nil {
return xerrors.Errorf("invalid provided PieceCid: %w", err)
}
if !providedPieceCidv1.Equals(generatedPieceCid) {
return fmt.Errorf("provided PieceCid does not match generated PieceCid: %s != %s", providedPieceCidv1, generatedPieceCid)
}

pieceDataArray = append(pieceDataArray, PieceData{Data: c.Bytes()})
}

abiData, err := contract.PDPVerifierMetaData.GetAbi()
if err != nil {
return err
}

extraDataBytes := []byte{}
if req.Pieces.ExtraData != nil {
extraDataHexStr := *req.Pieces.ExtraData
decodedBytes, err := hex.DecodeString(strings.TrimPrefix(extraDataHexStr, "0x"))
if err != nil {
return xerrors.Errorf("failed to decode hex extraData: %w", err)
}
extraDataBytes = decodedBytes
}

data, err := abiData.Pack("addPieces", new(big.Int).SetUint64(dataSetId), pieceDataArray, extraDataBytes)
if err != nil {
return err
}

fromAddr, err := getSenderAddress(ctx, db)
if err != nil {
return err
}
tx := types.NewTransaction(0, contract.ContractAddresses().PDPVerifier, big.NewInt(0), 0, nil, data)
txHash, err := sender.Send(ctx, fromAddr, tx, "pdp-addpieces")
if err != nil {
return err
}

txLower := strings.ToLower(txHash.Hex())
_, err = db.BeginTransaction(ctx, func(txdb *harmonydb.Tx) (bool, error) {
if _, err := txdb.Exec(`INSERT INTO message_waits_eth (signed_tx_hash, tx_status) VALUES ($1, $2)`, txLower, "pending"); err != nil {
return false, err
}
for _, r := range stagedRows {
if _, err := txdb.Exec(`
INSERT INTO pdp_data_set_piece_adds (data_set, piece, add_message_hash, add_message_index, sub_piece, sub_piece_offset, sub_piece_size, pdp_pieceref)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
`, dataSetId, r.PieceCIDv1, txLower, r.MsgIdx, r.SubCIDv1, r.Offset, r.Size, r.RefID); err != nil {
return false, err
}
}
return true, nil
})
if err != nil {
return err
}
_, err = db.Exec(ctx, `DELETE FROM pdp_pending_piece_adds WHERE create_message_hash = $1`, psc.CreateMessageHash)
if err != nil {
log.Warnf("Failed to cleanup pending piece add intent for %s: %v", psc.CreateMessageHash, err)
}
}

Comment on lines +146 to +338
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this?

return nil
}

func getSenderAddress(ctx context.Context, db *harmonydb.DB) (common.Address, error) {
var addressStr string
if err := db.QueryRow(ctx, `SELECT address FROM eth_keys WHERE role = 'pdp' LIMIT 1`).Scan(&addressStr); err != nil {
return common.Address{}, err
}
return common.HexToAddress(addressStr), nil
}

func extractDataSetIdFromReceipt(receipt *types.Receipt) (uint64, error) {
pdpABI, err := contract.PDPVerifierMetaData.GetAbi()
if err != nil {
Expand Down