Skip to content

Commit

Permalink
Configurable cleanup/sync batch size and workersNum, limit async requ…
Browse files Browse the repository at this point in the history
…ests during cleanup, improve logging
  • Loading branch information
Lezek123 committed Jan 8, 2025
1 parent 1e76ce8 commit 6fcd6a7
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 36 deletions.
6 changes: 4 additions & 2 deletions storage-node/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
### 4.4.0

- **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized:
- Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory.
- Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory.
- Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`).
- Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000`
- Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup.
- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid.
- Improved logging during cleanup.
- Improved logging during sync and cleanup.

### 4.3.0

Expand Down
1 change: 1 addition & 0 deletions storage-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
"multihashes": "^4.0.2",
"node-cache": "^5.1.2",
"openapi-editor": "^0.3.0",
"p-limit": "^3",
"promise-timeout": "^1.3.0",
"proper-lockfile": "^4.1.2",
"react": "^18.2.0",
Expand Down
32 changes: 29 additions & 3 deletions storage-node/src/commands/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,29 @@ export default class Server extends ApiCommandBase {
description: 'Interval before retrying failed synchronization run (in minutes)',
default: 3,
}),
syncBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during synchronization.',
default: 10_000,
}),
cleanup: flags.boolean({
char: 'c',
description: 'Enable cleanup/pruning of no-longer assigned assets.',
default: false,
}),
cleanupBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during cleanup.',
default: 10_000,
}),
cleanupInterval: flags.integer({
char: 'i',
description: 'Interval between periodic cleanup actions (in minutes)',
default: 360,
}),
cleanupWorkersNumber: flags.integer({
required: false,
description: 'Cleanup workers number (max async operations in progress).',
default: 100,
}),
storageSquidEndpoint: flags.string({
char: 'q',
required: true,
Expand Down Expand Up @@ -299,6 +312,7 @@ Supported values: warn, error, debug, info. Default:debug`,
flags.syncWorkersTimeout,
flags.syncInterval,
flags.syncRetryInterval,
flags.syncBatchSize,
X_HOST_ID
),
0
Expand All @@ -319,8 +333,9 @@ Supported values: warn, error, debug, info. Default:debug`,
api,
qnApi,
flags.uploads,
flags.syncWorkersNumber,
flags.cleanupWorkersNumber,
flags.cleanupInterval,
flags.cleanupBatchSize,
X_HOST_ID
),
0
Expand Down Expand Up @@ -397,14 +412,24 @@ async function runSyncWithInterval(
syncWorkersTimeout: number,
syncIntervalMinutes: number,
syncRetryIntervalMinutes: number,
syncBatchSize: number,
hostId: string
) {
const sleepInterval = syncIntervalMinutes * 60 * 1000
const retrySleepInterval = syncRetryIntervalMinutes * 60 * 1000
while (true) {
try {
logger.info(`Resume syncing....`)
await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId)
await performSync(
buckets,
syncWorkersNumber,
syncWorkersTimeout,
qnApi,
uploadsDirectory,
tempDirectory,
syncBatchSize,
hostId
)
logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`)
await sleep(sleepInterval)
} catch (err) {
Expand Down Expand Up @@ -434,6 +459,7 @@ async function runCleanupWithInterval(
uploadsDirectory: string,
syncWorkersNumber: number,
cleanupIntervalMinutes: number,
cleanupBatchSize: number,
hostId: string
) {
const sleepInterval = cleanupIntervalMinutes * 60 * 1000
Expand All @@ -442,7 +468,7 @@ async function runCleanupWithInterval(
await sleep(sleepInterval)
try {
logger.info(`Resume cleanup....`)
await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, hostId)
await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, cleanupBatchSize, hostId)
} catch (err) {
logger.error(`Critical cleanup error: ${err}`)
}
Expand Down
14 changes: 13 additions & 1 deletion storage-node/src/commands/util/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export default class Cleanup extends ApiCommandBase {
required: true,
description: 'The buckerId to sync prune/cleanup',
}),
cleanupBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch during cleanup.',
default: 10_000,
}),
cleanupWorkersNumber: flags.integer({
char: 'p',
required: false,
Expand Down Expand Up @@ -57,7 +61,15 @@ export default class Cleanup extends ApiCommandBase {
logger.info('Cleanup...')

try {
await performCleanup([bucketId], flags.cleanupWorkersNumber, api, qnApi, flags.uploads, '')
await performCleanup(
[bucketId],
flags.cleanupWorkersNumber,
api,
qnApi,
flags.uploads,
flags.cleanupBatchSize,
''
)
} catch (err) {
logger.error(err)
logger.error(stringify(err))
Expand Down
5 changes: 5 additions & 0 deletions storage-node/src/commands/util/fetch-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export default class FetchBucket extends Command {
description: 'Asset downloading timeout for the syncronization (in minutes).',
default: 30,
}),
syncBatchSize: flags.integer({
description: 'Maximum number of objects to process in a single batch.',
default: 10_000,
}),
queryNodeEndpoint: flags.string({
char: 'q',
required: false,
Expand Down Expand Up @@ -74,6 +78,7 @@ export default class FetchBucket extends Command {
qnApi,
flags.uploads,
flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'),
flags.syncBatchSize,
'',
flags.dataSourceOperatorUrl
)
Expand Down
101 changes: 75 additions & 26 deletions storage-node/src/services/sync/cleanupService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { DeleteLocalFileTask } from './tasks'
import { TaskProcessorSpawner, WorkingStack } from '../processing/workingProcess'
import { DataObjectWithBagDetailsFragment } from '../queryNode/generated/queries'
import { Logger } from 'winston'
import pLimit from 'p-limit'

/**
* The maximum allowed threshold by which the QN processor can lag behind
Expand Down Expand Up @@ -41,21 +42,21 @@ export const MINIMUM_REPLICATION_THRESHOLD = parseInt(process.env.CLEANUP_MIN_RE
* - If the asset being pruned from this storage-node is currently being downloaded
* by some external actors, then the cleanup action for this asset would be postponed
*
* @param api - (optional) runtime API promise
* @param workerId - current storage provider ID
* @param buckets - Selected storage buckets
* @param buckets - selected storage buckets
* @param asyncWorkersNumber - maximum parallel cleanups number
* @param asyncWorkersTimeout - downloading asset timeout
* @param api - runtime API promise
* @param qnApi - Query Node API
* @param uploadDirectory - local directory to get file names from
* @param tempDirectory - local directory for temporary data uploading
* @param batchSize - max. number of data objects to process in a single batch
* @param hostId
*/
export async function performCleanup(
buckets: string[],
asyncWorkersNumber: number,
api: ApiPromise,
qnApi: QueryNodeApi,
uploadDirectory: string,
batchSize: number,
hostId: string
): Promise<void> {
const logger = rootLogger.child({ label: 'Cleanup' })
Expand Down Expand Up @@ -98,11 +99,11 @@ export async function performCleanup(
const workingStack = new WorkingStack()
const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber)

// Execute deleted objects removal tasks in batches of 10_000
// Execute deleted objects removal tasks in batches
if (deletedDataObjectIds.size) {
let deletedProcessed = 0
logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`)
for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) {
for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], batchSize)) {
// Confirm whether the objects were actually deleted by fetching the related deletion events
const dataObjectDeletedEvents = await qnApi.getDataObjectDeletedEvents(deletedObjectsIdsBatch)
const confirmedIds = new Set(dataObjectDeletedEvents.map((e) => e.data.dataObjectId))
Expand All @@ -120,26 +121,35 @@ export async function performCleanup(
deletedProcessed += deletedObjectsIdsBatch.length
logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`)
}
logger.info(`${deletedProcessed}/${deletedDataObjectIds.size} deleted data objects successfully cleared.`)
}

// Execute moved objects removal tasks in batches of 10_000
// Execute moved objects removal tasks in batches
if (movedObjectIds.size) {
let movedProcessed = 0
logger.info(`removing ${movedObjectIds.size} moved objects...`)
for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) {
for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], batchSize)) {
const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch)
const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects(
logger,
uploadDirectory,
model,
movedDataObjectsBatch,
asyncWorkersNumber,
hostId
)
const numberOfTasks = deletionTasksOfMovedDataObjects.length
if (numberOfTasks !== movedObjectsIdsBatch.length) {
logger.warn(
`Only ${numberOfTasks} / ${movedObjectsIdsBatch.length} moved objects will be removed in this batch...`
)
}
await workingStack.add(deletionTasksOfMovedDataObjects)
await processSpawner.process()
movedProcessed += movedDataObjectsBatch.length
movedProcessed += numberOfTasks
logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`)
}
logger.info(`${movedProcessed}/${movedObjectIds.size} moved data objects successfully cleared.`)
}
} else {
logger.info('No objects to prune, skipping...')
Expand All @@ -155,40 +165,79 @@ export async function performCleanup(
* @param uploadDirectory - local directory for data uploading
* @param dataObligations - defines the current data obligations for the node
* @param movedDataObjects- obsolete (no longer assigned) data objects that has been moved to other buckets
* @param asyncWorkersNumber - number of async workers assigned for cleanup tasks
* @param hostId - host id of the current node
*/
async function getDeletionTasksFromMovedDataObjects(
logger: Logger,
uploadDirectory: string,
dataObligations: DataObligations,
movedDataObjects: DataObjectWithBagDetailsFragment[],
asyncWorkersNumber: number,
hostId: string
): Promise<DeleteLocalFileTask[]> {
const timeoutMs = 60 * 1000 // 1 minute since it's only a HEAD request
const deletionTasks: DeleteLocalFileTask[] = []

const { bucketOperatorUrlById } = dataObligations
await Promise.allSettled(
movedDataObjects.map(async (movedDataObject) => {
let dataObjectReplicationCount = 0

for (const { storageBucket } of movedDataObject.storageBag.storageBuckets) {
const nodeUrl = bucketOperatorUrlById.get(storageBucket.id)
if (nodeUrl) {
const fileUrl = urljoin(nodeUrl, 'api/v1/files', movedDataObject.id)
const limit = pLimit(asyncWorkersNumber)
let checkedObjects = 0
const checkReplicationThreshold = async (movedDataObject: DataObjectWithBagDetailsFragment) => {
++checkedObjects
if (checkedObjects % asyncWorkersNumber === 0) {
logger.debug(
`Checking replication: ${checkedObjects}/${movedDataObjects.length} (active: ${limit.activeCount}, pending: ${limit.pendingCount})`
)
}

const externaBucketEndpoints = movedDataObject.storageBag.storageBuckets
.map(({ storageBucket: { id } }) => {
return bucketOperatorUrlById.get(id)
})
.filter((url): url is string => !!url)
let lastErr = ''
let successes = 0
let failures = 0

if (externaBucketEndpoints.length >= MINIMUM_REPLICATION_THRESHOLD) {
for (const nodeUrl of externaBucketEndpoints) {
const fileUrl = urljoin(nodeUrl, 'api/v1/files', movedDataObject.id)
try {
await superagent.head(fileUrl).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId)
dataObjectReplicationCount++
++successes
} catch (e) {
++failures
lastErr = e instanceof Error ? e.message : e.toString()
}
if (successes >= MINIMUM_REPLICATION_THRESHOLD) {
break
}
}
}

if (dataObjectReplicationCount < MINIMUM_REPLICATION_THRESHOLD) {
logger.warn(`data object replication threshold unmet - file deletion canceled: ${movedDataObject.id}`)
return
}
if (successes < MINIMUM_REPLICATION_THRESHOLD) {
logger.debug(
`Replication threshold unmet for object ${movedDataObject.id} ` +
`(buckets: ${externaBucketEndpoints.length}, successes: ${successes}, failures: ${failures}). ` +
(lastErr ? `Last error: ${lastErr}. ` : '') +
`File deletion canceled...`
)
return
}

deletionTasks.push(new DeleteLocalFileTask(uploadDirectory, movedDataObject.id))
}

await Promise.all(movedDataObjects.map((movedDataObject) => limit(() => checkReplicationThreshold(movedDataObject))))

const failedCount = movedDataObjects.length - deletionTasks.length
if (failedCount > 0) {
logger.warn(
`Replication threshold was unmet or couldn't be verified for ${failedCount} / ${movedDataObjects.length} objects in the current batch.`
)
}

deletionTasks.push(new DeleteLocalFileTask(uploadDirectory, movedDataObject.id))
})
)
logger.debug('Checking replication: Done')

return deletionTasks
}
2 changes: 1 addition & 1 deletion storage-node/src/services/sync/storageObligations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async function getAllBuckets(api: QueryNodeApi): Promise<StorageBucketDetailsFra
return await getAllObjectsWithPaging(async (offset, limit) => {
const idsPart = ids.slice(offset, offset + limit)
if (!_.isEmpty(idsPart)) {
logger.debug(`Sync - getting all storage buckets: offset = ${offset}, limit = ${limit}`)
logger.debug(`Getting all storage buckets: offset = ${offset}, limit = ${limit}`)
return await api.getStorageBucketDetails(idsPart)
} else {
return false
Expand Down
6 changes: 4 additions & 2 deletions storage-node/src/services/sync/synchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export const PendingDirName = 'pending'
* @param qnApi - Query Node API
* @param uploadDirectory - local directory to get file names from
* @param tempDirectory - local directory for temporary data uploading
* @param batchSize - maximum number of data objects to process in a single batch
* @param selectedOperatorUrl - (optional) defines the data source URL. If not set
* the source URL is resolved for each data object separately using the Query
* Node information about the storage providers.
Expand All @@ -46,6 +47,7 @@ export async function performSync(
qnApi: QueryNodeApi,
uploadDirectory: string,
tempDirectory: string,
batchSize: number,
hostId: string,
selectedOperatorUrl?: string
): Promise<void> {
Expand All @@ -64,10 +66,10 @@ export async function performSync(
const workingStack = new WorkingStack()
const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber)

// Process unsynced objects in batches od 10_000
// Process unsynced objects in batches
logger.debug(`Sync - started processing...`)
let processed = 0
for (const unsyncedIdsBatch of _.chunk(unsyncedObjectIds, 10_000)) {
for (const unsyncedIdsBatch of _.chunk(unsyncedObjectIds, batchSize)) {
const objectsBatch = await getDataObjectsByIDs(qnApi, unsyncedIdsBatch)
const syncTasks = await getDownloadTasks(
model,
Expand Down
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18274,7 +18274,7 @@ p-is-promise@^2.0.0:
resolved "https://registry.npmjs.org/p-is-promise/-/p-is-promise-2.1.0.tgz"
integrity sha512-Y3W0wlRPK8ZMRbNq97l4M5otioeA5lm1z7bkNkxCka8HSPjR0xRWmpCmc9utiaLP9Jb1eD8BgeIxTW4AIF45Pg==

[email protected], p-limit@^3.0.2:
[email protected], p-limit@^3, p-limit@^3.0.2:
version "3.1.0"
resolved "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz"
integrity sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==
Expand Down

0 comments on commit 6fcd6a7

Please sign in to comment.