Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions cmd/pdptool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/minio/sha256-simd"
"github.com/schollz/progressbar/v3"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"

"github.com/filecoin-project/go-commp-utils/nonffi"
commcid "github.com/filecoin-project/go-fil-commcid"
Expand Down Expand Up @@ -66,6 +67,8 @@ func main() {
uploadFileCmd, // upload a file to a pdp service in many chunks
downloadFileCmd, // download a file from curio

streamingPieceUploadCmd, // upload a piece to a pdp service in streaming mode

createDataSetCmd, // create a new data set on the PDP service
getDataSetStatusCmd, // get the status of a data set creation on the PDP service
getDataSetCmd, // retrieve the details of a data set from the PDP service
Expand Down Expand Up @@ -1505,3 +1508,255 @@ var removePiecesCmd = &cli.Command{
return nil
},
}

var streamingPieceUploadCmd = &cli.Command{
Name: "upload",
Usage: "Upload a piece to a PDP service",
ArgsUsage: "<input-file>",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "service-url",
Usage: "URL of the PDP service",
Required: true,
},
&cli.StringFlag{
Name: "jwt-token",
Usage: "JWT token for authentication (optional if --service-name is provided)",
},
&cli.StringFlag{
Name: "service-name",
Usage: "Service Name to include in the JWT token (used if --jwt-token is not provided)",
},
&cli.StringFlag{
Name: "notify-url",
Usage: "Notification URL",
Required: false,
},
&cli.StringFlag{
Name: "hash-type",
Usage: "Hash type to use for verification (sha256 or commp)",
Value: "commp",
},
&cli.BoolFlag{
Name: "local-notif-wait",
Usage: "Wait for server notification by spawning a temporary local HTTP server",
},
},
Action: func(cctx *cli.Context) error {
inputFile := cctx.Args().Get(0)
if inputFile == "" {
return fmt.Errorf("input file is required")
}

serviceURL := cctx.String("service-url")
jwtToken := cctx.String("jwt-token")
notifyURL := cctx.String("notify-url")
serviceName := cctx.String("service-name")
hashType := cctx.String("hash-type")
localNotifWait := cctx.Bool("local-notif-wait")

if jwtToken == "" {
if serviceName == "" {
return fmt.Errorf("either --jwt-token or --service-name must be provided")
}
var err error
jwtToken, err = getJWTTokenForService(serviceName)
if err != nil {
return err
}
}

if hashType != "sha256" && hashType != "commp" {
return fmt.Errorf("invalid hash type: %s", hashType)
}

if localNotifWait && notifyURL != "" {
return fmt.Errorf("cannot specify both --notify-url and --local-notif-wait")
}

var notifyReceived chan struct{}
var err error

if localNotifWait {
notifyURL, notifyReceived, err = startLocalNotifyServer()
if err != nil {
return fmt.Errorf("failed to start local HTTP server: %v", err)
}
}

// Open the input file
file, err := os.Open(inputFile)
if err != nil {
return fmt.Errorf("failed to open input file: %v", err)
}
defer func() {
_ = file.Close()
}()

// Get the piece size
fi, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat input file: %v", err)
}
raw_size := fi.Size()

client := &http.Client{}

req, err := http.NewRequest("GET", serviceURL+"/pdp/piece/uploads", nil)
if err != nil {
return fmt.Errorf("failed to create upload request: %v", err)
}
if jwtToken != "" {
req.Header.Set("Authorization", "Bearer "+jwtToken)
}
req.Header.Set("Content-Type", "application/json")

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusCreated {
ret, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to get upload URL, status code %d, failed to read body %s", resp.StatusCode, err.Error())
}
return fmt.Errorf("failed to create upload, status code %d: %s", resp.StatusCode, string(ret))
}

location := resp.Header.Get("Location")
if location == "" {
return fmt.Errorf("failed to get upload URL, status code %d, no Location header", resp.StatusCode)
}

cp := commp.Calc{}

pipeReader, pipeWriter := io.Pipe()

// Set up a MultiWriter to write to both cp and the pipe
multiWriter := io.MultiWriter(&cp, pipeWriter)

// Create an error group to handle goroutines
var g errgroup.Group

// Start goroutine to read the file and write to the MultiWriter
g.Go(func() error {
defer func() {
_ = pipeWriter.Close() // Ensure the pipeWriter is closed
}()
n, err := io.Copy(multiWriter, file)
if err != nil {
return fmt.Errorf("failed to copy data to multiwriter: %v", err)
}
if n != raw_size {
return fmt.Errorf("failed to copy all data to multiwriter, only copied %d/%d bytes", n, raw_size)
}
return nil
})

// Start a goroutine to handle the HTTP request
g.Go(func() error {
defer func() {
_ = pipeReader.Close() // Ensure the pipeReader is closed
}()
// Prepare the HTTP request for file upload
req, err := http.NewRequest("PUT", serviceURL+location, pipeReader)
if err != nil {
return fmt.Errorf("failed to create upload request: %v", err)
}
if jwtToken != "" {
req.Header.Set("Authorization", "Bearer "+jwtToken)
}
req.Header.Set("Content-Type", "application/octet-stream")

// Execute the request
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send upload request: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusNoContent {
ret, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to upload, status code %d, failed to read body %s", resp.StatusCode, err.Error())
}
return fmt.Errorf("upload failed, status code %d: %s", resp.StatusCode, string(ret))
}
return nil
})

// Wait for all goroutines to complete
if err := g.Wait(); err != nil {
return fmt.Errorf("upload process failed: %v", err)
}

digest, _, err := cp.Digest()
if err != nil {
return fmt.Errorf("failed to calculate digest: %v", err)
}

pcid2, err := commcid.DataCommitmentToPieceCidv2(digest, uint64(raw_size))
if err != nil {
return fmt.Errorf("failed to compute piece CID: %v", err)
}

// At this point, the commp calculation is complete
fmt.Printf("CommP: %s\n", pcid2.String())

type finalize struct {
PieceCID string `json:"pieceCid"`
Notify string `json:"notify,omitempty"`
}

bd := finalize{
PieceCID: pcid2.String(),
}

if notifyURL != "" {
bd.Notify = notifyURL
}

bodyBytes, err := json.Marshal(bd)
if err != nil {
return fmt.Errorf("failed to marshal finalize request body: %v", err)
}

req, err = http.NewRequest("POST", serviceURL+location, bytes.NewBuffer(bodyBytes))
if err != nil {
return fmt.Errorf("failed to create finalize request: %v", err)
}
if jwtToken != "" {
req.Header.Set("Authorization", "Bearer "+jwtToken)
}
req.Header.Set("Content-Type", "application/json")

resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("failed to send finalize request: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()

if resp.StatusCode != http.StatusOK {
ret, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to finalize, status code %d, failed to read body %s", resp.StatusCode, err.Error())
}
return fmt.Errorf("failed to finalize, status code %d: %s", resp.StatusCode, string(ret))
}

fmt.Println("Piece uploaded successfully.")
if localNotifWait {
fmt.Println("Waiting for server notification...")
<-notifyReceived
}
return nil
},
}
2 changes: 1 addition & 1 deletion cuhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func attachRouters(ctx context.Context, r *chi.Mux, d *deps.Deps, sd *ServiceDep
libp2p.Router(r, rd)

if sd.EthSender != nil {
pdsvc := pdp.NewPDPService(d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender)
pdsvc := pdp.NewPDPService(ctx, d.DB, d.LocalStore, must.One(d.EthClient.Get()), d.Chain, sd.EthSender)
pdp.Routes(r, pdsvc)
}

Expand Down
14 changes: 14 additions & 0 deletions harmony/harmonydb/sql/20250930-streaming-upload.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE pdp_piece_streaming_uploads (
id UUID PRIMARY KEY NOT NULL,
service TEXT NOT NULL, -- pdp_services.id

piece_cid TEXT, -- piece cid v1
piece_size BIGINT,
raw_size BIGINT,

piece_ref BIGINT, -- packed_piece_refs.ref_id

created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()),
complete bool,
completed_at TIMESTAMPTZ
);
2 changes: 1 addition & 1 deletion lib/proof/merkle_sha254_memtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/fr32"
)

const MaxMemtreeSize = 256 << 20
const MaxMemtreeSize = 1 << 30

// BuildSha254Memtree builds a sha256 memtree from the input data
// Returned slice should be released to the pool after use
Expand Down
51 changes: 49 additions & 2 deletions pdp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -52,8 +53,8 @@ type PDPServiceNodeApi interface {
}

// NewPDPService creates a new instance of PDPService with the provided stores
func NewPDPService(db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService {
return &PDPService{
func NewPDPService(ctx context.Context, db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client, fc PDPServiceNodeApi, sn *message.SenderETH) *PDPService {
p := &PDPService{
Auth: &NullAuth{},
db: db,
storage: stor,
Expand All @@ -62,6 +63,9 @@ func NewPDPService(db *harmonydb.DB, stor paths.StashStore, ec *ethclient.Client
ethClient: ec,
filClient: fc,
}

go p.cleanup(ctx)
return p
}

// Routes registers the HTTP routes with the provided router
Expand Down Expand Up @@ -113,6 +117,15 @@ func Routes(r *chi.Mux, p *PDPService) {

// PUT /pdp/piece/upload/{uploadUUID}
r.Put(path.Join(PDPRoutePath, "/piece/upload/{uploadUUID}"), p.handlePieceUpload)

// POST /pdp/piece/uploads
r.Post(path.Join(PDPRoutePath, "/piece/uploads"), p.handleStreamingUploadURL)

// PUT /pdp/piece/uploads/{uploadUUID}
r.Put(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleStreamingUpload)

// POST /pdp/piece/uploads/{uploadUUID}
r.Post(path.Join(PDPRoutePath, "/piece/uploads/{uploadUUID}"), p.handleFinalizeStreamingUpload)
}

// Handler functions
Expand Down Expand Up @@ -908,6 +921,7 @@ func (p *PDPService) handleAddPieceToDataSet(w http.ResponseWriter, r *http.Requ
}
if height > 50 {
http.Error(w, "Invalid height", http.StatusBadRequest)
return
}

// Get raw size by summing up the sizes of subPieces
Expand Down Expand Up @@ -1590,3 +1604,36 @@ func asPieceCIDv2(cidStr string, size uint64) (cid.Cid, uint64, error) {
return cid.Undef, 0, fmt.Errorf("unsupported piece CID type: %d", pieceCid.Prefix().MhType)
}
}

func (p *PDPService) cleanup(ctx context.Context) {
rm := func(ctx context.Context, db *harmonydb.DB) {

var RefIDs []int64

err := db.QueryRow(ctx, `SELECT COALESCE(array_agg(ref_id), '{}') AS ref_ids
FROM pdp_piece_streaming_uploads
WHERE complete = TRUE
AND completed_at <= TIMEZONE('UTC', NOW()) - INTERVAL '60 minutes';`).Scan(&RefIDs)
if err != nil {
log.Errorw("failed to get non-finalized uploads", "error", err)
}

if len(RefIDs) > 0 {
_, err := db.Exec(ctx, `DELETE FROM parked_piece_refs WHERE ref_id = ANY($1);`, RefIDs)
if err != nil {
log.Errorw("failed to delete non-finalized uploads", "error", err)
}
}
}

ticker := time.NewTicker(time.Minute * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rm(ctx, p.db)
case <-ctx.Done():
return
}
}
}
Loading