Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[colocated-join] Adds Support for instancePartitionsMap in Table Config #8989

Merged
merged 18 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,33 @@
package org.apache.pinot.common.assignment;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TableGroupConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableGroupConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class InstanceAssignmentConfigUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(InstanceAssignmentConfigUtils.class);

private InstanceAssignmentConfigUtils() {
}

Expand All @@ -55,6 +68,9 @@ public static boolean shouldRelocateCompletedSegments(TableConfig tableConfig) {
*/
public static boolean allowInstanceAssignment(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
if (TableConfigUtils.isTableInGroup(tableConfig)) {
return allowInstanceAssignmentForGroup(tableConfig, instancePartitionsType);
}
TableType tableType = tableConfig.getTableType();
Map<InstancePartitionsType, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
Expand Down Expand Up @@ -122,4 +138,49 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t

return new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig);
}

/**
* Retrieves instance assignment config for the given group. A group should always have
* an instance assignment config, and hence this method will throw an exception if it
* can't find it in Zookeeper.
*/
public static InstanceAssignmentConfig getGroupInstanceAssignmentConfig(
ZkHelixPropertyStore<ZNRecord> propertyStore, String groupName) throws IOException {
Preconditions.checkArgument(StringUtils.isNotBlank(groupName));
String path = ZKMetadataProvider.constructPropertyStorePathForTableGroup(groupName);
if (!propertyStore.exists(path, AccessOption.PERSISTENT)) {
throw new RuntimeException(String.format("Path=%s does not exist in ZK (for group=%s)", path, groupName));
}
ZNRecord znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
TableGroupConfig tableGroupConfig = TableGroupConfigUtils.fromZNRecord(znRecord);
return tableGroupConfig.getInstanceAssignmentConfig();
}

public static void setGroupInstanceAssignmentConfig(ZkHelixPropertyStore<ZNRecord> propertyStore,
String groupName, InstanceAssignmentConfig instanceAssignmentConfig) throws IOException {
Preconditions.checkArgument(StringUtils.isNotBlank(groupName));
String path = ZKMetadataProvider.constructPropertyStorePathForTableGroup(groupName);
ZNRecord znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
if (znRecord == null) {
throw new RuntimeException(String.format("Group=%s does not exist", groupName));
}
TableGroupConfig tableGroupConfig = TableGroupConfigUtils.fromZNRecord(znRecord);
tableGroupConfig.setInstanceAssignmentConfig(instanceAssignmentConfig);
ZNRecord newZnRecord = TableGroupConfigUtils.toZNRecord(tableGroupConfig);
propertyStore.set(path, newZnRecord, AccessOption.PERSISTENT);
}

/**
* For a table in a table-group, allow instance assignment for OFFLINE instance-partitions for offline tables
* and CONSUMING instance-partitions for realtime tables. This will return false for COMPLETED
* instance-partitions since we don't want/need separate COMPLETED instance partitions. In case COMPLETED
* Instance Partitions are omitted, realtime segment assignment will use the same instance partitions as CONSUMING.
*/
private static boolean allowInstanceAssignmentForGroup(TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
Preconditions.checkState(StringUtils.isNotBlank(tableConfig.getTableGroupName()));
TableType tableType = tableConfig.getTableType();
return (tableType == TableType.OFFLINE && instancePartitionsType == InstancePartitionsType.OFFLINE)
|| (tableType == TableType.REALTIME && instancePartitionsType == InstancePartitionsType.CONSUMING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public ZNRecord toZNRecord() {
return znRecord;
}

public InstancePartitions withName(String newName) {
return new InstancePartitions(newName, getPartitionToInstancesMap());
}

public String toJsonString() {
try {
return JsonUtils.objectToString(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -54,13 +55,24 @@ public static String getInstancePartitionsName(String tableName, String instance
return TableNameBuilder.extractRawTableName(tableName) + TYPE_SUFFIX_SEPARATOR + instancePartitionsType;
}

public static String getGroupInstancePartitionsName(String groupName) {
return String.format("%s_GROUP", groupName);
}

/**
* Fetches the instance partitions from Helix property store if it exists, or computes it for backward-compatibility.
*/
public static InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig,
InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();

// If table is in a group, use pre-computed instance partitions
if (TableConfigUtils.isTableInGroup(tableConfig)) {
InstancePartitions instancePartitions = fetchGroupInstancePartitions(helixManager.getHelixPropertyStore(),
tableConfig.getTableGroupName());
return instancePartitions.withName(instancePartitionsType.getInstancePartitionsName(tableNameWithType));
}

// Fetch the instance partitions from property store if it exists
ZkHelixPropertyStore<ZNRecord> propertyStore = helixManager.getHelixPropertyStore();
InstancePartitions instancePartitions = fetchInstancePartitions(propertyStore,
Expand All @@ -84,6 +96,17 @@ public static InstancePartitions fetchInstancePartitions(HelixPropertyStore<ZNRe
return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
}

public static InstancePartitions fetchGroupInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
String groupName) {
String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(
getGroupInstancePartitionsName(groupName));
ZNRecord znRecord = propertyStore.get(path, null, AccessOption.PERSISTENT);
if (znRecord == null) {
throw new RuntimeException("No instance partitions for group found");
}
return InstancePartitions.fromZNRecord(znRecord);
}

/**
* Computes the default instance partitions. Sort all qualified instances and rotate the list based on the table name
* to prevent creating hotspot servers.
Expand Down Expand Up @@ -144,6 +167,18 @@ public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> proper
}
}

/**
* Persists instance partitions for the group to Zookeeper.
*/
public static void persistGroupInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore,
String groupName, InstancePartitions instancePartitions) {
String path = ZKMetadataProvider
.constructPropertyStorePathForInstancePartitions(getGroupInstancePartitionsName(groupName));
if (!propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT)) {
throw new ZkException("Failed to persist instance partitions: " + instancePartitions);
}
}

/**
* Removes the instance partitions from Helix property store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private ZKMetadataProvider() {
private static final String PROPERTYSTORE_SCHEMAS_PREFIX = "/SCHEMAS";
private static final String PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX = "/INSTANCE_PARTITIONS";
private static final String PROPERTYSTORE_TABLE_CONFIGS_PREFIX = "/CONFIGS/TABLE";
private static final String PROPERTYSTORE_TABLE_GROUP_PREFIX = "/CONFIGS/TABLE_GROUP";
private static final String PROPERTYSTORE_USER_CONFIGS_PREFIX = "/CONFIGS/USER";
private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX = "/CONFIGS/INSTANCE";
private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX = "/CONFIGS/CLUSTER";
Expand Down Expand Up @@ -110,6 +111,10 @@ public static String constructPropertyStorePathForInstancePartitions(String inst
return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, instancePartitionsName);
}

public static String constructPropertyStorePathForTableGroup(String groupName) {
return StringUtil.join("/", PROPERTYSTORE_TABLE_GROUP_PREFIX, groupName);
}

public static String constructPropertyStorePathForResource(String resourceName) {
return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName);
}
Expand Down Expand Up @@ -319,6 +324,19 @@ private static TableConfig toTableConfig(@Nullable ZNRecord znRecord) {
}
}

public static List<String> getAllGroups(ZkHelixPropertyStore<ZNRecord> propertyStore) {
List<ZNRecord> znRecords = propertyStore.getChildren(PROPERTYSTORE_TABLE_GROUP_PREFIX, null,
AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT,
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
if (znRecords != null) {
return znRecords.stream()
.map(ZNRecord::getId)
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}

public static void setSchema(ZkHelixPropertyStore<ZNRecord> propertyStore, Schema schema) {
propertyStore.set(constructPropertyStorePathForSchema(schema.getSchemaName()), SchemaUtils.toZNRecord(schema),
AccessOption.PERSISTENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
Expand Down Expand Up @@ -56,6 +57,7 @@ private TableConfigUtils() {

private static final String FIELD_MISSING_MESSAGE_TEMPLATE = "Mandatory field '%s' is missing";

// TODO: Add check to ensure partitioning is enabled if table-group is set.
public static TableConfig fromZNRecord(ZNRecord znRecord)
throws IOException {
Map<String, String> simpleFields = znRecord.getSimpleFields();
Expand All @@ -77,6 +79,8 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
Preconditions.checkState(tenantConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TENANT_CONFIG_KEY);
TenantConfig tenantConfig = JsonUtils.stringToObject(tenantConfigString, TenantConfig.class);

String tableGroupName = simpleFields.get(TableConfig.TABLE_GROUP_CONFIG_KEY);

String indexingConfigString = simpleFields.get(TableConfig.INDEXING_CONFIG_KEY);
Preconditions
.checkState(indexingConfigString != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.INDEXING_CONFIG_KEY);
Expand Down Expand Up @@ -160,7 +164,7 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
dedupConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList);
dedupConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList, tableGroupName);
}

public static ZNRecord toZNRecord(TableConfig tableConfig)
Expand Down Expand Up @@ -223,6 +227,9 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
if (tunerConfigList != null) {
simpleFields.put(TableConfig.TUNER_CONFIG_LIST_KEY, JsonUtils.objectToString(tunerConfigList));
}
if (tableConfig.getTableGroupName() != null) {
simpleFields.put(TableConfig.TABLE_GROUP_CONFIG_KEY, tableConfig.getTableGroupName());
}

ZNRecord znRecord = new ZNRecord(tableConfig.getTableName());
znRecord.setSimpleFields(simpleFields);
Expand Down Expand Up @@ -294,4 +301,8 @@ public static void convertFromLegacyTableConfig(TableConfig tableConfig) {
validationConfig.setSegmentPushFrequency(null);
validationConfig.setSegmentPushType(null);
}

public static boolean isTableInGroup(TableConfig tableConfig) {
return StringUtils.isNotBlank(tableConfig.getTableGroupName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils.config;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.spi.config.table.TableGroupConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.utils.JsonUtils;


public class TableGroupConfigUtils {
private TableGroupConfigUtils() {
}

public static ZNRecord toZNRecord(TableGroupConfig tableGroupConfig)
throws IOException {
Preconditions.checkArgument(tableGroupConfig != null, "Table group config cannot be null");
Preconditions.checkArgument(StringUtils.isNotBlank(tableGroupConfig.getGroupName()), "Table group name cannot be "
+ "blank");
Preconditions.checkArgument(tableGroupConfig.getInstanceAssignmentConfig() != null, "Instance assignment config "
+ "cannot be null for a table-group");
String groupName = tableGroupConfig.getGroupName();
ZNRecord znRecord = new ZNRecord(groupName);
Map<String, String> mapFields = new HashMap<>();
mapFields.put(TableGroupConfig.ASSIGNMENT_CONFIG_KEY,
JsonUtils.objectToString(tableGroupConfig.getInstanceAssignmentConfig()));
znRecord.setMapField("config", mapFields);
return znRecord;
}

public static TableGroupConfig fromZNRecord(ZNRecord znRecord)
throws IOException {
String groupName = znRecord.getId();
Map<String, String> configMap = znRecord.getMapFields().get("config");
InstanceAssignmentConfig instanceAssignmentConfig = JsonUtils.stringToObject(configMap.get(
TableGroupConfig.ASSIGNMENT_CONFIG_KEY), InstanceAssignmentConfig.class);
return new TableGroupConfig(groupName, instanceAssignmentConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private Constants() {

public static final String CLUSTER_TAG = "Cluster";
public static final String TABLE_TAG = "Table";
public static final String TABLE_GROUP_TAG = "Group";
public static final String USER_TAG = "User";
public static final String VERSION_TAG = "Version";
public static final String HEALTH_TAG = "Health";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
Expand Down Expand Up @@ -210,6 +211,11 @@ private void assignInstancesForInstancePartitionsType(
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, TableConfig tableConfig,
List<InstanceConfig> instanceConfigs, InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
if (TableConfigUtils.isTableInGroup(tableConfig)) {
instancePartitionsMap.put(instancePartitionsType, InstancePartitionsUtils.fetchGroupInstancePartitions(
_resourceManager.getPropertyStore(), tableConfig.getTableGroupName()));
return;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType, instancePartitionsType.toString()));
Expand Down
Loading