Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow segment upload via Metadata in MergeRollup Minion task #9825

Merged
merged 16 commits into from
Dec 6, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
Expand All @@ -70,6 +71,7 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
private static final String SINGLE_LEVEL_CONCAT_TEST_TABLE = "myTable1";
private static final String SINGLE_LEVEL_ROLLUP_TEST_TABLE = "myTable2";
private static final String MULTI_LEVEL_CONCAT_TEST_TABLE = "myTable3";
private static final String SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE = "myTable4";
private static final long TIMEOUT_IN_MS = 10_000L;

protected PinotHelixTaskResourceManager _helixTaskResourceManager;
Expand All @@ -79,15 +81,18 @@ public class MergeRollupMinionClusterIntegrationTest extends BaseClusterIntegrat
protected final File _segmentDir1 = new File(_tempDir, "segmentDir1");
protected final File _segmentDir2 = new File(_tempDir, "segmentDir2");
protected final File _segmentDir3 = new File(_tempDir, "segmentDir3");
protected final File _segmentDir4 = new File(_tempDir, "segmentDir4");
protected final File _tarDir1 = new File(_tempDir, "tarDir1");
protected final File _tarDir2 = new File(_tempDir, "tarDir2");
protected final File _tarDir3 = new File(_tempDir, "tarDir3");
protected final File _tarDir4 = new File(_tempDir, "tarDir4");

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir1, _segmentDir2, _segmentDir3, _tarDir1, _tarDir2,
_tarDir3);
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir,
_segmentDir1, _segmentDir2, _segmentDir3, _segmentDir4,
_tarDir1, _tarDir2, _tarDir3, _tarDir4);

// Start the Pinot cluster
startZk();
Expand All @@ -105,9 +110,12 @@ public void setUp()
getMultiColumnsSegmentPartitionConfig());
TableConfig multiLevelConcatTableConfig =
createOfflineTableConfig(MULTI_LEVEL_CONCAT_TEST_TABLE, getMultiLevelConcatTaskConfig());
TableConfig singleLevelConcatMetadataTableConfig =
createOfflineTableConfig(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, getSingleLevelConcatMetadataTaskConfig());
addTableConfig(singleLevelConcatTableConfig);
addTableConfig(singleLevelRollupTableConfig);
addTableConfig(multiLevelConcatTableConfig);
addTableConfig(singleLevelConcatMetadataTableConfig);

// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
Expand All @@ -119,9 +127,13 @@ public void setUp()
buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, schema, 0, _segmentDir2, _tarDir2, "2");
ClusterIntegrationTestUtils
.buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 0, _segmentDir3, _tarDir3);
ClusterIntegrationTestUtils
.buildSegmentsFromAvro(avroFiles, singleLevelConcatMetadataTableConfig, schema, 0, _segmentDir4, _tarDir4);
uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1);
uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2);
uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
uploadSegments(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE, _tarDir4);


// Set up the H2 connection
setUpH2Connection(avroFiles);
Expand Down Expand Up @@ -160,6 +172,21 @@ private TableTaskConfig getSingleLevelConcatTaskConfig() {
tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
}

private TableTaskConfig getSingleLevelConcatMetadataTaskConfig() {
Map<String, String> tableTaskConfigs = new HashMap<>();
tableTaskConfigs.put("100days.mergeType", "concat");
tableTaskConfigs.put("100days.bufferTimePeriod", "1d");
tableTaskConfigs.put("100days.bucketTimePeriod", "100d");
tableTaskConfigs.put("100days.maxNumRecordsPerSegment", "15000");
tableTaskConfigs.put("100days.maxNumRecordsPerTask", "15000");
tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
tableTaskConfigs.put(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.METADATA.toString());
return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
}

Expand All @@ -169,6 +196,7 @@ private TableTaskConfig getSingleLevelRollupTaskConfig() {
tableTaskConfigs.put("150days.bufferTimePeriod", "1d");
tableTaskConfigs.put("150days.bucketTimePeriod", "150d");
tableTaskConfigs.put("150days.roundBucketTimePeriod", "7d");
tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
}

Expand All @@ -185,6 +213,7 @@ private TableTaskConfig getMultiLevelConcatTaskConfig() {
tableTaskConfigs.put("90days.bucketTimePeriod", "90d");
tableTaskConfigs.put("90days.maxNumRecordsPerSegment", "100000");
tableTaskConfigs.put("90days.maxNumRecordsPerTask", "100000");
tableTaskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
return new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs));
}

Expand Down Expand Up @@ -345,6 +374,119 @@ public void testSingleLevelConcat()
verifyTableDelete(offlineTableName);
}

/**
* Test single level concat task with maxNumRecordPerTask, maxNumRecordPerSegment constraints
* Push type is set to Metadata
*/
@Test
public void testSingleLevelConcatWithMetadataPush()
throws Exception {
// The original segments are time partitioned by month:
// segmentName (totalDocs)
// myTable1_16071_16101_3 (9746)
// myTable1_16102_16129_4 (8690)
// myTable1_16130_16159_5 (9621)
// myTable1_16160_16189_6 (9454)
// myTable1_16190_16220_7 (10329)
// myTable1_16221_16250_8 (10468)
// myTable1_16251_16281_9 (10499)
// myTable1_16282_16312_10 (10196)
// myTable1_16313_16342_11 (9136)
// myTable1_16343_16373_0 (9292)
// myTable1_16374_16404_1 (8736)
// myTable1_16405_16435_2 (9378)

// Expected merge tasks and result segments:
// 1.
// {myTable1_16071_16101_3}
// -> {merged_100days_T1_0_myTable1_16071_16099_0, merged_100days_T1_0_myTable1_16100_16101_1}
// 2.
// {merged_100days_T1_0_myTable1_16100_16101_1, myTable1_16102_16129_4, myTable1_16130_16159_5}
// -> {merged_100days_T2_0_myTable1_16100_???_0(15000), merged_100days_T2_0_myTable1_???_16159_1}
// {myTable1_16160_16189_6, myTable1_16190_16220_7}
// -> {merged_100days_T2_1_myTable1_16160_16199_0, merged_100days_T2_1_myTable1_16200_16220_1}
// 3.
// {merged_100days_T2_1_myTable1_16200_16220_1, myTable1_16221_16250_8}
// -> {merged_100days_T3_0_myTable1_16200_???_0(15000), merged_100days_T3_0_myTable1_???_16250_1}
// {myTable1_16251_16281_9, myTable1_16282_16312_10}
// -> {merged_100days_T3_1_myTable1_16251_???_0(15000), merged_100days_T3_1_myTable1_???_16299_1,
// merged_100days_T3_1_myTable1_16300_16312_2}
// 4.
// {merged_100days_T3_1_myTable1_16300_16312_2, myTable1_16313_16342_11, myTable1_16343_16373_0}
// -> {merged_100days_T4_0_myTable1_16300_???_0(15000), merged_100days_T4_0_myTable1_???_16373_1}
// {myTable1_16374_16404_1}
// -> {merged_100days_T4_1_16374_16399_0, merged_100days_T4_1_16400_16404_1}
// 5.
// {merged_100days_T4_1_16400_16404_1, myTable1_16405_16435_2}
// -> {merged_100days_T5_0_myTable1_16400_16435_0}

String sqlQuery = "SELECT count(*) FROM myTable4"; // 115545 rows for the test table
JsonNode expectedJson = postQuery(sqlQuery, _brokerBaseApiUrl);
int[] expectedNumSubTasks = {1, 2, 2, 2, 1};
int[] expectedNumSegmentsQueried = {13, 12, 13, 13, 12};
long expectedWatermark = 16000 * 86_400_000L;
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
int numTasks = 0;
for (String tasks = _taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
tasks != null; tasks =
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE), numTasks++) {
assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), expectedNumSubTasks[numTasks]);
assertTrue(_helixTaskResourceManager.getTaskQueues()
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
// Will not schedule task if there's incomplete task
assertNull(
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
waitForTaskToComplete();

// Check watermark
MergeRollupTaskMetadata minionTaskMetadata = MergeRollupTaskMetadata.fromZNRecord(
_taskManager.getClusterInfoAccessor()
.getMinionTaskMetadataZNRecord(MinionConstants.MergeRollupTask.TASK_TYPE, offlineTableName));
assertNotNull(minionTaskMetadata);
assertEquals((long) minionTaskMetadata.getWatermarkMap().get("100days"), expectedWatermark);
expectedWatermark += 100 * 86_400_000L;

// Check metadata of merged segments
for (SegmentZKMetadata metadata : _pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
if (metadata.getSegmentName().startsWith("merged")) {
// Check merged segment zk metadata
assertNotNull(metadata.getCustomMap());
assertEquals("100days",
metadata.getCustomMap().get(MinionConstants.MergeRollupTask.SEGMENT_ZK_METADATA_MERGE_LEVEL_KEY));
// Check merged segments are time partitioned
assertEquals(metadata.getEndTimeMs() / (86_400_000L * 100), metadata.getStartTimeMs() / (86_400_000L * 100));
}
}

final int finalNumTasks = numTasks;
TestUtils.waitForCondition(aVoid -> {
try {
// Check num total doc of merged segments are the same as the original segments
JsonNode actualJson = postQuery(sqlQuery, _brokerBaseApiUrl);
if (!SqlResultComparator.areEqual(actualJson, expectedJson, sqlQuery)) {
return false;
}
// Check query routing
int numSegmentsQueried = actualJson.get("numSegmentsQueried").asInt();
return numSegmentsQueried == expectedNumSegmentsQueried[finalNumTasks];
} catch (Exception e) {
throw new RuntimeException(e);
}
}, TIMEOUT_IN_MS, "Timeout while validating segments");
}
// Check total tasks
assertEquals(numTasks, 5);

assertTrue(_controllerStarter.getControllerMetrics()
.containsGauge("mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"));

// Drop the table
dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);

// Check if the task metadata is cleaned up on table deletion
verifyTableDelete(offlineTableName);
}

/**
* Test single level rollup task with duplicate data (original segments * 2)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -47,7 +48,15 @@
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
import org.apache.pinot.spi.utils.retry.RetriableOperationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -66,6 +75,10 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId";

private static final int DEFUALT_PUSH_ATTEMPTS = 5;
private static final int DEFAULT_PUSH_PARALLELISM = 1;
private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;

protected MinionConf _minionConf;

// Tracking finer grained progress status.
Expand Down Expand Up @@ -242,6 +255,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
segmentZKMetadataCustomMapModifier.toJsonString());

URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that we should check push mode first before moving the segment. Otherwise, this middle step is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
As @zhtaoxiang mentioned, we should NOT move the segments to deep storage when we do the segment tar file push. In that case, the controller will move the segment to the deep storage during the upload phase.

LOGGER.info("Moved generated segment from [{}] to location: [{}]",
Copy link
Contributor

@snleee snleee Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the Minion somehow got killed (e.g. restart) after finish the segment move to outputPinotFS but before finishing the metadata push to controller?

I think that this can potentially leave some files in the deep storage without any Pinot side segment metadata pointing those files. This will potentially become the problem unless we have the cleanup mechanism. Do we have any design/solution to address this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One solution can be to always mention a seperate OUTPUT_DIR and always clean that dir in case the task fails. Even if we fail to clean the outputDir it should not be a problem if it is seperate from the deep store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think this may not be a problem. Because Minion task should be idempotent, a later retry should generate segments with the same name and the existing ones will be reused or be override.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not assume that the later retry will always come to finish this particular task. Similarly, the controller can be down and fail the metadata update. If the controller recovery happens after the minion's retry attempt fails, I think that this will cause some files to sit at the deep storage.

convertedTarredSegmentFile, outputSegmentTarURI);

List<Header> httpHeaders = new ArrayList<>();
httpHeaders.add(segmentZKMetadataCustomMapModifierHeader);
httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider));
Expand All @@ -253,9 +270,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
TableNameBuilder.extractRawTableName(tableNameWithType));
List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter);

SegmentConversionUtils
.uploadSegment(configs, httpHeaders, parameters, tableNameWithType, resultSegmentName, uploadURL,
convertedTarredSegmentFile);
pushSegment(tableNameWithType, pinotTaskConfig.getConfigs(), outputSegmentTarURI, httpHeaders, parameters);
// SegmentConversionUtils
// .uploadSegment(configs, httpHeaders, parameters, tableNameWithType, resultSegmentName, uploadURL,
// convertedTarredSegmentFile);
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
}
Expand All @@ -276,6 +294,98 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
}
}

private void pushSegment(String tableName, Map<String, String> taskConfigs, URI outputSegmentTarURI,
List<Header> headers, List<NameValuePair> parameters) throws Exception {
String pushMode = taskConfigs.get(BatchConfigProperties.PUSH_MODE);
LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI);

PushJobSpec pushJobSpec = new PushJobSpec();
pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX));

SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec);

URI outputSegmentDirURI = null;
if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
}
try (PinotFS outputFileFS = TaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
case TAR:
try (PinotFS pinotFS = TaskUtils.getLocalPinotFs()) {
MinionPushUtils.pushSegments(
spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
break;
case URI:
try {
List<String> segmentUris = new ArrayList<>();
URI updatedURI = MinionPushUtils.generateSegmentTarURI(outputSegmentDirURI, outputSegmentTarURI,
pushJobSpec.getSegmentUriPrefix(), pushJobSpec.getSegmentUriSuffix());
segmentUris.add(updatedURI.toString());
MinionPushUtils.sendSegmentUris(spec, segmentUris, headers, parameters);
} catch (RetriableOperationException | AttemptsExceededException e) {
throw new RuntimeException(e);
}
break;
case METADATA:
try {
Map<String, String> segmentUriToTarPathMap =
MinionPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
new String[]{outputSegmentTarURI.toString()});
MinionPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
} catch (RetriableOperationException | AttemptsExceededException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

throw new RuntimeException(e);
}
break;
default:
throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode);
}
}
}

private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map<String, String> taskConfigs,
PushJobSpec pushJobSpec) {

TableSpec tableSpec = new TableSpec();
tableSpec.setTableName(tableName);

PinotClusterSpec pinotClusterSpec = new PinotClusterSpec();
pinotClusterSpec.setControllerURI(taskConfigs.get(BatchConfigProperties.PUSH_CONTROLLER_URI));
PinotClusterSpec[] pinotClusterSpecs = new PinotClusterSpec[]{pinotClusterSpec};

SegmentGenerationJobSpec spec = new SegmentGenerationJobSpec();
spec.setPushJobSpec(pushJobSpec);
spec.setTableSpec(tableSpec);
spec.setPinotClusterSpecs(pinotClusterSpecs);
spec.setAuthToken(taskConfigs.get(BatchConfigProperties.AUTH_TOKEN));

return spec;
}

private URI moveSegmentToOutputPinotFS(Map<String, String> taskConfigs, File localSegmentTarFile)
throws Exception {
if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
return localSegmentTarFile.toURI();
}
URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
try (PinotFS outputFileFS = TaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
URI outputSegmentTarURI = URI.create(outputSegmentDirURI + localSegmentTarFile.getName());
if (!Boolean.parseBoolean(taskConfigs.get(BatchConfigProperties.OVERWRITE_OUTPUT)) && outputFileFS.exists(
outputSegmentDirURI)) {
LOGGER.warn("Not overwrite existing output segment tar file: {}", outputFileFS.exists(outputSegmentDirURI));
} else {
outputFileFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
}
return outputSegmentTarURI;
}
}

// SegmentUploadContext holds the info to conduct certain actions
// before and after uploading multiple segments.
protected static class SegmentUploadContext {
Expand Down
Loading