Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix topic operator loop for unmanaged topics #10451

Merged
merged 1 commit into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import io.strimzi.api.kafka.model.common.ConditionBuilder;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder;
import io.strimzi.api.kafka.model.topic.KafkaTopicStatus;
import io.strimzi.api.kafka.model.topic.KafkaTopicStatusBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.StatusDiff;
import io.strimzi.operator.common.model.StatusUtils;
import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder;
import io.strimzi.operator.topic.model.Either;
Expand Down Expand Up @@ -186,7 +186,7 @@ static String resourceVersion(KafkaTopic kt) {
private List<ReconcilableTopic> addOrRemoveFinalizer(boolean useFinalizer, List<ReconcilableTopic> reconcilableTopics) {
List<ReconcilableTopic> collect = reconcilableTopics.stream()
.map(reconcilableTopic ->
new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer
new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer
? addFinalizer(reconcilableTopic) : removeFinalizer(reconcilableTopic), reconcilableTopic.topicName()))
.collect(Collectors.toList());
LOGGER.traceOp("{} {} topics", useFinalizer ? "Added finalizers to" : "Removed finalizers from", reconcilableTopics.size());
Expand Down Expand Up @@ -1132,7 +1132,7 @@ private void updateStatusForSuccess(ReconcilableTopic reconcilableTopic) {
} else if (TopicOperatorUtil.isPaused(reconcilableTopic.kt())) {
conditionType = "ReconciliationPaused";
}

conditions.add(new ConditionBuilder()
.withType(conditionType)
.withStatus("True")
Expand Down Expand Up @@ -1210,10 +1210,10 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
var oldStatus = Crds.topicOperation(kubeClient)
.inNamespace(reconcilableTopic.kt().getMetadata().getNamespace())
.withName(reconcilableTopic.kt().getMetadata().getName()).get().getStatus();

// the observedGeneration is a marker that shows that the operator works and that it saw the last update to the resource
reconcilableTopic.kt().getStatus().setObservedGeneration(reconcilableTopic.kt().getMetadata().getGeneration());

// set or reset the topicName
reconcilableTopic.kt().getStatus().setTopicName(
!TopicOperatorUtil.isManaged(reconcilableTopic.kt())
Expand All @@ -1222,8 +1222,9 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
? oldStatus.getTopicName()
: TopicOperatorUtil.topicName(reconcilableTopic.kt())
);

if (statusChanged(reconcilableTopic.kt(), oldStatus)) {

StatusDiff statusDiff = new StatusDiff(oldStatus, reconcilableTopic.kt().getStatus());
if (!statusDiff.isEmpty()) {
try {
var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
.editOrNewMetadata()
Expand All @@ -1242,49 +1243,4 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
}
}
}

private boolean statusChanged(KafkaTopic kt, KafkaTopicStatus oldStatus) {
return oldStatusOrTopicNameMissing(oldStatus)
|| nonPausedAndDifferentGenerations(kt, oldStatus)
|| differentConditions(kt.getStatus().getConditions(), oldStatus.getConditions())
|| replicasChangesDiffer(kt, oldStatus);
}

private boolean oldStatusOrTopicNameMissing(KafkaTopicStatus oldStatus) {
return oldStatus == null || oldStatus.getTopicName() == null;
}

private boolean nonPausedAndDifferentGenerations(KafkaTopic kt, KafkaTopicStatus oldStatus) {
return !TopicOperatorUtil.isPaused(kt) && oldStatus.getObservedGeneration() != kt.getMetadata().getGeneration();
}

private boolean differentConditions(List<Condition> newConditions, List<Condition> oldConditions) {
if (Objects.equals(newConditions, oldConditions)) {
return false;
} else if (newConditions == null || oldConditions == null || newConditions.size() != oldConditions.size()) {
return true;
} else {
for (int i = 0; i < newConditions.size(); i++) {
if (conditionsDiffer(newConditions.get(i), oldConditions.get(i))) {
return true;
}
}
}
return false;
}

private boolean conditionsDiffer(Condition newCondition, Condition oldCondition) {
return !Objects.equals(newCondition.getType(), oldCondition.getType())
|| !Objects.equals(newCondition.getStatus(), oldCondition.getStatus())
|| !Objects.equals(newCondition.getReason(), oldCondition.getReason())
|| !Objects.equals(newCondition.getMessage(), oldCondition.getMessage());
}

@SuppressWarnings("BooleanExpressionComplexity")
private boolean replicasChangesDiffer(KafkaTopic kt, KafkaTopicStatus oldStatus) {
return kt.getStatus().getReplicasChange() == null && oldStatus.getReplicasChange() != null
|| kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() == null
|| (kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() != null
&& !Objects.equals(kt.getStatus().getReplicasChange(), oldStatus.getReplicasChange()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,23 @@ private static Predicate<KafkaTopic> readyIsFalseAndReasonIs(String requiredReas
}

private static Predicate<KafkaTopic> readyIsTrueOrFalse() {
return typeIsTrueOrFalse("Ready");
}

private static Predicate<KafkaTopic> unmanagedIsTrueOrFalse() {
return typeIsTrueOrFalse("Unmanaged");
}

private static Predicate<KafkaTopic> typeIsTrueOrFalse(String type) {
Predicate<Condition> conditionPredicate = condition ->
"Ready".equals(condition.getType())
&& "True".equals(condition.getStatus())
|| "False".equals(condition.getStatus());
return isReconcilatedAndHasConditionMatching("Ready=True or False", conditionPredicate);
type.equals(condition.getType())
&& "True".equals(condition.getStatus())
|| "False".equals(condition.getStatus());
return isReconcilatedAndHasConditionMatching(type + "=True or False", conditionPredicate);
}

private static Predicate<KafkaTopic> unmanagedStatusTrue() {
return typeIsTrueOrFalse("Unmanaged");
}

private static Predicate<KafkaTopic> unmanagedIsTrue() {
Expand Down Expand Up @@ -665,7 +677,7 @@ public void shouldNotCreateTopicInKafkaWhenUnmanagedTopicCreatedInKube(
// given

// when
var reconciled = createTopic(kafkaCluster, kt);
var reconciled = createTopic(kafkaCluster, kt, unmanagedStatusTrue());

// then
assertNull(reconciled.getStatus().getTopicName());
Expand Down Expand Up @@ -1137,7 +1149,7 @@ public void shouldRestoreFinalizerIfRemoved(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
// given
var created = createTopic(kafkaCluster, kt);
var created = createTopic(kafkaCluster, kt, TopicOperatorUtil.isManaged(kt) ? readyIsTrueOrFalse() : unmanagedIsTrueOrFalse());
if (TopicOperatorUtil.isManaged(kt)) {
assertCreateSuccess(kt, created);
}
Expand Down Expand Up @@ -2153,4 +2165,63 @@ public void shouldReconcileOnTopicExistsException(
KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1));
assertTrue(readyIsTrue().test(kafkaTopic));
}

@Test
public void shouldUpdateAnUnmanagedTopic(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// create the topic
var topic = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, null, topicName, 1, 1,
Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")));
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();

TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
Optional.of(kt)
.map(KafkaTopic::getStatus)
.map(KafkaTopicStatus::getConditions)
.flatMap(c -> Optional.of(c.get(0)))
.map(Condition::getType)
.filter("Ready"::equals)
.isPresent()
);

// set unmanaged
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
topic.setStatus(null);
topic.getMetadata().getAnnotations().put(TopicOperatorUtil.MANAGED, "false");
topic = Crds.topicOperation(kubernetesClient).resource(topic).update();

TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
Optional.of(kt)
.map(KafkaTopic::getStatus)
.map(KafkaTopicStatus::getConditions)
.flatMap(c -> Optional.of(c.get(0)))
.map(Condition::getType)
.filter("Unmanaged"::equals)
.isPresent()
);

// apply a change to the unmanaged topic
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
topic.setStatus(null);
topic.getSpec().getConfig().put(TopicConfig.RETENTION_MS_CONFIG, "1001");
topic = Crds.topicOperation(kubernetesClient).resource(topic).update();
var resourceVersionOnUpdate = topic.getMetadata().getResourceVersion();

TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
!resourceVersionOnUpdate.equals(kt.getMetadata().getResourceVersion())
);
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
var resourceVersionAfterUpdate = topic.getMetadata().getResourceVersion();

// Wait a bit to check the resource is not getting updated continuously
Thread.sleep(500L);
TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion())
);
}

}
Loading