Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow segment upload via Metadata in MergeRollup Minion task #9825

Merged
merged 16 commits into from
Dec 6, 2022

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Nov 17, 2022

Users need to use the same configs as SegmentGenerationAndPushTask to switch to metadata push.
e.g.
pushType: METADATA

@@ -242,6 +255,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
segmentZKMetadataCustomMapModifier.toJsonString());

URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that we should check push mode first before moving the segment. Otherwise, this middle step is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
As @zhtaoxiang mentioned, we should NOT move the segments to deep storage when we do the segment tar file push. In that case, the controller will move the segment to the deep storage during the upload phase.

singleFileGenerationTaskConfig.put(
BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
}
if ((outputDirURI == null) || (pushMode == null)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When push mode is MetadataPush, we should set the default location data_dir/tablename when OUTPUT_SEGMENT_DIR_URI is not set by users. Most of the time, users will not need to set it.

String downloadUrl = downloadURLList[0];
URI downloadURI = URI.create(downloadUrl);
URI outputDirURI = null;
if (!downloadURI.getScheme().contentEquals("http")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstand something, but I don't fully follow the logic here. Why is the outputDir decided by the downloadUrl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the segment needs to be copied to some place that is accessible by the controller in case of metadata push.
If user doesn't set the outputDirUri then the best place would be to copy it in deep store. The download url refers to deep store path and hence using it to derive the metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the goal, then we just need to use controllerConfig.getDataDir()/tableName. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the commit to use dataDir.

@KKcorps KKcorps changed the title Allow segment upload via Metadata in MergeRollup Minion tas Allow segment upload via Metadata in MergeRollup Minion task Nov 18, 2022
@@ -242,6 +255,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
segmentZKMetadataCustomMapModifier.toJsonString());

URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
As @zhtaoxiang mentioned, we should NOT move the segments to deep storage when we do the segment tar file push. In that case, the controller will move the segment to the deep storage during the upload phase.

@@ -242,6 +256,10 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
segmentZKMetadataCustomMapModifier.toJsonString());

URI outputSegmentTarURI = moveSegmentToOutputPinotFS(pinotTaskConfig.getConfigs(), convertedTarredSegmentFile);
LOGGER.info("Moved generated segment from [{}] to location: [{}]",
Copy link
Contributor

@snleee snleee Nov 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the Minion somehow got killed (e.g. restart) after finish the segment move to outputPinotFS but before finishing the metadata push to controller?

I think that this can potentially leave some files in the deep storage without any Pinot side segment metadata pointing those files. This will potentially become the problem unless we have the cleanup mechanism. Do we have any design/solution to address this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One solution can be to always mention a seperate OUTPUT_DIR and always clean that dir in case the task fails. Even if we fail to clean the outputDir it should not be a problem if it is seperate from the deep store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think this may not be a problem. Because Minion task should be idempotent, a later retry should generate segments with the same name and the existing ones will be reused or be override.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not assume that the later retry will always come to finish this particular task. Similarly, the controller can be down and fail the metadata update. If the controller recovery happens after the minion's retry attempt fails, I think that this will cause some files to sit at the deep storage.

@codecov-commenter
Copy link

codecov-commenter commented Nov 21, 2022

Codecov Report

Merging #9825 (72efe9e) into master (e307106) will increase coverage by 42.36%.
The diff coverage is 42.63%.

@@              Coverage Diff              @@
##             master    #9825       +/-   ##
=============================================
+ Coverage     28.06%   70.43%   +42.36%     
- Complexity       53     4984     +4931     
=============================================
  Files          1949     1981       +32     
  Lines        104632   106370     +1738     
  Branches      15847    16117      +270     
=============================================
+ Hits          29362    74918    +45556     
+ Misses        72395    26235    -46160     
- Partials       2875     5217     +2342     
Flag Coverage Δ
integration1 24.99% <42.63%> (-0.23%) ⬇️
integration2 24.45% <28.68%> (+0.02%) ⬆️
unittests1 67.96% <0.00%> (?)
unittests2 15.78% <9.30%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...he/pinot/segment/local/utils/SegmentPushUtils.java 11.97% <0.00%> (+11.97%) ⬆️
...che/pinot/plugin/minion/tasks/MinionTaskUtils.java 40.90% <40.90%> (ø)
.../tasks/BaseMultipleSegmentsConversionExecutor.java 80.00% <51.85%> (-9.86%) ⬇️
...nandpush/SegmentGenerationAndPushTaskExecutor.java 67.88% <75.00%> (ø)
...on/tasks/mergerollup/MergeRollupTaskGenerator.java 84.89% <100.00%> (+7.92%) ⬆️
...gments/RealtimeToOfflineSegmentsTaskGenerator.java 94.85% <100.00%> (+24.26%) ⬆️
...andpush/SegmentGenerationAndPushTaskGenerator.java 33.87% <100.00%> (ø)
...ery/aggregation/DoubleAggregationResultHolder.java 60.00% <0.00%> (-15.00%) ⬇️
...apache/pinot/common/function/FunctionRegistry.java 81.63% <0.00%> (-5.21%) ⬇️
...nction/DistinctCountBitmapAggregationFunction.java 47.66% <0.00%> (-5.19%) ⬇️
... and 1391 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
ClusterInfoAccessor clusterInfoAccessor) {
try {
URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check push mode first before deciding the outputDir?

If push mode is tar, we simply need to build segment locally and tar push to Pinot.

Let me know if I misunderstood something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not really used in the case when pushMode is TAR.

if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
    != BatchConfigProperties.SegmentPushType.TAR) {
  outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile);
  LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile,
      outputSegmentTarURI);
} else {
  outputSegmentTarURI = convertedTarredSegmentFile.toURI();
}

......
case TAR:
  try (PinotFS pinotFS = MinionTaskUtils.getLocalPinotFs()) {
    SegmentPushUtils.pushSegments(
        spec, pinotFS, Arrays.asList(outputSegmentTarURI.toString()), headers, parameters);
  } catch (RetriableOperationException | AttemptsExceededException e) {
    throw new RuntimeException(e);
  }
  break;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it is not really used in the case when push mode is TAR. However, I feel the current logic is a little bit hard to follow if we don't have the full context.

If we can check push mode first then decide what to do, it's easier to follow. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Made the changes. A lot of the code used here has been borrowed from SegmentGenerationAndPushTask. I will also raise another PR to port some of these changes from here to there.

@@ -97,6 +97,49 @@ public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String prefix,
public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths)
throws RetriableOperationException, AttemptsExceededException {
String tableName = spec.getTableSpec().getTableName();
AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious: why do we need to create those 3 new methods? It seems that there is no logic change?

Copy link
Contributor Author

@KKcorps KKcorps Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass this custom header in the new flow
Older methods don't allow passing new headers

// Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata
SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
    getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult);
Header segmentZKMetadataCustomMapModifierHeader =
    new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER,
        segmentZKMetadataCustomMapModifier.toJsonString());

public static Map<String, String> getPushTaskConfig(String tableName, Map<String, String> taskConfigs,
ClusterInfoAccessor clusterInfoAccessor) {
try {
URI outputDirURI = URI.create(clusterInfoAccessor.getDataDir() + "/" + tableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it is not really used in the case when push mode is TAR. However, I feel the current logic is a little bit hard to follow if we don't have the full context.

If we can check push mode first then decide what to do, it's easier to follow. WDYT?


if (!isLocalOutputDir(outputDirURIScheme)) {
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputDirURI.toString());
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, pushMode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we set push_mode to metadata when the provided one is URI? and we should log this change.

@KKcorps KKcorps force-pushed the minion_metadata_push branch from 370bc3d to 72efe9e Compare December 2, 2022 18:51
Copy link
Contributor

@zhtaoxiang zhtaoxiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

snleee
snleee previously approved these changes Dec 5, 2022
Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address the following issue.

@snleee snleee self-requested a review December 5, 2022 18:09
@snleee snleee dismissed their stale review December 5, 2022 18:11

dismissing approval

Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's fix the test. LGTM other than minor comments. Thank you for working on this!

String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName,
uploadURL, tarFile);
} catch (RetriableOperationException | AttemptsExceededException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to catch specific exceptions and wrap it with RuntimeException here? Our top caller function's definition: executeTask() throw Exception

SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
new String[]{outputSegmentTarURI.toString()});
SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters);
} catch (RetriableOperationException | AttemptsExceededException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@KKcorps KKcorps merged commit fb48288 into apache:master Dec 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants