Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove duplicated values in the space-diff table #459

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions billing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
"lru-cache": "^11.0.0",
"multiformats": "^13.1.0",
"p-retry": "^6.2.0",
"stream-json": "^1.9.1",
"stripe": "^14.2.0",
"uint8arrays": "^4.0.6"
},
"devDependencies": {
"@types/big.js": "^6.2.1",
"@types/stream-json": "^1.7.8",
"aws-lambda": "^1.0.7",
"c8": "^8.0.1",
"csv-parser": "^3.0.0",
Expand Down
94 changes: 94 additions & 0 deletions billing/scripts/dedupe-space-diff/1-identify-duplicates.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import all from 'p-all'
import fs from 'node:fs'
import path from 'node:path'
import zlib from 'node:zlib'
import Stream from 'stream-json'
import { unmarshall } from '@aws-sdk/util-dynamodb'
import StreamValues from 'stream-json/streamers/StreamValues.js'

const args = process.argv.slice(2)
const folderPath = args[0] ?? 's3-export'

const concurrency = 5
const seenCauses = new Map()
/** @type {{pk:string, sk: string}[]} */
const itemsToDelete = []

/**
*
* @param {any} item
*/
function processItem(item) {
const seenItem = seenCauses.get(item.cause)
if (seenItem) {
const duplicateItemPk = new Date(seenItem.receiptAt) < new Date(item.receiptAt)
? { pk: item.pk, sk: item.sk }
: { pk: seenItem.pk, sk: seenItem.sk }
itemsToDelete.push(duplicateItemPk)
} else {
seenCauses.set(item.cause, {
receiptAt: item.receiptAt,
pk: item.pk,
sk: item.sk,
})
}
}

/**
*
* @param {string} filePath
* @returns
*/
async function processFile(filePath) {
return new Promise((resolve, reject) => {
const fileStream = fs.createReadStream(filePath)
const gunzipStream = zlib.createGunzip()
const jsonStream = Stream.parser({ jsonStreaming: true })
const pipeline = fileStream
.pipe(gunzipStream)
.pipe(jsonStream)
.pipe(StreamValues.streamValues())

pipeline.on('data', ({ value }) => {
if (value.Item) {
processItem(unmarshall(value.Item))
}
})

pipeline.on('end', resolve)
pipeline.on('error', reject)
})
}

export async function main() {
const files = fs
.readdirSync(folderPath)
.filter((file) => file.endsWith('.json.gz'))

if (files.length == 0) {
throw new Error('No relevant files found in the folder.')
}

await all(
files.map((file) => async () => {
const filePath = path.join(folderPath, file)
console.log(`Processing file: ${filePath}`)
await processFile(filePath)
}),
{ concurrency }
)

console.log(`Unique items: ${seenCauses.size}`)
console.log(`Items to delete: ${itemsToDelete.length}`)

await fs.promises.writeFile(
`./items-to-delete.json`,
JSON.stringify(itemsToDelete)
)
}

try {
await main()
} catch (e) {
console.error(e)
}
129 changes: 129 additions & 0 deletions billing/scripts/dedupe-space-diff/2-remove-duplicates.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import all from 'p-all'
import fs from 'node:fs'
import dotenv from 'dotenv'
import Stream from 'stream-json'
import { BatchWriteItemCommand, DynamoDBClient } from '@aws-sdk/client-dynamodb'
import StreamArray from 'stream-json/streamers/StreamArray.js'

import { mustGetEnv } from '../../../lib/env.js'
import { marshall, unmarshall } from '@aws-sdk/util-dynamodb'

dotenv.config({ path: '.env.local' })

/**
* @typedef {object} ItemKey
* @property {string} pk
* @property {string} sk
*/

const SPACE_DIFF_TABLE_NAME = mustGetEnv('SPACE_DIFF_TABLE_NAME')

const BATCH_SIZE = 25
const MAX_RETRIES = 3
const concurrency = 5
const dynamo = new DynamoDBClient()

/**
* @param {number} ms - Delay in milliseconds
* @returns {Promise<void>}
*/
const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms))

/**
*
* @param {ItemKey[]} items
* @param {number} retryCount
* @param {number} delay
*/
async function processBatch(items, retryCount = 0, delay = 100) {
console.log(`Processing batch with ${items.length} items...`)
const deleteRequests = items.map((item) => ({
DeleteRequest: {
Key: marshall(item),
},
}))

const batchDeleteCommand = new BatchWriteItemCommand({
RequestItems: {
[SPACE_DIFF_TABLE_NAME]: deleteRequests,
},
})

try {
const response = await dynamo.send(batchDeleteCommand)

if (response.UnprocessedItems && response.UnprocessedItems[SPACE_DIFF_TABLE_NAME]) {
const unprocessedItems = /** @type {ItemKey[]} */ (
response.UnprocessedItems[SPACE_DIFF_TABLE_NAME].map((item) =>
unmarshall(/** @type {Record<string, any>} */ (item.DeleteRequest?.Key))
)
)
if (retryCount < MAX_RETRIES) {
console.log(`Retrying ${unprocessedItems.length} unprocessed items...`)
await sleep(delay)
return processBatch(
unprocessedItems,
retryCount + 1,
delay * 2 // Increase delay exponentially
)
} else {
console.error(
'Max retries reached. Some items could not be processed:',
unprocessedItems
)
}
}
} catch (err) {
console.error('Failed to delete batch!', err)
}
}

/**
* @param {string} filePath
* @returns {Promise<void>}
*/
async function processFile(filePath) {
return new Promise((resolve, reject) => {
const fileStream = fs.createReadStream(filePath)
const pipeline = fileStream
.pipe(Stream.parser())
.pipe(StreamArray.streamArray())

/** @type {ItemKey[]} */
let batch = []
/** @type {(() => Promise<void>)[]} */
const tasks = []

pipeline.on('data', async ({ value }) => {
if (value) {
batch.push(value)
if (batch.length == BATCH_SIZE) {
const copyBatch = batch.slice()
tasks.push(() => processBatch(copyBatch))
batch = []
}
}
})

pipeline.on('end', async () => {
if (batch.length > 0) {
tasks.push(() => processBatch(batch))
}
await all(tasks, { concurrency })
resolve()
})
pipeline.on('error', reject)
})
}

export async function main() {
const file = `items-to-delete.json`
console.log(`Processing ${file}...`)
await processFile(file)
}

try {
await main()
} catch (e) {
console.error(e)
}
9 changes: 9 additions & 0 deletions billing/scripts/dedupe-space-diff/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Dedup Space Diff Table

A bug that caused duplicate entries in the `space-diff` table has been identified and fixed. To ensure the table reflects the correct state, it is essential that only one item with a specific `cause` exists. This requires identifying the duplicates and removing the most recently inserted one.

Please see the steps that need to be followed below:

1. **Get the data:** Export the table data to S3 and download the `.json.gz` files.
2. **Identify the Duplicates:** run the `1-identify-duplicates.js`, passing the exported files folder path.
3. **Remove the Dyplicates:** run the `2-remove-duplicates.js`.
34 changes: 34 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading