Skip to content

Commit

Permalink
Improve progress reporter in SegmentCreationMapper (#8129)
Browse files Browse the repository at this point in the history
Currently, the progress reporter only runs during the segment
creation phase. We stop the reporter; however, it is possible
that the tarring & copying from local to remote location for
large file can take long. This code change makes the progress
reporter runs until the end of `map()` in SegmentCreationMapper.
  • Loading branch information
Seunghyun Lee authored Feb 5, 2022
1 parent 9785ea3 commit 8bbf93a
Showing 1 changed file with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,35 @@ protected void map(LongWritable key, Text value, Context context)
driver.init(segmentGeneratorConfig);
validateSchema(driver.getIngestionSchemaValidator());
driver.build();
String segmentName = driver.getSegmentName();
_logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);

File localSegmentDir = new File(_localSegmentDir, segmentName);
String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
_logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);

long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
_logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));

Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
if (_useRelativePath) {
Path relativeOutputPath =
getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(),
hdfsInputFile.toUri(), _hdfsSegmentTarDir);
hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
}
_logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
.copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);

context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
_logger
.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
sequenceId);
} catch (Exception e) {
_logger.error("Caught exception while creating segment with HDFS input file: {}, sequence id: {}", hdfsInputFile,
sequenceId, e);
Expand All @@ -286,34 +315,6 @@ protected void map(LongWritable key, Text value, Context context)
_logger.error("Failed to interrupt progress reporter thread: {}", progressReporterThread);
}
}
String segmentName = driver.getSegmentName();
_logger.info("Finish creating segment: {} with sequence id: {}", segmentName, sequenceId);

File localSegmentDir = new File(_localSegmentDir, segmentName);
String segmentTarFileName = segmentName + JobConfigConstants.TAR_GZ_FILE_EXT;
File localSegmentTarFile = new File(_localSegmentTarDir, segmentTarFileName);
_logger.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);

long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
_logger.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));

Path hdfsSegmentTarFile = new Path(_hdfsSegmentTarDir, segmentTarFileName);
if (_useRelativePath) {
Path relativeOutputPath =
getRelativeOutputPath(new Path(_jobConf.get(JobConfigConstants.PATH_TO_INPUT)).toUri(), hdfsInputFile.toUri(),
_hdfsSegmentTarDir);
hdfsSegmentTarFile = new Path(relativeOutputPath, segmentTarFileName);
}
_logger.info("Copying segment tar file from: {} to: {}", localSegmentTarFile, hdfsSegmentTarFile);
FileSystem.get(hdfsSegmentTarFile.toUri(), _jobConf)
.copyFromLocalFile(true, true, new Path(localSegmentTarFile.getAbsolutePath()), hdfsSegmentTarFile);

context.write(new LongWritable(sequenceId), new Text(segmentTarFileName));
_logger.info("Finish generating segment: {} with HDFS input file: {}, sequence id: {}", segmentName, hdfsInputFile,
sequenceId);
}

protected FileFormat getFileFormat(String fileName) {
Expand Down

0 comments on commit 8bbf93a

Please sign in to comment.