Skip to content

Commit

Permalink
[fix][broker] Fix namespace deletion if __change_events topic has not…
Browse files Browse the repository at this point in the history
… been created yet (apache#18804)
  • Loading branch information
nicoloboschi authored and lifepuzzlefun committed Jan 10, 2023
1 parent f1e7f0f commit 158d497
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3285,16 +3285,8 @@ public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
return CompletableFuture.completedFuture(null);
}
return pulsarService.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(optPolicies -> {
if (optPolicies.isPresent() && optPolicies.get().deleted) {
// We can return the completed future directly if the namespace is already deleted.
return CompletableFuture.completedFuture(null);
}
TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
});
return pulsar.getTopicPoliciesService()
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
}

private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,50 +105,53 @@ private CompletableFuture<Void> sendTopicPolicyEvent(TopicName topicName, Action
return CompletableFuture.failedFuture(
new BrokerServiceException.NotAllowedException("Not allowed to send event to health check topic"));
}
CompletableFuture<Void> result = new CompletableFuture<>();
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
result.completeExceptionally(e);
return result;
}
return pulsarService.getPulsarResources().getNamespaceResources()
.getPoliciesAsync(topicName.getNamespaceObject())
.thenCompose(namespacePolicies -> {
if (namespacePolicies.isPresent() && namespacePolicies.get().deleted) {
log.debug("[{}] skip sending topic policy event since the namespace is deleted", topicName);
return CompletableFuture.completedFuture(null);
}

SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture = systemTopicClient.newWriterAsync();
writerFuture.whenComplete((writer, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> actionFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(getEventKey(event), event)
: writer.writeAsync(getEventKey(event), event);
actionFuture.whenComplete(((messageId, e) -> {
if (e != null) {
result.completeExceptionally(e);
} else {
if (messageId != null) {
result.complete(null);
} else {
result.completeExceptionally(new RuntimeException("Got message id is null."));
}
}
writer.closeAsync().whenComplete((v, cause) -> {
if (cause != null) {
log.error("[{}] Close writer error.", topicName, cause);
SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());

return systemTopicClient.newWriterAsync()
.thenCompose(writer -> {
PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
CompletableFuture<MessageId> writeFuture =
ActionType.DELETE.equals(actionType) ? writer.deleteAsync(getEventKey(event), event)
: writer.writeAsync(getEventKey(event), event);
return writeFuture.handle((messageId, e) -> {
if (e != null) {
return CompletableFuture.failedFuture(e);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Close writer success.", topicName);
if (messageId != null) {
return CompletableFuture.completedFuture(null);
} else {
return CompletableFuture.failedFuture(
new RuntimeException("Got message id is null."));
}
}
});
})
);
}
});
return result;
}).thenRun(() ->
writer.closeAsync().whenComplete((v, cause) -> {
if (cause != null) {
log.error("[{}] Close writer error.", topicName, cause);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Close writer success.", topicName);
}
}
})
);
});
});
}

private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) {
Expand Down Expand Up @@ -390,25 +393,25 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean

private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> reader) {
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
})
.whenComplete((__, ex) -> {
if (ex == null) {
readMorePolicies(reader);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
log.warn("Read more topic policies exception, close the read now!", ex);
cleanCacheAndCloseReader(
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read again.", ex);
readMorePolicies(reader);
}
}
});
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
notifyListener(msg);
})
.whenComplete((__, ex) -> {
if (ex == null) {
readMorePolicies(reader);
} else {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof PulsarClientException.AlreadyClosedException) {
log.warn("Read more topic policies exception, close the read now!", ex);
cleanCacheAndCloseReader(
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
} else {
log.warn("Read more topic polices exception, read again.", ex);
readMorePolicies(reader);
}
}
});
}

private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
Expand Down Expand Up @@ -477,7 +480,7 @@ private boolean hasReplicateTo(Message<?> message) {
if (message instanceof MessageImpl) {
return ((MessageImpl<?>) message).hasReplicateTo()
? (((MessageImpl<?>) message).getReplicateTo().size() == 1
? !((MessageImpl<?>) message).getReplicateTo().contains(clusterName) : true)
? !((MessageImpl<?>) message).getReplicateTo().contains(clusterName) : true)
: false;
}
if (message instanceof TopicMessageImpl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,11 @@ public void testDeleteNamespaceWithTopicPolicies() throws Exception {

// create namespace2
String namespace = tenant + "/test-ns2";
admin.namespaces().createNamespace(namespace, Set.of("test"));
admin.topics().createNonPartitionedTopic(namespace + "/tobedeleted");
// verify namespace can be deleted even without topic policy events
admin.namespaces().deleteNamespace(namespace, true);

admin.namespaces().createNamespace(namespace, Set.of("test"));
// create topic
String topic = namespace + "/test-topic2";
Expand Down Expand Up @@ -1873,7 +1878,6 @@ public void testListOfNamespaceBundles() throws Exception {

@Test
public void testForceDeleteNamespace() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
final String namespaceName = "prop-xyz2/ns1";
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("prop-xyz2", tenantInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@BeforeClass
@Override
public void setup() throws Exception {
setupConfigAndStart(null);
}

private void applyDefaultConfig() {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
conf.setLoadBalancerEnabled(true);
Expand All @@ -178,6 +182,13 @@ public void setup() throws Exception {
conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setNumExecutorThreadPoolSize(5);
}

private void setupConfigAndStart(java.util.function.Consumer<ServiceConfiguration> configurationConsumer) throws Exception {
applyDefaultConfig();
if (configurationConsumer != null) {
configurationConsumer.accept(conf);
}

super.internalSetup();

Expand Down Expand Up @@ -215,6 +226,7 @@ public void resetClusters() throws Exception {
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);

resetConfig();
applyDefaultConfig();
setupClusters();
}

Expand Down Expand Up @@ -1718,9 +1730,9 @@ public void testNamespaceSplitBundleWithInvalidAlgorithm() {
@Test
public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws Exception {
cleanup();
setup();
setupConfigAndStart(conf -> conf
.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE));

conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
// Force to create a topic
final String namespace = "prop-xyz/ns1";
List<String> topicNames = Lists.newArrayList(
Expand Down Expand Up @@ -1756,7 +1768,6 @@ public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm(
for (Producer<byte[]> producer : producers) {
producer.close();
}
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
}

@Test
Expand Down Expand Up @@ -1911,9 +1922,6 @@ public void testNamespaceUnloadBundle() throws Exception {

@Test(dataProvider = "numBundles")
public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
cleanup();
setup();

admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Set.of("test"));

Expand Down Expand Up @@ -3259,7 +3267,8 @@ public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exc

@Test
public void testGetTtlDurationDefaultInSeconds() throws Exception {
conf.setTtlDurationDefaultInSeconds(3600);
cleanup();
setupConfigAndStart(conf -> conf.setTtlDurationDefaultInSeconds(3600));
Integer seconds = admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
assertNull(seconds);
}
Expand Down Expand Up @@ -3309,8 +3318,11 @@ public void testPartitionedTopicMsgDelayedAggregated() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" + UUID.randomUUID().toString();
final String subName = "my-sub";
final int numPartitions = 2;
conf.setSubscriptionRedeliveryTrackerEnabled(true);
conf.setDelayedDeliveryEnabled(true);
cleanup();
setupConfigAndStart(conf -> {
conf.setSubscriptionRedeliveryTrackerEnabled(true);
conf.setDelayedDeliveryEnabled(true);
});
admin.topics().createPartitionedTopic(topic, numPartitions);

for (int i = 0; i < 2; i++) {
Expand Down Expand Up @@ -3367,7 +3379,7 @@ public void testPartitionedTopicMsgDelayedAggregated() throws Exception {

@Test(timeOut = 20000)
public void testPartitionedTopicTruncate() throws Exception {
final String topicName = "persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
final String topicName = "persistent://prop-xyz/ns1/testTruncateTopic2-" + UUID.randomUUID().toString();
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topicName,6);
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(60, 50));
Expand All @@ -3387,9 +3399,13 @@ public void testPartitionedTopicTruncate() throws Exception {

@Test(timeOut = 20000)
public void testNonPartitionedTopicTruncate() throws Exception {
final String topicName = "persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
final String topicName = "persistent://prop-xyz/ns1/testTruncateTopic1-" + UUID.randomUUID().toString();
final String subName = "my-sub";
this.conf.setTopicLevelPoliciesEnabled(true);
cleanup();
setupConfigAndStart(conf -> {
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
});
admin.topics().createNonPartitionedTopic(topicName);
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(60, 50));
List<MessageId> messageIds = publishMessagesOnPersistentTopic(topicName, 10);
Expand All @@ -3405,7 +3421,7 @@ public void testNonPartitionedTopicTruncate() throws Exception {

@Test(timeOut = 20000)
public void testNonPersistentTopicTruncate() throws Exception {
final String topicName = "non-persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
final String topicName = "non-persistent://prop-xyz/ns1/testTruncateTopic3-" + UUID.randomUUID().toString();
admin.topics().createNonPartitionedTopic(topicName);
assertThrows(() -> {admin.topics().truncate(topicName);});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,15 @@ public void run() {
}
});
}

@Test
public void testHandleNamespaceBeingDeleted() throws Exception {
SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
pulsar.getPulsarResources().getNamespaceResources().setPolicies(NamespaceName.get(NAMESPACE1),
old -> {
old.deleted = true;
return old;
});
service.deleteTopicPoliciesAsync(TOPIC1).get();
}
}

0 comments on commit 158d497

Please sign in to comment.