Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[colocated-join] Adds Support for instancePartitionsMap in Table Config #8989

Merged
merged 18 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import java.util.Map;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
Expand Down Expand Up @@ -55,6 +56,9 @@ public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
*/
public static boolean allowInstanceAssignment(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
return true;
}
TableType tableType = tableConfig.getTableType();
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ public ZNRecord toZNRecord() {
return znRecord;
}

/**
* Returns a new instance of InstancePartitions with the given name
*/
public InstancePartitions withName(String newName) {
return new InstancePartitions(newName, getPartitionToInstancesMap());
}

public String toJsonString() {
try {
return JsonUtils.objectToString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -60,6 +61,14 @@ public static String getInstancePartitionsName(String tableName, String instance
public static InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);

// If table has pre-configured instance partitions.
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
return fetchInstancePartitionsWithRename(helixManager.getHelixPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName));
}

// Fetch the instance partitions from property store if it exists
ZkHelixPropertyStore<ZNRecord> propertyStore = helixManager.getHelixPropertyStore();
Expand All @@ -84,6 +93,20 @@ public static InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRe
return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
}

/**
* Gets the instance partitions with the given name, and returns a re-named copy of the same.
* This method is useful when we use a table with instancePartitionsMap since in that case
* the value of a table's instance partitions are copied over from an existing instancePartitions.
*/
public static InstancePartitions fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore,
String instancePartitionsName, String newName) {
InstancePartitions instancePartitions = fetchInstancePartitions(propertyStore, instancePartitionsName);
Preconditions.checkNotNull(instancePartitions,
String.format("Couldn't find instance-partitions with name=%s. Cannot rename to %s",
instancePartitionsName, newName));
return instancePartitions.withName(newName);
}

/**
* Computes the default instance partitions. Sort all qualified instances and rotate the list based on the table name
* to prevent creating hotspot servers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,16 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
});
}

Map<InstancePartitionsType, String> instancePartitionsMap = null;
String instancePartitionsMapString = simpleFields.get(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY);
if (instancePartitionsMapString != null) {
instancePartitionsMap = JsonUtils.stringToObject(instancePartitionsMapString,
new TypeReference<Map<InstancePartitionsType, String>>() { });
}

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
dedupConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList);
dedupConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList, instancePartitionsMap);
}

public static ZNRecord toZNRecord(TableConfig tableConfig)
Expand Down Expand Up @@ -223,6 +230,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
if (tunerConfigList != null) {
simpleFields.put(TableConfig.TUNER_CONFIG_LIST_KEY, JsonUtils.objectToString(tunerConfigList));
}
if (tableConfig.getInstancePartitionsMap() != null) {
simpleFields.put(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY,
JsonUtils.objectToString(tableConfig.getInstancePartitionsMap()));
}

ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
znRecord.setSimpleFields(simpleFields);
Expand Down Expand Up @@ -294,4 +305,20 @@ public static void convertFromLegacyTableConfig(TableConfig tableConfig) {
validationConfig.setSegmentPushFrequency(null);
validationConfig.setSegmentPushType(null);
}

/**
* Returns true if the table has pre-configured instance partitions for any type (OFFLINE/CONSUMING/COMPLETED).
*/
public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig) {
return MapUtils.isNotEmpty(tableConfig.getInstancePartitionsMap());
}

/**
* Returns true if the table has pre-configured instance partitions for the given type.
*/
public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
return hasPreConfiguredInstancePartitions(tableConfig)
&& tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
Expand Down Expand Up @@ -210,6 +211,13 @@ private void assignInstancesForInstancePartitionsType(
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
instancePartitionsMap.put(instancePartitionsType, InstancePartitionsUtils.fetchInstancePartitionsWithRename(
_resourceManager.getPropertyStore(), tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName)));
return;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1693,10 +1693,22 @@ private void assignInstances(TableConfig tableConfig, boolean override) {
InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
InstancePartitions instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
InstancePartitions instancePartitions;
if (!hasPreConfiguredInstancePartitions) {
instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
} else {
String referenceInstancePartitionsName =
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName));
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.common.tier.PinotServerTierStorage;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
Expand All @@ -63,6 +64,7 @@
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -418,6 +420,22 @@ private InstancePartitions getInstancePartitions(TableConfig tableConfig,
String tableNameWithType = tableConfig.getTableName();
if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
if (reassignInstances) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
if (hasPreConfiguredInstancePartitions) {
String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
InstancePartitions instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(
_helixManager.getHelixPropertyStore(), referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
if (!dryRun) {
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
}
return instancePartitions;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
Expand Down Expand Up @@ -131,6 +132,7 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
validateFieldConfigList(tableConfig.getFieldConfigList(), tableConfig.getIndexingConfig(), schema);
validateInstancePartitionsTypeMapConfig(tableConfig);
if (!skipTypes.contains(ValidationType.UPSERT)) {
validateUpsertAndDedupConfig(tableConfig, schema);
validatePartialUpsertStrategies(tableConfig, schema);
Expand Down Expand Up @@ -559,6 +561,24 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema)
validateAggregateMetricsForUpsertConfig(tableConfig);
}

/**
* Detects whether both InstanceAssignmentConfig and InstancePartitionsMap are set for a given
* instance partitions type. Validation fails because the table would ignore InstanceAssignmentConfig
* when the partitions are already set.
*/
@VisibleForTesting
static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) {
if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap())
|| MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
return;
}
for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) {
Preconditions.checkState(!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType),
String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s",
instancePartitionsType));
}
}

/**
* Validates metrics aggregation when upsert config is enabled
* - Metrics aggregation cannot be enabled when Upsert Config is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
Expand All @@ -55,6 +57,7 @@
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -1557,6 +1560,38 @@ public void testTaskConfig() {
}
}

@Test
public void testValidateInstancePartitionsMap() {
InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class);

TableConfig tableConfigWithoutInstancePartitionsMap =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.build();

// Call validate with a table-config without any instance partitions or instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithoutInstancePartitionsMap);

TableConfig tableConfigWithInstancePartitionsMap =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
.build();

// Call validate with a table-config with instance partitions set but not instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);

TableConfig invalidTableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
.setInstanceAssignmentConfigMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, instanceAssignmentConfig))
.build();
try {
// Call validate with instance partitions and config set for the same type
TableConfigUtils.validateInstancePartitionsTypeMapConfig(invalidTableConfig);
Assert.fail("Validation should have failed since both instancePartitionsMap and config are set");
} catch (IllegalStateException ignored) {
}
}

private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TableConfig extends BaseJsonConfig {
public static final String ROUTING_CONFIG_KEY = "routing";
public static final String QUERY_CONFIG_KEY = "query";
public static final String INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY = "instanceAssignmentConfigMap";
public static final String INSTANCE_PARTITIONS_MAP_CONFIG_KEY = "instancePartitionsMap";
public static final String FIELD_CONFIG_LIST_KEY = "fieldConfigList";
public static final String UPSERT_CONFIG_KEY = "upsertConfig";
public static final String DEDUP_CONFIG_KEY = "dedupConfig";
Expand Down Expand Up @@ -84,6 +85,10 @@ public class TableConfig extends BaseJsonConfig {
private RoutingConfig _routingConfig;
private QueryConfig _queryConfig;
private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;

@JsonPropertyDescription(value = "Point to an existing instance partitions")
private Map<InstancePartitionsType, String> _instancePartitionsMap;

private List<FieldConfig> _fieldConfigList;

@JsonPropertyDescription(value = "upsert related config")
Expand Down Expand Up @@ -121,7 +126,9 @@ public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String
@JsonProperty(INGESTION_CONFIG_KEY) @Nullable IngestionConfig ingestionConfig,
@JsonProperty(TIER_CONFIGS_LIST_KEY) @Nullable List<TierConfig> tierConfigsList,
@JsonProperty(IS_DIM_TABLE_KEY) boolean dimTable,
@JsonProperty(TUNER_CONFIG_LIST_KEY) @Nullable List<TunerConfig> tunerConfigList) {
@JsonProperty(TUNER_CONFIG_LIST_KEY) @Nullable List<TunerConfig> tunerConfigList,
@JsonProperty(INSTANCE_PARTITIONS_MAP_CONFIG_KEY) @Nullable
Map<InstancePartitionsType, String> instancePartitionsMap) {
Preconditions.checkArgument(tableName != null, "'tableName' must be configured");
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
"'tableName' cannot contain double underscore ('__')");
Expand Down Expand Up @@ -150,6 +157,7 @@ public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String
_tierConfigsList = tierConfigsList;
_dimTable = dimTable;
_tunerConfigList = tunerConfigList;
_instancePartitionsMap = instancePartitionsMap;
}

@JsonProperty(TABLE_NAME_KEY)
Expand Down Expand Up @@ -254,6 +262,15 @@ public void setInstanceAssignmentConfigMap(
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
}

@JsonProperty(INSTANCE_PARTITIONS_MAP_CONFIG_KEY)
public Map<InstancePartitionsType, String> getInstancePartitionsMap() {
return _instancePartitionsMap;
}

public void setInstancePartitionsMap(Map<InstancePartitionsType, String> instancePartitionsMap) {
_instancePartitionsMap = instancePartitionsMap;
}

@JsonProperty(FIELD_CONFIG_LIST_KEY)
@Nullable
public List<FieldConfig> getFieldConfigList() {
Expand Down
Loading