Skip to content

Commit 8a4c3e9

Browse files
committed
feat(pdp): new PieceCID-last flow (internal only)
First pass at supporting the new flow @ filecoin-project/curio#668 Second pass at this would expose the stream up through the SDK so you're not forced to pass down byte arrays. For now, this is just internal and hides the details outside of PDPServer.
1 parent 9527d2d commit 8a4c3e9

File tree

2 files changed

+154
-88
lines changed

2 files changed

+154
-88
lines changed

packages/synapse-sdk/src/pdp/server.ts

Lines changed: 66 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*/
2828

2929
import { ethers } from 'ethers'
30-
import { asPieceCID, calculate as calculatePieceCID, downloadAndValidate } from '../piece/index.ts'
30+
import { asPieceCID, createPieceCIDStream, downloadAndValidate } from '../piece/index.ts'
3131
import type { DataSetData, MetadataEntry, PieceCID } from '../types.ts'
3232
import { validateDataSetMetadata, validatePieceMetadata } from '../utils/metadata.ts'
3333
import { constructFindPieceUrl, constructPieceUrl } from '../utils/piece.ts'
@@ -442,38 +442,23 @@ export class PDPServer {
442442
async uploadPiece(data: Uint8Array | ArrayBuffer): Promise<UploadResponse> {
443443
// Convert ArrayBuffer to Uint8Array if needed
444444
const uint8Data = data instanceof ArrayBuffer ? new Uint8Array(data) : data
445-
446-
// Calculate PieceCID
447-
performance.mark('synapse:calculatePieceCID-start')
448-
const pieceCid = calculatePieceCID(uint8Data)
449-
performance.mark('synapse:calculatePieceCID-end')
450-
performance.measure('synapse:calculatePieceCID', 'synapse:calculatePieceCID-start', 'synapse:calculatePieceCID-end')
451445
const size = uint8Data.length
452446

453-
const requestBody = {
454-
pieceCid: pieceCid.toString(),
455-
// No notify URL needed
456-
}
457-
458-
// Create upload session or check if piece exists
459-
performance.mark('synapse:POST.pdp.piece-start')
460-
const createResponse = await fetch(`${this._serviceURL}/pdp/piece`, {
447+
// Create upload session
448+
performance.mark('synapse:POST.pdp.piece.uploads-start')
449+
const createResponse = await fetch(`${this._serviceURL}/pdp/piece/uploads`, {
461450
method: 'POST',
462451
headers: {
463452
'Content-Type': 'application/json',
464453
},
465-
body: JSON.stringify(requestBody),
454+
body: JSON.stringify({}),
466455
})
467-
performance.mark('synapse:POST.pdp.piece-end')
468-
performance.measure('synapse:POST.pdp.piece', 'synapse:POST.pdp.piece-start', 'synapse:POST.pdp.piece-end')
469-
470-
if (createResponse.status === 200) {
471-
// Piece already exists on server
472-
return {
473-
pieceCid,
474-
size,
475-
}
476-
}
456+
performance.mark('synapse:POST.pdp.piece.uploads-end')
457+
performance.measure(
458+
'synapse:POST.pdp.piece.uploads',
459+
'synapse:POST.pdp.piece.uploads-start',
460+
'synapse:POST.pdp.piece.uploads-end'
461+
)
477462

478463
if (createResponse.status !== 201) {
479464
const errorText = await createResponse.text()
@@ -489,37 +474,80 @@ export class PDPServer {
489474
}
490475

491476
// Validate the location format and extract UUID
492-
// Match /pdp/piece/upload/UUID or /piece/upload/UUID anywhere in the path
493-
const locationMatch = location.match(/\/(?:pdp\/)?piece\/upload\/([a-fA-F0-9-]+)/)
477+
// Match /pdp/piece/uploads/UUID anywhere in the path
478+
const locationMatch = location.match(/\/pdp\/piece\/uploads\/([a-fA-F0-9-]+)/)
494479
if (locationMatch == null) {
495480
throw new Error(`Invalid Location header format: ${location}`)
496481
}
497482

498483
const uploadUuid = locationMatch[1] // Extract just the UUID
499484

500-
// Upload the data
501-
performance.mark('synapse:PUT.pdp.piece.upload-start')
502-
const uploadResponse = await fetch(`${this._serviceURL}/pdp/piece/upload/${uploadUuid}`, {
485+
// Create streaming CommP calculator
486+
const { stream: pieceCidStream, getPieceCID } = createPieceCIDStream()
487+
488+
// Convert Uint8Array to ReadableStream for streaming through the CommP calculator
489+
const dataStream = new ReadableStream({
490+
start(controller) {
491+
controller.enqueue(uint8Data)
492+
controller.close()
493+
},
494+
})
495+
496+
// Upload the data while calculating CommP in parallel
497+
performance.mark('synapse:PUT.pdp.piece.uploads-start')
498+
const uploadResponse = await fetch(`${this._serviceURL}/pdp/piece/uploads/${uploadUuid}`, {
503499
method: 'PUT',
504500
headers: {
505501
'Content-Type': 'application/octet-stream',
506502
'Content-Length': uint8Data.length.toString(),
507-
// No Authorization header needed
508503
},
509-
body: uint8Data,
510-
})
511-
performance.mark('synapse:PUT.pdp.piece.upload-end')
504+
body: dataStream.pipeThrough(pieceCidStream),
505+
duplex: 'half',
506+
} as RequestInit)
507+
performance.mark('synapse:PUT.pdp.piece.uploads-end')
512508
performance.measure(
513-
'synapse:PUT.pdp.piece.upload',
514-
'synapse:PUT.pdp.piece.upload-start',
515-
'synapse:PUT.pdp.piece.upload-end'
509+
'synapse:PUT.pdp.piece.uploads',
510+
'synapse:PUT.pdp.piece.uploads-start',
511+
'synapse:PUT.pdp.piece.uploads-end'
516512
)
517513

518514
if (uploadResponse.status !== 204) {
519515
const errorText = await uploadResponse.text()
520516
throw new Error(`Failed to upload piece: ${uploadResponse.status} ${uploadResponse.statusText} - ${errorText}`)
521517
}
522518

519+
// Get the calculated PieceCID (available after stream completes)
520+
const pieceCid = getPieceCID()
521+
if (pieceCid == null) {
522+
throw new Error('Failed to calculate PieceCID during upload')
523+
}
524+
525+
// Finalize the upload with CommP validation
526+
performance.mark('synapse:POST.pdp.piece.uploads.finalize-start')
527+
const finalizeResponse = await fetch(`${this._serviceURL}/pdp/piece/uploads/${uploadUuid}`, {
528+
method: 'POST',
529+
headers: {
530+
'Content-Type': 'application/json',
531+
},
532+
body: JSON.stringify({
533+
pieceCid: pieceCid.toString(),
534+
size,
535+
}),
536+
})
537+
performance.mark('synapse:POST.pdp.piece.uploads.finalize-end')
538+
performance.measure(
539+
'synapse:POST.pdp.piece.uploads.finalize',
540+
'synapse:POST.pdp.piece.uploads.finalize-start',
541+
'synapse:POST.pdp.piece.uploads.finalize-end'
542+
)
543+
544+
if (finalizeResponse.status !== 200) {
545+
const errorText = await finalizeResponse.text()
546+
throw new Error(
547+
`Failed to finalize upload: ${finalizeResponse.status} ${finalizeResponse.statusText} - ${errorText}`
548+
)
549+
}
550+
523551
return {
524552
pieceCid,
525553
size,

packages/synapse-sdk/src/test/pdp-server.test.ts

Lines changed: 88 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -502,26 +502,23 @@ describe('PDPServer', () => {
502502
const mockUuid = '12345678-90ab-cdef-1234-567890abcdef'
503503

504504
server.use(
505-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async ({ request }) => {
506-
try {
507-
const body = await request.json()
508-
assert.exists(body.pieceCid)
509-
return HttpResponse.text('Created', {
510-
status: 201,
511-
headers: {
512-
Location: `/pdp/piece/upload/${mockUuid}`,
513-
},
514-
})
515-
} catch (error) {
516-
return HttpResponse.text((error as Error).message, {
517-
status: 400,
518-
})
519-
}
505+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
506+
return HttpResponse.text('Created', {
507+
status: 201,
508+
headers: {
509+
Location: `/pdp/piece/uploads/${mockUuid}`,
510+
},
511+
})
520512
}),
521-
http.put('http://pdp.local/pdp/piece/upload/:uuid', async () => {
513+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
522514
return HttpResponse.text('No Content', {
523515
status: 204,
524516
})
517+
}),
518+
http.post('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
519+
return HttpResponse.text('OK', {
520+
status: 200,
521+
})
525522
})
526523
)
527524

@@ -537,26 +534,23 @@ describe('PDPServer', () => {
537534
const mockUuid = 'fedcba09-8765-4321-fedc-ba0987654321'
538535

539536
server.use(
540-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async ({ request }) => {
541-
try {
542-
const body = await request.json()
543-
assert.exists(body.pieceCid)
544-
return HttpResponse.text('Created', {
545-
status: 201,
546-
headers: {
547-
Location: `/pdp/piece/upload/${mockUuid}`,
548-
},
549-
})
550-
} catch (error) {
551-
return HttpResponse.text((error as Error).message, {
552-
status: 400,
553-
})
554-
}
537+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
538+
return HttpResponse.text('Created', {
539+
status: 201,
540+
headers: {
541+
Location: `/pdp/piece/uploads/${mockUuid}`,
542+
},
543+
})
555544
}),
556-
http.put('http://pdp.local/pdp/piece/upload/:uuid', async () => {
545+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
557546
return HttpResponse.text('No Content', {
558547
status: 204,
559548
})
549+
}),
550+
http.post('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
551+
return HttpResponse.text('OK', {
552+
status: 200,
553+
})
560554
})
561555
)
562556

@@ -565,33 +559,42 @@ describe('PDPServer', () => {
565559
assert.equal(result.size, 5)
566560
})
567561

568-
it('should handle existing piece (200 response)', async () => {
562+
it('should throw on create upload session error', async () => {
569563
const testData = new Uint8Array([1, 2, 3, 4, 5])
570-
const mockPieceCid = 'bafkzcibcd4bdomn3tgwgrh3g532zopskstnbrd2n3sxfqbze7rxt7vqn7veigmy'
571564

572565
server.use(
573-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async () => {
574-
return HttpResponse.json(
575-
{ pieceCid: mockPieceCid },
576-
{
577-
status: 200,
578-
}
579-
)
566+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
567+
return HttpResponse.text('Database error', {
568+
status: 500,
569+
})
580570
})
581571
)
582572

583-
// Should not throw - existing piece is OK
584-
const result = await pdpServer.uploadPiece(testData)
585-
assert.exists(result.pieceCid)
586-
assert.equal(result.size, 5)
573+
try {
574+
await pdpServer.uploadPiece(testData)
575+
assert.fail('Should have thrown error')
576+
} catch (error: any) {
577+
assert.include(error.message, 'Failed to create upload session')
578+
assert.include(error.message, '500')
579+
assert.include(error.message, 'Database error')
580+
}
587581
})
588582

589-
it('should throw on create upload session error', async () => {
583+
it('should throw on upload error', async () => {
590584
const testData = new Uint8Array([1, 2, 3, 4, 5])
585+
const mockUuid = '12345678-90ab-cdef-1234-567890abcdef'
591586

592587
server.use(
593-
http.post<Record<string, never>, { pieceCid: string }>('http://pdp.local/pdp/piece', async () => {
594-
return HttpResponse.text('Database error', {
588+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
589+
return HttpResponse.text('Created', {
590+
status: 201,
591+
headers: {
592+
Location: `/pdp/piece/uploads/${mockUuid}`,
593+
},
594+
})
595+
}),
596+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
597+
return HttpResponse.text('Upload failed', {
595598
status: 500,
596599
})
597600
})
@@ -601,9 +604,44 @@ describe('PDPServer', () => {
601604
await pdpServer.uploadPiece(testData)
602605
assert.fail('Should have thrown error')
603606
} catch (error: any) {
604-
assert.include(error.message, 'Failed to create upload session')
607+
assert.include(error.message, 'Failed to upload piece')
605608
assert.include(error.message, '500')
606-
assert.include(error.message, 'Database error')
609+
assert.include(error.message, 'Upload failed')
610+
}
611+
})
612+
613+
it('should throw on finalization error', async () => {
614+
const testData = new Uint8Array([1, 2, 3, 4, 5])
615+
const mockUuid = '12345678-90ab-cdef-1234-567890abcdef'
616+
617+
server.use(
618+
http.post('http://pdp.local/pdp/piece/uploads', async () => {
619+
return HttpResponse.text('Created', {
620+
status: 201,
621+
headers: {
622+
Location: `/pdp/piece/uploads/${mockUuid}`,
623+
},
624+
})
625+
}),
626+
http.put('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
627+
return HttpResponse.text('No Content', {
628+
status: 204,
629+
})
630+
}),
631+
http.post('http://pdp.local/pdp/piece/uploads/:uuid', async () => {
632+
return HttpResponse.text('CommP mismatch', {
633+
status: 400,
634+
})
635+
})
636+
)
637+
638+
try {
639+
await pdpServer.uploadPiece(testData)
640+
assert.fail('Should have thrown error')
641+
} catch (error: any) {
642+
assert.include(error.message, 'Failed to finalize upload')
643+
assert.include(error.message, '400')
644+
assert.include(error.message, 'CommP mismatch')
607645
}
608646
})
609647
})

0 commit comments

Comments
 (0)