diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java index 38fe5d697d3..1635e86414f 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -2346,4 +2347,46 @@ public void shouldUpdateAnUnmanagedTopic( resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion()) ); } + + @Test + public void shouldUpdateTopicIdIfDeletedWhileUnmanaged( + @BrokerConfig(name = "auto.create.topics.enable", value = "false") + KafkaCluster kafkaCluster + ) throws ExecutionException, InterruptedException { + TopicOperatorConfig config = topicOperatorConfig(NAMESPACE, kafkaCluster, true, 500); + kafkaAdminClientOp = new Admin[]{Mockito.spy(Admin.create(config.adminClientConfig()))}; + + var created = createTopic(kafkaCluster, + kafkaTopic(NAMESPACE, "my-topic", SELECTOR, null, true, "my-topic", 1, 1, Map.of())); + + unmanageTopic(NAMESPACE, "my-topic"); + + kafkaAdminClientOp[0].deleteTopics(Set.of("my-topic")); + kafkaAdminClientOp[0].createTopics(Set.of(new NewTopic("my-topic", 1, (short) 1))); + + var updated = manageTopic(NAMESPACE, "my-topic"); + + assertNotEquals(created.getStatus().getTopicId(), updated.getStatus().getTopicId()); + } + + @Test + public void shouldUpdateTopicIdIfDeletedWhilePaused( + @BrokerConfig(name = "auto.create.topics.enable", value = "false") + KafkaCluster kafkaCluster + ) throws ExecutionException, InterruptedException { + TopicOperatorConfig config = topicOperatorConfig(NAMESPACE, kafkaCluster, true, 500); + kafkaAdminClientOp = new Admin[]{Mockito.spy(Admin.create(config.adminClientConfig()))}; + + var created = createTopic(kafkaCluster, + kafkaTopic(NAMESPACE, "my-topic", SELECTOR, null, true, "my-topic", 1, 1, Map.of())); + + pauseTopic(NAMESPACE, "my-topic"); + + kafkaAdminClientOp[0].deleteTopics(Set.of("my-topic")); + kafkaAdminClientOp[0].createTopics(Set.of(new NewTopic("my-topic", 1, (short) 1))); + + var updated = unpauseTopic(NAMESPACE, "my-topic"); + + assertNotEquals(created.getStatus().getTopicId(), updated.getStatus().getTopicId()); + } }