diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index 9d60a1a43..35212bc85 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -292,7 +292,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan es := getSenderEth() sdeps.EthSender = es - pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched) + pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched, es) pdp.NewWatcherPieceAdd(db, must.One(dependencies.EthClient.Val()), chainSched) pdpProveTask := pdp.NewProveTask(chainSched, db, must.One(dependencies.EthClient.Val()), dependencies.Chain, es, dependencies.CachedPieceReader) diff --git a/harmony/harmonydb/sql/20251007-pdp-upload-and-create.sql b/harmony/harmonydb/sql/20251007-pdp-upload-and-create.sql new file mode 100644 index 000000000..70b70d30c --- /dev/null +++ b/harmony/harmonydb/sql/20251007-pdp-upload-and-create.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS pdp_pending_piece_adds ( + create_message_hash TEXT PRIMARY KEY, + service TEXT NOT NULL, + payload JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_pdp_pending_piece_adds_service ON pdp_pending_piece_adds(service); + + diff --git a/pdp/handlers.go b/pdp/handlers.go index 49924b5ef..143fb0ff2 100644 --- a/pdp/handlers.go +++ b/pdp/handlers.go @@ -144,6 +144,15 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) type RequestBody struct { RecordKeeper string `json:"recordKeeper"` ExtraData *string `json:"extraData,omitempty"` + Pieces *struct { + Pieces []struct { + PieceCID string `json:"pieceCid"` + SubPieces []struct { + SubPieceCID string `json:"subPieceCid"` + } `json:"subPieces"` + } `json:"pieces"` + ExtraData *string `json:"extraData,omitempty"` + } `json:"pieces,omitempty"` } body, err := io.ReadAll(r.Body) @@ -245,6 +254,17 @@ func (p *PDPService) handleCreateDataSet(w http.ResponseWriter, r *http.Request) return } + if reqBody.Pieces != nil { + _, err := p.db.Exec(ctx, ` + INSERT INTO pdp_pending_piece_adds (create_message_hash, service, payload) + VALUES ($1, $2, $3) + ON CONFLICT (create_message_hash) DO UPDATE SET payload = EXCLUDED.payload + `, txHashLower, serviceLabel, body) + if err != nil { + log.Errorf("Failed to persist pending piece add intent: %v", err) + } + } + // Step 7: Respond with 201 Created and Location header w.Header().Set("Location", path.Join("/pdp/data-sets/created", txHashLower)) w.WriteHeader(http.StatusCreated) diff --git a/tasks/pdp/proofset_create_watch.go b/tasks/pdp/proofset_create_watch.go index 7d8432ce4..2d11d5628 100644 --- a/tasks/pdp/proofset_create_watch.go +++ b/tasks/pdp/proofset_create_watch.go @@ -2,18 +2,27 @@ package pdp import ( "context" + "encoding/hex" "encoding/json" + "fmt" "math/big" + "strings" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" + "github.com/filecoin-project/go-commp-utils/nonffi" + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/lib/chainsched" "github.com/filecoin-project/curio/pdp/contract" + "github.com/filecoin-project/curio/tasks/message" chainTypes "github.com/filecoin-project/lotus/chain/types" ) @@ -23,9 +32,9 @@ type DataSetCreate struct { Service string `db:"service"` } -func NewWatcherCreate(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chainsched.CurioChainSched) { +func NewWatcherCreate(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chainsched.CurioChainSched, sender *message.SenderETH) { if err := pcs.AddHandler(func(ctx context.Context, revert, apply *chainTypes.TipSet) error { - err := processPendingDataSetCreates(ctx, db, ethClient) + err := processPendingDataSetCreates(ctx, db, ethClient, sender) if err != nil { log.Warnf("Failed to process pending data set creates: %v", err) } @@ -35,7 +44,7 @@ func NewWatcherCreate(db *harmonydb.DB, ethClient *ethclient.Client, pcs *chains } } -func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client) error { +func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClient *ethclient.Client, sender *message.SenderETH) error { // Query for pdp_data_set_creates entries where ok = TRUE and data_set_created = FALSE var dataSetCreates []DataSetCreate @@ -60,7 +69,7 @@ func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClie log.Infow("Processing data set create", "txHash", psc.CreateMessageHash, "service", psc.Service) - err := processDataSetCreate(ctx, db, psc, ethClient) + err := processDataSetCreate(ctx, db, psc, ethClient, sender) if err != nil { log.Warnf("Failed to process data set create for tx %s: %v", psc.CreateMessageHash, err) continue @@ -71,7 +80,7 @@ func processPendingDataSetCreates(ctx context.Context, db *harmonydb.DB, ethClie return nil } -func processDataSetCreate(ctx context.Context, db *harmonydb.DB, psc DataSetCreate, ethClient *ethclient.Client) error { +func processDataSetCreate(ctx context.Context, db *harmonydb.DB, psc DataSetCreate, ethClient *ethclient.Client, sender *message.SenderETH) error { // Retrieve the tx_receipt from message_waits_eth var txReceiptJSON []byte log.Debugw("Fetching tx_receipt from message_waits_eth", "txHash", psc.CreateMessageHash) @@ -134,9 +143,210 @@ func processDataSetCreate(ctx context.Context, db *harmonydb.DB, psc DataSetCrea return xerrors.Errorf("failed to update data_set_creates for tx %s: %w", psc.CreateMessageHash, err) } + var raw json.RawMessage + err = db.QueryRow(ctx, `SELECT payload FROM pdp_pending_piece_adds WHERE create_message_hash = $1 AND service = $2`, psc.CreateMessageHash, psc.Service).Scan(&raw) + if err == nil && len(raw) > 0 { + var req struct { + RecordKeeper string `json:"recordKeeper"` + ExtraData *string `json:"extraData,omitempty"` + Pieces *struct { + Pieces []struct { + PieceCID string `json:"pieceCid"` + SubPieces []struct { + SubPieceCID string `json:"subPieceCid"` + } `json:"subPieces"` + } `json:"pieces"` + ExtraData *string `json:"extraData,omitempty"` + } `json:"pieces,omitempty"` + } + if err := json.Unmarshal(raw, &req); err != nil { + return xerrors.Errorf("failed to decode pending piece intent: %w", err) + } + if req.Pieces == nil || len(req.Pieces.Pieces) == 0 { + _, _ = db.Exec(ctx, `DELETE FROM pdp_pending_piece_adds WHERE create_message_hash = $1`, psc.CreateMessageHash) + return nil + } + + // Build pieceData array and sub-piece info similar to handler + type subInfo struct { + PaddedSize uint64 + RawSize uint64 + Offset uint64 + RefID int64 + } + subPieceInfoMap := make(map[string]*subInfo) + + unique := make(map[string]struct{}) + for _, p := range req.Pieces.Pieces { + for _, sp := range p.SubPieces { + unique[sp.SubPieceCID] = struct{}{} + } + } + subList := make([]string, 0, len(unique)) + for k := range unique { + subList = append(subList, k) + } + + rows, err := db.Query(ctx, ` + SELECT ppr.piece_cid, ppr.id AS pdp_pieceref_id, pp.piece_padded_size, pp.piece_raw_size + FROM pdp_piecerefs ppr + JOIN parked_piece_refs pprf ON pprf.ref_id = ppr.piece_ref + JOIN parked_pieces pp ON pp.id = pprf.piece_id + WHERE ppr.service = $1 AND ppr.piece_cid = ANY($2) + `, psc.Service, subList) + if err != nil { + return xerrors.Errorf("failed subpiece lookup: %w", err) + } + for rows.Next() { + var cidStr string + var refID int64 + var pad, raw uint64 + if err := rows.Scan(&cidStr, &refID, &pad, &raw); err != nil { + return err + } + subPieceInfoMap[cidStr] = &subInfo{PaddedSize: pad, RawSize: raw, RefID: refID} + } + rows.Close() + + type PieceData struct{ Data []byte } + var pieceDataArray []PieceData + + type staged struct { + PieceCIDv1 string + SubCIDv1 string + Offset uint64 + Size uint64 + RefID int64 + MsgIdx int + } + var stagedRows []staged + + for msgIdx, p := range req.Pieces.Pieces { + var totalOffset uint64 + c, err := cid.Decode(p.PieceCID) + if err != nil { + return xerrors.Errorf("invalid pieceCid: %w", err) + } + _, rawSize, err := commcid.PieceCidV1FromV2(c) + if err != nil { + return xerrors.Errorf("invalid commP v2: %w", err) + } + + pieceInfos := make([]abi.PieceInfo, len(p.SubPieces)) + var sum uint64 + var prevSize uint64 + for i, sp := range p.SubPieces { + info, ok := subPieceInfoMap[sp.SubPieceCID] + if !ok { + return fmt.Errorf("subPiece not found: %s", sp.SubPieceCID) + } + if i == 0 { + prevSize = info.PaddedSize + } else if info.PaddedSize > prevSize { + return fmt.Errorf("subPieces must be in descending size") + } else { + prevSize = info.PaddedSize + } + + subPieceCid, err := cid.Decode(sp.SubPieceCID) + if err != nil { + return xerrors.Errorf("invalid subPieceCid %s: %w", sp.SubPieceCID, err) + } + + pieceInfos[i] = abi.PieceInfo{ + Size: abi.PaddedPieceSize(info.PaddedSize), + PieceCID: subPieceCid, + } + + stagedRows = append(stagedRows, staged{PieceCIDv1: p.PieceCID, SubCIDv1: sp.SubPieceCID, Offset: totalOffset, Size: info.PaddedSize, RefID: info.RefID, MsgIdx: msgIdx}) + totalOffset += info.PaddedSize + sum += info.RawSize + } + if sum != rawSize { + return fmt.Errorf("raw size mismatch") + } + + proofType := abi.RegisteredSealProof_StackedDrg64GiBV1_1 + generatedPieceCid, err := nonffi.GenerateUnsealedCID(proofType, pieceInfos) + if err != nil { + return xerrors.Errorf("failed to generate PieceCid: %w", err) + } + providedPieceCidv1, _, err := commcid.PieceCidV1FromV2(c) + if err != nil { + return xerrors.Errorf("invalid provided PieceCid: %w", err) + } + if !providedPieceCidv1.Equals(generatedPieceCid) { + return fmt.Errorf("provided PieceCid does not match generated PieceCid: %s != %s", providedPieceCidv1, generatedPieceCid) + } + + pieceDataArray = append(pieceDataArray, PieceData{Data: c.Bytes()}) + } + + abiData, err := contract.PDPVerifierMetaData.GetAbi() + if err != nil { + return err + } + + extraDataBytes := []byte{} + if req.Pieces.ExtraData != nil { + extraDataHexStr := *req.Pieces.ExtraData + decodedBytes, err := hex.DecodeString(strings.TrimPrefix(extraDataHexStr, "0x")) + if err != nil { + return xerrors.Errorf("failed to decode hex extraData: %w", err) + } + extraDataBytes = decodedBytes + } + + data, err := abiData.Pack("addPieces", new(big.Int).SetUint64(dataSetId), pieceDataArray, extraDataBytes) + if err != nil { + return err + } + + fromAddr, err := getSenderAddress(ctx, db) + if err != nil { + return err + } + tx := types.NewTransaction(0, contract.ContractAddresses().PDPVerifier, big.NewInt(0), 0, nil, data) + txHash, err := sender.Send(ctx, fromAddr, tx, "pdp-addpieces") + if err != nil { + return err + } + + txLower := strings.ToLower(txHash.Hex()) + _, err = db.BeginTransaction(ctx, func(txdb *harmonydb.Tx) (bool, error) { + if _, err := txdb.Exec(`INSERT INTO message_waits_eth (signed_tx_hash, tx_status) VALUES ($1, $2)`, txLower, "pending"); err != nil { + return false, err + } + for _, r := range stagedRows { + if _, err := txdb.Exec(` + INSERT INTO pdp_data_set_piece_adds (data_set, piece, add_message_hash, add_message_index, sub_piece, sub_piece_offset, sub_piece_size, pdp_pieceref) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8) + `, dataSetId, r.PieceCIDv1, txLower, r.MsgIdx, r.SubCIDv1, r.Offset, r.Size, r.RefID); err != nil { + return false, err + } + } + return true, nil + }) + if err != nil { + return err + } + _, err = db.Exec(ctx, `DELETE FROM pdp_pending_piece_adds WHERE create_message_hash = $1`, psc.CreateMessageHash) + if err != nil { + log.Warnf("Failed to cleanup pending piece add intent for %s: %v", psc.CreateMessageHash, err) + } + } + return nil } +func getSenderAddress(ctx context.Context, db *harmonydb.DB) (common.Address, error) { + var addressStr string + if err := db.QueryRow(ctx, `SELECT address FROM eth_keys WHERE role = 'pdp' LIMIT 1`).Scan(&addressStr); err != nil { + return common.Address{}, err + } + return common.HexToAddress(addressStr), nil +} + func extractDataSetIdFromReceipt(receipt *types.Receipt) (uint64, error) { pdpABI, err := contract.PDPVerifierMetaData.GetAbi() if err != nil {