From 182b7047bac664649381a461bc9793e3f31c0259 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 10 Jul 2024 10:28:47 +0800 Subject: [PATCH] [fix][admin] Fix half deletion when attempt to topic with a incorrect API (#23002) (cherry picked from commit 1f3449736e614428ea4d625e48cafa09b35e608d) (cherry picked from commit 0fab9ed7a1713e42bc506f5592a3930b26b60f1b) --- .../admin/impl/PersistentTopicsBase.java | 12 +++- .../broker/admin/v2/PersistentTopics.java | 14 ++++- .../broker/admin/AdminTopicApiTest.java | 62 +++++++++++++++++++ 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a899eef63d57b..3b67808403f60 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -752,7 +752,17 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, .thenCompose(partitionedMeta -> { final int numPartitions = partitionedMeta.partitions; if (numPartitions < 1) { - return CompletableFuture.completedFuture(null); + return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName) + .thenApply(exists -> { + if (exists) { + throw new RestException(Response.Status.CONFLICT, + String.format("%s is a non-partitioned topic. Instead of calling" + + " delete-partitioned-topic please call delete.", topicName)); + } else { + throw new RestException(Status.NOT_FOUND, + String.format("Topic %s not found.", topicName)); + } + }); } return internalRemovePartitionsAuthenticationPoliciesAsync() .thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index e619769d81ea0..1f28283737f73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1157,7 +1157,17 @@ public void deleteTopic( @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - internalDeleteTopicAsync(authoritative, force) + + getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExistsAsync(topicName).thenAccept(exists -> { + if (exists) { + RestException restException = new RestException(Response.Status.CONFLICT, + String.format("%s is a partitioned topic, instead of calling delete topic, please call" + + " delete-partitioned-topic.", topicName)); + resumeAsyncResponseExceptionally(asyncResponse, restException); + return; + } + internalDeleteTopicAsync(authoritative, force) .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable t = FutureUtil.unwrapCompletionException(ex); @@ -1176,6 +1186,8 @@ public void deleteTopic( resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); + }); + } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java index 93bf2349103c3..45bbb3a2912f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTopicApiTest.java @@ -19,21 +19,27 @@ package org.apache.pulsar.broker.admin; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.time.Duration; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -59,6 +65,62 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test + public void testDeleteNonExistTopic() throws Exception { + // Case 1: call delete for a partitioned topic. + final String topic1 = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createPartitionedTopic(topic1, 2); + admin.schemas().createSchemaAsync(topic1, Schema.STRING.getSchemaInfo()); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1); + }); + try { + admin.topics().delete(topic1); + fail("expected a 409 error"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("please call delete-partitioned-topic")); + } + Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(() -> { + assertEquals(admin.schemas().getAllSchemas(topic1).size(), 1); + }); + // cleanup. + admin.topics().deletePartitionedTopic(topic1, false); + + // Case 2: call delete-partitioned-topi for a non-partitioned topic. + final String topic2 = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic2); + admin.schemas().createSchemaAsync(topic2, Schema.STRING.getSchemaInfo()); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1); + }); + try { + admin.topics().deletePartitionedTopic(topic2); + fail("expected a 409 error"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Instead of calling delete-partitioned-topic please call delete")); + } + Awaitility.await().pollDelay(Duration.ofSeconds(2)).untilAsserted(() -> { + assertEquals(admin.schemas().getAllSchemas(topic2).size(), 1); + }); + // cleanup. + admin.topics().delete(topic2, false); + + // Case 3: delete topic does not exist. + final String topic3 = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + try { + admin.topics().delete(topic3); + fail("expected a 404 error"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("not found")); + } + try { + admin.topics().deletePartitionedTopic(topic3); + fail("expected a 404 error"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("not found")); + } + } + @Test public void testPeekMessages() throws Exception { @Cleanup