Skip to content

Commit 1ad54ea

Browse files
author
Alan Shaw
authored
feat: use batching for billing UCAN stream handler (storacha#410)
This enables the UCAN stream handler to accept messages in batches, which should hopefully speed up processing of the stream and stop the backlog from growing. It's ok for the whole batch to be retried on failure since the primary key does not change over time, so any records that succeeded to be written in a failed batch will just be overwritten on retry.
1 parent 76e98ba commit 1ad54ea

File tree

9 files changed

+152
-73
lines changed

9 files changed

+152
-73
lines changed

Diff for: billing/functions/ucan-stream.js

+4-11
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { LRUCache } from 'lru-cache'
55
import { createSpaceDiffStore } from '../tables/space-diff.js'
66
import { createConsumerStore } from '../tables/consumer.js'
77
import { expect } from './lib.js'
8-
import { findSpaceUsageDeltas, storeSpaceUsageDelta } from '../lib/ucan-stream.js'
8+
import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../lib/ucan-stream.js'
99
import { mustGetEnv } from '../../lib/env.js'
1010

1111
Sentry.AWSLambda.init({
@@ -35,24 +35,17 @@ export const handler = Sentry.AWSLambda.wrapHandler(
3535
const region = customContext?.region ?? mustGetEnv('AWS_REGION')
3636

3737
const messages = parseUcanStreamEvent(event)
38-
if (!messages || messages.length > 1) {
39-
throw new Error(`invalid batch size, expected: 1, actual: ${messages.length}`)
40-
}
41-
4238
const deltas = findSpaceUsageDeltas(messages)
4339
if (!deltas.length) {
4440
console.log("No messages found that contain space usage deltas", "capabilities", messages[0].value.att.map((att) => att.can), "resources", messages[0].value.att.map((att) => att.with) )
4541
return
4642
}
47-
console.log("Storing space usage delta", deltas[0])
48-
43+
console.log(`Storing ${deltas.length} space usage deltas`)
44+
4945
const consumerStore = createConsumerStore({ region }, { tableName: consumerTable })
5046
const spaceDiffStore = createSpaceDiffStore({ region }, { tableName: spaceDiffTable })
5147
const ctx = { spaceDiffStore, consumerStore: withConsumerListCache(consumerStore) }
52-
expect(
53-
await storeSpaceUsageDelta(deltas[0], ctx),
54-
`storing space usage delta for: ${deltas[0].resource}, cause: ${deltas[0].cause}`
55-
)
48+
expect(await storeSpaceUsageDeltas(deltas, ctx), 'storing space usage deltas')
5649
}
5750
)
5851

Diff for: billing/lib/api.ts

+20-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export interface SpaceDiffListKey {
4343
}
4444

4545
export type SpaceDiffStore =
46-
& StorePutter<SpaceDiff>
46+
& StoreBatchPutter<SpaceDiff>
4747
& StoreLister<SpaceDiffListKey, SpaceDiff>
4848

4949
/** Captures size of a space at a given point in time. */
@@ -294,12 +294,31 @@ export interface RecordNotFound<K> extends Failure {
294294
key: K
295295
}
296296

297+
/** Not enough records were provided for the operation. */
298+
export interface InsufficientRecords extends Failure {
299+
name: 'InsufficientRecords'
300+
}
301+
297302
/** StorePutter allows a single item to be put in the store by it's key. */
298303
export interface StorePutter<T> {
299304
/** Puts a single item into the store by it's key */
300305
put: (rec: T) => Promise<Result<Unit, EncodeFailure|StoreOperationFailure|Failure>>
301306
}
302307

308+
/**
309+
* StoreBatchPutter allows multiple items to be put in the store. Note: this is
310+
* not transactional. A failure may mean 1 or more records succeeded to
311+
* be written.
312+
*/
313+
export interface StoreBatchPutter<T> {
314+
/**
315+
* Puts multiple items into the store by their key. Note: this is
316+
* not transactional. A failure may mean 1 or more records succeeded to
317+
* be written.
318+
*/
319+
batchPut: (rec: Iterable<T>) => Promise<Result<Unit, InsufficientRecords|EncodeFailure|StoreOperationFailure|Failure>>
320+
}
321+
303322
/** StoreGetter allows a single item to be retrieved by it's key. */
304323
export interface StoreGetter<K extends {}, V> {
305324
/** Gets a single item by it's key. */

Diff for: billing/lib/ucan-stream.js

+27-19
Original file line numberDiff line numberDiff line change
@@ -62,33 +62,41 @@ export const findSpaceUsageDeltas = messages => {
6262
* multiple calls to this function with the same data must not add _another_
6363
* record to the store.
6464
*
65-
* @param {import('./api.js').UsageDelta} delta
65+
* @param {import('./api.js').UsageDelta[]} deltas
6666
* @param {{
6767
* spaceDiffStore: import('./api').SpaceDiffStore
6868
* consumerStore: import('./api').ConsumerStore
6969
* }} ctx
70-
* @returns {Promise<import('@ucanto/interface').Result<import('@ucanto/interface').Unit>>}
7170
*/
72-
export const storeSpaceUsageDelta = async (delta, ctx) => {
73-
const consumerList = await ctx.consumerStore.list({ consumer: delta.resource })
74-
if (consumerList.error) return consumerList
71+
export const storeSpaceUsageDeltas = async (deltas, ctx) => {
72+
const spaceDiffResults = await Promise.all(deltas.map(async delta => {
73+
const consumerList = await ctx.consumerStore.list({ consumer: delta.resource })
74+
if (consumerList.error) return consumerList
7575

76-
// There should only be one subscription per provider, but in theory you
77-
// could have multiple providers for the same consumer (space).
78-
for (const consumer of consumerList.ok.results) {
79-
const spaceDiffPut = await ctx.spaceDiffStore.put({
80-
provider: consumer.provider,
81-
subscription: consumer.subscription,
82-
space: delta.resource,
83-
cause: delta.cause,
84-
delta: delta.delta,
85-
receiptAt: delta.receiptAt,
86-
insertedAt: new Date()
87-
})
88-
if (spaceDiffPut.error) return spaceDiffPut
76+
const diffs = []
77+
// There should only be one subscription per provider, but in theory you
78+
// could have multiple providers for the same consumer (space).
79+
for (const consumer of consumerList.ok.results) {
80+
diffs.push({
81+
provider: consumer.provider,
82+
subscription: consumer.subscription,
83+
space: delta.resource,
84+
cause: delta.cause,
85+
delta: delta.delta,
86+
receiptAt: delta.receiptAt,
87+
insertedAt: new Date()
88+
})
89+
}
90+
return { ok: diffs, error: undefined }
91+
}))
92+
93+
const spaceDiffs = []
94+
for (const res of spaceDiffResults) {
95+
if (res.error) return res
96+
spaceDiffs.push(...res.ok)
8997
}
9098

91-
return { ok: {} }
99+
return ctx.spaceDiffStore.batchPut(spaceDiffs)
92100
}
93101

94102
/**

Diff for: billing/tables/client.js

+49-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { DynamoDBClient, GetItemCommand, PutItemCommand, QueryCommand, ScanCommand } from '@aws-sdk/client-dynamodb'
1+
import { BatchWriteItemCommand, DynamoDBClient, GetItemCommand, PutItemCommand, QueryCommand, ScanCommand } from '@aws-sdk/client-dynamodb'
22
import { marshall, unmarshall, convertToAttr } from '@aws-sdk/util-dynamodb'
33
import retry from 'p-retry'
4-
import { RecordNotFound, StoreOperationFailure } from './lib.js'
4+
import { InsufficientRecords, RecordNotFound, StoreOperationFailure } from './lib.js'
55
import { getDynamoClient } from '../../lib/aws/dynamo.js'
66

77
/** @param {{ region: string } | DynamoDBClient} target */
@@ -54,6 +54,53 @@ export const createStorePutterClient = (conf, context) => {
5454
}
5555
}
5656

57+
/**
58+
* @template T
59+
* @param {{ region: string } | import('@aws-sdk/client-dynamodb').DynamoDBClient} conf
60+
* @param {object} context
61+
* @param {string} context.tableName
62+
* @param {import('../lib/api').Validator<T>} context.validate
63+
* @param {import('../lib/api').Encoder<T, import('../types').StoreRecord>} context.encode
64+
* @returns {import('../lib/api').StoreBatchPutter<T>}
65+
*/
66+
export const createStoreBatchPutterClient = (conf, context) => {
67+
const client = connectTable(conf)
68+
return {
69+
batchPut: async (records) => {
70+
/** @type {import('@aws-sdk/client-dynamodb').WriteRequest[]} */
71+
const writeRequests = []
72+
for (const record of records) {
73+
const validation = context.validate(record)
74+
if (validation.error) return validation
75+
76+
const encoding = context.encode(record)
77+
if (encoding.error) return encoding
78+
writeRequests.push(({ PutRequest: { Item: marshall(encoding.ok, { removeUndefinedValues: true }) } }))
79+
}
80+
81+
if (!writeRequests.length) {
82+
return { error: new InsufficientRecords('records must have length greater than or equal to 1') }
83+
}
84+
85+
try {
86+
let requestItems = { [context.tableName]: writeRequests }
87+
await retry(async () => {
88+
const cmd = new BatchWriteItemCommand({ RequestItems: requestItems })
89+
const res = await client.send(cmd)
90+
if (res.UnprocessedItems && Object.keys(res.UnprocessedItems).length) {
91+
requestItems = res.UnprocessedItems
92+
throw new Error('unprocessed items')
93+
}
94+
}, { onFailedAttempt: console.warn })
95+
return { ok: {} }
96+
} catch (/** @type {any} */ err) {
97+
console.error(err)
98+
return { error: new StoreOperationFailure(err.message, { cause: err }) }
99+
}
100+
}
101+
}
102+
}
103+
57104
/**
58105
* @template {object} K
59106
* @template V

Diff for: billing/tables/lib.js

+16
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,19 @@ export class RecordNotFound extends Failure {
3535
return { ...super.toJSON(), key: this.key }
3636
}
3737
}
38+
39+
export class InsufficientRecords extends Failure {
40+
/**
41+
* @param {string} [message] Context for the message.
42+
* @param {ErrorOptions} [options]
43+
*/
44+
constructor (message, options) {
45+
super(undefined, options)
46+
this.name = /** @type {const} */ ('InsufficientRecords')
47+
this.detail = message
48+
}
49+
50+
describe () {
51+
return this.detail ?? 'insufficient records were provided for the operation'
52+
}
53+
}

Diff for: billing/tables/space-diff.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { createStoreListerClient, createStorePutterClient } from './client.js'
1+
import { createStoreBatchPutterClient, createStoreListerClient } from './client.js'
22
import { validate, encode, lister, decode } from '../data/space-diff.js'
33

44
/**
@@ -36,6 +36,6 @@ export const spaceDiffTableProps = {
3636
* @returns {import('../lib/api').SpaceDiffStore}
3737
*/
3838
export const createSpaceDiffStore = (conf, { tableName }) => ({
39-
...createStorePutterClient(conf, { tableName, validate, encode }),
39+
...createStoreBatchPutterClient(conf, { tableName, validate, encode }),
4040
...createStoreListerClient(conf, { tableName, encodeKey: lister.encodeKey, decode })
4141
})

Diff for: billing/test/lib/space-billing-queue.js

+27-26
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@ export const test = {
1414
const to = startOfMonth(now)
1515
const delta = 1024 * 1024 * 1024 // 1GiB
1616

17-
await ctx.spaceDiffStore.put({
17+
await ctx.spaceDiffStore.batchPut([{
1818
provider: consumer.provider,
1919
space: consumer.consumer,
2020
subscription: consumer.subscription,
2121
cause: randomLink(),
2222
delta,
2323
receiptAt: from,
2424
insertedAt: new Date()
25-
})
25+
}])
2626

2727
/** @type {import('../../lib/api.js').SpaceBillingInstruction} */
2828
const instruction = {
@@ -75,28 +75,29 @@ export const test = {
7575
insertedAt: new Date()
7676
})
7777

78-
// add 1GiB
79-
await ctx.spaceDiffStore.put({
80-
provider: consumer.provider,
81-
space: consumer.consumer,
82-
subscription: consumer.subscription,
83-
cause: randomLink(),
84-
delta,
85-
receiptAt: from,
86-
insertedAt: new Date()
87-
})
88-
89-
// remove 1GiB
90-
await ctx.spaceDiffStore.put({
91-
provider: consumer.provider,
92-
space: consumer.consumer,
93-
subscription: consumer.subscription,
94-
cause: randomLink(),
95-
delta: -delta,
96-
// removed exactly half way through the month
97-
receiptAt: new Date(from.getTime() + ((to.getTime() - from.getTime()) / 2)),
98-
insertedAt: new Date()
99-
})
78+
await ctx.spaceDiffStore.batchPut([
79+
// add 1GiB
80+
{
81+
provider: consumer.provider,
82+
space: consumer.consumer,
83+
subscription: consumer.subscription,
84+
cause: randomLink(),
85+
delta,
86+
receiptAt: from,
87+
insertedAt: new Date()
88+
},
89+
// remove 1GiB
90+
{
91+
provider: consumer.provider,
92+
space: consumer.consumer,
93+
subscription: consumer.subscription,
94+
cause: randomLink(),
95+
delta: -delta,
96+
// removed exactly half way through the month
97+
receiptAt: new Date(from.getTime() + ((to.getTime() - from.getTime()) / 2)),
98+
insertedAt: new Date()
99+
}
100+
])
100101

101102
/** @type {import('../../lib/api.js').SpaceBillingInstruction} */
102103
const instruction = {
@@ -157,7 +158,7 @@ export const test = {
157158
return yest
158159
}
159160

160-
await ctx.spaceDiffStore.put({
161+
await ctx.spaceDiffStore.batchPut([{
161162
provider: consumer.provider,
162163
space: consumer.consumer,
163164
subscription: consumer.subscription,
@@ -166,7 +167,7 @@ export const test = {
166167
// store/add 24h prior to end of billing
167168
receiptAt: yesterday(to),
168169
insertedAt: new Date()
169-
})
170+
}])
170171

171172
/** @type {import('../../lib/api.js').SpaceBillingInstruction} */
172173
const instruction = {

Diff for: billing/test/lib/ucan-stream.js

+5-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Schema } from '@ucanto/core'
22
import * as ServiceBlobCaps from '@web3-storage/capabilities/web3.storage/blob'
33
import * as BlobCaps from '@web3-storage/capabilities/blob'
44
import * as StoreCaps from '@web3-storage/capabilities/store'
5-
import { findSpaceUsageDeltas, storeSpaceUsageDelta } from '../../lib/ucan-stream.js'
5+
import { findSpaceUsageDeltas, storeSpaceUsageDeltas } from '../../lib/ucan-stream.js'
66
import { randomConsumer } from '../helpers/consumer.js'
77
import { randomLink } from '../helpers/dag.js'
88
import { randomDID, randomDIDKey } from '../helpers/did.js'
@@ -174,11 +174,8 @@ export const test = {
174174
}]
175175

176176
const deltas = findSpaceUsageDeltas(receipts)
177-
178-
for (const d of deltas) {
179-
const res = await storeSpaceUsageDelta(d, ctx)
180-
assert.ok(res.ok)
181-
}
177+
const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx)
178+
assert.ok(storeDeltasRes.ok)
182179

183180
const res = await ctx.spaceDiffStore.list({
184181
provider: consumer.provider,
@@ -230,11 +227,8 @@ export const test = {
230227
}]
231228

232229
const deltas = findSpaceUsageDeltas(receipts)
233-
234-
for (const d of deltas) {
235-
const res = await storeSpaceUsageDelta(d, ctx)
236-
assert.ok(res.ok)
237-
}
230+
const storeDeltasRes = await storeSpaceUsageDeltas(deltas, ctx)
231+
assert.equal(storeDeltasRes.error?.name, 'InsufficientRecords')
238232

239233
const res = await ctx.spaceDiffStore.list({
240234
provider: consumer.provider,

Diff for: stacks/billing-stack.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ export function BillingStack ({ stack, app }) {
107107
function: ucanStreamHandler,
108108
cdk: {
109109
eventSource: {
110-
batchSize: 1,
110+
batchSize: 25, // max dynamo BatchWriteItems size
111+
bisectBatchOnError: true,
111112
startingPosition: StartingPosition.LATEST,
112113
filters: [
113114
FilterCriteria.filter({

0 commit comments

Comments
 (0)