diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index e415078ee28f..89e71283975a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -243,8 +243,8 @@ public void start() .equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) { LOGGER.info("Starting Grpc BrokerRequestHandler."); _brokerRequestHandler = - new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, - queryQuotaManager, tableCache, _brokerMetrics, null); + new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager, + tableCache, _brokerMetrics, null); } else { // default request handler type, e.g. netty LOGGER.info("Starting Netty BrokerRequestHandler."); if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) { @@ -310,7 +310,7 @@ public void start() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), new BrokerUserDefinedMessageHandlerFactory(_routingManager, queryQuotaManager)); _participantHelixManager.connect(); - updateInstanceConfigIfNeeded(); + updateInstanceConfigAndBrokerResourceIfNeeded(); _brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> _participantHelixManager.isConnected() ? 1L : 0L); _participantHelixManager.addPreConnectCallback( @@ -323,19 +323,35 @@ public void start() LOGGER.info("Finish starting Pinot broker"); } - private void updateInstanceConfigIfNeeded() { + private void updateInstanceConfigAndBrokerResourceIfNeeded() { InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId); - boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); - updated |= HelixHelper.addDefaultTags(instanceConfig, () -> { + boolean instanceConfigUpdated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); + boolean shouldUpdateBrokerResource = false; + String brokerTag = null; + List instanceTags = instanceConfig.getTags(); + if (instanceTags.isEmpty()) { + // This is a new broker (first time joining the cluster) if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) { - return Collections.singletonList(TagNameUtils.getBrokerTagForTenant(null)); + brokerTag = TagNameUtils.getBrokerTagForTenant(null); + shouldUpdateBrokerResource = true; } else { - return Collections.singletonList(Helix.UNTAGGED_BROKER_INSTANCE); + brokerTag = Helix.UNTAGGED_BROKER_INSTANCE; } - }); - if (updated) { + instanceConfig.addTag(brokerTag); + instanceConfigUpdated = true; + } + if (instanceConfigUpdated) { HelixHelper.updateInstanceConfig(_participantHelixManager, instanceConfig); } + if (shouldUpdateBrokerResource) { + // Update broker resource to include the new broker + long startTimeMs = System.currentTimeMillis(); + List tablesAdded = new ArrayList<>(); + HelixHelper.updateBrokerResource(_participantHelixManager, _instanceId, Collections.singletonList(brokerTag), + tablesAdded, null); + LOGGER.info("Updated broker resource for new joining broker: {} in {}ms, tables added: {}", _instanceId, + System.currentTimeMillis() - startTimeMs, tablesAdded); + } } /** @@ -365,8 +381,7 @@ private void registerServiceStatusHandler() { _clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup), new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager, _clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup), - new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown) - ))); + new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown)))); } @Override diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 501a72c22fa4..505c681e9ea5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -61,8 +61,8 @@ private ZKMetadataProvider() { public static void setRealtimeTableConfig(ZkHelixPropertyStore propertyStore, String realtimeTableName, ZNRecord znRecord) { - propertyStore - .set(constructPropertyStorePathForResourceConfig(realtimeTableName), znRecord, AccessOption.PERSISTENT); + propertyStore.set(constructPropertyStorePathForResourceConfig(realtimeTableName), znRecord, + AccessOption.PERSISTENT); } public static void setOfflineTableConfig(ZkHelixPropertyStore propertyStore, String offlineTableName, @@ -79,8 +79,8 @@ public static void setInstanceZKMetadata(ZkHelixPropertyStore property public static InstanceZKMetadata getInstanceZKMetadata(ZkHelixPropertyStore propertyStore, String instanceId) { - ZNRecord znRecord = propertyStore - .get(StringUtil.join("/", PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX, instanceId), null, AccessOption.PERSISTENT); + ZNRecord znRecord = propertyStore.get(StringUtil.join("/", PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX, instanceId), null, + AccessOption.PERSISTENT); if (znRecord == null) { return null; } @@ -121,8 +121,8 @@ public static String constructPropertyStorePathForMinionTaskMetadata(String task public static boolean isSegmentExisted(ZkHelixPropertyStore propertyStore, String resourceNameForResource, String segmentName) { - return propertyStore - .exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName), AccessOption.PERSISTENT); + return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName), + AccessOption.PERSISTENT); } public static void removeResourceSegmentsFromPropertyStore(ZkHelixPropertyStore propertyStore, @@ -153,9 +153,9 @@ public static void removeResourceConfigFromPropertyStore(ZkHelixPropertyStore propertyStore, String tableNameWithType, SegmentZKMetadata segmentZKMetadata) { try { - return propertyStore - .create(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()), - segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT); + return propertyStore.create( + constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()), + segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT); } catch (Exception e) { LOGGER.error("Caught exception while creating segmentZkMetadata for table: {}", tableNameWithType, e); return false; @@ -166,9 +166,9 @@ public static boolean setSegmentZKMetadata(ZkHelixPropertyStore proper SegmentZKMetadata segmentZKMetadata, int expectedVersion) { // NOTE: Helix will throw ZkBadVersionException if version does not match try { - return propertyStore - .set(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()), - segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT); + return propertyStore.set( + constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()), + segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT); } catch (ZkBadVersionException e) { return false; } @@ -194,25 +194,15 @@ public static ZNRecord getZnRecord(ZkHelixPropertyStore propertyStore, @Nullable public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore propertyStore, String tableNameWithType, String segmentName) { - ZNRecord znRecord = propertyStore - .get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), null, AccessOption.PERSISTENT); + ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), null, + AccessOption.PERSISTENT); return znRecord != null ? new SegmentZKMetadata(znRecord) : null; } @Nullable public static TableConfig getTableConfig(ZkHelixPropertyStore propertyStore, String tableNameWithType) { - ZNRecord znRecord = propertyStore - .get(constructPropertyStorePathForResourceConfig(tableNameWithType), null, AccessOption.PERSISTENT); - if (znRecord == null) { - return null; - } - try { - TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); - return (TableConfig) ConfigUtils.applyConfigWithEnvVariables(tableConfig); - } catch (Exception e) { - LOGGER.error("Caught exception while getting table configuration for table: {}", tableNameWithType, e); - return null; - } + return toTableConfig(propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null, + AccessOption.PERSISTENT)); } @Nullable @@ -225,6 +215,43 @@ public static TableConfig getRealtimeTableConfig(ZkHelixPropertyStore return getTableConfig(propertyStore, TableNameBuilder.REALTIME.tableNameWithType(tableName)); } + public static List getAllTableConfigs(ZkHelixPropertyStore propertyStore) { + List znRecords = + propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, null, AccessOption.PERSISTENT, + CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); + if (znRecords != null) { + int numZNRecords = znRecords.size(); + List tableConfigs = new ArrayList<>(numZNRecords); + for (ZNRecord znRecord : znRecords) { + TableConfig tableConfig = toTableConfig(znRecord); + if (tableConfig != null) { + tableConfigs.add(tableConfig); + } + } + if (numZNRecords > tableConfigs.size()) { + LOGGER.warn("Failed to read {}/{} table configs", numZNRecords - tableConfigs.size(), numZNRecords); + } + return tableConfigs; + } else { + LOGGER.warn("Path: {} does not exist", PROPERTYSTORE_TABLE_CONFIGS_PREFIX); + return Collections.emptyList(); + } + } + + @Nullable + private static TableConfig toTableConfig(@Nullable ZNRecord znRecord) { + if (znRecord == null) { + return null; + } + try { + TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); + return ConfigUtils.applyConfigWithEnvVariables(tableConfig); + } catch (Exception e) { + LOGGER.error("Caught exception while creating table config from ZNRecord: {}", znRecord.getId(), e); + return null; + } + } + public static void setSchema(ZkHelixPropertyStore propertyStore, Schema schema) { propertyStore.set(constructPropertyStorePathForSchema(schema.getSchemaName()), SchemaUtils.toZNRecord(schema), AccessOption.PERSISTENT); @@ -296,8 +323,8 @@ public static Schema getTableSchema(ZkHelixPropertyStore propertyStore public static List getSegmentsZKMetadata(ZkHelixPropertyStore propertyStore, String tableNameWithType) { String parentPath = constructPropertyStorePathForResource(tableNameWithType); - List znRecords = propertyStore - .getChildren(parentPath, null, AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT, + List znRecords = + propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); if (znRecords != null) { int numZNRecords = znRecords.size(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 99525888d9d4..8534edec0521 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -21,6 +21,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -45,9 +46,12 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.apache.pinot.spi.utils.retry.RetryPolicy; import org.slf4j.Logger; @@ -185,6 +189,52 @@ public static void updateIdealState(final HelixManager helixManager, final Strin updateIdealState(helixManager, resourceName, updater, policy, false); } + /** + * Updates broker resource ideal state for the given broker with the given broker tags. Optional {@code tablesAdded} + * and {@code tablesRemoved} can be provided to track the tables added/removed during the update. + */ + public static void updateBrokerResource(HelixManager helixManager, String brokerId, List brokerTags, + @Nullable List tablesAdded, @Nullable List tablesRemoved) { + Preconditions.checkArgument(brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE), + "Invalid broker id: %s", brokerId); + for (String brokerTag : brokerTags) { + Preconditions.checkArgument(TagNameUtils.isBrokerTag(brokerTag), "Invalid broker tag: %s", brokerTag); + } + + Set tablesForBrokerTag; + int numBrokerTags = brokerTags.size(); + if (numBrokerTags == 0) { + tablesForBrokerTag = Collections.emptySet(); + } else if (numBrokerTags == 1) { + tablesForBrokerTag = getTablesForBrokerTag(helixManager, brokerTags.get(0)); + } else { + tablesForBrokerTag = getTablesForBrokerTags(helixManager, brokerTags); + } + + updateIdealState(helixManager, BROKER_RESOURCE, idealState -> { + if (tablesAdded != null) { + tablesAdded.clear(); + } + if (tablesRemoved != null) { + tablesRemoved.clear(); + } + for (Map.Entry> entry : idealState.getRecord().getMapFields().entrySet()) { + String tableNameWithType = entry.getKey(); + Map brokerAssignment = entry.getValue(); + if (tablesForBrokerTag.contains(tableNameWithType)) { + if (brokerAssignment.put(brokerId, BrokerResourceStateModel.ONLINE) == null && tablesAdded != null) { + tablesAdded.add(tableNameWithType); + } + } else { + if (brokerAssignment.remove(brokerId) != null && tablesRemoved != null) { + tablesRemoved.add(tableNameWithType); + } + } + } + return idealState; + }); + } + /** * Returns all instances for the given cluster. * @@ -320,8 +370,8 @@ public IdealState apply(IdealState idealState) { // Removing partitions from ideal state LOGGER.info("Trying to remove resource {} from idealstate", resourceTag); - HelixHelper - .updateIdealState(helixManager, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, updater, DEFAULT_RETRY_POLICY); + HelixHelper.updateIdealState(helixManager, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, updater, + DEFAULT_RETRY_POLICY); } /** @@ -495,12 +545,12 @@ public static Set getServerInstancesForTenantWithType(List serverInstancesWithType = new HashSet<>(); if (tableType == null || tableType == TableType.OFFLINE) { - serverInstancesWithType - .addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getOfflineTagForTenant(tenant))); + serverInstancesWithType.addAll( + HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getOfflineTagForTenant(tenant))); } if (tableType == null || tableType == TableType.REALTIME) { - serverInstancesWithType - .addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getRealtimeTagForTenant(tenant))); + serverInstancesWithType.addAll( + HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getRealtimeTagForTenant(tenant))); } return serverInstancesWithType; } @@ -519,6 +569,28 @@ public static Set getBrokerInstanceConfigsForTenant(List(getInstancesConfigsWithTag(instanceConfigs, TagNameUtils.getBrokerTagForTenant(tenant))); } + public static Set getTablesForBrokerTag(HelixManager helixManager, String brokerTag) { + Set tablesForBrokerTag = new HashSet<>(); + List tableConfigs = ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore()); + for (TableConfig tableConfig : tableConfigs) { + if (TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()).equals(brokerTag)) { + tablesForBrokerTag.add(tableConfig.getTableName()); + } + } + return tablesForBrokerTag; + } + + public static Set getTablesForBrokerTags(HelixManager helixManager, List brokerTags) { + Set tablesForBrokerTags = new HashSet<>(); + List tableConfigs = ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore()); + for (TableConfig tableConfig : tableConfigs) { + if (brokerTags.contains(TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()))) { + tablesForBrokerTags.add(tableConfig.getTableName()); + } + } + return tablesForBrokerTags; + } + /** * Returns the instance config for a specific instance. */ @@ -535,9 +607,9 @@ public static void updateInstanceConfig(HelixManager helixManager, InstanceConfi // NOTE: Use HelixDataAccessor.setProperty() instead of HelixAdmin.setInstanceConfig() because the latter explicitly // forbids instance host/port modification HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor(); - Preconditions.checkState(helixDataAccessor - .setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceConfig.getId()), instanceConfig), - "Failed to update instance config for instance: " + instanceConfig.getId()); + Preconditions.checkState( + helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceConfig.getId()), + instanceConfig), "Failed to update instance config for instance: " + instanceConfig.getId()); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java index dd36be744303..0c93eba5da0e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java @@ -28,8 +28,10 @@ import java.util.List; import java.util.Map; import javax.inject.Inject; +import javax.ws.rs.ClientErrorException; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.PUT; @@ -159,12 +161,21 @@ private Map getSystemResourceInfo(InstanceConfig instanceConfig) @ApiResponse(code = 409, message = "Instance already exists"), @ApiResponse(code = 500, message = "Internal error") }) - public SuccessResponse addInstance(Instance instance) { - LOGGER.info("Instance creation request received for instance: {}", InstanceUtils.getHelixInstanceId(instance)); - if (!_pinotHelixResourceManager.addInstance(instance).isSuccessful()) { - throw new ControllerApplicationException(LOGGER, "Instance already exists", Response.Status.CONFLICT); + public SuccessResponse addInstance( + @ApiParam("Whether to update broker resource for broker instance") @QueryParam("updateBrokerResource") + @DefaultValue("false") boolean updateBrokerResource, Instance instance) { + String instanceId = InstanceUtils.getHelixInstanceId(instance); + LOGGER.info("Instance creation request received for instance: {}, updateBrokerResource: {}", instanceId, + updateBrokerResource); + try { + PinotResourceManagerResponse response = _pinotHelixResourceManager.addInstance(instance, updateBrokerResource); + return new SuccessResponse(response.getMessage()); + } catch (ClientErrorException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), e.getResponse().getStatus()); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Failed to create instance: " + instanceId, + Response.Status.INTERNAL_SERVER_ERROR, e); } - return new SuccessResponse("Instance successfully created"); } @POST @@ -256,14 +267,21 @@ public SuccessResponse dropInstance( }) public SuccessResponse updateInstance( @ApiParam(value = "Instance name", required = true, example = "Server_a.b.com_20000 | Broker_my.broker.com_30000") - @PathParam("instanceName") String instanceName, Instance instance) { - LOGGER.info("Instance update request received for instance: {}", instanceName); - PinotResourceManagerResponse response = _pinotHelixResourceManager.updateInstance(instanceName, instance); - if (!response.isSuccessful()) { - throw new ControllerApplicationException(LOGGER, "Failure to update instance. Reason: " + response.getMessage(), - Response.Status.INTERNAL_SERVER_ERROR); + @PathParam("instanceName") String instanceName, + @ApiParam("Whether to update broker resource for broker instance") @QueryParam("updateBrokerResource") + @DefaultValue("false") boolean updateBrokerResource, Instance instance) { + LOGGER.info("Instance update request received for instance: {}, updateBrokerResource: {}", instanceName, + updateBrokerResource); + try { + PinotResourceManagerResponse response = + _pinotHelixResourceManager.updateInstance(instanceName, instance, updateBrokerResource); + return new SuccessResponse(response.getMessage()); + } catch (ClientErrorException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), e.getResponse().getStatus()); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Failed to update instance: " + instanceName, + Response.Status.INTERNAL_SERVER_ERROR, e); } - return new SuccessResponse("Instance successfully updated"); } @PUT @@ -275,22 +293,60 @@ public SuccessResponse updateInstance( notes = "Update the tags of the specified instance") @ApiResponses(value = { @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 404, message = "Instance not found"), @ApiResponse(code = 500, message = "Internal error") }) public SuccessResponse updateInstanceTags( @ApiParam(value = "Instance name", required = true, example = "Server_a.b.com_20000 | Broker_my.broker.com_30000") @PathParam("instanceName") String instanceName, - @ApiParam(value = "Comma separated tags list", required = true) @QueryParam("tags") String tags) { - LOGGER.info("Instance update request received for instance: {} and tags: {}", instanceName, tags); + @ApiParam(value = "Comma separated tags list", required = true) @QueryParam("tags") String tags, + @ApiParam("Whether to update broker resource for broker instance") @QueryParam("updateBrokerResource") + @DefaultValue("false") boolean updateBrokerResource) { + LOGGER.info("Instance update request received for instance: {}, tags: {}, updateBrokerResource: {}", instanceName, + tags, updateBrokerResource); if (tags == null) { throw new ControllerApplicationException(LOGGER, "Must provide tags to update", Response.Status.BAD_REQUEST); } - PinotResourceManagerResponse response = _pinotHelixResourceManager.updateInstanceTags(instanceName, tags); - if (!response.isSuccessful()) { + try { + PinotResourceManagerResponse response = + _pinotHelixResourceManager.updateInstanceTags(instanceName, tags, updateBrokerResource); + return new SuccessResponse(response.getMessage()); + } catch (ClientErrorException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), e.getResponse().getStatus()); + } catch (Exception e) { throw new ControllerApplicationException(LOGGER, - "Failure to update instance: " + instanceName + " with tags: " + tags + ". Reason: " + response.getMessage(), - Response.Status.INTERNAL_SERVER_ERROR); + String.format("Failed to update instance: %s with tags: %s", instanceName, tags), + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + @POST + @Path("/instances/{instanceName}/updateBrokerResource") + @Authenticate(AccessType.UPDATE) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Update the tables served by the specified broker instance in the broker resource", notes = + "Broker resource should be updated when a new broker instance is added, or the tags for an existing broker are " + + "changed. Updating broker resource requires reading all the table configs, which can be costly for large " + + "cluster. Consider updating broker resource for each table individually.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 404, message = "Instance not found"), + @ApiResponse(code = 500, message = "Internal error") + }) + public SuccessResponse updateBrokerResource( + @ApiParam(value = "Instance name", required = true, example = "Broker_my.broker.com_30000") + @PathParam("instanceName") String instanceName) { + LOGGER.info("Update broker resource request received for instance: {}", instanceName); + try { + PinotResourceManagerResponse response = _pinotHelixResourceManager.updateBrokerResource(instanceName); + return new SuccessResponse(response.getMessage()); + } catch (ClientErrorException e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), e.getResponse().getStatus()); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Failed to update broker resource for instance: " + instanceName, + Response.Status.INTERNAL_SERVER_ERROR, e); } - return new SuccessResponse("Successfully updated tags for instance: " + instanceName + " tags: " + tags); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java index 30041fd4c65c..7670ab4061a3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java @@ -61,7 +61,12 @@ public String forInstance(String instanceName) { } public String forInstanceUpdateTags(String instanceName, List tags) { - return StringUtil.join("/", _baseUrl, "instances", instanceName, "updateTags?tags=" + StringUtils.join(tags, ",")); + return forInstanceUpdateTags(instanceName, tags, false); + } + + public String forInstanceUpdateTags(String instanceName, List tags, boolean updateBrokerResource) { + return StringUtil.join("/", _baseUrl, "instances", instanceName, + "updateTags?tags=" + StringUtils.join(tags, ",") + "&updateBrokerResource=" + updateBrokerResource); } public String forInstanceList() { @@ -353,9 +358,9 @@ public String forInstanceReplace(String tableName, @Nullable InstancePartitionsT public String forIngestFromFile(String tableNameWithType, String batchConfigMapStr) throws UnsupportedEncodingException { - return String - .format("%s?tableNameWithType=%s&batchConfigMapStr=%s", StringUtil.join("/", _baseUrl, "ingestFromFile"), - tableNameWithType, URLEncoder.encode(batchConfigMapStr, StandardCharsets.UTF_8.toString())); + return String.format("%s?tableNameWithType=%s&batchConfigMapStr=%s", + StringUtil.join("/", _baseUrl, "ingestFromFile"), tableNameWithType, + URLEncoder.encode(batchConfigMapStr, StandardCharsets.UTF_8.toString())); } public String forIngestFromFile(String tableNameWithType, Map batchConfigMap) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index b28dbe69bf7e..107edcc6b950 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -46,6 +46,10 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.annotation.Nullable; +import javax.ws.rs.BadRequestException; +import javax.ws.rs.ClientErrorException; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.core.Response; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; @@ -400,50 +404,152 @@ public List getInstancesWithTag(String tag) { * Add an instance into the Helix cluster. * * @param instance Instance to be added + * @param updateBrokerResource Whether to update broker resource for broker instance * @return Request response */ - public synchronized PinotResourceManagerResponse addInstance(Instance instance) { - List instances = getAllInstances(); - String instanceIdToAdd = InstanceUtils.getHelixInstanceId(instance); - if (instances.contains(instanceIdToAdd)) { - return PinotResourceManagerResponse.failure("Instance " + instanceIdToAdd + " already exists"); + public synchronized PinotResourceManagerResponse addInstance(Instance instance, boolean updateBrokerResource) { + String instanceId = InstanceUtils.getHelixInstanceId(instance); + InstanceConfig instanceConfig = getHelixInstanceConfig(instanceId); + if (instanceConfig != null) { + throw new ClientErrorException(String.format("Instance: %s already exists", instanceId), + Response.Status.CONFLICT); + } + + instanceConfig = InstanceUtils.toHelixInstanceConfig(instance); + _helixAdmin.addInstance(_helixClusterName, instanceConfig); + + // Update broker resource if necessary + boolean shouldUpdateBrokerResource = false; + List newBrokerTags = null; + if (instanceId.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE) && updateBrokerResource) { + List newTags = instance.getTags(); + if (CollectionUtils.isNotEmpty(newTags)) { + newBrokerTags = newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()); + shouldUpdateBrokerResource = !newBrokerTags.isEmpty(); + } + } + if (shouldUpdateBrokerResource) { + long startTimeMs = System.currentTimeMillis(); + List tablesAdded = new ArrayList<>(); + HelixHelper.updateBrokerResource(_helixZkManager, instanceId, newBrokerTags, tablesAdded, null); + LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}", instanceId, + newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded); + return PinotResourceManagerResponse.success( + String.format("Added instance: %s, and updated broker resource - tables added: %s", instanceId, tablesAdded)); } else { - _helixAdmin.addInstance(_helixClusterName, InstanceUtils.toHelixInstanceConfig(instance)); - return PinotResourceManagerResponse.SUCCESS; + return PinotResourceManagerResponse.success("Added instance: " + instanceId); } } /** * Update a given instance for the specified Instance ID */ - public synchronized PinotResourceManagerResponse updateInstance(String instanceIdToUpdate, Instance newInstance) { - InstanceConfig instanceConfig = getHelixInstanceConfig(instanceIdToUpdate); + public synchronized PinotResourceManagerResponse updateInstance(String instanceId, Instance newInstance, + boolean updateBrokerResource) { + InstanceConfig instanceConfig = getHelixInstanceConfig(instanceId); if (instanceConfig == null) { - return PinotResourceManagerResponse.failure("Instance " + instanceIdToUpdate + " does not exists"); + throw new NotFoundException("Failed to find instance config for instance: " + instanceId); + } + + List newTags = newInstance.getTags(); + List oldTags = instanceConfig.getTags(); + InstanceUtils.updateHelixInstanceConfig(instanceConfig, newInstance); + if (!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceId), instanceConfig)) { + throw new RuntimeException("Failed to set instance config for instance: " + instanceId); + } + + // Update broker resource if necessary + boolean shouldUpdateBrokerResource = false; + List newBrokerTags = null; + if (instanceId.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE) && updateBrokerResource) { + newBrokerTags = + newTags != null ? newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()) + : Collections.emptyList(); + List oldBrokerTags = + oldTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()); + shouldUpdateBrokerResource = !newBrokerTags.equals(oldBrokerTags); + } + if (shouldUpdateBrokerResource) { + long startTimeMs = System.currentTimeMillis(); + List tablesAdded = new ArrayList<>(); + List tablesRemoved = new ArrayList<>(); + HelixHelper.updateBrokerResource(_helixZkManager, instanceId, newBrokerTags, tablesAdded, tablesRemoved); + LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", + instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved); + return PinotResourceManagerResponse.success( + String.format("Updated instance: %s, and updated broker resource - tables added: %s, tables removed: %s", + instanceId, tablesAdded, tablesRemoved)); } else { - InstanceUtils.updateHelixInstanceConfig(instanceConfig, newInstance); - if (!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceIdToUpdate), instanceConfig)) { - return PinotResourceManagerResponse.failure("Unable to update instance: " + instanceIdToUpdate); - } - return PinotResourceManagerResponse.SUCCESS; + return PinotResourceManagerResponse.success("Updated instance: " + instanceId); } } /** * Updates the tags of the specified instance ID */ - public synchronized PinotResourceManagerResponse updateInstanceTags(String instanceIdToUpdate, String tags) { - InstanceConfig instanceConfig = getHelixInstanceConfig(instanceIdToUpdate); + public synchronized PinotResourceManagerResponse updateInstanceTags(String instanceId, String tagsString, + boolean updateBrokerResource) { + InstanceConfig instanceConfig = getHelixInstanceConfig(instanceId); if (instanceConfig == null) { - return PinotResourceManagerResponse.failure("Instance " + instanceIdToUpdate + " does not exists"); + throw new NotFoundException("Failed to find instance config for instance: " + instanceId); + } + + List newTags = Arrays.asList(StringUtils.split(tagsString, ',')); + List oldTags = instanceConfig.getTags(); + instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), newTags); + if (!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceId), instanceConfig)) { + throw new RuntimeException("Failed to set instance config for instance: " + instanceId); + } + + // Update broker resource if necessary + boolean shouldUpdateBrokerResource = false; + List newBrokerTags = null; + if (instanceId.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE) && updateBrokerResource) { + newBrokerTags = newTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()); + List oldBrokerTags = + oldTags.stream().filter(TagNameUtils::isBrokerTag).sorted().collect(Collectors.toList()); + shouldUpdateBrokerResource = !newBrokerTags.equals(oldBrokerTags); + } + if (shouldUpdateBrokerResource) { + long startTimeMs = System.currentTimeMillis(); + List tablesAdded = new ArrayList<>(); + List tablesRemoved = new ArrayList<>(); + HelixHelper.updateBrokerResource(_helixZkManager, instanceId, newBrokerTags, tablesAdded, tablesRemoved); + LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", + instanceId, newBrokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved); + return PinotResourceManagerResponse.success(String.format( + "Updated tags: %s for instance: %s, and updated broker resource - tables added: %s, tables removed: %s", + newTags, instanceId, tablesAdded, tablesRemoved)); + } else { + return PinotResourceManagerResponse.success( + String.format("Updated tags: %s for instance: %s", newTags, instanceId)); } - List tagList = Arrays.asList(StringUtils.split(tags, ',')); - instanceConfig.getRecord().setListField(InstanceConfig.InstanceConfigProperty.TAG_LIST.name(), tagList); - if (!_helixDataAccessor.setProperty(_keyBuilder.instanceConfig(instanceIdToUpdate), instanceConfig)) { - return PinotResourceManagerResponse - .failure("Unable to update instance: " + instanceIdToUpdate + " to tags: " + tags); + } + + /** + * Updates the tables served by the specified broker instance in the broker resource. + * NOTE: This method will read all the table configs, so can be costly. + */ + public PinotResourceManagerResponse updateBrokerResource(String instanceId) { + if (!instanceId.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) { + throw new BadRequestException("Cannot update broker resource for non-broker instance: " + instanceId); } - return PinotResourceManagerResponse.SUCCESS; + InstanceConfig instanceConfig = getHelixInstanceConfig(instanceId); + if (instanceConfig == null) { + throw new NotFoundException("Failed to find instance config for instance: " + instanceId); + } + + long startTimeMs = System.currentTimeMillis(); + List brokerTags = + instanceConfig.getTags().stream().filter(TagNameUtils::isBrokerTag).collect(Collectors.toList()); + List tablesAdded = new ArrayList<>(); + List tablesRemoved = new ArrayList<>(); + HelixHelper.updateBrokerResource(_helixZkManager, instanceId, brokerTags, tablesAdded, tablesRemoved); + LOGGER.info("Updated broker resource for broker: {} with tags: {} in {}ms, tables added: {}, tables removed: {}", + instanceId, brokerTags, System.currentTimeMillis() - startTimeMs, tablesAdded, tablesRemoved); + return PinotResourceManagerResponse.success( + String.format("Updated broker resource for broker: %s - tables added: %s, tables removed: %s", instanceId, + tablesAdded, tablesRemoved)); } /** diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java index 3ab0d48525e4..ac5716ff9614 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java @@ -18,11 +18,21 @@ */ package org.apache.pinot.controller.helix.core; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.common.utils.config.TagNameUtils; +import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.api.exception.InvalidTableConfigException; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.spi.config.instance.Instance; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TagOverrideConfig; @@ -32,29 +42,22 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + public class PinotHelixResourceManagerStatelessTest extends ControllerTest { private static final int BASE_SERVER_ADMIN_PORT = 10000; private static final int NUM_INSTANCES = 5; private static final String BROKER_TENANT_NAME = "brokerTenant"; private static final String SERVER_TENANT_NAME = "serverTenant"; - private static final String TABLE_NAME = "testTable"; - private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME); - private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); - - private static final String SEGMENTS_REPLACE_TEST_TABLE_NAME = "segmentsReplaceTestTable"; - private static final String OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME = - TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_TABLE_NAME); - - private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000; - private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000; - private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10; - private static final long TIMEOUT_IN_MS = 10_000L; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); @BeforeClass public void setUp() @@ -88,7 +91,7 @@ public void testValidateDimTableTenantConfig() { dimTableConfig.setTenantConfig(new TenantConfig(null, SERVER_TENANT_NAME, null)); try { _helixResourceManager.validateTableTenantConfig(dimTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -104,13 +107,12 @@ public void testValidateTenantConfig() { Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 3, 0, 0); _helixResourceManager.createBrokerTenant(brokerTenant); - String rawTableName = "testTable"; - TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).build(); + TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); // Empty broker tag (DefaultTenant_BROKER) try { _helixResourceManager.validateTableTenantConfig(offlineTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -119,7 +121,7 @@ public void testValidateTenantConfig() { offlineTableConfig.setTenantConfig(new TenantConfig(BROKER_TENANT_NAME, null, null)); try { _helixResourceManager.validateTableTenantConfig(offlineTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -129,13 +131,13 @@ public void testValidateTenantConfig() { _helixResourceManager.validateTableTenantConfig(offlineTableConfig); TableConfig realtimeTableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setBrokerTenant(BROKER_TENANT_NAME) + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) .setServerTenant(SERVER_TENANT_NAME).build(); // Empty server tag (serverTenant_REALTIME) try { _helixResourceManager.validateTableTenantConfig(realtimeTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -146,7 +148,7 @@ public void testValidateTenantConfig() { realtimeTableConfig.setTenantConfig(new TenantConfig(BROKER_TENANT_NAME, SERVER_TENANT_NAME, tagOverrideConfig)); try { _helixResourceManager.validateTableTenantConfig(realtimeTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -156,7 +158,7 @@ public void testValidateTenantConfig() { realtimeTableConfig.setTenantConfig(new TenantConfig(BROKER_TENANT_NAME, SERVER_TENANT_NAME, tagOverrideConfig)); try { _helixResourceManager.validateTableTenantConfig(realtimeTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -167,7 +169,7 @@ public void testValidateTenantConfig() { realtimeTableConfig.setTenantConfig(new TenantConfig(BROKER_TENANT_NAME, SERVER_TENANT_NAME, tagOverrideConfig)); try { _helixResourceManager.validateTableTenantConfig(realtimeTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -178,7 +180,7 @@ public void testValidateTenantConfig() { realtimeTableConfig.setTenantConfig(new TenantConfig(BROKER_TENANT_NAME, SERVER_TENANT_NAME, tagOverrideConfig)); try { _helixResourceManager.validateTableTenantConfig(realtimeTableConfig); - Assert.fail("Expected InvalidTableConfigException"); + fail("Expected InvalidTableConfigException"); } catch (InvalidTableConfigException e) { // expected } @@ -190,17 +192,85 @@ public void testValidateTenantConfig() { _helixResourceManager.validateTableTenantConfig(realtimeTableConfig); untagBrokers(); - Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), NUM_INSTANCES); + assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), NUM_INSTANCES); } private void untagBrokers() { for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) { - _helixAdmin.removeInstanceTag(getHelixClusterName(), brokerInstance, - TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME)); - _helixAdmin.addInstanceTag(getHelixClusterName(), brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); + _helixResourceManager.updateInstanceTags(brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, false); } } + @Test + public void testUpdateBrokerResource() + throws Exception { + // Create broker tenant on 3 brokers + Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 3, 0, 0); + _helixResourceManager.createBrokerTenant(brokerTenant); + + String brokerTag = TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME); + List instanceConfigs = HelixHelper.getInstanceConfigs(_helixManager); + List taggedBrokers = HelixHelper.getInstancesWithTag(instanceConfigs, brokerTag); + assertEquals(taggedBrokers.size(), 3); + List untaggedBrokers = + HelixHelper.getInstancesWithTag(instanceConfigs, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); + assertEquals(untaggedBrokers.size(), 2); + + // Add a table + TableConfig offlineTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) + .setServerTenant(SERVER_TENANT_NAME).build(); + _helixResourceManager.addTable(offlineTableConfig); + checkBrokerResource(taggedBrokers); + + // Untag a tagged broker with instance update + String brokerToUntag = taggedBrokers.remove(ThreadLocalRandom.current().nextInt(3)); + Instance instance = + new Instance("localhost", brokerToUntag.charAt(brokerToUntag.length() - 1) - '0', InstanceType.BROKER, + Collections.singletonList(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE), null, 0, 0, false); + assertTrue(_helixResourceManager.updateInstance(brokerToUntag, instance, true).isSuccessful()); + untaggedBrokers.add(brokerToUntag); + checkBrokerResource(taggedBrokers); + + // Tag an untagged broker with tags update + String brokerToTag = untaggedBrokers.remove(ThreadLocalRandom.current().nextInt(3)); + assertTrue(_helixResourceManager.updateInstanceTags(brokerToTag, brokerTag, true).isSuccessful()); + taggedBrokers.add(brokerToTag); + checkBrokerResource(taggedBrokers); + + // Add a new broker instance + Instance newBrokerInstance = + new Instance("localhost", 5, InstanceType.BROKER, Collections.singletonList(brokerTag), null, 0, 0, false); + assertTrue(_helixResourceManager.addInstance(newBrokerInstance, true).isSuccessful()); + String newBrokerId = InstanceUtils.getHelixInstanceId(newBrokerInstance); + taggedBrokers.add(newBrokerId); + checkBrokerResource(taggedBrokers); + + // Untag the new broker and update the broker resource + assertTrue( + _helixResourceManager.updateInstanceTags(newBrokerId, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, false) + .isSuccessful()); + assertTrue(_helixResourceManager.updateBrokerResource(newBrokerId).isSuccessful()); + taggedBrokers.remove(taggedBrokers.size() - 1); + checkBrokerResource(taggedBrokers); + + // Drop the new broker and delete the table + assertTrue(_helixResourceManager.dropInstance(newBrokerId).isSuccessful()); + _helixResourceManager.deleteOfflineTable(OFFLINE_TABLE_NAME); + + IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName()); + assertTrue(brokerResource.getPartitionSet().isEmpty()); + + untagBrokers(); + } + + private void checkBrokerResource(List expectedBrokers) { + IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName()); + assertEquals(brokerResource.getPartitionSet().size(), 1); + Map instanceStateMap = brokerResource.getInstanceStateMap(OFFLINE_TABLE_NAME); + assertEquals(instanceStateMap.keySet(), new HashSet<>(expectedBrokers)); + } + @AfterClass public void tearDown() { stopFakeInstances(); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index addec19a1819..dfacc0cd7bfc 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -205,7 +205,7 @@ private void addAndRemoveNewInstanceConfig(ZkClient zkClient) { // Add new instance. Instance instance = new Instance("localhost", biggerRandomNumber, InstanceType.SERVER, Collections.singletonList(UNTAGGED_SERVER_INSTANCE), null, 0, 0, false); - ControllerTestUtils.getHelixResourceManager().addInstance(instance); + ControllerTestUtils.getHelixResourceManager().addInstance(instance, false); List allInstances = ControllerTestUtils.getHelixResourceManager().getAllInstances(); Assert.assertTrue(allInstances.contains(instanceName)); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index 949acfbb27a6..947db73d2112 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -18,8 +18,21 @@ */ package org.apache.pinot.integration.tests; +import java.util.Collections; +import java.util.Map; +import org.apache.helix.model.IdealState; +import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; +import org.apache.pinot.spi.utils.NetUtils; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; + /** * Integration test that extends OfflineClusterIntegrationTest but start multiple brokers and servers. @@ -43,6 +56,49 @@ protected void startServers() { startServers(NUM_SERVERS); } + @Test + public void testUpdateBrokerResource() + throws Exception { + // Add a new broker to the cluster + Map properties = getDefaultBrokerConfiguration().toMap(); + properties.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, getHelixClusterName()); + properties.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl()); + int port = NetUtils.findOpenPort(DEFAULT_BROKER_PORT); + properties.put(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port); + properties.put(CommonConstants.Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0); + + HelixBrokerStarter brokerStarter = new HelixBrokerStarter(); + brokerStarter.init(new PinotConfiguration(properties)); + brokerStarter.start(); + + // Check if broker is added to all the tables in broker resource + String brokerId = brokerStarter.getInstanceId(); + IdealState brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName()); + for (Map brokerAssignment : brokerResource.getRecord().getMapFields().values()) { + assertEquals(brokerAssignment.get(brokerId), BrokerResourceStateModel.ONLINE); + } + + // Stop and drop the broker + brokerStarter.stop(); + try { + sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId)); + fail("Dropping instance should fail because it is still in the broker resource"); + } catch (Exception e) { + // Expected + } + // Untag the broker and update the broker resource so that it is removed from the broker resource + sendPutRequest(_controllerRequestURLBuilder.forInstanceUpdateTags(brokerId, Collections.emptyList(), true)); + // Check if broker is removed from all the tables in broker resource + brokerResource = HelixHelper.getBrokerIdealStates(_helixAdmin, getHelixClusterName()); + for (Map brokerAssignment : brokerResource.getRecord().getMapFields().values()) { + assertFalse(brokerAssignment.containsKey(brokerId)); + } + // Dropping instance should success + sendDeleteRequest(_controllerRequestURLBuilder.forInstance(brokerId)); + // Check if broker is dropped from the cluster + assertFalse(_helixAdmin.getInstancesInCluster(getHelixClusterName()).contains(brokerId)); + } + @Test(enabled = false) @Override public void testHardcodedServerPartitionedSqlQueries() {