From ebfce147c47d4ebcde802f4ba4d2281fd2073847 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Wed, 10 Jan 2024 19:58:34 +0400 Subject: [PATCH 1/4] colossus: use Map/Set.has instead of lodash.difference _.difference and _.differenceWith are very slow with large arrays --- storage-node/src/services/caching/localDataObjects.ts | 9 +++++++++ storage-node/src/services/sync/cleanupService.ts | 5 ++--- storage-node/src/services/sync/synchronizer.ts | 9 +++------ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/storage-node/src/services/caching/localDataObjects.ts b/storage-node/src/services/caching/localDataObjects.ts index 65d264200a..920131096c 100644 --- a/storage-node/src/services/caching/localDataObjects.ts +++ b/storage-node/src/services/caching/localDataObjects.ts @@ -113,6 +113,15 @@ export function getDataObjectIdFromCache( } } +/** + * Checks if data object is present. + * @param dataObjectId + * @returns boolean + */ +export function isDataObjectIdInCache(dataObjectId: string): boolean { + return idCache.has(dataObjectId) +} + /** * Returns file names from the local directory, ignoring subfolders. * diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index 89cf68f66c..5642ecd2e0 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -1,4 +1,3 @@ -import _ from 'lodash' import superagent from 'superagent' import urljoin from 'url-join' import { getDataObjectIDs } from '../../services/caching/localDataObjects' @@ -73,8 +72,8 @@ export async function performCleanup( getStorageObligationsFromRuntime(qnApi, buckets), getDataObjectIDs(), ]) - const assignedObjectsIds = model.dataObjects.map((obj) => obj.id) - const removedIds = _.difference(storedObjectsIds, assignedObjectsIds) + const assignedObjectsIds = new Set(model.dataObjects.map((obj) => obj.id)) + const removedIds = storedObjectsIds.filter((id) => !assignedObjectsIds.has(id)) const removedObjects = await getDataObjectsByIDs(qnApi, removedIds) logger.debug(`Cleanup - pruning ${removedIds.length} obsolete objects`) diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index a5685fd00a..0f71033299 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -1,5 +1,4 @@ -import _ from 'lodash' -import { getDataObjectIDs } from '../../services/caching/localDataObjects' +import { isDataObjectIdInCache } from '../../services/caching/localDataObjects' import logger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' @@ -45,15 +44,13 @@ export async function performSync( selectedOperatorUrl?: string ): Promise { logger.info('Started syncing...') - const [model, files] = await Promise.all([getStorageObligationsFromRuntime(qnApi, buckets), getDataObjectIDs()]) + const model = await getStorageObligationsFromRuntime(qnApi, buckets) const required = model.dataObjects - const added = _.differenceWith(required, files, (required, file) => required.id === file) - const removed = _.differenceWith(files, required, (file, required) => file === required.id) + const added = required.filter((obj) => !isDataObjectIdInCache(obj.id)) logger.debug(`Sync - new objects: ${added.length}`) - logger.debug(`Sync - obsolete objects: ${removed.length}`) const workingStack = new WorkingStack() From 995c9320314e5ac19c498435906e1c80be3f0b9e Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Wed, 10 Jan 2024 20:06:22 +0400 Subject: [PATCH 2/4] colossus: task worker - fix to run through all tasks and catch task failures to prevent worker failing --- storage-node/src/services/sync/workingProcess.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/storage-node/src/services/sync/workingProcess.ts b/storage-node/src/services/sync/workingProcess.ts index 59adedf2d0..1fa6ce88dc 100644 --- a/storage-node/src/services/sync/workingProcess.ts +++ b/storage-node/src/services/sync/workingProcess.ts @@ -89,7 +89,12 @@ export class TaskProcessor { if (task !== null) { logger.debug(task.description()) - await task.execute() + try { + await task.execute() + } catch (err) { + // Catch the task failure to avoid the current process worker failing + logger.warn(`task failed: ${err.message}`) + } } else { if (this.exitOnCompletion) { return @@ -126,6 +131,6 @@ export class TaskProcessorSpawner { processes.push(processor.process()) } - await Promise.all(processes) + await Promise.allSettled(processes) } } From e6ab052ac85b71b5a30e91d2ec7736fe32e68b27 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Wed, 10 Jan 2024 20:10:18 +0400 Subject: [PATCH 3/4] colossus: bump package to v3.10.2 --- storage-node/CHANGELOG.md | 5 +++++ storage-node/package.json | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index 6e383a1d48..d3f90a09c1 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -1,3 +1,8 @@ +### 3.10.2 +- Fix processing large arrays causing high cpu during sync and cleanup runs [#5033](https://github.com/Joystream/joystream/pull/5033) + +- Fix task runner to avoid ending prematurely on individual task failure [#5033](https://github.com/Joystream/joystream/pull/5033) + ### 3.10.1 - Bug fix: call stack size exceeded error - [#5021](https://github.com/Joystream/joystream/pull/5021) diff --git a/storage-node/package.json b/storage-node/package.json index 02f216ac09..23c8588254 100644 --- a/storage-node/package.json +++ b/storage-node/package.json @@ -1,7 +1,7 @@ { "name": "storage-node", "description": "Joystream storage subsystem.", - "version": "3.10.1", + "version": "3.10.2", "author": "Joystream contributors", "bin": { "storage-node": "./bin/run" From 6a04fd03ee0d9ae99cda94a7680263d882169bd2 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Thu, 11 Jan 2024 17:32:45 +0400 Subject: [PATCH 4/4] colossus: keep use of _.difference and log removed ids in sync run --- storage-node/src/commands/util/fetch-bucket.ts | 11 ++++------- storage-node/src/services/sync/cleanupService.ts | 12 ++++++------ storage-node/src/services/sync/synchronizer.ts | 11 ++++++++--- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/storage-node/src/commands/util/fetch-bucket.ts b/storage-node/src/commands/util/fetch-bucket.ts index c46ce98f4c..fc3fc2e0ad 100644 --- a/storage-node/src/commands/util/fetch-bucket.ts +++ b/storage-node/src/commands/util/fetch-bucket.ts @@ -4,10 +4,10 @@ import { QueryNodeApi } from '../..//services/queryNode/api' import logger from '../../services/logger' import stringify from 'fast-safe-stringify' import path from 'path' - +import { loadDataObjectIdCache } from '../../services/caching/localDataObjects' /** * CLI command: - * Fetch all data objects from a bucket into local store. + * Fetch data objects assigned to assigned bucket from remote node(s) into local store. * * @remarks * Should not be executed while server is running. @@ -18,11 +18,6 @@ export default class FetchBucket extends Command { static flags = { help: flags.help({ char: 'h' }), - workerId: flags.integer({ - char: 'w', - required: true, - description: 'Storage node operator worker ID.', - }), bucketId: flags.integer({ char: 'b', required: true, @@ -66,6 +61,8 @@ export default class FetchBucket extends Command { const { flags } = this.parse(FetchBucket) const bucketId = flags.bucketId.toString() const qnApi = new QueryNodeApi(flags.queryNodeEndpoint) + await loadDataObjectIdCache(flags.uploads) + logger.info('Fetching bucket...') try { diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index 5642ecd2e0..ef3afe7d76 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -7,6 +7,7 @@ import { DataObjectDetailsFragment } from '../queryNode/generated/queries' import { DataObligations, getDataObjectsByIDs, getStorageObligationsFromRuntime } from './storageObligations' import { DeleteLocalFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from './workingProcess' +import _ from 'lodash' /** * The maximum allowed threshold by which the QN processor can lag behind @@ -68,12 +69,11 @@ export async function performCleanup( ) } - const [model, storedObjectsIds] = await Promise.all([ - getStorageObligationsFromRuntime(qnApi, buckets), - getDataObjectIDs(), - ]) - const assignedObjectsIds = new Set(model.dataObjects.map((obj) => obj.id)) - const removedIds = storedObjectsIds.filter((id) => !assignedObjectsIds.has(id)) + const model = await getStorageObligationsFromRuntime(qnApi, buckets) + const storedObjectsIds = getDataObjectIDs() + + const assignedObjectsIds = model.dataObjects.map((obj) => obj.id) + const removedIds = _.difference(storedObjectsIds, assignedObjectsIds) const removedObjects = await getDataObjectsByIDs(qnApi, removedIds) logger.debug(`Cleanup - pruning ${removedIds.length} obsolete objects`) diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index 0f71033299..a3298779f7 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -1,9 +1,10 @@ -import { isDataObjectIdInCache } from '../../services/caching/localDataObjects' +import { getDataObjectIDs, isDataObjectIdInCache } from '../../services/caching/localDataObjects' import logger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' import { DownloadFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from './workingProcess' +import _ from 'lodash' /** * Temporary directory name for data uploading. @@ -45,12 +46,16 @@ export async function performSync( ): Promise { logger.info('Started syncing...') const model = await getStorageObligationsFromRuntime(qnApi, buckets) + const storedObjectIds = getDataObjectIDs() - const required = model.dataObjects + const assignedObjects = model.dataObjects + const assignedObjectIds = assignedObjects.map((obj) => obj.id) - const added = required.filter((obj) => !isDataObjectIdInCache(obj.id)) + const added = assignedObjects.filter((obj) => !isDataObjectIdInCache(obj.id)) + const removed = _.difference(storedObjectIds, assignedObjectIds) logger.debug(`Sync - new objects: ${added.length}`) + logger.debug(`Sync - obsolete objects: ${removed.length}`) const workingStack = new WorkingStack()