Skip to content

Commit

Permalink
Make CC port configurable in KafkaRebalanceAssemblyOperator (#10096)
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri authored May 15, 2024
1 parent a900d30 commit f83de1a
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,38 @@ public class KafkaRebalanceAssemblyOperator
private final SecretOperator secretOperations;
private final LabelSelector kafkaSelector;
private boolean usingJbodStorage;

private final ConfigMapOperator configMapOperator;
private int cruiseControlPort;

/**
* @param vertx The Vertx instance
* @param supplier Supplies the operators for different resources
* @param config Cluster Operator configuration
*/
public KafkaRebalanceAssemblyOperator(Vertx vertx,
ResourceOperatorSupplier supplier, ClusterOperatorConfig config) {
ResourceOperatorSupplier supplier,
ClusterOperatorConfig config) {
this(vertx, supplier, config, CruiseControl.REST_API_PORT);
}

/**
* @param vertx The Vertx instance
* @param supplier Supplies the operators for different resources
* @param config Cluster Operator configuration
* @param cruiseControlPort Cruise Control server port
*/
/* test */ KafkaRebalanceAssemblyOperator(Vertx vertx,
ResourceOperatorSupplier supplier,
ClusterOperatorConfig config,
int cruiseControlPort) {
super(vertx, KafkaRebalance.RESOURCE_KIND, supplier.kafkaRebalanceOperator, supplier.metricsProvider, null);
this.kafkaSelector = (config.getCustomResourceSelector() == null || config.getCustomResourceSelector().toMap().isEmpty()) ? null : new LabelSelector(null, config.getCustomResourceSelector().toMap());
this.kafkaSelector = (config.getCustomResourceSelector() == null || config.getCustomResourceSelector().toMap().isEmpty())
? null : new LabelSelector(null, config.getCustomResourceSelector().toMap());
this.kafkaRebalanceOperator = supplier.kafkaRebalanceOperator;
this.kafkaOperator = supplier.kafkaOperator;
this.configMapOperator = supplier.configMapOperations;
this.secretOperations = supplier.secretOperations;
this.cruiseControlPort = cruiseControlPort;
}

/**
Expand Down Expand Up @@ -918,15 +935,15 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onRebalancing(Reco
String sessionId = kafkaRebalance.getStatus().getSessionId();
if (rebalanceAnnotation(kafkaRebalance) == KafkaRebalanceAnnotation.stop) {
LOGGER.infoCr(reconciliation, "Stopping current Cruise Control rebalance user task");
apiClient.stopExecution(host, CruiseControl.REST_API_PORT)
apiClient.stopExecution(host, cruiseControlPort)
.onSuccess(r -> p.complete(buildRebalanceStatus(null, KafkaRebalanceState.Stopped, StatusUtils.validate(reconciliation, kafkaRebalance))))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control stopping execution failed", e.getCause());
p.fail(e.getCause());
});
} else if (rebalanceAnnotation(kafkaRebalance) == KafkaRebalanceAnnotation.refresh) {
LOGGER.infoCr(reconciliation, "Stopping current Cruise Control rebalance user task since refresh annotation is applied on the KafkaRebalance resource and requesting a new proposal");
apiClient.stopExecution(host, CruiseControl.REST_API_PORT)
apiClient.stopExecution(host, cruiseControlPort)
.onSuccess(r -> {
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
})
Expand All @@ -938,7 +955,7 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onRebalancing(Reco
LOGGER.infoCr(reconciliation, "Getting Cruise Control rebalance user task status");
Set<Condition> conditions = StatusUtils.validate(reconciliation, kafkaRebalance);
validateAnnotation(reconciliation, conditions, KafkaRebalanceState.Rebalancing, rebalanceAnnotation(kafkaRebalance), kafkaRebalance);
apiClient.getUserTaskStatus(host, CruiseControl.REST_API_PORT, sessionId)
apiClient.getUserTaskStatus(host, cruiseControlPort, sessionId)
.onSuccess(cruiseControlResponse -> {
JsonObject taskStatusJson = cruiseControlResponse.getJson();
CruiseControlUserTaskStatus taskStatus = CruiseControlUserTaskStatus.lookup(taskStatusJson.getString("Status"));
Expand Down Expand Up @@ -1195,13 +1212,13 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> requestRebalance(R
Future<CruiseControlRebalanceResponse> future;
switch (mode) {
case ADD_BROKERS:
future = apiClient.addBroker(host, CruiseControl.REST_API_PORT, ((AddBrokerOptions.AddBrokerOptionsBuilder) rebalanceOptionsBuilder).build(), userTaskID);
future = apiClient.addBroker(host, cruiseControlPort, ((AddBrokerOptions.AddBrokerOptionsBuilder) rebalanceOptionsBuilder).build(), userTaskID);
break;
case REMOVE_BROKERS:
future = apiClient.removeBroker(host, CruiseControl.REST_API_PORT, ((RemoveBrokerOptions.RemoveBrokerOptionsBuilder) rebalanceOptionsBuilder).build(), userTaskID);
future = apiClient.removeBroker(host, cruiseControlPort, ((RemoveBrokerOptions.RemoveBrokerOptionsBuilder) rebalanceOptionsBuilder).build(), userTaskID);
break;
default:
future = apiClient.rebalance(host, CruiseControl.REST_API_PORT, ((RebalanceOptions.RebalanceOptionsBuilder) rebalanceOptionsBuilder).build(), userTaskID);
future = apiClient.rebalance(host, cruiseControlPort, ((RebalanceOptions.RebalanceOptionsBuilder) rebalanceOptionsBuilder).build(), userTaskID);
break;
}
return future.map(response -> handleRebalanceResponse(reconciliation, kafkaRebalance, dryrun, response));
Expand Down
Loading

0 comments on commit f83de1a

Please sign in to comment.