Skip to content

Conversation

virajbhartiya
Copy link
Member

Changes:

  • Extend POST /pdp/data-sets/ to accept optional pieces parameter
  • Add pdp_pending_piece_adds table to store combined-flow intents
  • Update dataset creation watcher to automatically submit addPieces transactions

Implementation details:

  • Handler stores pending piece intent when pieces payload provided
  • Watcher detects successful dataset creation and processes pending intents
  • Reuses existing validation logic from add-pieces handler

Closes #678

type RequestBody struct {
RecordKeeper string `json:"recordKeeper"`
ExtraData *string `json:"extraData,omitempty"`
Pieces *struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's type this struct rather than using an anonymous one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this is going to need to be mandatory, so don't make it a pointer; the old CreateDataSet was removed (I didn't realise that when I wrote the original issue)

@rvagg
Copy link
Member

rvagg commented Oct 8, 2025

@virajbhartiya please look at FilOzone/pdp#201 - we have a breaking change that we're adapting to here, instead of doing two operations, which you are still doing here, there's just one operation, createDataSet has been removed and addPieces has been extended to optionally take a new listener address so it can act as createDataSet as well.

So, you shouldn't need any "pending" pieces to add, it's all just done in one go. You need to remove the plain createDataSet path, and when the pieces are added, if there's not a data set to add them to, you need to add the listener and the data set ID should be 0. Look carefully at that PDP PR.

You'll also need to update the ABI here so it knows how to call the new contract.

Copy link
Contributor

@LexLuthr LexLuthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a better way to do this once I look at this PR. Instead of trying to handle things in a single place, we should keep them as is. Just add same tx to different tables. This will allow create watcher and add_piece watcher to track the same message for different things. You can add a check in add watcher so, it skips when create watcher is not done adding the dataSet itself. Let me know if this explanation does not make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this.

Comment on lines +146 to +338
var raw json.RawMessage
err = db.QueryRow(ctx, `SELECT payload FROM pdp_pending_piece_adds WHERE create_message_hash = $1 AND service = $2`, psc.CreateMessageHash, psc.Service).Scan(&raw)
if err == nil && len(raw) > 0 {
var req struct {
RecordKeeper string `json:"recordKeeper"`
ExtraData *string `json:"extraData,omitempty"`
Pieces *struct {
Pieces []struct {
PieceCID string `json:"pieceCid"`
SubPieces []struct {
SubPieceCID string `json:"subPieceCid"`
} `json:"subPieces"`
} `json:"pieces"`
ExtraData *string `json:"extraData,omitempty"`
} `json:"pieces,omitempty"`
}
if err := json.Unmarshal(raw, &req); err != nil {
return xerrors.Errorf("failed to decode pending piece intent: %w", err)
}
if req.Pieces == nil || len(req.Pieces.Pieces) == 0 {
_, _ = db.Exec(ctx, `DELETE FROM pdp_pending_piece_adds WHERE create_message_hash = $1`, psc.CreateMessageHash)
return nil
}

// Build pieceData array and sub-piece info similar to handler
type subInfo struct {
PaddedSize uint64
RawSize uint64
Offset uint64
RefID int64
}
subPieceInfoMap := make(map[string]*subInfo)

unique := make(map[string]struct{})
for _, p := range req.Pieces.Pieces {
for _, sp := range p.SubPieces {
unique[sp.SubPieceCID] = struct{}{}
}
}
subList := make([]string, 0, len(unique))
for k := range unique {
subList = append(subList, k)
}

rows, err := db.Query(ctx, `
SELECT ppr.piece_cid, ppr.id AS pdp_pieceref_id, pp.piece_padded_size, pp.piece_raw_size
FROM pdp_piecerefs ppr
JOIN parked_piece_refs pprf ON pprf.ref_id = ppr.piece_ref
JOIN parked_pieces pp ON pp.id = pprf.piece_id
WHERE ppr.service = $1 AND ppr.piece_cid = ANY($2)
`, psc.Service, subList)
if err != nil {
return xerrors.Errorf("failed subpiece lookup: %w", err)
}
for rows.Next() {
var cidStr string
var refID int64
var pad, raw uint64
if err := rows.Scan(&cidStr, &refID, &pad, &raw); err != nil {
return err
}
subPieceInfoMap[cidStr] = &subInfo{PaddedSize: pad, RawSize: raw, RefID: refID}
}
rows.Close()

type PieceData struct{ Data []byte }
var pieceDataArray []PieceData

type staged struct {
PieceCIDv1 string
SubCIDv1 string
Offset uint64
Size uint64
RefID int64
MsgIdx int
}
var stagedRows []staged

for msgIdx, p := range req.Pieces.Pieces {
var totalOffset uint64
c, err := cid.Decode(p.PieceCID)
if err != nil {
return xerrors.Errorf("invalid pieceCid: %w", err)
}
_, rawSize, err := commcid.PieceCidV1FromV2(c)
if err != nil {
return xerrors.Errorf("invalid commP v2: %w", err)
}

pieceInfos := make([]abi.PieceInfo, len(p.SubPieces))
var sum uint64
var prevSize uint64
for i, sp := range p.SubPieces {
info, ok := subPieceInfoMap[sp.SubPieceCID]
if !ok {
return fmt.Errorf("subPiece not found: %s", sp.SubPieceCID)
}
if i == 0 {
prevSize = info.PaddedSize
} else if info.PaddedSize > prevSize {
return fmt.Errorf("subPieces must be in descending size")
} else {
prevSize = info.PaddedSize
}

subPieceCid, err := cid.Decode(sp.SubPieceCID)
if err != nil {
return xerrors.Errorf("invalid subPieceCid %s: %w", sp.SubPieceCID, err)
}

pieceInfos[i] = abi.PieceInfo{
Size: abi.PaddedPieceSize(info.PaddedSize),
PieceCID: subPieceCid,
}

stagedRows = append(stagedRows, staged{PieceCIDv1: p.PieceCID, SubCIDv1: sp.SubPieceCID, Offset: totalOffset, Size: info.PaddedSize, RefID: info.RefID, MsgIdx: msgIdx})
totalOffset += info.PaddedSize
sum += info.RawSize
}
if sum != rawSize {
return fmt.Errorf("raw size mismatch")
}

proofType := abi.RegisteredSealProof_StackedDrg64GiBV1_1
generatedPieceCid, err := nonffi.GenerateUnsealedCID(proofType, pieceInfos)
if err != nil {
return xerrors.Errorf("failed to generate PieceCid: %w", err)
}
providedPieceCidv1, _, err := commcid.PieceCidV1FromV2(c)
if err != nil {
return xerrors.Errorf("invalid provided PieceCid: %w", err)
}
if !providedPieceCidv1.Equals(generatedPieceCid) {
return fmt.Errorf("provided PieceCid does not match generated PieceCid: %s != %s", providedPieceCidv1, generatedPieceCid)
}

pieceDataArray = append(pieceDataArray, PieceData{Data: c.Bytes()})
}

abiData, err := contract.PDPVerifierMetaData.GetAbi()
if err != nil {
return err
}

extraDataBytes := []byte{}
if req.Pieces.ExtraData != nil {
extraDataHexStr := *req.Pieces.ExtraData
decodedBytes, err := hex.DecodeString(strings.TrimPrefix(extraDataHexStr, "0x"))
if err != nil {
return xerrors.Errorf("failed to decode hex extraData: %w", err)
}
extraDataBytes = decodedBytes
}

data, err := abiData.Pack("addPieces", new(big.Int).SetUint64(dataSetId), pieceDataArray, extraDataBytes)
if err != nil {
return err
}

fromAddr, err := getSenderAddress(ctx, db)
if err != nil {
return err
}
tx := types.NewTransaction(0, contract.ContractAddresses().PDPVerifier, big.NewInt(0), 0, nil, data)
txHash, err := sender.Send(ctx, fromAddr, tx, "pdp-addpieces")
if err != nil {
return err
}

txLower := strings.ToLower(txHash.Hex())
_, err = db.BeginTransaction(ctx, func(txdb *harmonydb.Tx) (bool, error) {
if _, err := txdb.Exec(`INSERT INTO message_waits_eth (signed_tx_hash, tx_status) VALUES ($1, $2)`, txLower, "pending"); err != nil {
return false, err
}
for _, r := range stagedRows {
if _, err := txdb.Exec(`
INSERT INTO pdp_data_set_piece_adds (data_set, piece, add_message_hash, add_message_index, sub_piece, sub_piece_offset, sub_piece_size, pdp_pieceref)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
`, dataSetId, r.PieceCIDv1, txLower, r.MsgIdx, r.SubCIDv1, r.Offset, r.Size, r.RefID); err != nil {
return false, err
}
}
return true, nil
})
if err != nil {
return err
}
_, err = db.Exec(ctx, `DELETE FROM pdp_pending_piece_adds WHERE create_message_hash = $1`, psc.CreateMessageHash)
if err != nil {
log.Warnf("Failed to cleanup pending piece add intent for %s: %v", psc.CreateMessageHash, err)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this?

@LexLuthr
Copy link
Contributor

LexLuthr commented Oct 8, 2025

Also, I think #680 and this PR are related will need coordination. They are basically trying to do the same thing.

@rvagg
Copy link
Member

rvagg commented Oct 8, 2025

indeed, @virajbhartiya you should rebase on top of the phi/update-contract-calibnet branch to get the ABI updates that you need

@rvagg
Copy link
Member

rvagg commented Oct 8, 2025

Oh, its' actually #685 that goes further and steps on toes here. I'm going to comment in there @virajbhartiya, I think maybe you should help on that branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Update Curio to use the M3 FWSS contracts

3 participants