Skip to content

Commit 13566fe

Browse files
authored
feat: add blob protocol to infra (storacha#353)
This PR creates stores and wires up new `upload-api` running `blob/add`, `web3.storage/blob/allocate`, `web3.storage/blob/accept` and `ucan/conclude` capabilities. Tests are also imported from `upload-api` implementation and run here. As agreed on storacha/w3up#1343 , there won't be any deduping between old world and new world. Therefore, we have new `allocations` table, and use different key schema in `carpark`. We are writing blobs keyed as `base58btc` as previously discussed as `${base58btcEncodedMultihash}/${base58btcEncodedMultihash}.blob`. I added `.blob` suffix but I am happy to other suggestions. Depending on how we progress with the reads side, we can consider creating a new bucket to fully isolate new content? The `receipts` and `tasks` storage end up being more complicated as they need to follow https://github.com/web3-storage/w3infra/blob/main/docs/ucan-invocation-stream.md#buckets, and is essentially the same as what happens on https://github.com/web3-storage/w3infra/blob/main/upload-api/ucan-invocation.js#L66 but at a different level as this is a proactive write of tasks and receipts.
1 parent 50b266f commit 13566fe

22 files changed

+1415
-376
lines changed

billing/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
"@sentry/serverless": "^7.74.1",
1414
"@ucanto/interface": "^10.0.1",
1515
"@ucanto/server": "^10.0.0",
16-
"@web3-storage/capabilities": "^13.3.0",
16+
"@web3-storage/capabilities": "^13.3.1",
1717
"big.js": "^6.2.1",
1818
"multiformats": "^13.1.0",
1919
"p-retry": "^6.2.0",

package-lock.json

Lines changed: 219 additions & 349 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
"@ipld/dag-ucan": "^3.0.1",
2424
"@tsconfig/node16": "^1.0.3",
2525
"@types/git-rev-sync": "^2.0.0",
26-
"@ucanto/client": "^9.0.0",
27-
"@ucanto/core": "^9.0.1",
28-
"@ucanto/interface": "^9.0.0",
29-
"@ucanto/principal": "^9.0.0",
30-
"@ucanto/transport": "^9.0.0",
31-
"@ucanto/validator": "^9.0.1",
26+
"@ucanto/client": "^9.0.1",
27+
"@ucanto/core": "^10.0.1",
28+
"@ucanto/interface": "^10.0.1",
29+
"@ucanto/principal": "^9.0.1",
30+
"@ucanto/transport": "^9.1.1",
31+
"@ucanto/validator": "^9.0.2",
3232
"@web-std/blob": "^3.0.4",
3333
"@web-std/fetch": "^4.1.0",
3434
"@web3-storage/data-segment": "5.0.0",

stacks/upload-api-stack.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export function UploadApiStack({ stack, app }) {
2525

2626
// Get references to constructs created in other stacks
2727
const { carparkBucket } = use(CarparkStack)
28-
const { storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack)
28+
const { allocationTable, storeTable, uploadTable, delegationBucket, delegationTable, revocationTable, adminMetricsTable, spaceMetricsTable, consumerTable, subscriptionTable, rateLimitTable, pieceTable, privateKey } = use(UploadDbStack)
2929
const { invocationBucket, taskBucket, workflowBucket, ucanStream } = use(UcanInvocationStack)
3030
const { customerTable, spaceDiffTable, spaceSnapshotTable, stripeSecretKey } = use(BillingDbStack)
3131
const { pieceOfferQueue, filecoinSubmitQueue } = use(FilecoinStack)
@@ -41,6 +41,7 @@ export function UploadApiStack({ stack, app }) {
4141
defaults: {
4242
function: {
4343
permissions: [
44+
allocationTable,
4445
storeTable,
4546
uploadTable,
4647
customerTable,
@@ -66,6 +67,7 @@ export function UploadApiStack({ stack, app }) {
6667
environment: {
6768
DID: process.env.UPLOAD_API_DID ?? '',
6869
AGGREGATOR_DID,
70+
ALLOCATION_TABLE_NAME: allocationTable.tableName,
6971
STORE_TABLE_NAME: storeTable.tableName,
7072
STORE_BUCKET_NAME: carparkBucket.bucketName,
7173
UPLOAD_TABLE_NAME: uploadTable.tableName,
@@ -92,6 +94,7 @@ export function UploadApiStack({ stack, app }) {
9294
COMMIT: git.commmit,
9395
STAGE: stack.stage,
9496
ACCESS_SERVICE_URL: getServiceURL(stack, customDomain) ?? '',
97+
UPLOAD_SERVICE_URL: getServiceURL(stack, customDomain) ?? '',
9598
POSTMARK_TOKEN: process.env.POSTMARK_TOKEN ?? '',
9699
PROVIDERS: process.env.PROVIDERS ?? '',
97100
R2_ACCESS_KEY_ID: process.env.R2_ACCESS_KEY_ID ?? '',

stacks/upload-db-stack.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Table, Bucket, Config } from 'sst/constructs'
22

33
import {
4+
allocationTableProps,
45
storeTableProps,
56
uploadTableProps,
67
consumerTableProps,
@@ -31,6 +32,12 @@ export function UploadDbStack({ stack, app }) {
3132
// TODO: we should look into creating a trust layer for content claims
3233
const contentClaimsPrivateKey = new Config.Secret(stack, 'CONTENT_CLAIMS_PRIVATE_KEY')
3334

35+
/**
36+
* The allocation table tracks allocated multihashes per space.
37+
* Used by the blob/* service capabilities.
38+
*/
39+
const allocationTable = new Table(stack, 'allocation', allocationTableProps)
40+
3441
/**
3542
* This table takes a stored CAR and makes an entry in the store table
3643
* Used by the store/* service capabilities.
@@ -99,6 +106,7 @@ export function UploadDbStack({ stack, app }) {
99106
const spaceMetricsTable = new Table(stack, 'space-metrics', spaceMetricsTableProps)
100107

101108
return {
109+
allocationTable,
102110
storeTable,
103111
uploadTable,
104112
pieceTable,

upload-api/functions/ucan-invocation-router.js

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import { createUcantoServer } from '../service.js'
2323
import { Config } from 'sst/node/config'
2424
import { CAR, Legacy, Codec } from '@ucanto/transport'
2525
import { Email } from '../email.js'
26+
import { createTasksStorage } from '../stores/tasks.js'
27+
import { createReceiptsStorage } from '../stores/receipts.js'
28+
import { createAllocationsStorage } from '../stores/allocations.js'
29+
import { createBlobsStorage, composeblobStoragesWithOrderedHas } from '../stores/blobs.js'
2630
import { useProvisionStore } from '../stores/provisions.js'
2731
import { useSubscriptionsStore } from '../stores/subscriptions.js'
2832
import { createDelegationsTable } from '../tables/delegations.js'
@@ -39,6 +43,7 @@ import { createSpaceDiffStore } from '@web3-storage/w3infra-billing/tables/space
3943
import { createSpaceSnapshotStore } from '@web3-storage/w3infra-billing/tables/space-snapshot.js'
4044
import { useUsageStore } from '../stores/usage.js'
4145
import { createStripeBillingProvider } from '../billing.js'
46+
import { createTasksScheduler } from '../scheduler.js'
4247

4348
Sentry.AWSLambda.init({
4449
environment: process.env.SST_STAGE,
@@ -99,6 +104,7 @@ export async function ucanInvocationRouter(request) {
99104
storeTableName,
100105
storeBucketName,
101106
uploadTableName,
107+
allocationTableName,
102108
consumerTableName,
103109
customerTableName,
104110
subscriptionTableName,
@@ -122,6 +128,7 @@ export async function ucanInvocationRouter(request) {
122128
aggregatorDid,
123129
dealTrackerDid,
124130
dealTrackerUrl,
131+
uploadServiceURL,
125132
pieceOfferQueueUrl,
126133
filecoinSubmitQueueUrl,
127134
requirePaymentPlan,
@@ -144,6 +151,22 @@ export async function ucanInvocationRouter(request) {
144151
const { PRIVATE_KEY, STRIPE_SECRET_KEY } = Config
145152
const serviceSigner = getServiceSigner({ did: UPLOAD_API_DID, privateKey: PRIVATE_KEY })
146153

154+
const tasksStorage = createTasksStorage(AWS_REGION, invocationBucketName, workflowBucketName)
155+
const receiptsStorage = createReceiptsStorage(AWS_REGION, taskBucketName, invocationBucketName, workflowBucketName)
156+
const allocationsStorage = createAllocationsStorage(AWS_REGION, allocationTableName, {
157+
endpoint: dbEndpoint,
158+
})
159+
const blobsStorage = composeblobStoragesWithOrderedHas(
160+
createBlobsStorage(R2_REGION, carparkBucketName, {
161+
endpoint: carparkBucketEndpoint,
162+
credentials: {
163+
accessKeyId: carparkBucketAccessKeyId,
164+
secretAccessKey: carparkBucketSecretAccessKey,
165+
},
166+
}),
167+
createBlobsStorage(AWS_REGION, storeBucketName),
168+
)
169+
147170
const invocationBucket = createInvocationStore(
148171
AWS_REGION,
149172
invocationBucketName
@@ -172,16 +195,29 @@ export async function ucanInvocationRouter(request) {
172195
const spaceSnapshotStore = createSpaceSnapshotStore({ region: AWS_REGION }, { tableName: spaceSnapshotTableName })
173196
const usageStorage = useUsageStore({ spaceDiffStore, spaceSnapshotStore })
174197

175-
const connection = getServiceConnection({
198+
const dealTrackerConnection = getServiceConnection({
176199
did: dealTrackerDid,
177200
url: dealTrackerUrl
178201
})
202+
const selfConnection = getServiceConnection({
203+
did: serviceSigner.did(),
204+
url: uploadServiceURL
205+
})
206+
const tasksScheduler = createTasksScheduler(() => selfConnection)
179207

180208
const server = createUcantoServer(serviceSigner, {
181209
codec,
210+
allocationsStorage,
211+
blobsStorage,
212+
tasksStorage,
213+
receiptsStorage,
214+
tasksScheduler,
215+
getServiceConnection: () => selfConnection,
216+
// TODO: to be deprecated with `store/*` protocol
182217
storeTable: createStoreTable(AWS_REGION, storeTableName, {
183218
endpoint: dbEndpoint,
184219
}),
220+
// TODO: to be deprecated with `store/*` protocol
185221
carStoreBucket: composeCarStoresWithOrderedHas(
186222
createCarStore(AWS_REGION, storeBucketName),
187223
createCarStore(R2_REGION, carparkBucketName, {
@@ -192,6 +228,7 @@ export async function ucanInvocationRouter(request) {
192228
},
193229
}),
194230
),
231+
// TODO: to be deprecated with `store/*` protocol
195232
dudewhereBucket: createDudewhereStore(R2_REGION, R2_DUDEWHERE_BUCKET_NAME, {
196233
endpoint: R2_ENDPOINT,
197234
credentials: {
@@ -218,10 +255,10 @@ export async function ucanInvocationRouter(request) {
218255
pieceOfferQueue: createPieceOfferQueueClient({ region: AWS_REGION }, { queueUrl: pieceOfferQueueUrl }),
219256
filecoinSubmitQueue: createFilecoinSubmitQueueClient({ region: AWS_REGION }, { queueUrl: filecoinSubmitQueueUrl }),
220257
dealTrackerService: {
221-
connection,
258+
connection: dealTrackerConnection,
222259
invocationConfig: {
223260
issuer: serviceSigner,
224-
audience: connection.id,
261+
audience: dealTrackerConnection.id,
225262
with: serviceSigner.did()
226263
}
227264
},
@@ -316,6 +353,7 @@ function getLambdaEnv () {
316353
storeTableName: mustGetEnv('STORE_TABLE_NAME'),
317354
storeBucketName: mustGetEnv('STORE_BUCKET_NAME'),
318355
uploadTableName: mustGetEnv('UPLOAD_TABLE_NAME'),
356+
allocationTableName: mustGetEnv('ALLOCATION_TABLE_NAME'),
319357
consumerTableName: mustGetEnv('CONSUMER_TABLE_NAME'),
320358
customerTableName: mustGetEnv('CUSTOMER_TABLE_NAME'),
321359
subscriptionTableName: mustGetEnv('SUBSCRIPTION_TABLE_NAME'),
@@ -339,6 +377,7 @@ function getLambdaEnv () {
339377
postmarkToken: mustGetEnv('POSTMARK_TOKEN'),
340378
providers: mustGetEnv('PROVIDERS'),
341379
accessServiceURL: mustGetEnv('ACCESS_SERVICE_URL'),
380+
uploadServiceURL: mustGetEnv('UPLOAD_SERVICE_URL'),
342381
aggregatorDid: mustGetEnv('AGGREGATOR_DID'),
343382
requirePaymentPlan: (process.env.REQUIRE_PAYMENT_PLAN === 'true'),
344383
dealTrackerDid: mustGetEnv('DEAL_TRACKER_DID'),

upload-api/package.json

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
"@ucanto/transport": "^9.1.1",
2323
"@ucanto/validator": "^9.0.2",
2424
"@web-std/fetch": "^4.1.0",
25-
"@web3-storage/access": "^18.3.0",
26-
"@web3-storage/capabilities": "^13.3.0",
25+
"@web3-storage/access": "^18.3.1",
26+
"@web3-storage/capabilities": "^13.3.1",
2727
"@web3-storage/did-mailto": "^2.1.0",
28-
"@web3-storage/upload-api": "^9.0.1",
28+
"@web3-storage/upload-api": "^9.1.5",
2929
"multiformats": "^13.1.0",
3030
"nanoid": "^5.0.2",
3131
"preact": "^10.14.1",
@@ -65,7 +65,8 @@
6565
"eslintConfig": {
6666
"rules": {
6767
"unicorn/consistent-destructuring": "off",
68-
"unicorn/prefer-array-flat-map": "off"
68+
"unicorn/prefer-array-flat-map": "off",
69+
"unicorn/no-useless-undefined": "off"
6970
}
7071
}
7172
}

upload-api/scheduler.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* @typedef {import('@web3-storage/upload-api/types').TasksScheduler} TasksSchedulerInterface
3+
* @typedef {import('@web3-storage/upload-api/types').Service} Service
4+
* @typedef {import('@ucanto/interface').ConnectionView<Service>} Connection
5+
* @typedef {import('@ucanto/interface').ServiceInvocation} ServiceInvocation
6+
* @typedef {import('@ucanto/interface').Failure} Failure
7+
* @typedef {import('@ucanto/interface').Unit} Unit
8+
* @typedef {import('@ucanto/interface').Result<Unit, Failure>} Result
9+
*/
10+
11+
/**
12+
* @param {() => Connection} getServiceConnection
13+
*/
14+
export const createTasksScheduler = (getServiceConnection) => new TasksScheduler(getServiceConnection)
15+
16+
/**
17+
* @implements {TasksSchedulerInterface}
18+
*/
19+
export class TasksScheduler {
20+
/**
21+
*
22+
* @param {() => Connection} getServiceConnection
23+
*/
24+
constructor (getServiceConnection) {
25+
this.getServiceConnection = getServiceConnection
26+
}
27+
28+
/**
29+
* @param {ServiceInvocation} invocation
30+
* @returns {Promise<Result>}
31+
*/
32+
async schedule(invocation) {
33+
const connection = this.getServiceConnection()
34+
// This performs a HTTP Request to the Service URL.
35+
// upload-api service URL stores received invocations and produced
36+
// receipts on the server.
37+
const [res] = await connection.execute(invocation)
38+
39+
if (res.out.error) {
40+
return res.out
41+
}
42+
return {
43+
ok: {},
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)