Skip to content

Commit

Permalink
Merge pull request #5194 from Lezek123/rework-sync-and-cleanup
Browse files Browse the repository at this point in the history
Colossus: Rework sync and cleanup
  • Loading branch information
mnaamani authored Jan 10, 2025
2 parents 9308382 + 03ff3ba commit 7144462
Show file tree
Hide file tree
Showing 15 changed files with 777 additions and 613 deletions.
10 changes: 10 additions & 0 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
### 4.4.0

- **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized:
- Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory.
- Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`).
- Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000`.
- Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup.
- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid.
- Improved logging during sync and cleanup.

### 4.3.0

- Adds `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup.
Expand Down
552 changes: 260 additions & 292 deletions storage-node/README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion storage-node/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "storage-node",
"description": "Joystream storage subsystem.",
"version": "4.3.0",
"version": "4.4.0",
"author": "Joystream contributors",
"bin": {
"storage-node": "./bin/run"
Expand Down Expand Up @@ -54,6 +54,7 @@
"multihashes": "^4.0.2",
"node-cache": "^5.1.2",
"openapi-editor": "^0.3.0",
"p-limit": "^3",
"promise-timeout": "^1.3.0",
"proper-lockfile": "^4.1.2",
"react": "^18.2.0",
Expand Down
32 changes: 29 additions & 3 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,29 @@ export default class Server extends ApiCommandBase {
description: 'Interval before retrying failed synchronization run (in minutes)',
default: 3,
}),
syncBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during synchronization.',
default: 10_000,
}),
cleanup: flags.boolean({
char: 'c',
description: 'Enable cleanup/pruning of no-longer assigned assets.',
default: false,
}),
cleanupBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during cleanup.',
default: 10_000,
}),
cleanupInterval: flags.integer({
char: 'i',
description: 'Interval between periodic cleanup actions (in minutes)',
default: 360,
}),
cleanupWorkersNumber: flags.integer({
required: false,
description: 'Cleanup workers number (max async operations in progress).',
default: 100,
}),
storageSquidEndpoint: flags.string({
char: 'q',
required: true,
Expand Down Expand Up @@ -299,6 +312,7 @@ Supported values: warn, error, debug, info. Default:debug`,
flags.syncWorkersTimeout,
flags.syncInterval,
flags.syncRetryInterval,
flags.syncBatchSize,
X_HOST_ID
),
0
Expand All @@ -319,8 +333,9 @@ Supported values: warn, error, debug, info. Default:debug`,
api,
qnApi,
flags.uploads,
flags.syncWorkersNumber,
flags.cleanupWorkersNumber,
flags.cleanupInterval,
flags.cleanupBatchSize,
X_HOST_ID
),
0
Expand Down Expand Up @@ -397,14 +412,24 @@ async function runSyncWithInterval(
syncWorkersTimeout: number,
syncIntervalMinutes: number,
syncRetryIntervalMinutes: number,
syncBatchSize: number,
hostId: string
) {
const sleepInterval = syncIntervalMinutes * 60 * 1000
const retrySleepInterval = syncRetryIntervalMinutes * 60 * 1000
while (true) {
try {
logger.info(`Resume syncing....`)
await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId)
await performSync(
buckets,
syncWorkersNumber,
syncWorkersTimeout,
qnApi,
uploadsDirectory,
tempDirectory,
syncBatchSize,
hostId
)
logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`)
await sleep(sleepInterval)
} catch (err) {
Expand Down Expand Up @@ -434,6 +459,7 @@ async function runCleanupWithInterval(
uploadsDirectory: string,
syncWorkersNumber: number,
cleanupIntervalMinutes: number,
cleanupBatchSize: number,
hostId: string
) {
const sleepInterval = cleanupIntervalMinutes * 60 * 1000
Expand All @@ -442,7 +468,7 @@ async function runCleanupWithInterval(
await sleep(sleepInterval)
try {
logger.info(`Resume cleanup....`)
await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, hostId)
await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, cleanupBatchSize, hostId)
} catch (err) {
logger.error(`Critical cleanup error: ${err}`)
}
Expand Down
14 changes: 13 additions & 1 deletion storage-node/src/commands/util/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export default class Cleanup extends ApiCommandBase {
required: true,
description: 'The buckerId to sync prune/cleanup',
}),
cleanupBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during cleanup.',
default: 10_000,
}),
cleanupWorkersNumber: flags.integer({
char: 'p',
required: false,
Expand Down Expand Up @@ -57,7 +61,15 @@ export default class Cleanup extends ApiCommandBase {
logger.info('Cleanup...')

try {
await performCleanup([bucketId], flags.cleanupWorkersNumber, api, qnApi, flags.uploads, '')
await performCleanup(
[bucketId],
flags.cleanupWorkersNumber,
api,
qnApi,
flags.uploads,
flags.cleanupBatchSize,
''
)
} catch (err) {
logger.error(err)
logger.error(stringify(err))
Expand Down
5 changes: 5 additions & 0 deletions storage-node/src/commands/util/fetch-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export default class FetchBucket extends Command {
description: 'Asset downloading timeout for the syncronization (in minutes).',
default: 30,
}),
syncBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch.',
default: 10_000,
}),
queryNodeEndpoint: flags.string({
char: 'q',
required: false,
Expand Down Expand Up @@ -74,6 +78,7 @@ export default class FetchBucket extends Command {
qnApi,
flags.uploads,
flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'),
flags.syncBatchSize,
'',
flags.dataSourceOperatorUrl
)
Expand Down
71 changes: 40 additions & 31 deletions storage-node/src/services/archive/ArchiveService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
OBJECTS_TRACKING_FILENAME,
} from './tracking'
import { QueryNodeApi } from '../queryNode/api'
import { getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { getDataObjectsByIDs, getStorageObligationsFromRuntime } from '../sync/storageObligations'
import { getDownloadTasks } from '../sync/synchronizer'
import sleep from 'sleep-promise'
import { Logger } from 'winston'
Expand Down Expand Up @@ -369,40 +369,49 @@ export class ArchiveService {
public async performSync(): Promise<void> {
const model = await getStorageObligationsFromRuntime(this.queryNodeApi)

const assignedObjects = model.dataObjects
const added = assignedObjects.filter((obj) => !this.objectTrackingService.isTracked(obj.id))
added.sort((a, b) => parseInt(b.id) - parseInt(a.id))
const unsyncedIds = (await model.getAssignedDataObjectIds(true))
.filter((id) => !this.objectTrackingService.isTracked(id))
.map((id) => parseInt(id))
// Sort unsynced ids in ASCENDING order (oldest first)
.sort((a, b) => a - b)

this.logger.info(`Sync - new objects: ${added.length}`)
this.logger.info(`Sync - new objects: ${unsyncedIds.length}`)

// Add new download tasks while the upload dir size limit allows
while (added.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = added.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
// Sync objects in batches of 10_000
for (const unsyncedIdsBatch of _.chunk(unsyncedIds, 10_000)) {
const objectIdsBatch = unsyncedIdsBatch.map((id) => id.toString())
// Sort objectsBatch by ids in DESCENDING order (because we're using .pop() to get the next object)
const objectsBatch = (await getDataObjectsByIDs(this.queryNodeApi, objectIdsBatch)).sort(
(a, b) => parseInt(b.id) - parseInt(a.id)
)
// Add new download tasks while the upload dir size limit allows
while (objectsBatch.length) {
const uploadDirectorySize = await this.getUploadDirSize()
while (true) {
const object = objectsBatch.pop()
if (!object) {
break
}
if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) {
this.logger.debug(
`Waiting for some disk space to free ` +
`(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` +
`sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... `
)
objectsBatch.push(object)
await sleep(60_000)
break
}
const [downloadTask] = await getDownloadTasks(
model,
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
added.push(object)
await sleep(60_000)
break
await this.addDownloadTask(downloadTask, object.size)
}
const [downloadTask] = await getDownloadTasks(
model,
[],
[object],
this.uploadQueueDir,
this.tmpDownloadDir,
this.syncWorkersTimeout,
this.hostId
)
await this.addDownloadTask(downloadTask, object.size)
}
}
}
Expand Down
Loading

0 comments on commit 7144462

Please sign in to comment.