Skip to content

Commit

Permalink
Automatically update broker resource on broker changes (#8249)
Browse files Browse the repository at this point in the history
- Add a boolean flag `updateBrokerResource` to create/update instance rest APIs to automatically update the broker resource when enabled (disabled by default to keep the current behavior because updating broker resource can be costly for large cluster)
- Add a rest API to update the broker resource for a specified broker
- For single-tenant cluster, when broker joins the cluster for the first time as `DefaultTenant`, automatically update the broker resource so that it can build the routing tables properly.
  • Loading branch information
Jackie-Jiang authored Feb 28, 2022
1 parent e6330bb commit 90b5492
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ public void start()
.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
LOGGER.info("Starting Grpc BrokerRequestHandler.");
_brokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, null);
new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null);
} else { // default request handler type, e.g. netty
LOGGER.info("Starting Netty BrokerRequestHandler.");
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void start()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new BrokerUserDefinedMessageHandlerFactory(_routingManager, queryQuotaManager));
_participantHelixManager.connect();
updateInstanceConfigIfNeeded();
updateInstanceConfigAndBrokerResourceIfNeeded();
_brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
() -> _participantHelixManager.isConnected() ? 1L : 0L);
_participantHelixManager.addPreConnectCallback(
Expand All @@ -323,19 +323,35 @@ public void start()
LOGGER.info("Finish starting Pinot broker");
}

private void updateInstanceConfigIfNeeded() {
private void updateInstanceConfigAndBrokerResourceIfNeeded() {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId);
boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
updated |= HelixHelper.addDefaultTags(instanceConfig, () -> {
boolean instanceConfigUpdated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
boolean shouldUpdateBrokerResource = false;
String brokerTag = null;
List<String> instanceTags = instanceConfig.getTags();
if (instanceTags.isEmpty()) {
// This is a new broker (first time joining the cluster)
if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) {
return Collections.singletonList(TagNameUtils.getBrokerTagForTenant(null));
brokerTag = TagNameUtils.getBrokerTagForTenant(null);
shouldUpdateBrokerResource = true;
} else {
return Collections.singletonList(Helix.UNTAGGED_BROKER_INSTANCE);
brokerTag = Helix.UNTAGGED_BROKER_INSTANCE;
}
});
if (updated) {
instanceConfig.addTag(brokerTag);
instanceConfigUpdated = true;
}
if (instanceConfigUpdated) {
HelixHelper.updateInstanceConfig(_participantHelixManager, instanceConfig);
}
if (shouldUpdateBrokerResource) {
// Update broker resource to include the new broker
long startTimeMs = System.currentTimeMillis();
List<String> tablesAdded = new ArrayList<>();
HelixHelper.updateBrokerResource(_participantHelixManager, _instanceId, Collections.singletonList(brokerTag),
tablesAdded, null);
LOGGER.info("Updated broker resource for new joining broker: {} in {}ms, tables added: {}", _instanceId,
System.currentTimeMillis() - startTimeMs, tablesAdded);
}
}

/**
Expand Down Expand Up @@ -365,8 +381,7 @@ private void registerServiceStatusHandler() {
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup),
new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup),
new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown)
)));
new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ private ZKMetadataProvider() {

public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
ZNRecord znRecord) {
propertyStore
.set(constructPropertyStorePathForResourceConfig(realtimeTableName), znRecord, AccessOption.PERSISTENT);
propertyStore.set(constructPropertyStorePathForResourceConfig(realtimeTableName), znRecord,
AccessOption.PERSISTENT);
}

public static void setOfflineTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String offlineTableName,
Expand All @@ -79,8 +79,8 @@ public static void setInstanceZKMetadata(ZkHelixPropertyStore<ZNRecord> property

public static InstanceZKMetadata getInstanceZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String instanceId) {
ZNRecord znRecord = propertyStore
.get(StringUtil.join("/", PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX, instanceId), null, AccessOption.PERSISTENT);
ZNRecord znRecord = propertyStore.get(StringUtil.join("/", PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX, instanceId), null,
AccessOption.PERSISTENT);
if (znRecord == null) {
return null;
}
Expand Down Expand Up @@ -121,8 +121,8 @@ public static String constructPropertyStorePathForMinionTaskMetadata(String task

public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
String segmentName) {
return propertyStore
.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName), AccessOption.PERSISTENT);
return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
AccessOption.PERSISTENT);
}

public static void removeResourceSegmentsFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore,
Expand Down Expand Up @@ -153,9 +153,9 @@ public static void removeResourceConfigFromPropertyStore(ZkHelixPropertyStore<ZN
public static boolean createSegmentZkMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType,
SegmentZKMetadata segmentZKMetadata) {
try {
return propertyStore
.create(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT);
return propertyStore.create(
constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT);
} catch (Exception e) {
LOGGER.error("Caught exception while creating segmentZkMetadata for table: {}", tableNameWithType, e);
return false;
Expand All @@ -166,9 +166,9 @@ public static boolean setSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> proper
SegmentZKMetadata segmentZKMetadata, int expectedVersion) {
// NOTE: Helix will throw ZkBadVersionException if version does not match
try {
return propertyStore
.set(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
return propertyStore.set(
constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
} catch (ZkBadVersionException e) {
return false;
}
Expand All @@ -194,25 +194,15 @@ public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
@Nullable
public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String tableNameWithType, String segmentName) {
ZNRecord znRecord = propertyStore
.get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), null, AccessOption.PERSISTENT);
ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), null,
AccessOption.PERSISTENT);
return znRecord != null ? new SegmentZKMetadata(znRecord) : null;
}

@Nullable
public static TableConfig getTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) {
ZNRecord znRecord = propertyStore
.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null, AccessOption.PERSISTENT);
if (znRecord == null) {
return null;
}
try {
TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
return (TableConfig) ConfigUtils.applyConfigWithEnvVariables(tableConfig);
} catch (Exception e) {
LOGGER.error("Caught exception while getting table configuration for table: {}", tableNameWithType, e);
return null;
}
return toTableConfig(propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null,
AccessOption.PERSISTENT));
}

@Nullable
Expand All @@ -225,6 +215,43 @@ public static TableConfig getRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord>
return getTableConfig(propertyStore, TableNameBuilder.REALTIME.tableNameWithType(tableName));
}

public static List<TableConfig> getAllTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
List<ZNRecord> znRecords =
propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, null, AccessOption.PERSISTENT,
CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
if (znRecords != null) {
int numZNRecords = znRecords.size();
List<TableConfig> tableConfigs = new ArrayList<>(numZNRecords);
for (ZNRecord znRecord : znRecords) {
TableConfig tableConfig = toTableConfig(znRecord);
if (tableConfig != null) {
tableConfigs.add(tableConfig);
}
}
if (numZNRecords > tableConfigs.size()) {
LOGGER.warn("Failed to read {}/{} table configs", numZNRecords - tableConfigs.size(), numZNRecords);
}
return tableConfigs;
} else {
LOGGER.warn("Path: {} does not exist", PROPERTYSTORE_TABLE_CONFIGS_PREFIX);
return Collections.emptyList();
}
}

@Nullable
private static TableConfig toTableConfig(@Nullable ZNRecord znRecord) {
if (znRecord == null) {
return null;
}
try {
TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
return ConfigUtils.applyConfigWithEnvVariables(tableConfig);
} catch (Exception e) {
LOGGER.error("Caught exception while creating table config from ZNRecord: {}", znRecord.getId(), e);
return null;
}
}

public static void setSchema(ZkHelixPropertyStore<ZNRecord> propertyStore, Schema schema) {
propertyStore.set(constructPropertyStorePathForSchema(schema.getSchemaName()), SchemaUtils.toZNRecord(schema),
AccessOption.PERSISTENT);
Expand Down Expand Up @@ -296,8 +323,8 @@ public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord> propertyStore
public static List<SegmentZKMetadata> getSegmentsZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String tableNameWithType) {
String parentPath = constructPropertyStorePathForResource(tableNameWithType);
List<ZNRecord> znRecords = propertyStore
.getChildren(parentPath, null, AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT,
List<ZNRecord> znRecords =
propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT,
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
if (znRecords != null) {
int numZNRecords = znRecords.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -45,9 +46,12 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
Expand Down Expand Up @@ -185,6 +189,52 @@ public static void updateIdealState(final HelixManager helixManager, final Strin
updateIdealState(helixManager, resourceName, updater, policy, false);
}

/**
* Updates broker resource ideal state for the given broker with the given broker tags. Optional {@code tablesAdded}
* and {@code tablesRemoved} can be provided to track the tables added/removed during the update.
*/
public static void updateBrokerResource(HelixManager helixManager, String brokerId, List<String> brokerTags,
@Nullable List<String> tablesAdded, @Nullable List<String> tablesRemoved) {
Preconditions.checkArgument(brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE),
"Invalid broker id: %s", brokerId);
for (String brokerTag : brokerTags) {
Preconditions.checkArgument(TagNameUtils.isBrokerTag(brokerTag), "Invalid broker tag: %s", brokerTag);
}

Set<String> tablesForBrokerTag;
int numBrokerTags = brokerTags.size();
if (numBrokerTags == 0) {
tablesForBrokerTag = Collections.emptySet();
} else if (numBrokerTags == 1) {
tablesForBrokerTag = getTablesForBrokerTag(helixManager, brokerTags.get(0));
} else {
tablesForBrokerTag = getTablesForBrokerTags(helixManager, brokerTags);
}

updateIdealState(helixManager, BROKER_RESOURCE, idealState -> {
if (tablesAdded != null) {
tablesAdded.clear();
}
if (tablesRemoved != null) {
tablesRemoved.clear();
}
for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
String tableNameWithType = entry.getKey();
Map<String, String> brokerAssignment = entry.getValue();
if (tablesForBrokerTag.contains(tableNameWithType)) {
if (brokerAssignment.put(brokerId, BrokerResourceStateModel.ONLINE) == null && tablesAdded != null) {
tablesAdded.add(tableNameWithType);
}
} else {
if (brokerAssignment.remove(brokerId) != null && tablesRemoved != null) {
tablesRemoved.add(tableNameWithType);
}
}
}
return idealState;
});
}

/**
* Returns all instances for the given cluster.
*
Expand Down Expand Up @@ -320,8 +370,8 @@ public IdealState apply(IdealState idealState) {

// Removing partitions from ideal state
LOGGER.info("Trying to remove resource {} from idealstate", resourceTag);
HelixHelper
.updateIdealState(helixManager, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, updater, DEFAULT_RETRY_POLICY);
HelixHelper.updateIdealState(helixManager, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, updater,
DEFAULT_RETRY_POLICY);
}

/**
Expand Down Expand Up @@ -495,12 +545,12 @@ public static Set<String> getServerInstancesForTenantWithType(List<InstanceConfi
TableType tableType) {
Set<String> serverInstancesWithType = new HashSet<>();
if (tableType == null || tableType == TableType.OFFLINE) {
serverInstancesWithType
.addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getOfflineTagForTenant(tenant)));
serverInstancesWithType.addAll(
HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getOfflineTagForTenant(tenant)));
}
if (tableType == null || tableType == TableType.REALTIME) {
serverInstancesWithType
.addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getRealtimeTagForTenant(tenant)));
serverInstancesWithType.addAll(
HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getRealtimeTagForTenant(tenant)));
}
return serverInstancesWithType;
}
Expand All @@ -519,6 +569,28 @@ public static Set<InstanceConfig> getBrokerInstanceConfigsForTenant(List<Instanc
return new HashSet<>(getInstancesConfigsWithTag(instanceConfigs, TagNameUtils.getBrokerTagForTenant(tenant)));
}

public static Set<String> getTablesForBrokerTag(HelixManager helixManager, String brokerTag) {
Set<String> tablesForBrokerTag = new HashSet<>();
List<TableConfig> tableConfigs = ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
for (TableConfig tableConfig : tableConfigs) {
if (TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()).equals(brokerTag)) {
tablesForBrokerTag.add(tableConfig.getTableName());
}
}
return tablesForBrokerTag;
}

public static Set<String> getTablesForBrokerTags(HelixManager helixManager, List<String> brokerTags) {
Set<String> tablesForBrokerTags = new HashSet<>();
List<TableConfig> tableConfigs = ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
for (TableConfig tableConfig : tableConfigs) {
if (brokerTags.contains(TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()))) {
tablesForBrokerTags.add(tableConfig.getTableName());
}
}
return tablesForBrokerTags;
}

/**
* Returns the instance config for a specific instance.
*/
Expand All @@ -535,9 +607,9 @@ public static void updateInstanceConfig(HelixManager helixManager, InstanceConfi
// NOTE: Use HelixDataAccessor.setProperty() instead of HelixAdmin.setInstanceConfig() because the latter explicitly
// forbids instance host/port modification
HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
Preconditions.checkState(helixDataAccessor
.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceConfig.getId()), instanceConfig),
"Failed to update instance config for instance: " + instanceConfig.getId());
Preconditions.checkState(
helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceConfig.getId()),
instanceConfig), "Failed to update instance config for instance: " + instanceConfig.getId());
}

/**
Expand Down
Loading

0 comments on commit 90b5492

Please sign in to comment.