Skip to content

Commit a6e5f72

Browse files
committed
feat(pdp): new PieceCID-last flow (internal only)
First pass at supporting the new flow @ filecoin-project/curio#668 Second pass at this would expose the stream up through the SDK so you're not forced to pass down byte arrays. For now, this is just internal and hides the details outside of PDPServer.
1 parent eeaf91d commit a6e5f72

File tree

6 files changed

+170
-100
lines changed

6 files changed

+170
-100
lines changed

packages/synapse-sdk/src/pdp/auth.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,10 @@ export class PDPAuthHelper {
209209
metadata: MetadataEntry[] = []
210210
): Promise<AuthSignature> {
211211
let signature: string
212-
const types = { CreateDataSet: EIP712_TYPES.CreateDataSet, MetadataEntry: EIP712_TYPES.MetadataEntry }
212+
const types = {
213+
CreateDataSet: EIP712_TYPES.CreateDataSet,
214+
MetadataEntry: EIP712_TYPES.MetadataEntry,
215+
}
213216

214217
// Check if we should use MetaMask-friendly signing
215218
const useMetaMask = await this.isMetaMaskSigner()

packages/synapse-sdk/src/pdp/server.ts

Lines changed: 65 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*/
2828

2929
import { ethers } from 'ethers'
30-
import { asPieceCID, calculate as calculatePieceCID, downloadAndValidate } from '../piece/index.ts'
30+
import { asPieceCID, createPieceCIDStream, downloadAndValidate } from '../piece/index.ts'
3131
import type { DataSetData, MetadataEntry, PieceCID } from '../types.ts'
3232
import { validateDataSetMetadata, validatePieceMetadata } from '../utils/metadata.ts'
3333
import { constructFindPieceUrl, constructPieceUrl } from '../utils/piece.ts'
@@ -444,38 +444,19 @@ export class PDPServer {
444444
async uploadPiece(data: Uint8Array | ArrayBuffer): Promise<UploadResponse> {
445445
// Convert ArrayBuffer to Uint8Array if needed
446446
const uint8Data = data instanceof ArrayBuffer ? new Uint8Array(data) : data
447-
448-
// Calculate PieceCID
449-
performance.mark('synapse:calculatePieceCID-start')
450-
const pieceCid = calculatePieceCID(uint8Data)
451-
performance.mark('synapse:calculatePieceCID-end')
452-
performance.measure('synapse:calculatePieceCID', 'synapse:calculatePieceCID-start', 'synapse:calculatePieceCID-end')
453447
const size = uint8Data.length
454448

455-
const requestBody = {
456-
pieceCid: pieceCid.toString(),
457-
// No notify URL needed
458-
}
459-
460-
// Create upload session or check if piece exists
461-
performance.mark('synapse:POST.pdp.piece-start')
462-
const createResponse = await fetch(`${this._serviceURL}/pdp/piece`, {
449+
// Create upload session
450+
performance.mark('synapse:POST.pdp.piece.uploads-start')
451+
const createResponse = await fetch(`${this._serviceURL}/pdp/piece/uploads`, {
463452
method: 'POST',
464-
headers: {
465-
'Content-Type': 'application/json',
466-
},
467-
body: JSON.stringify(requestBody),
468453
})
469-
performance.mark('synapse:POST.pdp.piece-end')
470-
performance.measure('synapse:POST.pdp.piece', 'synapse:POST.pdp.piece-start', 'synapse:POST.pdp.piece-end')
471-
472-
if (createResponse.status === 200) {
473-
// Piece already exists on server
474-
return {
475-
pieceCid,
476-
size,
477-
}
478-
}
454+
performance.mark('synapse:POST.pdp.piece.uploads-end')
455+
performance.measure(
456+
'synapse:POST.pdp.piece.uploads',
457+
'synapse:POST.pdp.piece.uploads-start',
458+
'synapse:POST.pdp.piece.uploads-end'
459+
)
479460

480461
if (createResponse.status !== 201) {
481462
const errorText = await createResponse.text()
@@ -491,37 +472,80 @@ export class PDPServer {
491472
}
492473

493474
// Validate the location format and extract UUID
494-
// Match /pdp/piece/upload/UUID or /piece/upload/UUID anywhere in the path
495-
const locationMatch = location.match(/\/(?:pdp\/)?piece\/upload\/([a-fA-F0-9-]+)/)
475+
// Match /pdp/piece/uploads/UUID anywhere in the path
476+
const locationMatch = location.match(/\/pdp\/piece\/uploads\/([a-fA-F0-9-]+)/)
496477
if (locationMatch == null) {
497478
throw new Error(`Invalid Location header format: ${location}`)
498479
}
499480

500481
const uploadUuid = locationMatch[1] // Extract just the UUID
501482

502-
// Upload the data
503-
performance.mark('synapse:PUT.pdp.piece.upload-start')
504-
const uploadResponse = await fetch(`${this._serviceURL}/pdp/piece/upload/${uploadUuid}`, {
483+
// Create streaming CommP calculator
484+
const { stream: pieceCidStream, getPieceCID } = createPieceCIDStream()
485+
486+
// Convert Uint8Array to ReadableStream for streaming through the CommP calculator
487+
const dataStream = new ReadableStream({
488+
start(controller) {
489+
controller.enqueue(uint8Data)
490+
controller.close()
491+
},
492+
})
493+
494+
// Upload the data while calculating CommP in parallel
495+
performance.mark('synapse:PUT.pdp.piece.uploads-start')
496+
const uploadResponse = await fetch(`${this._serviceURL}/pdp/piece/uploads/${uploadUuid}`, {
505497
method: 'PUT',
506498
headers: {
507499
'Content-Type': 'application/octet-stream',
508500
'Content-Length': uint8Data.length.toString(),
509-
// No Authorization header needed
510501
},
511-
body: uint8Data,
512-
})
513-
performance.mark('synapse:PUT.pdp.piece.upload-end')
502+
body: dataStream.pipeThrough(pieceCidStream),
503+
duplex: 'half',
504+
} as RequestInit)
505+
performance.mark('synapse:PUT.pdp.piece.uploads-end')
514506
performance.measure(
515-
'synapse:PUT.pdp.piece.upload',
516-
'synapse:PUT.pdp.piece.upload-start',
517-
'synapse:PUT.pdp.piece.upload-end'
507+
'synapse:PUT.pdp.piece.uploads',
508+
'synapse:PUT.pdp.piece.uploads-start',
509+
'synapse:PUT.pdp.piece.uploads-end'
518510
)
519511

520512
if (uploadResponse.status !== 204) {
521513
const errorText = await uploadResponse.text()
522514
throw new Error(`Failed to upload piece: ${uploadResponse.status} ${uploadResponse.statusText} - ${errorText}`)
523515
}
524516

517+
// Get the calculated PieceCID (available after stream completes)
518+
const pieceCid = getPieceCID()
519+
if (pieceCid == null) {
520+
throw new Error('Failed to calculate PieceCID during upload')
521+
}
522+
523+
// Finalize the upload with CommP validation
524+
performance.mark('synapse:POST.pdp.piece.uploads.finalize-start')
525+
const finalizeResponse = await fetch(`${this._serviceURL}/pdp/piece/uploads/${uploadUuid}`, {
526+
method: 'POST',
527+
headers: {
528+
'Content-Type': 'application/json',
529+
},
530+
body: JSON.stringify({
531+
pieceCid: pieceCid.toString(),
532+
// notify field is optional - we don't use it
533+
}),
534+
})
535+
performance.mark('synapse:POST.pdp.piece.uploads.finalize-end')
536+
performance.measure(
537+
'synapse:POST.pdp.piece.uploads.finalize',
538+
'synapse:POST.pdp.piece.uploads.finalize-start',
539+
'synapse:POST.pdp.piece.uploads.finalize-end'
540+
)
541+
542+
if (finalizeResponse.status !== 200) {
543+
const errorText = await finalizeResponse.text()
544+
throw new Error(
545+
`Failed to finalize upload: ${finalizeResponse.status} ${finalizeResponse.statusText} - ${errorText}`
546+
)
547+
}
548+
525549
return {
526550
pieceCid,
527551
size,

packages/synapse-sdk/src/test/pdp-server.test.ts

Lines changed: 88 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -542,26 +542,23 @@ describe('PDPServer', () => {
542542
const mockUuid = '12345678-90ab-cdef-1234-567890abcdef'
543543

544544
server.use(
545-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async ({ request }) => {
546-
try {
547-
const body = await request.json()
548-
assert.exists(body.pieceCid)
549-
return HttpResponse.text('Created', {
550-
status: 201,
551-
headers: {
552-
Location: `/pdp/piece/upload/${mockUuid}`,
553-
},
554-
})
555-
} catch (error) {
556-
return HttpResponse.text((error as Error).message, {
557-
status: 400,
558-
})
559-
}
545+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
546+
return HttpResponse.text('Created', {
547+
status: 201,
548+
headers: {
549+
Location: `/pdp/piece/uploads/${mockUuid}`,
550+
},
551+
})
560552
}),
561-
http.put('http://pdp.local/pdp/piece/upload/:uuid', async () => {
553+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
562554
return HttpResponse.text('No Content', {
563555
status: 204,
564556
})
557+
}),
558+
http.post('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
559+
return HttpResponse.text('OK', {
560+
status: 200,
561+
})
565562
})
566563
)
567564

@@ -577,26 +574,23 @@ describe('PDPServer', () => {
577574
const mockUuid = 'fedcba09-8765-4321-fedc-ba0987654321'
578575

579576
server.use(
580-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async ({ request }) => {
581-
try {
582-
const body = await request.json()
583-
assert.exists(body.pieceCid)
584-
return HttpResponse.text('Created', {
585-
status: 201,
586-
headers: {
587-
Location: `/pdp/piece/upload/${mockUuid}`,
588-
},
589-
})
590-
} catch (error) {
591-
return HttpResponse.text((error as Error).message, {
592-
status: 400,
593-
})
594-
}
577+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
578+
return HttpResponse.text('Created', {
579+
status: 201,
580+
headers: {
581+
Location: `/pdp/piece/uploads/${mockUuid}`,
582+
},
583+
})
595584
}),
596-
http.put('http://pdp.local/pdp/piece/upload/:uuid', async () => {
585+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
597586
return HttpResponse.text('No Content', {
598587
status: 204,
599588
})
589+
}),
590+
http.post('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
591+
return HttpResponse.text('OK', {
592+
status: 200,
593+
})
600594
})
601595
)
602596

@@ -605,33 +599,42 @@ describe('PDPServer', () => {
605599
assert.equal(result.size, 5)
606600
})
607601

608-
it('should handle existing piece (200 response)', async () => {
602+
it('should throw on create upload session error', async () => {
609603
const testData = new Uint8Array([1, 2, 3, 4, 5])
610-
const mockPieceCid = 'bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy'
611604

612605
server.use(
613-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async () => {
614-
return HttpResponse.json(
615-
{ pieceCid: mockPieceCid },
616-
{
617-
status: 200,
618-
}
619-
)
606+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
607+
return HttpResponse.text('Database error', {
608+
status: 500,
609+
})
620610
})
621611
)
622612

623-
// Should not throw - existing piece is OK
624-
const result = await pdpServer.uploadPiece(testData)
625-
assert.exists(result.pieceCid)
626-
assert.equal(result.size, 5)
613+
try {
614+
await pdpServer.uploadPiece(testData)
615+
assert.fail('Should have thrown error')
616+
} catch (error: any) {
617+
assert.include(error.message, 'Failed to create upload session')
618+
assert.include(error.message, '500')
619+
assert.include(error.message, 'Database error')
620+
}
627621
})
628622

629-
it('should throw on create upload session error', async () => {
623+
it('should throw on upload error', async () => {
630624
const testData = new Uint8Array([1, 2, 3, 4, 5])
625+
const mockUuid = '12345678-90ab-cdef-1234-567890abcdef'
631626

632627
server.use(
633-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async () => {
634-
return HttpResponse.text('Database error', {
628+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
629+
return HttpResponse.text('Created', {
630+
status: 201,
631+
headers: {
632+
Location: `/pdp/piece/uploads/${mockUuid}`,
633+
},
634+
})
635+
}),
636+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
637+
return HttpResponse.text('Upload failed', {
635638
status: 500,
636639
})
637640
})
@@ -641,9 +644,44 @@ describe('PDPServer', () => {
641644
await pdpServer.uploadPiece(testData)
642645
assert.fail('Should have thrown error')
643646
} catch (error: any) {
644-
assert.include(error.message, 'Failed to create upload session')
647+
assert.include(error.message, 'Failed to upload piece')
645648
assert.include(error.message, '500')
646-
assert.include(error.message, 'Database error')
649+
assert.include(error.message, 'Upload failed')
650+
}
651+
})
652+
653+
it('should throw on finalization error', async () => {
654+
const testData = new Uint8Array([1, 2, 3, 4, 5])
655+
const mockUuid = '12345678-90ab-cdef-1234-567890abcdef'
656+
657+
server.use(
658+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
659+
return HttpResponse.text('Created', {
660+
status: 201,
661+
headers: {
662+
Location: `/pdp/piece/uploads/${mockUuid}`,
663+
},
664+
})
665+
}),
666+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
667+
return HttpResponse.text('No Content', {
668+
status: 204,
669+
})
670+
}),
671+
http.post('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
672+
return HttpResponse.text('CommP mismatch', {
673+
status: 400,
674+
})
675+
})
676+
)
677+
678+
try {
679+
await pdpServer.uploadPiece(testData)
680+
assert.fail('Should have thrown error')
681+
} catch (error: any) {
682+
assert.include(error.message, 'Failed to finalize upload')
683+
assert.include(error.message, '400')
684+
assert.include(error.message, 'CommP mismatch')
647685
}
648686
})
649687
})

packages/synapse-sdk/src/utils/constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ export const CONTRACT_ADDRESSES = {
319319
*/
320320
WARM_STORAGE: {
321321
mainnet: '0x81DFD9813aDd354f03704F31419b0c6268d46232',
322-
calibration: '0x80617b65FD2EEa1D7fDe2B4F85977670690ed348',
322+
calibration: '0xbe7027f4e84B4a261A531eFD1c4ad878FC4B5C77',
323323
} as const satisfies Record<FilecoinNetworkType, string>,
324324

325325
/**

utils/example-storage-e2e.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@
1818
* PRIVATE_KEY=0x... node example-storage-e2e.js <file-path>
1919
*/
2020

21+
import { ethers } from 'ethers'
22+
import { readFile } from 'fs/promises'
2123
import {
2224
ADD_PIECES_TYPEHASH,
2325
CREATE_DATA_SET_TYPEHASH,
2426
PDP_PERMISSION_NAMES,
2527
SIZE_CONSTANTS,
2628
Synapse,
2729
TIME_CONSTANTS,
28-
} from '@filoz/synapse-sdk'
29-
import { ethers } from 'ethers'
30-
import { readFile } from 'fs/promises'
30+
} from '../packages/synapse-sdk/dist/src/index.js'
3131

3232
// Configuration from environment
3333
const PRIVATE_KEY = process.env.PRIVATE_KEY

utils/post-deploy-setup.js

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,15 @@
9393
*/
9494

9595
import { ethers } from 'ethers'
96-
import { PaymentsService } from '../dist/payments/service.js'
97-
import { SPRegistryService } from '../dist/sp-registry/service.js'
98-
import { CONTRACT_ADDRESSES, RPC_URLS, TIME_CONSTANTS, TOKENS } from '../dist/utils/constants.js'
99-
import { WarmStorageService } from '../dist/warm-storage/service.js'
96+
import { PaymentsService } from '../packages/synapse-sdk/dist/src/payments/service.js'
97+
import { SPRegistryService } from '../packages/synapse-sdk/dist/src/sp-registry/service.js'
98+
import {
99+
CONTRACT_ADDRESSES,
100+
RPC_URLS,
101+
TIME_CONSTANTS,
102+
TOKENS,
103+
} from '../packages/synapse-sdk/dist/src/utils/constants.js'
104+
import { WarmStorageService } from '../packages/synapse-sdk/dist/src/warm-storage/service.js'
100105

101106
// Constants for payment approvals
102107
const RATE_ALLOWANCE_PER_EPOCH = ethers.parseUnits('0.1', 18) // 0.1 USDFC per epoch

0 commit comments

Comments
 (0)