Skip to content

Commit

Permalink
Remove blocks available in DBS but not in Rucio
Browse files Browse the repository at this point in the history
  • Loading branch information
amaltaro committed Dec 9, 2024
1 parent 65dea1a commit 41d9f8b
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
blockDict = {}
for dataset in datasets:
# using the original dataset, resolve blocks, files and number of events with DBS
fCounter = 0
for fileInfo in dbsReader.getFileListByDataset(dataset=dataset, detail=True):
blockDict.setdefault(fileInfo['block_name'], {'FileList': [],
'NumberOfEvents': 0,
'PhEDExNodeNames': []})
blockDict[fileInfo['block_name']]['FileList'].append(fileInfo['logical_file_name'])
blockDict[fileInfo['block_name']]['NumberOfEvents'] += fileInfo['event_count']
fCounter += 1

logging.info(f"Found {len(blockDict)} blocks in DBS for dataset {dataset} with {fCounter} files")
self._getDatasetLocation(dataset, blockDict, msPileupUrl)

resultDict[pileupType] = blockDict
Expand Down Expand Up @@ -112,12 +115,15 @@ def _getDatasetLocation(self, dset, blockDict, msPileupUrl):
blockReplicas = self.rucio.getBlocksInContainer(container=dset, scope=puScope)
logging.info(f"Found {len(blockReplicas)} blocks in container {dset} for scope {puScope}")

# Finally, update blocks present in Rucio with the MSPileup currentRSEs
for blockName in blockReplicas:
try:
# Finally, update blocks present in Rucio with the MSPileup currentRSEs.
# Blocks not present in Rucio - hence only in DBS - are meant to be removed.
for blockName in list(blockDict):
if blockName not in blockReplicas:
logging.warning(f"Block {blockName} present in DBS but not in Rucio. Removing it.")
blockDict.pop(blockName)
else:
blockDict[blockName]['PhEDExNodeNames'] = doc["currentRSEs"]
except KeyError:
logging.warning(f"Block {blockName} present in Rucio but not in DBS")
logging.info(f"Final pileup dataset {dset} has a total of {len(blockDict)} blocks.")

def _getCacheFilePath(self, stepHelper):

Expand Down

0 comments on commit 41d9f8b

Please sign in to comment.