Skip to content

Commit

Permalink
Add two more topicId tests (#10536)
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri authored Sep 5, 2024
1 parent 5ddb18e commit 8a5da71
Showing 1 changed file with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -2346,4 +2347,46 @@ public void shouldUpdateAnUnmanagedTopic(
resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion())
);
}

@Test
public void shouldUpdateTopicIdIfDeletedWhileUnmanaged(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
TopicOperatorConfig config = topicOperatorConfig(NAMESPACE, kafkaCluster, true, 500);
kafkaAdminClientOp = new Admin[]{Mockito.spy(Admin.create(config.adminClientConfig()))};

var created = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, "my-topic", SELECTOR, null, true, "my-topic", 1, 1, Map.of()));

unmanageTopic(NAMESPACE, "my-topic");

kafkaAdminClientOp[0].deleteTopics(Set.of("my-topic"));
kafkaAdminClientOp[0].createTopics(Set.of(new NewTopic("my-topic", 1, (short) 1)));

var updated = manageTopic(NAMESPACE, "my-topic");

assertNotEquals(created.getStatus().getTopicId(), updated.getStatus().getTopicId());
}

@Test
public void shouldUpdateTopicIdIfDeletedWhilePaused(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster
) throws ExecutionException, InterruptedException {
TopicOperatorConfig config = topicOperatorConfig(NAMESPACE, kafkaCluster, true, 500);
kafkaAdminClientOp = new Admin[]{Mockito.spy(Admin.create(config.adminClientConfig()))};

var created = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, "my-topic", SELECTOR, null, true, "my-topic", 1, 1, Map.of()));

pauseTopic(NAMESPACE, "my-topic");

kafkaAdminClientOp[0].deleteTopics(Set.of("my-topic"));
kafkaAdminClientOp[0].createTopics(Set.of(new NewTopic("my-topic", 1, (short) 1)));

var updated = unpauseTopic(NAMESPACE, "my-topic");

assertNotEquals(created.getStatus().getTopicId(), updated.getStatus().getTopicId());
}
}

0 comments on commit 8a5da71

Please sign in to comment.