diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java index 011a9723e8c..3ac85a09d75 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java @@ -768,7 +768,12 @@ private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseC // 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted // 2. Task's retention time expired, or the cache has become full LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId); - requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete); + requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder) + .onSuccess(p::complete) + .onFailure(e -> { + LOGGER.errorCr(reconciliation, "Generating a new proposal failed", e); + p.fail(e); + }); return; } @@ -923,8 +928,8 @@ private Future> onRebalancing(Reco apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId) .onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, false, host, apiClient, rebalanceOptionsBuilder)) .onFailure(e -> { - LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e.getCause()); - p.fail(new CruiseControlRestException("Cruise Control getting rebalance task status failed")); + LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e); + p.fail(e); }); } return p.future(); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java index 6fa207d8c57..9f89c3b52bc 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperatorTest.java @@ -1133,6 +1133,48 @@ public void testNewToPendingProposalDeleteRemoveBroker(VertxTestContext context) this.krNewToPendingProposalDelete(context, CruiseControlEndpoints.REMOVE_BROKER, kr); } + /** + * Tests the transition from 'Rebalancing' to 'NotReady' due to a user task not existing and an error + * on re-issuing the optimization proposal request + * + * 1. A new KafkaRebalance resource is created; it is in the Rebalancing state + * 2. The operator requests the status of the corresponding user task through the Cruise Control REST API + * 3. The user task doesn't exist anymore and the KafkaRebalance resource moves to the 'NotReady' state + */ + @Test + public void testRebalancingToNotReadyRemoveBroker(VertxTestContext context) { + cruiseControlServer.setupUserTasktoEmpty(); + cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER); + + KafkaRebalance kr = new KafkaRebalanceBuilder(createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, false)) + .withNewStatus() + .withObservedGeneration(1L) + .withSessionId("test-session-id") + .withConditions(new ConditionBuilder() + .withType(KafkaRebalanceState.Rebalancing.name()) + .withStatus("True") + .build()) + .endStatus() + .build(); + + Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create(); + Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).updateStatus(); + + crdCreateKafka(); + crdCreateCruiseControlSecrets(); + + Checkpoint checkpoint = context.checkpoint(); + krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName())) + .onComplete(context.succeeding(v -> context.verify(() -> { + KafkaRebalance kr1 = Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).get(); + assertThat(kr1, StateMatchers.hasState()); + Condition condition = KafkaRebalanceUtils.rebalanceStateCondition(kr1.getStatus()); + assertThat(condition, StateMatchers.hasStateInCondition(KafkaRebalanceState.NotReady, IllegalArgumentException.class, + "Some/all brokers specified don't exist")); + checkpoint.flag(); + }))); + } + private void krNewToPendingProposalDelete(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException { // Set up the rebalance endpoint with the number of pending calls before a response is received. cruiseControlServer.setupCCRebalanceResponse(1, endpoint);