Skip to content

Commit

Permalink
Improve handling of invalid topic configurations
Browse files Browse the repository at this point in the history
An invalid topic configuration results in a RuntimeException that fails the whole reconciliation and following periodic reconciliations.
Additionally, if you have a batch of 100 topics or more, it can be difficult to spot where the configuration errors is.

With this change the TO reports the error in the status of the Kafka topic containing the invalid configuration, and continues with the reconciliation.

Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri committed Aug 29, 2024
1 parent ec17e90 commit d294c0f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,15 @@ private static Either<TopicOperatorException, Boolean> validateUnchangedTopicNam
}

private PartitionedByError<ReconcilableTopic, Void> createTopics(List<ReconcilableTopic> kts) {
Map<ReconcilableTopic, TopicOperatorException> newTopicsErrors = new HashMap<>();
var newTopics = kts.stream().map(reconcilableTopic -> {
// Admin create
return buildNewTopic(reconcilableTopic.kt(), reconcilableTopic.topicName());
}).collect(Collectors.toSet());
try {
return buildNewTopic(reconcilableTopic.kt(), reconcilableTopic.topicName());
} catch (Exception e) {
newTopicsErrors.put(reconcilableTopic, new TopicOperatorException.InternalError(e));
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toSet());

LOGGER.debugOp("Admin.createTopics({})", newTopics);
var timerSample = TopicOperatorUtil.startExternalRequestTimer(metrics, enableAdditionalMetrics);
Expand All @@ -310,6 +315,9 @@ private PartitionedByError<ReconcilableTopic, Void> createTopics(List<Reconcilab
});
Map<String, KafkaFuture<Void>> values = ctr.values();
return partitionedByError(kts.stream().map(reconcilableTopic -> {
if (newTopicsErrors.containsKey(reconcilableTopic)) {
return new Pair<>(reconcilableTopic, Either.ofLeft(newTopicsErrors.get(reconcilableTopic)));
}
try {
values.get(reconcilableTopic.topicName()).get();
reconcilableTopic.kt().setStatus(new KafkaTopicStatusBuilder()
Expand Down Expand Up @@ -363,7 +371,7 @@ private static String configValueAsString(Object value) {
.map(BatchingTopicController::configValueAsString)
.collect(Collectors.joining(","));
} else {
throw new RuntimeException("Cannot convert " + value);
throw new RuntimeException("Invalid config value: " + value);
}
return valueStr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1769,6 +1769,29 @@ public void shouldFailAlterConfigIfNoTopicAuthz(KafkaTopic kt,
assertEquals("KafkaError", condition.getReason());
assertEquals("org.apache.kafka.common.errors.TopicAuthorizationException: not allowed", condition.getMessage());
}

@Test
public void shouldFailTheReconciliationWithInvalidConfig(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
KafkaTopic kafkaTopic = new KafkaTopicBuilder()
.withNewMetadata()
.withNamespace(NAMESPACE)
.withName("my-topic")
.withLabels(SELECTOR)
.endMetadata()
.withNewSpec()
.withConfig(new HashMap<>() {{ put("cleanup.policy", null); }})
.withPartitions(1)
.withReplicas(1)
.endSpec()
.build();
var created = createTopic(kafkaCluster, kafkaTopic);
var condition = assertExactlyOneCondition(created);
assertEquals("InternalError", condition.getReason());
assertEquals("java.lang.RuntimeException: Invalid config value: null", condition.getMessage());
}

private static KafkaTopic setGzipCompression(KafkaTopic kt) {
return setCompression(kt, "gzip");
Expand Down

0 comments on commit d294c0f

Please sign in to comment.