Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Colossus: Rework sync and cleanup #5194

Merged
merged 10 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
### 4.4.0

- **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized:
- Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory.
- Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`).
- Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000`.
- Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup.
- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid.
- Improved logging during sync and cleanup.

### 4.3.0

- Adds `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup.
Expand Down
552 changes: 260 additions & 292 deletions storage-node/README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion storage-node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "storage-node",
"description": "Joystream storage subsystem.",
"version": "4.3.0",
"version": "4.4.0",
"author": "Joystream contributors",
"bin": {
"storage-node": "./bin/run"
Expand Down Expand Up @@ -54,6 +54,7 @@
"multihashes": "^4.0.2",
"node-cache": "^5.1.2",
"openapi-editor": "^0.3.0",
"p-limit": "^3",
"promise-timeout": "^1.3.0",
"proper-lockfile": "^4.1.2",
"react": "^18.2.0",
Expand Down
32 changes: 29 additions & 3 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,29 @@ export default class Server extends ApiCommandBase {
description: 'Interval before retrying failed synchronization run (in minutes)',
default: 3,
}),
syncBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during synchronization.',
default: 10_000,
}),
cleanup: flags.boolean({
char: 'c',
description: 'Enable cleanup/pruning of no-longer assigned assets.',
default: false,
}),
cleanupBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during cleanup.',
default: 10_000,
}),
cleanupInterval: flags.integer({
char: 'i',
description: 'Interval between periodic cleanup actions (in minutes)',
default: 360,
}),
cleanupWorkersNumber: flags.integer({
required: false,
description: 'Cleanup workers number (max async operations in progress).',
default: 100,
}),
storageSquidEndpoint: flags.string({
char: 'q',
required: true,
Expand Down Expand Up @@ -299,6 +312,7 @@ Supported values: warn, error, debug, info. Default:debug`,
flags.syncWorkersTimeout,
flags.syncInterval,
flags.syncRetryInterval,
flags.syncBatchSize,
X_HOST_ID
),
0
Expand All @@ -319,8 +333,9 @@ Supported values: warn, error, debug, info. Default:debug`,
api,
qnApi,
flags.uploads,
flags.syncWorkersNumber,
flags.cleanupWorkersNumber,
flags.cleanupInterval,
flags.cleanupBatchSize,
X_HOST_ID
),
0
Expand Down Expand Up @@ -397,14 +412,24 @@ async function runSyncWithInterval(
syncWorkersTimeout: number,
syncIntervalMinutes: number,
syncRetryIntervalMinutes: number,
syncBatchSize: number,
hostId: string
) {
const sleepInterval = syncIntervalMinutes * 60 * 1000
const retrySleepInterval = syncRetryIntervalMinutes * 60 * 1000
while (true) {
try {
logger.info(`Resume syncing....`)
await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId)
await performSync(
buckets,
syncWorkersNumber,
syncWorkersTimeout,
qnApi,
uploadsDirectory,
tempDirectory,
syncBatchSize,
hostId
)
logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`)
await sleep(sleepInterval)
} catch (err) {
Expand Down Expand Up @@ -434,6 +459,7 @@ async function runCleanupWithInterval(
uploadsDirectory: string,
syncWorkersNumber: number,
cleanupIntervalMinutes: number,
cleanupBatchSize: number,
hostId: string
) {
const sleepInterval = cleanupIntervalMinutes * 60 * 1000
Expand All @@ -442,7 +468,7 @@ async function runCleanupWithInterval(
await sleep(sleepInterval)
try {
logger.info(`Resume cleanup....`)
await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, hostId)
await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, cleanupBatchSize, hostId)
} catch (err) {
logger.error(`Critical cleanup error: ${err}`)
}
Expand Down
14 changes: 13 additions & 1 deletion storage-node/src/commands/util/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export default class Cleanup extends ApiCommandBase {
required: true,
description: 'The buckerId to sync prune/cleanup',
}),
cleanupBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during cleanup.',
default: 10_000,
}),
cleanupWorkersNumber: flags.integer({
char: 'p',
required: false,
Expand Down Expand Up @@ -57,7 +61,15 @@ export default class Cleanup extends ApiCommandBase {
logger.info('Cleanup...')

try {
await performCleanup([bucketId], flags.cleanupWorkersNumber, api, qnApi, flags.uploads, '')
await performCleanup(
[bucketId],
flags.cleanupWorkersNumber,
api,
qnApi,
flags.uploads,
flags.cleanupBatchSize,
''
)
} catch (err) {
logger.error(err)
logger.error(stringify(err))
Expand Down
5 changes: 5 additions & 0 deletions storage-node/src/commands/util/fetch-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export default class FetchBucket extends Command {
description: 'Asset downloading timeout for the syncronization (in minutes).',
default: 30,
}),
syncBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch.',
default: 10_000,
}),
queryNodeEndpoint: flags.string({
char: 'q',
required: false,
Expand Down Expand Up @@ -74,6 +78,7 @@ export default class FetchBucket extends Command {
qnApi,
flags.uploads,
flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'),
flags.syncBatchSize,
'',
flags.dataSourceOperatorUrl
)
Expand Down
71 changes: 40 additions & 31 deletions storage-node/src/services/archive/ArchiveService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
OBJECTS_TRACKING_FILENAME,
} from './tracking'
import { QueryNodeApi } from '../queryNode/api'
import { getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { getDataObjectsByIDs, getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { getDownloadTasks } from '../sync/synchronizer'
import sleep from 'sleep-promise'
import { Logger } from 'winston'
Expand Down Expand Up @@ -369,40 +369,49 @@ export class ArchiveService {
public async performSync(): Promise<void> {
const model = await getStorageObligationsFromRuntime(this.queryNodeApi)

const assignedObjects = model.dataObjects
const added = assignedObjects.filter((obj) => !this.objectTrackingService.isTracked(obj.id))
added.sort((a, b) => parseInt(b.id) - parseInt(a.id))
const unsyncedIds = (await model.getAssignedDataObjectIds(true))
.filter((id) => !this.objectTrackingService.isTracked(id))
.map((id) => parseInt(id))
// Sort unsynced ids in ASCENDING order (oldest first)
.sort((a, b) => a - b)

this.logger.info(`Sync - new objects: ${added.length}`)
this.logger.info(`Sync - new objects: ${unsyncedIds.length}`)

// Add new download tasks while the upload dir size limit allows
while (added.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = added.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
// Sync objects in batches of 10_000
for (const unsyncedIdsBatch of _.chunk(unsyncedIds, 10_000)) {
const objectIdsBatch = unsyncedIdsBatch.map((id) => id.toString())
// Sort objectsBatch by ids in DESCENDING order (because we're using .pop() to get the next object)
const objectsBatch = (await getDataObjectsByIDs(this.queryNodeApi, objectIdsBatch)).sort(
(a, b) => parseInt(b.id) - parseInt(a.id)
)
// Add new download tasks while the upload dir size limit allows
while (objectsBatch.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = objectsBatch.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
)
objectsBatch.push(object)
await sleep(60_000)
break
}
const [downloadTask] = await getDownloadTasks(
model,
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
added.push(object)
await sleep(60_000)
break
await this.addDownloadTask(downloadTask, object.size)
}
const [downloadTask] = await getDownloadTasks(
model,
[],
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
await this.addDownloadTask(downloadTask, object.size)
}
}
}
Expand Down
Loading
Loading