Skip to content

Commit

Permalink
[colocated-join] Adds Support for instancePartitionsMap in Table Conf…
Browse files Browse the repository at this point in the history
…ig (#8989)
  • Loading branch information
ankitsultana authored Aug 11, 2022
1 parent 56927a1 commit 54d2813
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import java.util.Map;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
Expand Down Expand Up @@ -55,6 +56,9 @@ public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
*/
public static boolean allowInstanceAssignment(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
return true;
}
TableType tableType = tableConfig.getTableType();
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ public ZNRecord toZNRecord() {
return znRecord;
}

/**
* Returns a new instance of InstancePartitions with the given name
*/
public InstancePartitions withName(String newName) {
return new InstancePartitions(newName, getPartitionToInstancesMap());
}

public String toJsonString() {
try {
return JsonUtils.objectToString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -60,6 +61,14 @@ public static String getInstancePartitionsName(String tableName, String instance
public static InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);

// If table has pre-configured instance partitions.
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
return fetchInstancePartitionsWithRename(helixManager.getHelixPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName));
}

// Fetch the instance partitions from property store if it exists
ZkHelixPropertyStore<ZNRecord> propertyStore = helixManager.getHelixPropertyStore();
Expand All @@ -84,6 +93,20 @@ public static InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRe
return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
}

/**
* Gets the instance partitions with the given name, and returns a re-named copy of the same.
* This method is useful when we use a table with instancePartitionsMap since in that case
* the value of a table's instance partitions are copied over from an existing instancePartitions.
*/
public static InstancePartitions fetchInstancePartitionsWithRename(HelixPropertyStore<ZNRecord> propertyStore,
String instancePartitionsName, String newName) {
InstancePartitions instancePartitions = fetchInstancePartitions(propertyStore, instancePartitionsName);
Preconditions.checkNotNull(instancePartitions,
String.format("Couldn't find instance-partitions with name=%s. Cannot rename to %s",
instancePartitionsName, newName));
return instancePartitions.withName(newName);
}

/**
* Computes the default instance partitions. Sort all qualified instances and rotate the list based on the table name
* to prevent creating hotspot servers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,16 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
});
}

Map<InstancePartitionsType, String> instancePartitionsMap = null;
String instancePartitionsMapString = simpleFields.get(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY);
if (instancePartitionsMapString != null) {
instancePartitionsMap = JsonUtils.stringToObject(instancePartitionsMapString,
new TypeReference<Map<InstancePartitionsType, String>>() { });
}

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
dedupConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList);
dedupConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList, instancePartitionsMap);
}

public static ZNRecord toZNRecord(TableConfig tableConfig)
Expand Down Expand Up @@ -223,6 +230,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
if (tunerConfigList != null) {
simpleFields.put(TableConfig.TUNER_CONFIG_LIST_KEY, JsonUtils.objectToString(tunerConfigList));
}
if (tableConfig.getInstancePartitionsMap() != null) {
simpleFields.put(TableConfig.INSTANCE_PARTITIONS_MAP_CONFIG_KEY,
JsonUtils.objectToString(tableConfig.getInstancePartitionsMap()));
}

ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
znRecord.setSimpleFields(simpleFields);
Expand Down Expand Up @@ -294,4 +305,20 @@ public static void convertFromLegacyTableConfig(TableConfig tableConfig) {
validationConfig.setSegmentPushFrequency(null);
validationConfig.setSegmentPushType(null);
}

/**
* Returns true if the table has pre-configured instance partitions for any type (OFFLINE/CONSUMING/COMPLETED).
*/
public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig) {
return MapUtils.isNotEmpty(tableConfig.getInstancePartitionsMap());
}

/**
* Returns true if the table has pre-configured instance partitions for the given type.
*/
public static boolean hasPreConfiguredInstancePartitions(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
return hasPreConfiguredInstancePartitions(tableConfig)
&& tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
Expand Down Expand Up @@ -210,6 +211,13 @@ private void assignInstancesForInstancePartitionsType(
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig, instancePartitionsType)) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
instancePartitionsMap.put(instancePartitionsType, InstancePartitionsUtils.fetchInstancePartitionsWithRename(
_resourceManager.getPropertyStore(), tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
instancePartitionsType.getInstancePartitionsName(rawTableName)));
return;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1702,10 +1702,22 @@ private void assignInstances(TableConfig tableConfig, boolean override) {
InstanceAssignmentDriver instanceAssignmentDriver = new InstanceAssignmentDriver(tableConfig);
List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
for (InstancePartitionsType instancePartitionsType : instancePartitionsTypesToAssign) {
InstancePartitions instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
InstancePartitions instancePartitions;
if (!hasPreConfiguredInstancePartitions) {
instancePartitions = instanceAssignmentDriver.assignInstances(instancePartitionsType, instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}", instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
} else {
String referenceInstancePartitionsName =
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName, instancePartitionsType.getInstancePartitionsName(rawTableName));
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.common.tier.PinotServerTierStorage;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
Expand All @@ -63,6 +64,7 @@
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.RebalanceConfigConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -418,6 +420,22 @@ private InstancePartitions getInstancePartitions(TableConfig tableConfig,
String tableNameWithType = tableConfig.getTableName();
if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig, instancePartitionsType)) {
if (reassignInstances) {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
boolean hasPreConfiguredInstancePartitions = TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
if (hasPreConfiguredInstancePartitions) {
String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
InstancePartitions instancePartitions = InstancePartitionsUtils.fetchInstancePartitionsWithRename(
_helixManager.getHelixPropertyStore(), referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
if (!dryRun) {
LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions,
referenceInstancePartitionsName);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
}
return instancePartitions;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
Expand Down Expand Up @@ -131,6 +132,7 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
validateFieldConfigList(tableConfig.getFieldConfigList(), tableConfig.getIndexingConfig(), schema);
validateInstancePartitionsTypeMapConfig(tableConfig);
if (!skipTypes.contains(ValidationType.UPSERT)) {
validateUpsertAndDedupConfig(tableConfig, schema);
validatePartialUpsertStrategies(tableConfig, schema);
Expand Down Expand Up @@ -559,6 +561,24 @@ static void validateUpsertAndDedupConfig(TableConfig tableConfig, Schema schema)
validateAggregateMetricsForUpsertConfig(tableConfig);
}

/**
* Detects whether both InstanceAssignmentConfig and InstancePartitionsMap are set for a given
* instance partitions type. Validation fails because the table would ignore InstanceAssignmentConfig
* when the partitions are already set.
*/
@VisibleForTesting
static void validateInstancePartitionsTypeMapConfig(TableConfig tableConfig) {
if (MapUtils.isEmpty(tableConfig.getInstancePartitionsMap())
|| MapUtils.isEmpty(tableConfig.getInstanceAssignmentConfigMap())) {
return;
}
for (InstancePartitionsType instancePartitionsType : tableConfig.getInstancePartitionsMap().keySet()) {
Preconditions.checkState(!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType),
String.format("Both InstanceAssignmentConfigMap and InstancePartitionsMap set for %s",
instancePartitionsType));
}
}

/**
* Validates metrics aggregation when upsert config is enabled
* - Metrics aggregation cannot be enabled when Upsert Config is enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
Expand All @@ -55,6 +57,7 @@
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -1543,6 +1546,38 @@ public void testTaskConfig() {
}
}

@Test
public void testValidateInstancePartitionsMap() {
InstanceAssignmentConfig instanceAssignmentConfig = Mockito.mock(InstanceAssignmentConfig.class);

TableConfig tableConfigWithoutInstancePartitionsMap =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.build();

// Call validate with a table-config without any instance partitions or instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithoutInstancePartitionsMap);

TableConfig tableConfigWithInstancePartitionsMap =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
.build();

// Call validate with a table-config with instance partitions set but not instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);

TableConfig invalidTableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, "test_OFFLINE"))
.setInstanceAssignmentConfigMap(ImmutableMap.of(InstancePartitionsType.OFFLINE, instanceAssignmentConfig))
.build();
try {
// Call validate with instance partitions and config set for the same type
TableConfigUtils.validateInstancePartitionsTypeMapConfig(invalidTableConfig);
Assert.fail("Validation should have failed since both instancePartitionsMap and config are set");
} catch (IllegalStateException ignored) {
}
}

private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class TableConfig extends BaseJsonConfig {
public static final String ROUTING_CONFIG_KEY = "routing";
public static final String QUERY_CONFIG_KEY = "query";
public static final String INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY = "instanceAssignmentConfigMap";
public static final String INSTANCE_PARTITIONS_MAP_CONFIG_KEY = "instancePartitionsMap";
public static final String FIELD_CONFIG_LIST_KEY = "fieldConfigList";
public static final String UPSERT_CONFIG_KEY = "upsertConfig";
public static final String DEDUP_CONFIG_KEY = "dedupConfig";
Expand Down Expand Up @@ -84,6 +85,10 @@ public class TableConfig extends BaseJsonConfig {
private RoutingConfig _routingConfig;
private QueryConfig _queryConfig;
private Map<InstancePartitionsType, InstanceAssignmentConfig> _instanceAssignmentConfigMap;

@JsonPropertyDescription(value = "Point to an existing instance partitions")
private Map<InstancePartitionsType, String> _instancePartitionsMap;

private List<FieldConfig> _fieldConfigList;

@JsonPropertyDescription(value = "upsert related config")
Expand Down Expand Up @@ -121,7 +126,9 @@ public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String
@JsonProperty(INGESTION_CONFIG_KEY) @Nullable IngestionConfig ingestionConfig,
@JsonProperty(TIER_CONFIGS_LIST_KEY) @Nullable List<TierConfig> tierConfigsList,
@JsonProperty(IS_DIM_TABLE_KEY) boolean dimTable,
@JsonProperty(TUNER_CONFIG_LIST_KEY) @Nullable List<TunerConfig> tunerConfigList) {
@JsonProperty(TUNER_CONFIG_LIST_KEY) @Nullable List<TunerConfig> tunerConfigList,
@JsonProperty(INSTANCE_PARTITIONS_MAP_CONFIG_KEY) @Nullable
Map<InstancePartitionsType, String> instancePartitionsMap) {
Preconditions.checkArgument(tableName != null, "'tableName' must be configured");
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
"'tableName' cannot contain double underscore ('__')");
Expand Down Expand Up @@ -150,6 +157,7 @@ public TableConfig(@JsonProperty(value = TABLE_NAME_KEY, required = true) String
_tierConfigsList = tierConfigsList;
_dimTable = dimTable;
_tunerConfigList = tunerConfigList;
_instancePartitionsMap = instancePartitionsMap;
}

@JsonProperty(TABLE_NAME_KEY)
Expand Down Expand Up @@ -254,6 +262,15 @@ public void setInstanceAssignmentConfigMap(
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
}

@JsonProperty(INSTANCE_PARTITIONS_MAP_CONFIG_KEY)
public Map<InstancePartitionsType, String> getInstancePartitionsMap() {
return _instancePartitionsMap;
}

public void setInstancePartitionsMap(Map<InstancePartitionsType, String> instancePartitionsMap) {
_instancePartitionsMap = instancePartitionsMap;
}

@JsonProperty(FIELD_CONFIG_LIST_KEY)
@Nullable
public List<FieldConfig> getFieldConfigList() {
Expand Down
Loading

0 comments on commit 54d2813

Please sign in to comment.