Skip to content

Commit

Permalink
[improve][broker] Fix non-persistent system topic schema compatibility (
Browse files Browse the repository at this point in the history
#23286)

When upgrading broker version from `3.0.x` to `3.3.x` with `ExtensibleLoadManagerImpl` enabled, it will have an `Unable to read schema` exception. And the broker will fail to start. This issue is caused by #22055 .

Add a new class `NonPersistentSystemTopic`, and it will use for system non-persistent topic.

(cherry picked from commit 7dbd8a5)
  • Loading branch information
Demogorgon314 authored and lhotari committed Feb 20, 2025
1 parent 2fb10d8 commit a1cfbc3
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
Expand Down Expand Up @@ -1286,7 +1287,11 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
if (isSystemTopic(topic)) {
nonPersistentTopic = new NonPersistentSystemTopic(topic, this);
} else {
nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class);
}
nonPersistentTopic.setCreateFuture(topicFuture);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;

import org.apache.pulsar.broker.service.BrokerService;

public class NonPersistentSystemTopic extends NonPersistentTopic {
public NonPersistentSystemTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);
}

@Override
public boolean isSystemTopic() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.reflect.FieldUtils;
Expand All @@ -103,6 +104,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
Expand All @@ -111,6 +113,7 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSystemTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -131,13 +134,15 @@
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -1924,6 +1929,43 @@ public void compactionScheduleTest() {
});
}

@Test
public void testSystemNonPersistentTopicSchemaCompatibility() throws Exception {
String topicName = ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
NonPersistentSystemTopic topic = new NonPersistentSystemTopic(topicName, pulsar.getBrokerService());
Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());

var brokerLoadDataStore = LoadDataStoreFactory.create(pulsar, topicName, BrokerLoadDataV1.class);
brokerLoadDataStore.init();
brokerLoadDataStore.pushAsync("key", new BrokerLoadDataV1()).get();
Awaitility.await().until(() -> {
var data = brokerLoadDataStore.get("key");
return data.isPresent();
});
brokerLoadDataStore.pushAsync("key", null).get();
brokerLoadDataStore.close();
}

@Data
private static class BrokerLoadDataV1 {
private ResourceUsage cpu;
private ResourceUsage memory;
private ResourceUsage directMemory;
private ResourceUsage bandwidthIn;
private ResourceUsage bandwidthOut;
private double msgThroughputIn;
private double msgThroughputOut;
private double msgRateIn;
private double msgRateOut;
private int bundleCount;
private int topics;
private double maxResourceUsage;
private double weightedMaxEMA;
private double msgThroughputEMA;
private long updatedAt;
private long reportedAt;
}

@Test(timeOut = 10 * 1000, priority = 5000)
public void unloadTimeoutCheckTest()
throws Exception {
Expand Down

0 comments on commit a1cfbc3

Please sign in to comment.