Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed stuck reconciliation on failed request rebalance #10717

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,12 @@ private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseC
// 1. Cruise Control restarted so resetting the state because the tasks queue is not persisted
// 2. Task's retention time expired, or the cache has become full
LOGGER.warnCr(reconciliation, "User task {} not found, going to generate a new proposal", sessionId);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder).onSuccess(p::complete);
requestRebalance(reconciliation, host, apiClient, kafkaRebalance, true, rebalanceOptionsBuilder)
.onSuccess(p::complete)
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Generating a new proposal failed", e);
p.fail(e);
});
return;
}

Expand Down Expand Up @@ -923,8 +928,8 @@ private Future<MapAndStatus<ConfigMap, KafkaRebalanceStatus>> onRebalancing(Reco
apiClient.getUserTaskStatus(reconciliation, host, cruiseControlPort, sessionId)
.onSuccess(cruiseControlResponse -> handleUserTaskStatusResponse(reconciliation, cruiseControlResponse, p, sessionId, conditions, kafkaRebalance, configMapOperator, false, host, apiClient, rebalanceOptionsBuilder))
.onFailure(e -> {
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e.getCause());
p.fail(new CruiseControlRestException("Cruise Control getting rebalance task status failed"));
LOGGER.errorCr(reconciliation, "Cruise Control getting rebalance task status failed", e);
p.fail(e);
});
}
return p.future();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,48 @@ public void testNewToPendingProposalDeleteRemoveBroker(VertxTestContext context)
this.krNewToPendingProposalDelete(context, CruiseControlEndpoints.REMOVE_BROKER, kr);
}

/**
* Tests the transition from 'Rebalancing' to 'NotReady' due to a user task not existing and an error
* on re-issuing the optimization proposal request
*
* 1. A new KafkaRebalance resource is created; it is in the Rebalancing state
* 2. The operator requests the status of the corresponding user task through the Cruise Control REST API
* 3. The user task doesn't exist anymore and the KafkaRebalance resource moves to the 'NotReady' state
*/
@Test
public void testRebalancingToNotReadyRemoveBroker(VertxTestContext context) {
cruiseControlServer.setupUserTasktoEmpty();
cruiseControlServer.setupCCBrokerDoesNotExist(CruiseControlEndpoints.REMOVE_BROKER);

KafkaRebalance kr = new KafkaRebalanceBuilder(createKafkaRebalance(namespace, CLUSTER_NAME, RESOURCE_NAME, REMOVE_BROKER_KAFKA_REBALANCE_SPEC, false))
.withNewStatus()
.withObservedGeneration(1L)
.withSessionId("test-session-id")
.withConditions(new ConditionBuilder()
.withType(KafkaRebalanceState.Rebalancing.name())
.withStatus("True")
.build())
.endStatus()
.build();

Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).create();
Crds.kafkaRebalanceOperation(client).inNamespace(namespace).resource(kr).updateStatus();

crdCreateKafka();
crdCreateCruiseControlSecrets();

Checkpoint checkpoint = context.checkpoint();
krao.reconcile(new Reconciliation("test-trigger", KafkaRebalance.RESOURCE_KIND, namespace, kr.getMetadata().getName()))
.onComplete(context.succeeding(v -> context.verify(() -> {
KafkaRebalance kr1 = Crds.kafkaRebalanceOperation(client).inNamespace(namespace).withName(kr.getMetadata().getName()).get();
assertThat(kr1, StateMatchers.hasState());
Condition condition = KafkaRebalanceUtils.rebalanceStateCondition(kr1.getStatus());
assertThat(condition, StateMatchers.hasStateInCondition(KafkaRebalanceState.NotReady, IllegalArgumentException.class,
"Some/all brokers specified don't exist"));
checkpoint.flag();
})));
}

private void krNewToPendingProposalDelete(VertxTestContext context, CruiseControlEndpoints endpoint, KafkaRebalance kr) throws IOException, URISyntaxException {
// Set up the rebalance endpoint with the number of pending calls before a response is received.
cruiseControlServer.setupCCRebalanceResponse(1, endpoint);
Expand Down
Loading