Skip to content

Commit

Permalink
colossus: keep use of _.difference and log removed ids in sync run
Browse files Browse the repository at this point in the history
  • Loading branch information
mnaamani committed Jan 11, 2024
1 parent e6ab052 commit 6a04fd0
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 16 deletions.
11 changes: 4 additions & 7 deletions storage-node/src/commands/util/fetch-bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import { QueryNodeApi } from '../..//services/queryNode/api'
import logger from '../../services/logger'
import stringify from 'fast-safe-stringify'
import path from 'path'

import { loadDataObjectIdCache } from '../../services/caching/localDataObjects'
/**
* CLI command:
* Fetch all data objects from a bucket into local store.
* Fetch data objects assigned to assigned bucket from remote node(s) into local store.
*
* @remarks
* Should not be executed while server is running.
Expand All @@ -18,11 +18,6 @@ export default class FetchBucket extends Command {

static flags = {
help: flags.help({ char: 'h' }),
workerId: flags.integer({
char: 'w',
required: true,
description: 'Storage node operator worker ID.',
}),
bucketId: flags.integer({
char: 'b',
required: true,
Expand Down Expand Up @@ -66,6 +61,8 @@ export default class FetchBucket extends Command {
const { flags } = this.parse(FetchBucket)
const bucketId = flags.bucketId.toString()
const qnApi = new QueryNodeApi(flags.queryNodeEndpoint)
await loadDataObjectIdCache(flags.uploads)

logger.info('Fetching bucket...')

try {
Expand Down
12 changes: 6 additions & 6 deletions storage-node/src/services/sync/cleanupService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { DataObjectDetailsFragment } from '../queryNode/generated/queries'
import { DataObligations, getDataObjectsByIDs, getStorageObligationsFromRuntime } from './storageObligations'
import { DeleteLocalFileTask } from './tasks'
import { TaskProcessorSpawner, WorkingStack } from './workingProcess'
import _ from 'lodash'

/**
* The maximum allowed threshold by which the QN processor can lag behind
Expand Down Expand Up @@ -68,12 +69,11 @@ export async function performCleanup(
)
}

const [model, storedObjectsIds] = await Promise.all([
getStorageObligationsFromRuntime(qnApi, buckets),
getDataObjectIDs(),
])
const assignedObjectsIds = new Set(model.dataObjects.map((obj) => obj.id))
const removedIds = storedObjectsIds.filter((id) => !assignedObjectsIds.has(id))
const model = await getStorageObligationsFromRuntime(qnApi, buckets)
const storedObjectsIds = getDataObjectIDs()

const assignedObjectsIds = model.dataObjects.map((obj) => obj.id)
const removedIds = _.difference(storedObjectsIds, assignedObjectsIds)
const removedObjects = await getDataObjectsByIDs(qnApi, removedIds)

logger.debug(`Cleanup - pruning ${removedIds.length} obsolete objects`)
Expand Down
11 changes: 8 additions & 3 deletions storage-node/src/services/sync/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { isDataObjectIdInCache } from '../../services/caching/localDataObjects'
import { getDataObjectIDs, isDataObjectIdInCache } from '../../services/caching/localDataObjects'
import logger from '../../services/logger'
import { QueryNodeApi } from '../queryNode/api'
import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations'
import { DownloadFileTask } from './tasks'
import { TaskProcessorSpawner, WorkingStack } from './workingProcess'
import _ from 'lodash'

/**
* Temporary directory name for data uploading.
Expand Down Expand Up @@ -45,12 +46,16 @@ export async function performSync(
): Promise<void> {
logger.info('Started syncing...')
const model = await getStorageObligationsFromRuntime(qnApi, buckets)
const storedObjectIds = getDataObjectIDs()

const required = model.dataObjects
const assignedObjects = model.dataObjects
const assignedObjectIds = assignedObjects.map((obj) => obj.id)

const added = required.filter((obj) => !isDataObjectIdInCache(obj.id))
const added = assignedObjects.filter((obj) => !isDataObjectIdInCache(obj.id))
const removed = _.difference(storedObjectIds, assignedObjectIds)

logger.debug(`Sync - new objects: ${added.length}`)
logger.debug(`Sync - obsolete objects: ${removed.length}`)

const workingStack = new WorkingStack()

Expand Down

0 comments on commit 6a04fd0

Please sign in to comment.