Skip to content

Commit

Permalink
Make allowDownloadFromServer minion-cluster-level config (apache#13247)
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 authored and gortiz committed Jun 14, 2024
1 parent b1acb5e commit 28ddd22
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void start()
minionMetrics.setValueOfGlobalGauge(MinionGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1);
MinionMetrics.register(minionMetrics);
minionContext.setMinionMetrics(minionMetrics);
minionContext.setAllowDownloadFromServer(_config.isAllowDownloadFromServer());

// Install default SSL context if necessary (even if not force-enabled everywhere)
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, CommonConstants.Minion.MINION_TLS_PREFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public int getEndReplaceSegmentsTimeoutMs() {
return getProperty(END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY, DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS);
}

public boolean isAllowDownloadFromServer() {
return Boolean.parseBoolean(getProperty(CommonConstants.Minion.CONFIG_OF_ALLOW_DOWNLOAD_FROM_SERVER,
CommonConstants.Minion.DEFAULT_ALLOW_DOWNLOAD_FROM_SERVER));
}

public PinotConfiguration getMetricsConfig() {
return subset(CommonConstants.Minion.METRICS_CONFIG_PREFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static MinionContext getInstance() {
// For PurgeTask
private SegmentPurger.RecordPurgerFactory _recordPurgerFactory;
private SegmentPurger.RecordModifierFactory _recordModifierFactory;
private boolean _allowDownloadFromServer;

public File getDataDir() {
return _dataDir;
Expand Down Expand Up @@ -119,4 +120,12 @@ public void setHelixManager(HelixManager helixManager) {
public HelixManager getHelixManager() {
return _helixManager;
}

public void setAllowDownloadFromServer(boolean allowDownloadFromServer) {
_allowDownloadFromServer = allowDownloadFromServer;
}

public boolean isAllowDownloadFromServer() {
return _allowDownloadFromServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ protected void downloadSegmentToLocal(String tableNameWithType, String segmentNa
} catch (Exception e) {
LOGGER.error("Segment download failed from deepstore for {}, crypter:{}", deepstoreURL, crypterName, e);
String peerDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig, taskType) && peerDownloadScheme != null) {
if (MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig, taskType,
MINION_CONTEXT.isAllowDownloadFromServer()) && peerDownloadScheme != null) {
LOGGER.info("Trying to download from servers for segment {} post deepstore download failed", segmentName);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, peerDownloadScheme, () -> {
List<URI> uris =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,16 @@ public static List<String> getServers(String segmentName, String tableNameWithTy
/**
* Extract allowDownloadFromServer config from table task config
*/
public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConfig, String taskType) {
public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConfig, String taskType,
boolean defaultValue) {
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig != null) {
Map<String, String> configs = tableTaskConfig.getConfigsForTaskType(taskType);
if (configs != null && !configs.isEmpty()) {
return Boolean.parseBoolean(configs.getOrDefault(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER,
String.valueOf(TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER)));
String.valueOf(defaultValue)));
}
}
return TableTaskConfig.DEFAULT_MINION_ALLOW_DOWNLOAD_FROM_SERVER;
return defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ public void testExtractMinionAllowDownloadFromServer() {

// Test when the configuration is not set, should return the default value which is false
assertFalse(MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE));
MinionConstants.MergeRollupTask.TASK_TYPE, false));

// Test when the configuration is set to true
configs.put(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, "true");
tableTaskConfig = new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, configs));
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("sampleTable")
.setTaskConfig(tableTaskConfig).build();
assertTrue(MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE));
MinionConstants.MergeRollupTask.TASK_TYPE, false));

// Test when the configuration is set to false
configs.put(TableTaskConfig.MINION_ALLOW_DOWNLOAD_FROM_SERVER, "false");
tableTaskConfig = new TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE, configs));
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("sampleTable")
.setTaskConfig(tableTaskConfig).build();
assertFalse(MinionTaskUtils.extractMinionAllowDownloadFromServer(tableConfig,
MinionConstants.MergeRollupTask.TASK_TYPE));
MinionConstants.MergeRollupTask.TASK_TYPE, false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,8 @@ public static class Minion {
public static final String CONFIG_OF_EVENT_OBSERVER_CLEANUP_DELAY_IN_SEC =
"pinot.minion.event.observer.cleanupDelayInSec";
public static final char TASK_LIST_SEPARATOR = ',';
public static final String CONFIG_OF_ALLOW_DOWNLOAD_FROM_SERVER = "pinot.minion.task.allow.download.from.server";
public static final String DEFAULT_ALLOW_DOWNLOAD_FROM_SERVER = "false";
}

public static class ControllerJob {
Expand Down

0 comments on commit 28ddd22

Please sign in to comment.