Skip to content

Commit

Permalink
Allow subclass to customize what happens pre/post segment uploading (#…
Browse files Browse the repository at this point in the history
…8203)

Added pre/postUploadSegments() methods for BaseMultipleSegmentsConversionExecutor, so that subclass can customize what happens before/after segment uploading, e.g. RT2OFF task should use segment lineage against the dest offline table instead of the src realtime table, but MergeRollup uses segment lineage against a single table that is both src/dest table.
  • Loading branch information
klsince authored Feb 16, 2022
1 parent 8042408 commit e8c849e
Showing 1 changed file with 98 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicHeader;
Expand Down Expand Up @@ -57,6 +59,7 @@
*/
public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = "lineageEntryId";

protected MinionConf _minionConf;

Expand Down Expand Up @@ -89,6 +92,33 @@ protected void preProcess(PinotTaskConfig pinotTaskConfig) {
protected void postProcess(PinotTaskConfig pinotTaskConfig) {
}

protected void preUploadSegments(SegmentUploadContext context)
throws Exception {
// Update the segment lineage to indicate that the segment replacement is in progress.
if (context.isReplaceSegmentsEnabled()) {
List<String> segmentsFrom =
Arrays.stream(StringUtils.split(context.getInputSegmentNames(), MinionConstants.SEGMENT_NAME_SEPARATOR))
.map(String::trim).collect(Collectors.toList());
List<String> segmentsTo =
context.getSegmentConversionResults().stream().map(SegmentConversionResult::getSegmentName)
.collect(Collectors.toList());
String lineageEntryId =
SegmentConversionUtils.startSegmentReplace(context.getTableNameWithType(), context.getUploadURL(),
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), context.getAuthToken());
context.setCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID, lineageEntryId);
}
}

protected void postUploadSegments(SegmentUploadContext context)
throws Exception {
// Update the segment lineage to indicate that the segment replacement is done.
if (context.isReplaceSegmentsEnabled()) {
String lineageEntryId = (String) context.getCustomContext(CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID);
SegmentConversionUtils.endSegmentReplace(context.getTableNameWithType(), context.getUploadURL(), lineageEntryId,
_minionConf.getEndReplaceSegmentsTimeoutMs(), context.getAuthToken());
}
}

@Override
public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
throws Exception {
Expand All @@ -102,8 +132,6 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
String authToken = configs.get(MinionConstants.AUTH_TOKEN);
String replaceSegmentsString = configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY);
boolean replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString);

LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType,
tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
Expand Down Expand Up @@ -168,16 +196,8 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
taskType + " on table: " + tableNameWithType + ", segments: " + inputSegmentNames + " got cancelled");
}

// Update the segment lineage to indicate that the segment replacement is in progress.
String lineageEntryId = null;
if (replaceSegmentsEnabled) {
List<String> segmentsFrom =
Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
List<String> segmentsTo =
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken);
}
SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, segmentConversionResults);
preUploadSegments(segmentUploadContext);

// Upload the tarred segments
for (int i = 0; i < numOutputSegments; i++) {
Expand Down Expand Up @@ -211,11 +231,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
}
}

// Update the segment lineage to indicate that the segment replacement is done.
if (replaceSegmentsEnabled) {
SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
_minionConf.getEndReplaceSegmentsTimeoutMs(), authToken);
}
postUploadSegments(segmentUploadContext);

String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
.collect(Collectors.joining(","));
Expand All @@ -229,4 +245,69 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
FileUtils.deleteQuietly(tempDataDir);
}
}

// SegmentUploadContext holds the info to conduct certain actions
// before and after uploading multiple segments.
protected static class SegmentUploadContext {
private final PinotTaskConfig _pinotTaskConfig;
private final List<SegmentConversionResult> _segmentConversionResults;

private final String _tableNameWithType;
private final String _uploadURL;
private final String _authToken;
private final String _inputSegmentNames;
private final boolean _replaceSegmentsEnabled;
private final Map<String, Object> _customMap;

public SegmentUploadContext(PinotTaskConfig pinotTaskConfig,
List<SegmentConversionResult> segmentConversionResults) {
_pinotTaskConfig = pinotTaskConfig;
_segmentConversionResults = segmentConversionResults;

Map<String, String> configs = pinotTaskConfig.getConfigs();
_tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
_uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
_authToken = configs.get(MinionConstants.AUTH_TOKEN);
_inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
String replaceSegmentsString = configs.get(MinionConstants.ENABLE_REPLACE_SEGMENTS_KEY);
_replaceSegmentsEnabled = Boolean.parseBoolean(replaceSegmentsString);
_customMap = new HashMap<>();
}

public PinotTaskConfig getPinotTaskConfig() {
return _pinotTaskConfig;
}

public List<SegmentConversionResult> getSegmentConversionResults() {
return _segmentConversionResults;
}

public String getTableNameWithType() {
return _tableNameWithType;
}

public String getUploadURL() {
return _uploadURL;
}

public String getAuthToken() {
return _authToken;
}

public String getInputSegmentNames() {
return _inputSegmentNames;
}

public boolean isReplaceSegmentsEnabled() {
return _replaceSegmentsEnabled;
}

public Object getCustomContext(String key) {
return _customMap.get(key);
}

public void setCustomContext(String key, Object value) {
_customMap.put(key, value);
}
}
}

0 comments on commit e8c849e

Please sign in to comment.