From 92ecb3dee3275cab6eafef1d4db0f770590202a6 Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Mon, 14 Nov 2022 13:52:24 -0800 Subject: [PATCH] fix potential fd leakage for SegmentProcessorFramework --- .../framework/SegmentProcessorFramework.java | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index 7a1b81fe27c0..25f2b82eba06 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -142,28 +142,31 @@ public List process() for (Map.Entry entry : partitionToFileManagerMap.entrySet()) { String partitionId = entry.getKey(); GenericRowFileManager fileManager = entry.getValue(); - GenericRowFileReader fileReader = fileManager.getFileReader(); - int numRows = fileReader.getNumRows(); - int numSortFields = fileReader.getNumSortFields(); - LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", partitionId, numRows, - numSortFields); - GenericRowFileRecordReader recordReader = fileReader.getRecordReader(); - for (int startRowId = 0; startRowId < numRows; startRowId += maxNumRecordsPerSegment, sequenceId++) { - int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows); - LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId, - endRowId); - observer.accept(String.format( - "Creating segment of sequentId: %d with data from partition: %s and row range: [%d, %d) out of [0, %d)", - sequenceId, partitionId, startRowId, endRowId, numRows)); - generatorConfig.setSequenceId(sequenceId); - GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); - SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); - driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), - TransformPipeline.getPassThroughPipeline()); - driver.build(); - outputSegmentDirs.add(driver.getOutputDirectory()); + try { + GenericRowFileReader fileReader = fileManager.getFileReader(); + int numRows = fileReader.getNumRows(); + int numSortFields = fileReader.getNumSortFields(); + LOGGER.info("Start creating segments on partition: {}, numRows: {}, numSortFields: {}", partitionId, numRows, + numSortFields); + GenericRowFileRecordReader recordReader = fileReader.getRecordReader(); + for (int startRowId = 0; startRowId < numRows; startRowId += maxNumRecordsPerSegment, sequenceId++) { + int endRowId = Math.min(startRowId + maxNumRecordsPerSegment, numRows); + LOGGER.info("Start creating segment of sequenceId: {} with row range: {} to {}", sequenceId, startRowId, + endRowId); + observer.accept(String.format( + "Creating segment of sequentId: %d with data from partition: %s and row range: [%d, %d) out of [0, %d)", + sequenceId, partitionId, startRowId, endRowId, numRows)); + generatorConfig.setSequenceId(sequenceId); + GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange), + TransformPipeline.getPassThroughPipeline()); + driver.build(); + outputSegmentDirs.add(driver.getOutputDirectory()); + } + } finally { + fileManager.cleanUp(); } - fileManager.cleanUp(); } FileUtils.deleteDirectory(_mapperOutputDir); FileUtils.deleteDirectory(_reducerOutputDir);