Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions docs/src/content/docs/developer-guides/storage/storage-context.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ The SDK intelligently manages data sets to minimize on-chain transactions. The s
```ts twoslash
import {
PieceCID,
PieceRecord,
UploadResult,
ProviderInfo,
PreflightInfo,
Expand All @@ -145,35 +146,33 @@ import {
import { ethers } from "ethers";
type Transaction = Promise<ethers.TransactionResponse>;
type Hex = `0x${string}`;
/**
* Callbacks for tracking upload progress
*
* These callbacks provide visibility into the upload process stages:
* 1. Upload completion (piece uploaded to provider)
* 2. Piece addition (transaction submitted to chain)
* 3. Confirmation (transaction confirmed on-chain)
*/
export interface UploadCallbacks {
/** Called periodically during upload with bytes uploaded so far */
onProgress?: (bytesUploaded: number) => void;
/** Called when upload to service provider completes */
onUploadComplete?: (pieceCid: PieceCID) => void;
/** Called when the service provider has added the piece and submitted the transaction to the chain */
/** Called when the service provider has added the piece(s) and submitted the transaction to the chain */
onPiecesAdded?: (transaction?: Hex, pieces?: { pieceCid: PieceCID }[]) => void;
/** @deprecated Use onPiecesAdded instead */
onPieceAdded?: (transaction?: Hex) => void;
/** Called when the service provider agrees that the piece addition is confirmed on-chain */
/** Called when the service provider agrees that the piece addition(s) are confirmed on-chain */
onPiecesConfirmed?: (dataSetId: number, pieces: PieceRecord[]) => void;
/** @deprecated Use onPiecesConfirmed instead */
onPieceConfirmed?: (pieceIds: number[]) => void;
}

/**
* Options for uploading individual pieces to an existing storage context
* @param metadata - Custom metadata for this specific piece (key-value pairs)
* @param onUploadComplete - Called when upload to service provider completes
* @param onPieceAdded - Called when the service provider has added the piece and submitted the transaction to the chain
* @param onPieceConfirmed - Called when the service provider agrees that the piece addition is confirmed on-chain
* @param onPiecesAdded - Called when the service provider has added the piece(s) and submitted the transaction to the chain
* @param onPiecesConfirmed - Called when the service provider agrees that the piece addition(s) are confirmed on-chain and provides the dataSetId
*/
type UploadOptions = {
metadata?: Record<string, string>;
onUploadComplete?: (pieceCid: PieceCID) => void;
onPieceAdded?: (transaction?: Hex) => void;
onPieceConfirmed?: (pieceIds: number[]) => void;
onPiecesAdded?: (transaction?: Hex, pieces?: { pieceCid: PieceCID }[]) => void;
onPiecesConfirmed?: (dataSetId: number, pieces: PieceIdentifiers[]) => void;
};
// ---cut---
interface StorageContextAPI {
Expand Down Expand Up @@ -241,13 +240,23 @@ const { pieceCid, size, pieceId } = await storageContext.upload(data, {
`Uploaded PieceCID: ${piece.toV1().toString()} to storage provider!`
);
},
onPieceAdded: (hash) => {
onPiecesAdded: (hash, pieces) => {
console.log(
`🔄 Waiting for transaction to be confirmed on chain (txHash: ${hash})`
);
console.log(
`Batch includes PieceCIDs: ${
pieces?.map(({ pieceCid }) => pieceCid.toString()).join(", ") ?? ""
}`
);
},
onPieceConfirmed: () => {
console.log("Data pieces added to data set successfully");
onPiecesConfirmed: (dataSetId, pieces) => {
console.log(`Data set ${dataSetId} confirmed with provider`);
console.log(
`Piece ID mapping: ${pieces
.map(({ pieceId, pieceCid }) => `${pieceId}:${pieceCid}`)
.join(", ")}`
);
},
});

Expand Down
10 changes: 5 additions & 5 deletions docs/src/content/docs/guides/storage.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,12 @@ const result = await context.upload(fileData, {
onUploadComplete: (pieceCid) => {
console.log(`Upload complete! PieceCID: ${pieceCid}`)
},
onPieceAdded: (hash) => {
console.log(`Piece added, tx: ${hash}`)
onPiecesAdded: (hash, pieces) => {
console.log(`Pieces added in tx ${hash}: ${pieces?.length ?? 0}`)
},
onPiecesConfirmed: (dataSetId, pieces) => {
console.log(`Data set ${dataSetId} confirmed with pieces: ${pieces.map(({ pieceId }) => pieceId).join(', ')}`)
},
onPieceConfirmed: (pieceIds) => {
console.log(`Piece IDs: ${pieceIds.join(', ')}`)
}
})
```

Expand Down
16 changes: 12 additions & 4 deletions examples/cli/src/commands/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,19 @@ export const upload: Command = command(
metadata: {
name: path.basename(absolutePath),
},
onPieceAdded(transactionHash) {
p.log.info(`Piece added, tx: ${transactionHash}`)
onPiecesAdded(transactionHash, pieces) {
p.log.info(`Pieces added in tx: ${transactionHash}`)
if (pieces?.length) {
p.log.info(
`PieceCIDs: ${pieces.map(({ pieceCid }) => pieceCid.toString()).join(', ')}`
)
}
},
onPieceConfirmed(pieceIds) {
p.log.info(`Piece confirmed: ${pieceIds.join(', ')}`)
onPiecesConfirmed(dataSetId, pieces) {
p.log.info(`Data set ${dataSetId} confirmed`)
p.log.info(
`Piece IDs: ${pieces.map(({ pieceId }) => pieceId).join(', ')}`
)
},
onUploadComplete(pieceCid) {
p.log.info(`Upload complete! PieceCID: ${pieceCid}`)
Expand Down
19 changes: 15 additions & 4 deletions packages/synapse-sdk/src/storage/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import type {
EnhancedDataSetInfo,
MetadataEntry,
PieceCID,
PieceRecord,
PieceStatus,
PreflightInfo,
ProviderSelectionResult,
Expand Down Expand Up @@ -981,6 +982,7 @@ export class StorageContext {
const pieceCids: PieceCID[] = batch.map((item) => item.pieceCid)
const metadataArray: MetadataEntry[][] = batch.map((item) => item.metadata ?? [])
const confirmedPieceIds: number[] = []
const addedPieceRecords = pieceCids.map((pieceCid) => ({ pieceCid }))

if (this.dataSetId) {
const [, dataSetInfo] = await Promise.all([
Expand All @@ -997,14 +999,20 @@ export class StorageContext {

// Notify callbacks with transaction
batch.forEach((item) => {
item.callbacks?.onPiecesAdded?.(addPiecesResult.txHash as Hex, addedPieceRecords)
item.callbacks?.onPieceAdded?.(addPiecesResult.txHash as Hex)
})
const addPiecesResponse = await SP.pollForAddPiecesStatus(addPiecesResult)

// Handle transaction tracking if available
confirmedPieceIds.push(...(addPiecesResponse.confirmedPieceIds ?? []))

const confirmedPieceRecords: PieceRecord[] = confirmedPieceIds.map((pieceId, index) => ({
pieceId,
pieceCid: pieceCids[index],
}))
batch.forEach((item) => {
item.callbacks?.onPiecesConfirmed?.(this.dataSetId as number, confirmedPieceRecords)
item.callbacks?.onPieceConfirmed?.(confirmedPieceIds)
})
} else {
Expand All @@ -1031,6 +1039,7 @@ export class StorageContext {
}
)
batch.forEach((item) => {
item.callbacks?.onPiecesAdded?.(createAndAddPiecesResult.txHash as Hex, addedPieceRecords)
item.callbacks?.onPieceAdded?.(createAndAddPiecesResult.txHash as Hex)
})
const confirmedDataset = await SP.pollForDataSetCreationStatus(createAndAddPiecesResult)
Expand All @@ -1045,7 +1054,12 @@ export class StorageContext {

confirmedPieceIds.push(...(confirmedPieces.confirmedPieceIds ?? []))

const confirmedPieceRecords: PieceRecord[] = confirmedPieceIds.map((pieceId, index) => ({
pieceId,
pieceCid: pieceCids[index],
}))
batch.forEach((item) => {
item.callbacks?.onPiecesConfirmed?.(this.dataSetId as number, confirmedPieceRecords)
item.callbacks?.onPieceConfirmed?.(confirmedPieceIds)
})
}
Expand Down Expand Up @@ -1132,10 +1146,7 @@ export class StorageContext {
* @param options.signal - Optional AbortSignal to cancel the operation
* @yields Object with pieceCid and pieceId - the piece ID is needed for certain operations like deletion
*/
async *getPieces(options?: {
batchSize?: number
signal?: AbortSignal
}): AsyncGenerator<{ pieceCid: PieceCID; pieceId: number }> {
async *getPieces(options?: { batchSize?: number; signal?: AbortSignal }): AsyncGenerator<PieceRecord> {
if (this._dataSetId == null) {
return
}
Expand Down
31 changes: 29 additions & 2 deletions packages/synapse-sdk/src/test/storage-upload.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import { assert } from 'chai'
import { ethers } from 'ethers'
import { setup } from 'iso-web/msw'
import { HttpResponse, http } from 'msw'
import type { Hex } from 'viem'
import { Synapse } from '../synapse.ts'
import type { PieceCID, PieceRecord } from '../types.ts'
import { SIZE_CONSTANTS } from '../utils/constants.ts'
import { JSONRPC, PRIVATE_KEYS, presets } from './mocks/jsonrpc/index.ts'
import { findAnyPieceHandler, streamingUploadHandlers } from './mocks/pdp/handlers.ts'
Expand Down Expand Up @@ -451,7 +453,10 @@ describe('Storage Upload', () => {
it('should handle new server with transaction tracking', async () => {
let pieceAddedCallbackFired = false
let pieceConfirmedCallbackFired = false
let piecesAddedArgs: { transaction?: Hex; pieces?: Array<{ pieceCid: PieceCID }> } | null = null
let piecesConfirmedArgs: { dataSetId?: number; pieces?: PieceRecord[] } | null = null
let uploadCompleteCallbackFired = false
let resolvedDataSetId: number | undefined
const txHash = '0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef123456'
const pdpOptions = {
baseUrl: 'https://pdp.example.com',
Expand All @@ -470,11 +475,12 @@ describe('Storage Upload', () => {
})
}),
http.get<{ id: string }>(`https://pdp.example.com/pdp/data-sets/:id/pieces/added/:txHash`, ({ params }) => {
resolvedDataSetId = parseInt(params.id, 10)
return HttpResponse.json(
{
addMessageOk: true,
confirmedPieceIds: [0],
dataSetId: parseInt(params.id, 10),
dataSetId: resolvedDataSetId,
pieceCount: 1,
piecesAdded: true,
txHash,
Expand All @@ -493,7 +499,13 @@ describe('Storage Upload', () => {
})

const expectedSize = SIZE_CONSTANTS.MIN_UPLOAD_SIZE
await context.upload(new Uint8Array(expectedSize).fill(1), {
const uploadResult = await context.upload(new Uint8Array(expectedSize).fill(1), {
onPiecesAdded(transaction: Hex | undefined, pieces: Array<{ pieceCid: PieceCID }> | undefined) {
piecesAddedArgs = { transaction, pieces }
},
onPiecesConfirmed(dataSetId: number, pieces: PieceRecord[]) {
piecesConfirmedArgs = { dataSetId, pieces }
},
onPieceAdded() {
pieceAddedCallbackFired = true
},
Expand All @@ -508,6 +520,21 @@ describe('Storage Upload', () => {
assert.isTrue(pieceAddedCallbackFired, 'pieceAddedCallback should have been called')
assert.isTrue(pieceConfirmedCallbackFired, 'pieceConfirmedCallback should have been called')
assert.isTrue(uploadCompleteCallbackFired, 'uploadCompleteCallback should have been called')
assert.isNotNull(piecesAddedArgs, 'onPiecesAdded args should be captured')
assert.isNotNull(piecesConfirmedArgs, 'onPiecesConfirmed args should be captured')
if (piecesAddedArgs == null || piecesConfirmedArgs == null) {
throw new Error('Callbacks should have been called')
}
const addedArgs: { transaction?: Hex; pieces?: Array<{ pieceCid: PieceCID }> } = piecesAddedArgs
const confirmedArgs: { dataSetId?: number; pieces?: PieceRecord[] } = piecesConfirmedArgs
assert.strictEqual(addedArgs.transaction, txHash, 'onPiecesAdded should receive transaction hash')
assert.strictEqual(
addedArgs.pieces?.[0].pieceCid.toString(),
uploadResult.pieceCid.toString(),
'onPiecesAdded should provide matching pieceCid'
)
assert.strictEqual(confirmedArgs.dataSetId, resolvedDataSetId, 'onPiecesConfirmed should provide the dataset id')
assert.strictEqual(confirmedArgs.pieces?.[0].pieceId, 0, 'onPiecesConfirmed should include piece IDs')
})

it('should handle ArrayBuffer input', async () => {
Expand Down
27 changes: 17 additions & 10 deletions packages/synapse-sdk/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,25 +396,32 @@ export interface PreflightInfo {
// that combines context creation + upload in one call)
// ============================================================================

/**
* Callbacks for tracking upload progress
*
* These callbacks provide visibility into the upload process stages:
* 1. Upload completion (piece uploaded to provider)
* 2. Piece addition (transaction submitted to chain)
* 3. Confirmation (transaction confirmed on-chain)
*/
export interface UploadCallbacks {
/** Called periodically during upload with bytes uploaded so far */
onProgress?: (bytesUploaded: number) => void
/** Called when upload to service provider completes */
onUploadComplete?: (pieceCid: PieceCID) => void
/** Called when the service provider has added the piece and submitted the transaction to the chain */
/** Called when the service provider has added the piece(s) and submitted the transaction to the chain */
onPiecesAdded?: (transaction?: Hex, pieces?: { pieceCid: PieceCID }[]) => void
/** @deprecated Use onPiecesAdded instead */
onPieceAdded?: (transaction?: Hex) => void
/** Called when the service provider agrees that the piece addition is confirmed on-chain */
/** Called when the service provider agrees that the piece addition(s) are confirmed on-chain */
onPiecesConfirmed?: (dataSetId: number, pieces: PieceRecord[]) => void
/** @deprecated Use onPiecesConfirmed instead */
onPieceConfirmed?: (pieceIds: number[]) => void
}

/**
* Canonical representation of a piece within a data set.
*
* This is used when reporting confirmed pieces and when iterating over pieces
* in a data set.
*/
export interface PieceRecord {
pieceId: number
pieceCid: PieceCID
}

/**
* Options for uploading individual pieces to an existing storage context
*
Expand Down