diff --git a/billing/package.json b/billing/package.json index f878e651..058c1e0a 100644 --- a/billing/package.json +++ b/billing/package.json @@ -19,11 +19,13 @@ "lru-cache": "^11.0.0", "multiformats": "^13.1.0", "p-retry": "^6.2.0", + "stream-json": "^1.9.1", "stripe": "^14.2.0", "uint8arrays": "^4.0.6" }, "devDependencies": { "@types/big.js": "^6.2.1", + "@types/stream-json": "^1.7.8", "aws-lambda": "^1.0.7", "c8": "^8.0.1", "csv-parser": "^3.0.0", diff --git a/billing/scripts/dedupe-space-diff/1-identify-duplicates.js b/billing/scripts/dedupe-space-diff/1-identify-duplicates.js new file mode 100644 index 00000000..6e094858 --- /dev/null +++ b/billing/scripts/dedupe-space-diff/1-identify-duplicates.js @@ -0,0 +1,94 @@ +import all from 'p-all' +import fs from 'node:fs' +import path from 'node:path' +import zlib from 'node:zlib' +import Stream from 'stream-json' +import { unmarshall } from '@aws-sdk/util-dynamodb' +import StreamValues from 'stream-json/streamers/StreamValues.js' + +const args = process.argv.slice(2) +const folderPath = args[0] ?? 's3-export' + +const concurrency = 5 +const seenCauses = new Map() +/** @type {{pk:string, sk: string}[]} */ +const itemsToDelete = [] + +/** + * + * @param {any} item + */ +function processItem(item) { + const seenItem = seenCauses.get(item.cause) + if (seenItem) { + const duplicateItemPk = new Date(seenItem.receiptAt) < new Date(item.receiptAt) + ? { pk: item.pk, sk: item.sk } + : { pk: seenItem.pk, sk: seenItem.sk } + itemsToDelete.push(duplicateItemPk) + } else { + seenCauses.set(item.cause, { + receiptAt: item.receiptAt, + pk: item.pk, + sk: item.sk, + }) + } +} + +/** + * + * @param {string} filePath + * @returns + */ +async function processFile(filePath) { + return new Promise((resolve, reject) => { + const fileStream = fs.createReadStream(filePath) + const gunzipStream = zlib.createGunzip() + const jsonStream = Stream.parser({ jsonStreaming: true }) + const pipeline = fileStream + .pipe(gunzipStream) + .pipe(jsonStream) + .pipe(StreamValues.streamValues()) + + pipeline.on('data', ({ value }) => { + if (value.Item) { + processItem(unmarshall(value.Item)) + } + }) + + pipeline.on('end', resolve) + pipeline.on('error', reject) + }) +} + +export async function main() { + const files = fs + .readdirSync(folderPath) + .filter((file) => file.endsWith('.json.gz')) + + if (files.length == 0) { + throw new Error('No relevant files found in the folder.') + } + + await all( + files.map((file) => async () => { + const filePath = path.join(folderPath, file) + console.log(`Processing file: ${filePath}`) + await processFile(filePath) + }), + { concurrency } + ) + + console.log(`Unique items: ${seenCauses.size}`) + console.log(`Items to delete: ${itemsToDelete.length}`) + + await fs.promises.writeFile( + `./items-to-delete.json`, + JSON.stringify(itemsToDelete) + ) +} + +try { + await main() +} catch (e) { + console.error(e) +} diff --git a/billing/scripts/dedupe-space-diff/2-remove-duplicates.js b/billing/scripts/dedupe-space-diff/2-remove-duplicates.js new file mode 100644 index 00000000..27132eb3 --- /dev/null +++ b/billing/scripts/dedupe-space-diff/2-remove-duplicates.js @@ -0,0 +1,129 @@ +import all from 'p-all' +import fs from 'node:fs' +import dotenv from 'dotenv' +import Stream from 'stream-json' +import { BatchWriteItemCommand, DynamoDBClient } from '@aws-sdk/client-dynamodb' +import StreamArray from 'stream-json/streamers/StreamArray.js' + +import { mustGetEnv } from '../../../lib/env.js' +import { marshall, unmarshall } from '@aws-sdk/util-dynamodb' + +dotenv.config({ path: '.env.local' }) + +/** + * @typedef {object} ItemKey + * @property {string} pk + * @property {string} sk + */ + +const SPACE_DIFF_TABLE_NAME = mustGetEnv('SPACE_DIFF_TABLE_NAME') + +const BATCH_SIZE = 25 +const MAX_RETRIES = 3 +const concurrency = 5 +const dynamo = new DynamoDBClient() + +/** + * @param {number} ms - Delay in milliseconds + * @returns {Promise} + */ +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)) + +/** + * + * @param {ItemKey[]} items + * @param {number} retryCount + * @param {number} delay + */ +async function processBatch(items, retryCount = 0, delay = 100) { + console.log(`Processing batch with ${items.length} items...`) + const deleteRequests = items.map((item) => ({ + DeleteRequest: { + Key: marshall(item), + }, + })) + + const batchDeleteCommand = new BatchWriteItemCommand({ + RequestItems: { + [SPACE_DIFF_TABLE_NAME]: deleteRequests, + }, + }) + + try { + const response = await dynamo.send(batchDeleteCommand) + + if (response.UnprocessedItems && response.UnprocessedItems[SPACE_DIFF_TABLE_NAME]) { + const unprocessedItems = /** @type {ItemKey[]} */ ( + response.UnprocessedItems[SPACE_DIFF_TABLE_NAME].map((item) => + unmarshall(/** @type {Record} */ (item.DeleteRequest?.Key)) + ) + ) + if (retryCount < MAX_RETRIES) { + console.log(`Retrying ${unprocessedItems.length} unprocessed items...`) + await sleep(delay) + return processBatch( + unprocessedItems, + retryCount + 1, + delay * 2 // Increase delay exponentially + ) + } else { + console.error( + 'Max retries reached. Some items could not be processed:', + unprocessedItems + ) + } + } + } catch (err) { + console.error('Failed to delete batch!', err) + } +} + +/** + * @param {string} filePath + * @returns {Promise} + */ +async function processFile(filePath) { + return new Promise((resolve, reject) => { + const fileStream = fs.createReadStream(filePath) + const pipeline = fileStream + .pipe(Stream.parser()) + .pipe(StreamArray.streamArray()) + + /** @type {ItemKey[]} */ + let batch = [] + /** @type {(() => Promise)[]} */ + const tasks = [] + + pipeline.on('data', async ({ value }) => { + if (value) { + batch.push(value) + if (batch.length == BATCH_SIZE) { + const copyBatch = batch.slice() + tasks.push(() => processBatch(copyBatch)) + batch = [] + } + } + }) + + pipeline.on('end', async () => { + if (batch.length > 0) { + tasks.push(() => processBatch(batch)) + } + await all(tasks, { concurrency }) + resolve() + }) + pipeline.on('error', reject) + }) +} + +export async function main() { + const file = `items-to-delete.json` + console.log(`Processing ${file}...`) + await processFile(file) +} + +try { + await main() +} catch (e) { + console.error(e) +} diff --git a/billing/scripts/dedupe-space-diff/README.md b/billing/scripts/dedupe-space-diff/README.md new file mode 100644 index 00000000..08404e26 --- /dev/null +++ b/billing/scripts/dedupe-space-diff/README.md @@ -0,0 +1,9 @@ +# Dedup Space Diff Table + +A bug that caused duplicate entries in the `space-diff` table has been identified and fixed. To ensure the table reflects the correct state, it is essential that only one item with a specific `cause` exists. This requires identifying the duplicates and removing the most recently inserted one. + +Please see the steps that need to be followed below: + +1. **Get the data:** Export the table data to S3 and download the `.json.gz` files. +2. **Identify the Duplicates:** run the `1-identify-duplicates.js`, passing the exported files folder path. +3. **Remove the Dyplicates:** run the `2-remove-duplicates.js`. diff --git a/package-lock.json b/package-lock.json index 00f8a032..26599b28 100644 --- a/package-lock.json +++ b/package-lock.json @@ -84,11 +84,13 @@ "lru-cache": "^11.0.0", "multiformats": "^13.1.0", "p-retry": "^6.2.0", + "stream-json": "^1.9.1", "stripe": "^14.2.0", "uint8arrays": "^4.0.6" }, "devDependencies": { "@types/big.js": "^6.2.1", + "@types/stream-json": "^1.7.8", "aws-lambda": "^1.0.7", "c8": "^8.0.1", "csv-parser": "^3.0.0", @@ -11722,6 +11724,25 @@ "dev": true, "peer": true }, + "node_modules/@types/stream-chain": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/@types/stream-chain/-/stream-chain-2.1.0.tgz", + "integrity": "sha512-guDyAl6s/CAzXUOWpGK2bHvdiopLIwpGu8v10+lb9hnQOyo4oj/ZUQFOvqFjKGsE3wJP1fpIesCcMvbXuWsqOg==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, + "node_modules/@types/stream-json": { + "version": "1.7.8", + "resolved": "https://registry.npmjs.org/@types/stream-json/-/stream-json-1.7.8.tgz", + "integrity": "sha512-MU1OB1eFLcYWd1LjwKXrxdoPtXSRzRmAnnxs4Js/ayB5O/NvHraWwuOaqMWIebpYwM6khFlsJOHEhI9xK/ab4Q==", + "dev": true, + "dependencies": { + "@types/node": "*", + "@types/stream-chain": "*" + } + }, "node_modules/@types/ws": { "version": "8.5.10", "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.10.tgz", @@ -27312,6 +27333,19 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/stream-chain": { + "version": "2.2.5", + "resolved": "https://registry.npmjs.org/stream-chain/-/stream-chain-2.2.5.tgz", + "integrity": "sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA==" + }, + "node_modules/stream-json": { + "version": "1.9.1", + "resolved": "https://registry.npmjs.org/stream-json/-/stream-json-1.9.1.tgz", + "integrity": "sha512-uWkjJ+2Nt/LO9Z/JyKZbMusL8Dkh97uUBTv3AJQ74y07lVahLY4eEFsPsE97pxYBwr8nnjMAIch5eqI0gPShyw==", + "dependencies": { + "stream-chain": "^2.2.5" + } + }, "node_modules/stream-shift": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz",