From b8518e860651e84fa1c520e6001ea105d083a4d0 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 00:45:21 +0800 Subject: [PATCH 01/11] Add test framework --- .../MetadataStoreSessionExpiredTest.java | 91 ++++ .../extensions/MockManagedCursor.java | 463 ++++++++++++++++++ .../extensions/MockManagedLedger.java | 458 +++++++++++++++++ .../extensions/MockManagedLedgerFactory.java | 191 ++++++++ .../extensions/MockManagedLedgerMXBean.java | 215 ++++++++ .../extensions/MockManagedLedgerStorage.java | 63 +++ .../extensions/MockSchemaStorage.java | 122 +++++ 7 files changed, 1603 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedCursor.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedger.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerFactory.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerMXBean.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerStorage.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockSchemaStorage.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java new file mode 100644 index 0000000000000..508ae930843b7 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.Collections; +import java.util.Optional; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.schema.SchemaStorage; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class MetadataStoreSessionExpiredTest { + + private static final String clusterName = "test"; + private PulsarService pulsar; + + @BeforeClass + protected void setup() throws Exception { + pulsar = new PulsarService(brokerConfig()); + pulsar.start(); + final var admin = pulsar.getAdminClient(); + admin.clusters().createCluster(clusterName, ClusterData.builder().build()); + admin.tenants().createTenant("public", TenantInfo.builder() + .allowedClusters(Collections.singleton(clusterName)).build()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + pulsar.close(); + } + + @Test + public void testLookupAfterSessionTimeout() throws Exception { + final var topic = "test-lookup-after-session-timeout"; + pulsar.getAdminClient().topics().createPartitionedTopic(topic, 1); + } + + private ServiceConfiguration brokerConfig() { + final var config = new ServiceConfiguration(); + config.setClusterName(clusterName); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("memory:local"); + config.setSchemaRegistryStorageClassName(MockSchemaStorageFactory.class.getName()); + config.setManagedLedgerStorageClassName(MockManagedLedgerStorage.class.getName()); + + config.setSystemTopicEnabled(false); + config.setTopicLevelPoliciesEnabled(false); + + config.setManagedLedgerDefaultWriteQuorum(1); + config.setManagedLedgerDefaultAckQuorum(1); + config.setManagedLedgerDefaultEnsembleSize(1); + config.setDefaultNumberOfNamespaceBundles(16); + config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + config.setLoadBalancerDebugModeEnabled(true); + config.setBrokerShutdownTimeoutMs(100); + return config; + } + + public static class MockSchemaStorageFactory implements SchemaStorageFactory { + + @Override + public SchemaStorage create(PulsarService pulsar) throws Exception { + return new MockSchemaStorage(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedCursor.java new file mode 100644 index 0000000000000..a91aecb3a7710 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedCursor.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import com.google.common.collect.Range; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import lombok.RequiredArgsConstructor; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedCursorMXBean; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; + +@RequiredArgsConstructor +public class MockManagedCursor implements ManagedCursor { + + private final String name; + private final MockManagedLedger managedLedger; + private long offset = 0L; + + @Override + public String getName() { + return name; + } + + @Override + public long getLastActive() { + return 0; + } + + @Override + public void updateLastActive() { + } + + @Override + public Map getProperties() { + return Map.of(); + } + + @Override + public Map getCursorProperties() { + return Map.of(); + } + + @Override + public CompletableFuture putCursorProperty(String key, String value) { + return null; + } + + @Override + public CompletableFuture setCursorProperties(Map cursorProperties) { + return null; + } + + @Override + public CompletableFuture removeCursorProperty(String key) { + return null; + } + + @Override + public boolean putProperty(String key, Long value) { + return false; + } + + @Override + public boolean removeProperty(String key) { + return false; + } + + @Override + public synchronized List readEntries(int numberOfEntriesToRead) { + synchronized (managedLedger) { + final var nextOffset = Math.min(offset + numberOfEntriesToRead, managedLedger.entries.size()); + final var entries = new ArrayList(); + for (long i = offset; i < nextOffset; i++) { + entries.add(EntryImpl.create(PositionFactory.create(managedLedger.ledgerId, i), + managedLedger.entries.get((int) i))); + } + offset = nextOffset; + return entries; + } + } + + @Override + public void asyncReadEntries(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx, + Position maxPosition) { + callback.readEntriesComplete(readEntries(numberOfEntriesToRead), ctx); + } + + @Override + public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, + AsyncCallbacks.ReadEntriesCallback callback, Object ctx, Position maxPosition) { + callback.readEntriesComplete(readEntries(numberOfEntriesToRead), ctx); + } + + @Override + public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) { + throw new RuntimeException("getNthEntry is not supported"); + } + + @Override + public void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, + AsyncCallbacks.ReadEntryCallback callback, Object ctx) { + callback.readEntryFailed(new ManagedLedgerException("getNthEntry is not supported"), ctx); + } + + @Override + public List readEntriesOrWait(int numberOfEntriesToRead) { + return List.of(); + } + + @Override + public List readEntriesOrWait(int maxEntries, long maxSizeBytes) { + final var future = new CompletableFuture>(); + asyncReadEntriesOrWait(maxEntries, Long.MAX_VALUE, new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + future.complete(entries); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null, PositionFactory.LATEST); + return future.join(); + } + + @Override + public void asyncReadEntriesOrWait(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, + Object ctx, Position maxPosition) { + asyncReadEntriesOrWait(numberOfEntriesToRead, Long.MAX_VALUE, callback, ctx, maxPosition); + } + + @Override + public synchronized void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, + AsyncCallbacks.ReadEntriesCallback callback, + Object ctx, Position maxPosition) { + final var entries = readEntries(maxEntries); + if (entries.isEmpty()) { + CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS).execute(() -> + callback.readEntriesComplete(entries, ctx)); + } else { + callback.readEntriesComplete(entries, ctx); + } + } + + @Override + public boolean cancelPendingReadRequest() { + return false; + } + + @Override + public synchronized boolean hasMoreEntries() { + synchronized (managedLedger) { + return offset >= managedLedger.entries.size(); + } + } + + @Override + public long getNumberOfEntries() { + return managedLedger.getNumberOfEntries(); + } + + @Override + public long getNumberOfEntriesInBacklog(boolean isPrecise) { + return 0; + } + + @Override + public void markDelete(Position position) { + + } + + @Override + public void markDelete(Position position, Map properties) { + } + + @Override + public void asyncMarkDelete(Position position, AsyncCallbacks.MarkDeleteCallback callback, Object ctx) { + callback.markDeleteComplete(ctx); + } + + @Override + public void asyncMarkDelete(Position position, Map properties, + AsyncCallbacks.MarkDeleteCallback callback, Object ctx) { + callback.markDeleteComplete(ctx); + } + + @Override + public void delete(Position position) throws InterruptedException, ManagedLedgerException { + } + + @Override + public void asyncDelete(Position position, AsyncCallbacks.DeleteCallback callback, Object ctx) { + callback.deleteComplete(ctx); + } + + @Override + public void delete(Iterable positions) throws InterruptedException, ManagedLedgerException { + } + + @Override + public void asyncDelete(Iterable position, AsyncCallbacks.DeleteCallback callback, Object ctx) { + callback.deleteComplete(ctx); + } + + @Override + public synchronized Position getReadPosition() { + return PositionFactory.create(managedLedger.ledgerId, offset); + } + + @Override + public Position getMarkDeletedPosition() { + return getReadPosition(); + } + + @Override + public Position getPersistentMarkDeletedPosition() { + return getMarkDeletedPosition(); + } + + @Override + public synchronized void rewind() { + offset = 0L; + } + + @Override + public synchronized void seek(Position newReadPosition, boolean force) { + if (newReadPosition.getLedgerId() != managedLedger.ledgerId) { + throw new RuntimeException("Failed to seek " + newReadPosition); + } + offset = newReadPosition.getEntryId(); + } + + @Override + public void clearBacklog() { + } + + @Override + public void asyncClearBacklog(AsyncCallbacks.ClearBacklogCallback callback, Object ctx) { + callback.clearBacklogComplete(ctx); + } + + @Override + public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries) { + } + + @Override + public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries, + AsyncCallbacks.SkipEntriesCallback callback, Object ctx) { + callback.skipEntriesComplete(ctx); + } + + @Override + public Position findNewestMatching(Predicate condition) { + return getFirstPosition(); + } + + @Override + public Position findNewestMatching(FindPositionConstraint constraint, Predicate condition) { + return getFirstPosition(); + } + + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + AsyncCallbacks.FindEntryCallback callback, Object ctx) { + callback.findEntryComplete(getFirstPosition(), ctx); + } + + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + AsyncCallbacks.FindEntryCallback callback, Object ctx, + boolean isFindFromLedger) { + callback.findEntryComplete(getFirstPosition(), ctx); + } + + @Override + public void resetCursor(Position position) { + seek(position); + } + + @Override + public void asyncResetCursor(Position position, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) { + seek(position); + callback.resetComplete(null); + } + + @Override + public List replayEntries(Set positions) { + return List.of(); + } + + @Override + public Set asyncReplayEntries(Set positions, + AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { + callback.readEntriesComplete(List.of(), ctx); + return Set.of(); + } + + @Override + public Set asyncReplayEntries(Set positions, + AsyncCallbacks.ReadEntriesCallback callback, Object ctx, + boolean sortEntries) { + return asyncReplayEntries(positions, callback, ctx); + } + + @Override + public void close() { + } + + @Override + public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) { + callback.closeComplete(ctx); + } + + @Override + public Position getFirstPosition() { + return managedLedger.getFirstPosition(); + } + + @Override + public void setActive() { + } + + @Override + public void setInactive() { + } + + @Override + public void setAlwaysInactive() { + } + + @Override + public boolean isActive() { + return false; + } + + @Override + public boolean isDurable() { + return false; + } + + @Override + public long getNumberOfEntriesSinceFirstNotAckedMessage() { + return 0; + } + + @Override + public int getTotalNonContiguousDeletedMessagesRange() { + return 0; + } + + @Override + public int getNonContiguousDeletedMessagesRangeSerializedSize() { + return 0; + } + + @Override + public long getEstimatedSizeSinceMarkDeletePosition() { + return 0; + } + + @Override + public double getThrottleMarkDelete() { + return 0; + } + + @Override + public void setThrottleMarkDelete(double throttleMarkDelete) { + } + + @Override + public ManagedLedger getManagedLedger() { + return managedLedger; + } + + @Override + public Range getLastIndividualDeletedRange() { + throw new RuntimeException("getLastIndividualDeletedRange is not supported"); + } + + @Override + public void trimDeletedEntries(List entries) { + } + + @Override + public long[] getDeletedBatchIndexesAsLongArray(Position position) { + return new long[0]; + } + + @Override + public ManagedCursorMXBean getStats() { + return null; + } + + @Override + public boolean checkAndUpdateReadPositionChanged() { + return false; + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public ManagedLedgerInternalStats.CursorStats getCursorStats() { + return null; + } + + @Override + public boolean isMessageDeleted(Position position) { + return false; + } + + @Override + public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException { + return null; + } + + @Override + public long[] getBatchPositionAckSet(Position position) { + return new long[0]; + } + + @Override + public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { + return 0; + } + + @Override + public void updateReadStats(int readEntriesCount, long readEntriesSize) { + + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedger.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedger.java new file mode 100644 index 0000000000000..3df0d36bcc190 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedger.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import com.google.common.collect.Range; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import lombok.RequiredArgsConstructor; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.PositionBound; +import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; + +@RequiredArgsConstructor +public class MockManagedLedger implements ManagedLedger { + + private static final AtomicLong ledgerIdGenerator = new AtomicLong(0L); + + final long ledgerId = ledgerIdGenerator.getAndIncrement(); + final List entries = new ArrayList<>(); + private final Map cursors = new ConcurrentHashMap<>(); + private final String name; + + @Override + public String getName() { + return name; + } + + @Override + public synchronized Position addEntry(byte[] data) { + final var buf = Unpooled.wrappedBuffer(data); + entries.add(buf); + return PositionFactory.create(ledgerId, entries.size() - 1); + } + + @Override + public Position addEntry(byte[] data, int numberOfMessages) { + return addEntry(data); + } + + @Override + public synchronized void asyncAddEntry(byte[] data, AsyncCallbacks.AddEntryCallback callback, Object ctx) { + final var position = addEntry(data); + callback.addComplete(addEntry(data), entries.get((int) position.getEntryId()), ctx); + } + + @Override + public Position addEntry(byte[] data, int offset, int length) { + return addEntry(data); + } + + @Override + public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) { + return addEntry(data); + } + + @Override + public void asyncAddEntry(byte[] data, int offset, int length, AsyncCallbacks.AddEntryCallback callback, + Object ctx) { + asyncAddEntry(data, callback, ctx); + } + + @Override + public void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, + AsyncCallbacks.AddEntryCallback callback, Object ctx) { + asyncAddEntry(data, callback, ctx); + } + + @Override + public synchronized void asyncAddEntry(ByteBuf buffer, AsyncCallbacks.AddEntryCallback callback, Object ctx) { + final var buf = Unpooled.wrappedBuffer(buffer); + entries.add(buf); + final var position = PositionFactory.create(ledgerId, entries.size() - 1); + callback.addComplete(position, buf, ctx); + } + + @Override + public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AsyncCallbacks.AddEntryCallback callback, + Object ctx) { + asyncAddEntry(buffer, callback, ctx); + } + + @Override + public ManagedCursor openCursor(String name) { + return cursors.computeIfAbsent(name, __ -> new MockManagedCursor(name, this)); + } + + @Override + public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition) { + return openCursor(name); + } + + @Override + public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition, + Map properties, Map cursorProperties) { + return openCursor(name); + } + + @Override + public ManagedCursor newNonDurableCursor(Position startCursorPosition) { + return openCursor(name); + } + + @Override + public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) { + return openCursor(subscriptionName); + } + + @Override + public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, + CommandSubscribe.InitialPosition initialPosition, + boolean isReadCompacted) { + return openCursor(subscriptionName); + } + + @Override + public void asyncDeleteCursor(String name, AsyncCallbacks.DeleteCursorCallback callback, Object ctx) { + cursors.remove(name); + callback.deleteCursorComplete(ctx); + } + + @Override + public void deleteCursor(String name) { + cursors.remove(name); + } + + @Override + public void removeWaitingCursor(ManagedCursor cursor) { + } + + @Override + public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) { + callback.openCursorComplete(openCursor(name), ctx); + } + + @Override + public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition, + AsyncCallbacks.OpenCursorCallback callback, Object ctx) { + callback.openCursorComplete(openCursor(name), ctx); + } + + @Override + public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition, + Map properties, Map cursorProperties, + AsyncCallbacks.OpenCursorCallback callback, Object ctx) { + callback.openCursorComplete(openCursor(name), ctx); + } + + @Override + public Iterable getCursors() { + return this.cursors.values(); + } + + @Override + public Iterable getActiveCursors() { + return getCursors(); + } + + @Override + public long getNumberOfEntries() { + return cursors.size(); + } + + @Override + public long getNumberOfEntries(Range range) { + return 0; + } + + @Override + public long getNumberOfActiveEntries() { + return 0; + } + + @Override + public long getTotalSize() { + return 0; + } + + @Override + public long getEstimatedBacklogSize() { + return 0; + } + + @Override + public CompletableFuture getEarliestMessagePublishTimeInBacklog() { + return CompletableFuture.completedFuture(System.currentTimeMillis()); + } + + @Override + public long getOffloadedSize() { + return 0; + } + + @Override + public long getLastOffloadedLedgerId() { + return 0; + } + + @Override + public long getLastOffloadedSuccessTimestamp() { + return 0; + } + + @Override + public long getLastOffloadedFailureTimestamp() { + return 0; + } + + @Override + public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) { + callback.terminateComplete(getFirstPosition(), ctx); + } + + @Override + public CompletableFuture asyncMigrate() { + return CompletableFuture.completedFuture(getFirstPosition()); + } + + @Override + public Position terminate() { + return getFirstPosition(); + } + + @Override + public void close() { + } + + @Override + public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) { + callback.closeComplete(ctx); + } + + @Override + public ManagedLedgerMXBean getStats() { + return new MockManagedLedgerMXBean(); + } + + @Override + public void delete() throws InterruptedException, ManagedLedgerException { + } + + @Override + public void asyncDelete(AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) { + callback.deleteLedgerComplete(ctx); + } + + @Override + public Position offloadPrefix(Position pos) { + return getFirstPosition(); + } + + @Override + public void asyncOffloadPrefix(Position pos, AsyncCallbacks.OffloadCallback callback, Object ctx) { + callback.offloadComplete(getFirstPosition(), ctx); + } + + @Override + public ManagedCursor getSlowestConsumer() { + return cursors.values().stream().findAny().orElse(null); + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean isMigrated() { + return false; + } + + @Override + public ManagedLedgerConfig getConfig() { + return new ManagedLedgerConfig(); + } + + @Override + public void setConfig(ManagedLedgerConfig config) { + } + + @Override + public synchronized Position getLastConfirmedEntry() { + return PositionFactory.create(ledgerId, entries.size() - 1); + } + + @Override + public void readyToCreateNewLedger() { + } + + @Override + public Map getProperties() { + return Map.of(); + } + + @Override + public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException { + } + + @Override + public void asyncSetProperty(String key, String value, AsyncCallbacks.UpdatePropertiesCallback callback, + Object ctx) { + callback.updatePropertiesComplete(Map.of(), ctx); + } + + @Override + public void deleteProperty(String key) { + } + + @Override + public void asyncDeleteProperty(String key, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx) { + + } + + @Override + public void setProperties(Map properties) throws InterruptedException, ManagedLedgerException { + } + + @Override + public void asyncSetProperties(Map properties, AsyncCallbacks.UpdatePropertiesCallback callback, + Object ctx) { + callback.updatePropertiesComplete(Map.of(), ctx); + } + + @Override + public void trimConsumedLedgersInBackground(CompletableFuture promise) { + promise.complete(null); + } + + @Override + public void rollCurrentLedgerIfFull() { + } + + @Override + public CompletableFuture asyncFindPosition(Predicate predicate) { + return CompletableFuture.completedFuture(getFirstPosition()); + } + + @Override + public ManagedLedgerInterceptor getManagedLedgerInterceptor() { + return null; + } + + @Override + public CompletableFuture getLedgerInfo(long ledgerId) { + return CompletableFuture.completedFuture(null); + } + + @Override + public Optional getOptionalLedgerInfo(long ledgerId) { + return Optional.empty(); + } + + @Override + public CompletableFuture asyncTruncate() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture getManagedLedgerInternalStats(boolean includeLedgerMetadata) { + return CompletableFuture.completedFuture(null); + } + + @Override + public boolean checkInactiveLedgerAndRollOver() { + return false; + } + + @Override + public void checkCursorsToCacheEntries() { + } + + @Override + public synchronized void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { + if (position.getLedgerId() != ledgerId) { + callback.readEntryFailed(new ManagedLedgerException(position.getLedgerId() + " does not exist"), ctx); + return; + } + final var buf = entries.get((int) position.getEntryId()); + callback.readEntryComplete(EntryImpl.create(position, buf), ctx); + } + + @Override + public NavigableMap getLedgersInfo() { + return new TreeMap<>(); + } + + @Override + public Position getNextValidPosition(Position position) { + return PositionFactory.create(position.getLedgerId(), position.getEntryId() + 1); + } + + @Override + public Position getPreviousPosition(Position position) { + return PositionFactory.create(position.getLedgerId(), position.getEntryId() - 1); + } + + @Override + public long getEstimatedBacklogSize(Position position) { + return 0; + } + + @Override + public Position getPositionAfterN(Position startPosition, long n, PositionBound startRange) { + return PositionFactory.create(startPosition.getLedgerId(), startPosition.getEntryId() + n); + } + + @Override + public int getPendingAddEntriesCount() { + return 0; + } + + @Override + public long getCacheSize() { + return 0; + } + + @Override + public Position getFirstPosition() { + return PositionFactory.create(ledgerId, 0L); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerFactory.java new file mode 100644 index 0000000000000..955c7a99498ff --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerFactory.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import lombok.Setter; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.ReadOnlyCursor; +import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.mockito.Mockito; + +public class MockManagedLedgerFactory implements ManagedLedgerFactory { + + private final Map ledgers = new ConcurrentHashMap<>(); + @Setter + private MetadataStore store; + + @Override + public ManagedLedger open(String name) { + return open(name, null); + } + + @Override + public ManagedLedger open(String name, ManagedLedgerConfig config) { + final var future = new CompletableFuture(); + asyncOpen(name, new AsyncCallbacks.OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + future.complete(ledger); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, null); + return future.join(); + } + + @Override + public void asyncOpen(String name, AsyncCallbacks.OpenLedgerCallback callback, Object ctx) { + asyncOpen(name, new ManagedLedgerConfig(), callback, null, ctx); + } + + @Override + public void asyncOpen(String name, ManagedLedgerConfig config, AsyncCallbacks.OpenLedgerCallback callback, + Supplier> mlOwnershipChecker, Object ctx) { + final var ledger = ledgers.computeIfAbsent(name, __ -> { + final String path = "/managed-ledgers/" + name; + store.put(path, new byte[0], Optional.empty()); + return new MockManagedLedger(name); + }); + callback.openLedgerComplete(ledger, ctx); + } + + @Override + public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition, + ManagedLedgerConfig config) { + throw new RuntimeException("openReadOnlyCursor is not supported"); + } + + @Override + public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config, + AsyncCallbacks.OpenReadOnlyCursorCallback callback, Object ctx) { + throw new RuntimeException("openReadOnlyCursor is not supported"); + } + + @Override + public void asyncOpenReadOnlyManagedLedger(String managedLedgerName, + AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback, + ManagedLedgerConfig config, Object ctx) { + throw new RuntimeException("openReadOnlyCursor is not supported"); + } + + @Override + public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException { + return new ManagedLedgerInfo(); + } + + @Override + public void asyncGetManagedLedgerInfo(String name, AsyncCallbacks.ManagedLedgerInfoCallback callback, Object ctx) { + callback.getInfoComplete(new ManagedLedgerInfo(), ctx); + } + + @Override + public void delete(String name) { + ledgers.remove(name); + } + + @Override + public void delete(String name, CompletableFuture mlConfigFuture) { + delete(name); + } + + @Override + public void asyncDelete(String name, AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) { + delete(name); + callback.deleteLedgerComplete(ctx); + } + + @Override + public void asyncDelete(String name, CompletableFuture mlConfigFuture, + AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) { + delete(name); + callback.deleteLedgerComplete(ctx); + } + + @Override + public void shutdown() throws InterruptedException, ManagedLedgerException { + } + + @Override + public CompletableFuture shutdownAsync() { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture asyncExists(String ledgerName) { + return CompletableFuture.completedFuture(ledgers.containsKey(ledgerName)); + } + + @Override + public EntryCacheManager getEntryCacheManager() { + return Mockito.mock(EntryCacheManager.class); + } + + @Override + public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos) { + } + + @Override + public long getCacheEvictionTimeThreshold() { + return 0; + } + + @Override + public CompletableFuture> getManagedLedgerPropertiesAsync(String name) { + return CompletableFuture.completedFuture(Map.of()); + } + + @Override + public Map getManagedLedgers() { + return Map.of(); + } + + @Override + public ManagedLedgerFactoryMXBean getCacheStats() { + return Mockito.mock(ManagedLedgerFactoryMXBean.class); + } + + @Override + public void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats offlineTopicStats, TopicName topicName, + boolean accurate, Object ctx) throws Exception { + } + + @Override + public ManagedLedgerFactoryConfig getConfig() { + return new ManagedLedgerFactoryConfig(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerMXBean.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerMXBean.java new file mode 100644 index 0000000000000..0c2f243a2e134 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerMXBean.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; +import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; +import org.apache.bookkeeper.mledger.util.StatsBuckets; + +public class MockManagedLedgerMXBean extends ManagedLedgerMBeanImpl { + + public MockManagedLedgerMXBean() { + super(null); + } + + @Override + public String getName() { + return ""; + } + + @Override + public long getStoredMessagesSize() { + return 0; + } + + @Override + public long getStoredMessagesLogicalSize() { + return 0; + } + + @Override + public long getNumberOfMessagesInBacklog() { + return 0; + } + + @Override + public double getAddEntryMessagesRate() { + return 0; + } + + @Override + public double getAddEntryBytesRate() { + return 0; + } + + @Override + public long getAddEntryBytesTotal() { + return 0; + } + + @Override + public double getAddEntryWithReplicasBytesRate() { + return 0; + } + + @Override + public long getAddEntryWithReplicasBytesTotal() { + return 0; + } + + @Override + public double getReadEntriesRate() { + return 0; + } + + @Override + public double getReadEntriesBytesRate() { + return 0; + } + + @Override + public long getReadEntriesBytesTotal() { + return 0; + } + + @Override + public double getMarkDeleteRate() { + return 0; + } + + @Override + public long getMarkDeleteTotal() { + return 0; + } + + @Override + public long getAddEntrySucceed() { + return 0; + } + + @Override + public long getAddEntrySucceedTotal() { + return 0; + } + + @Override + public long getAddEntryErrors() { + return 0; + } + + @Override + public long getAddEntryErrorsTotal() { + return 0; + } + + @Override + public long getEntriesReadTotalCount() { + return 0; + } + + @Override + public long getReadEntriesSucceeded() { + return 0; + } + + @Override + public long getReadEntriesSucceededTotal() { + return 0; + } + + @Override + public long getReadEntriesErrors() { + return 0; + } + + @Override + public long getReadEntriesErrorsTotal() { + return 0; + } + + @Override + public double getReadEntriesOpsCacheMissesRate() { + return 0; + } + + @Override + public long getReadEntriesOpsCacheMissesTotal() { + return 0; + } + + @Override + public double getEntrySizeAverage() { + return 0; + } + + @Override + public long[] getEntrySizeBuckets() { + return new long[0]; + } + + @Override + public double getAddEntryLatencyAverageUsec() { + return 0; + } + + @Override + public long[] getAddEntryLatencyBuckets() { + return new long[0]; + } + + @Override + public long[] getLedgerSwitchLatencyBuckets() { + return new long[0]; + } + + @Override + public double getLedgerSwitchLatencyAverageUsec() { + return 0; + } + + @Override + public StatsBuckets getInternalAddEntryLatencyBuckets() { + return new StatsBuckets(500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000, 200_000, 1000_000); + } + + @Override + public StatsBuckets getInternalEntrySizeBuckets() { + return new StatsBuckets(128, 512, 1024, 2048, 4096, 16_384, 102_400, 1_048_576); + } + + @Override + public PendingBookieOpsStats getPendingBookieOpsStats() { + return new PendingBookieOpsStats(); + } + + @Override + public double getLedgerAddEntryLatencyAverageUsec() { + return 0; + } + + @Override + public long[] getLedgerAddEntryLatencyBuckets() { + return new long[0]; + } + + @Override + public StatsBuckets getInternalLedgerAddEntryLatencyBuckets() { + return new StatsBuckets(500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000, 200_000, 1000_000); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerStorage.java new file mode 100644 index 0000000000000..d31a57b8e9250 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerStorage.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import io.netty.channel.EventLoopGroup; +import io.opentelemetry.api.OpenTelemetry; +import java.io.IOException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.pulsar.broker.BookKeeperClientFactory; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.storage.ManagedLedgerStorage; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.mockito.Mockito; + +public class MockManagedLedgerStorage implements ManagedLedgerStorage { + + private final MockManagedLedgerFactory factory = new MockManagedLedgerFactory(); + + @Override + public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, + BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup, + OpenTelemetry openTelemetry) throws Exception { + factory.setStore(metadataStore); + } + + @Override + public ManagedLedgerFactory getManagedLedgerFactory() { + return factory; + } + + @Override + public StatsProvider getStatsProvider() { + return new NullStatsProvider(); + } + + @Override + public BookKeeper getBookKeeperClient() { + return Mockito.mock(BookKeeper.class); + } + + @Override + public void close() throws IOException { + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockSchemaStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockSchemaStorage.java new file mode 100644 index 0000000000000..1ee03524cb25c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockSchemaStorage.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.protocol.schema.SchemaStorage; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.apache.pulsar.common.protocol.schema.StoredSchema; +import org.apache.pulsar.common.schema.LongSchemaVersion; + +public class MockSchemaStorage implements SchemaStorage { + + private final Map> schemasMap = new HashMap<>(); + + @Override + public synchronized CompletableFuture put(String key, byte[] value, byte[] hash) { + final var existingValue = schemasMap.get(key); + final LongSchemaVersion version; + if (existingValue == null) { + version = new LongSchemaVersion(0L); + final var schemas = new TreeMap(); + schemas.put(version.getVersion(), new StoredSchema(value, version)); + schemasMap.put(key, schemas); + } else { + final var lastVersion = existingValue.lastEntry().getKey(); + version = new LongSchemaVersion(lastVersion + 1); + existingValue.put(version.getVersion(), new StoredSchema(value, version)); + } + return CompletableFuture.completedFuture(version); + } + + @Override + public CompletableFuture put( + String key, Function>>, + CompletableFuture>> fn) { + return fn.apply(getAll(key)).thenCompose(pair -> { + if (pair != null) { + return put(key, pair.getLeft(), pair.getRight()); + } else { + // TODO: figure out why it would come here. With the default logic, NPE will happen. + return CompletableFuture.completedFuture(SchemaVersion.Empty); + } + }); + } + + @Override + public synchronized CompletableFuture get(String key, SchemaVersion version) { + final var schemas = schemasMap.get(key); + if (schemas == null) { + return CompletableFuture.completedFuture(null); + } + final LongSchemaVersion storedVersion; + if (version == SchemaVersion.Latest) { + storedVersion = (LongSchemaVersion) schemas.lastEntry().getValue().version; + } else { + storedVersion = (LongSchemaVersion) version; + } + return CompletableFuture.completedFuture(schemas.get(storedVersion.getVersion())); + } + + @Override + public synchronized CompletableFuture>> getAll(String key) { + return Optional.ofNullable(schemasMap.get(key)).map(schemas -> schemas.values().stream() + .map(CompletableFuture::completedFuture).toList() + ).map(CompletableFuture::completedFuture).orElse(CompletableFuture.completedFuture(List.of())); + } + + @Override + public CompletableFuture delete(String key, boolean forcefully) { + final var schemas = schemasMap.remove(key); + if (schemas != null) { + return CompletableFuture.completedFuture(new LongSchemaVersion(-1L)); + } else { + return CompletableFuture.completedFuture(null); + } + } + + @Override + public CompletableFuture delete(String key) { + return delete(key, true); + } + + @Override + public SchemaVersion versionFromBytes(byte[] version) { + final var buffer = ByteBuffer.wrap(version); + return new LongSchemaVersion(buffer.getLong()); + } + + @Override + public void start() throws Exception { + // No ops + } + + @Override + public void close() throws Exception { + // No ops + } +} From c05ed36bcebc257d51cb41ebd67091711a1b7402 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 01:26:15 +0800 Subject: [PATCH 02/11] Recreate the node if it's deleted --- .../extensions/BrokerRegistry.java | 6 ++- .../extensions/BrokerRegistryImpl.java | 49 ++++++++++--------- .../channel/ServiceUnitStateChannelImpl.java | 1 + .../extensions/BrokerRegistryTest.java | 2 +- .../ExtensibleLoadManagerImplTest.java | 4 +- .../impl/MetadataStoreNodeDeletedTest.java} | 34 +++++++++++-- .../MockManagedCursor.java | 2 +- .../MockManagedLedger.java | 2 +- .../MockManagedLedgerFactory.java | 2 +- .../MockManagedLedgerMXBean.java | 2 +- .../MockManagedLedgerStorage.java | 2 +- .../MockSchemaStorage.java | 2 +- 12 files changed, 71 insertions(+), 37 deletions(-) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java => metadata/impl/MetadataStoreNodeDeletedTest.java} (65%) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/loadbalance/extensions => utils}/MockManagedCursor.java (99%) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/loadbalance/extensions => utils}/MockManagedLedger.java (99%) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/loadbalance/extensions => utils}/MockManagedLedgerFactory.java (99%) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/loadbalance/extensions => utils}/MockManagedLedgerMXBean.java (98%) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/loadbalance/extensions => utils}/MockManagedLedgerStorage.java (97%) rename pulsar-broker/src/test/java/org/apache/pulsar/{broker/loadbalance/extensions => utils}/MockSchemaStorage.java (98%) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java index 8133d4c482752..79dba9c63342e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java @@ -25,6 +25,8 @@ import java.util.function.BiConsumer; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.NotificationType; @@ -32,6 +34,8 @@ * Responsible for registering the current Broker lookup info to * the distributed store (e.g. Zookeeper) for broker discovery. */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Unstable public interface BrokerRegistry extends AutoCloseable { /** @@ -47,7 +51,7 @@ public interface BrokerRegistry extends AutoCloseable { /** * Register local broker to metadata store. */ - void register() throws MetadataStoreException; + CompletableFuture registerAsync(); /** * Unregister the broker. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index f34d377990b68..b8cfa70513077 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarServerException; @@ -69,10 +70,11 @@ protected enum State { Init, Started, Registered, + Unregistering, Closed } - private State state; + private final AtomicReference state = new AtomicReference<>(State.Init); public BrokerRegistryImpl(PulsarService pulsar) { this.pulsar = pulsar; @@ -94,44 +96,45 @@ public BrokerRegistryImpl(PulsarService pulsar) { System.currentTimeMillis(), pulsar.getBrokerVersion(), pulsar.getConfig().lookupProperties()); - this.state = State.Init; } @Override public synchronized void start() throws PulsarServerException { - if (this.state != State.Init) { - return; + if (!this.state.compareAndSet(State.Init, State.Started)) { + throw new PulsarServerException("Cannot start the broker registry in state " + state.get()); } pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); try { - this.state = State.Started; - this.register(); - } catch (MetadataStoreException e) { - throw new PulsarServerException(e); + this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + throw PulsarServerException.from(e); } } @Override public boolean isStarted() { - return this.state == State.Started || this.state == State.Registered; + final var state = this.state.get(); + return state == State.Started || state == State.Registered; } @Override - public synchronized void register() throws MetadataStoreException { - if (this.state == State.Started) { - try { - brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral)) - .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); - this.state = State.Registered; - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw MetadataStoreException.unwrap(e); - } + public CompletableFuture registerAsync() { + final var state = this.state.get(); + if (state != State.Started && state != State.Registered) { + log.info("[{}] Skip registering self because the state is {}", getBrokerId(), state); + return CompletableFuture.completedFuture(null); } + log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state); + return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral)) + .thenAccept(__ -> { + this.state.set(State.Registered); + log.info("[{}] Finished registering self", getBrokerId()); + }); } @Override public synchronized void unregister() throws MetadataStoreException { - if (this.state == State.Registered) { + if (state.compareAndSet(State.Registered, State.Unregistering)) { try { brokerLookupDataMetadataCache.delete(brokerIdKeyPath) .get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); @@ -144,7 +147,7 @@ public synchronized void unregister() throws MetadataStoreException { } catch (InterruptedException | TimeoutException e) { throw MetadataStoreException.unwrap(e); } finally { - this.state = State.Started; + state.set(State.Started); } } } @@ -191,7 +194,7 @@ public synchronized void addListener(BiConsumer listen @Override public synchronized void close() throws PulsarServerException { - if (this.state == State.Closed) { + if (this.state.get() == State.Closed) { return; } try { @@ -200,7 +203,7 @@ public synchronized void close() throws PulsarServerException { } catch (Exception ex) { log.error("Unexpected error when unregistering the broker registry", ex); } finally { - this.state = State.Closed; + this.state.set(State.Closed); } } @@ -238,7 +241,7 @@ protected static String keyPath(String brokerId) { } private void checkState() throws IllegalStateException { - if (this.state == State.Closed) { + if (this.state.get() == State.Closed) { throw new IllegalStateException("The registry already closed."); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ddbc9eacac921..1bbf3ba69270a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1276,6 +1276,7 @@ private void handleBrokerDeletionEvent(String broker) { log.error("Failed to handle broker deletion event.", e); return; } + brokerRegistry.registerAsync(); MetadataState state = getMetadataState(); log.info("Handling broker:{} ownership cleanup based on metadata connection state:{}, event:{}, event_ts:{}:", broker, state, lastMetadataSessionEvent, lastMetadataSessionEventTimestamp); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 91ada90dda690..3d7d6ea2205a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -332,7 +332,7 @@ public void testCloseRegister() throws Exception { assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Started); // Check state after re-register. - brokerRegistry.register(); + brokerRegistry.registerAsync().get(); assertEquals(getState(brokerRegistry), BrokerRegistryImpl.State.Registered); // Check state after close. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 4f6a006918318..7871e612c847a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -1211,7 +1211,7 @@ public void testDeployAndRollbackLoadManager() throws Exception { producer.send("t1"); // Test re-register broker and check the lookup result - loadManager4.getBrokerRegistry().register(); + loadManager4.getBrokerRegistry().registerAsync().get(); result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); assertNotNull(result); @@ -1423,7 +1423,7 @@ public void testLoadBalancerServiceUnitTableViewSyncer() throws Exception { producer.send("t1"); // Test re-register broker and check the lookup result - loadManager4.getBrokerRegistry().register(); + loadManager4.getBrokerRegistry().registerAsync().get(); result = pulsar.getAdminClient().lookups().lookupTopic(slaMonitorTopic); assertNotNull(result); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java similarity index 65% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java index 508ae930843b7..036c59890dc60 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MetadataStoreSessionExpiredTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java @@ -16,22 +16,33 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions; +package org.apache.pulsar.metadata.impl; +import java.time.Duration; import java.util.Collections; +import java.util.List; import java.util.Optional; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.utils.MockManagedLedgerStorage; +import org.apache.pulsar.utils.MockSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.protocol.schema.SchemaStorage; +import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @Test(groups = "broker") -public class MetadataStoreSessionExpiredTest { +public class MetadataStoreNodeDeletedTest { private static final String clusterName = "test"; private PulsarService pulsar; @@ -54,8 +65,23 @@ protected void cleanup() throws Exception { @Test public void testLookupAfterSessionTimeout() throws Exception { - final var topic = "test-lookup-after-session-timeout"; - pulsar.getAdminClient().topics().createPartitionedTopic(topic, 1); + final var metadataStore = (LocalMemoryMetadataStore) pulsar.getLocalMetadataStore(); + final var children = metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get(); + Assert.assertEquals(children, List.of(pulsar.getBrokerId())); + + // Simulate the case that the node was somehow deleted (e.g. by session timeout) + final var path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId(); + metadataStore.delete(path, Optional.empty()); + metadataStore.receivedNotification(new Notification(NotificationType.Deleted, path)); + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + final var newChildren = metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join(); + Assert.assertEquals(newChildren, List.of(pulsar.getBrokerId())); + }); + + // If the node is deleted by unregister(), it should not recreate the path + ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get()).get().getBrokerRegistry().unregister(); + Thread.sleep(3000); + Assert.assertTrue(metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join().isEmpty()); } private ServiceConfiguration brokerConfig() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedCursor.java similarity index 99% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedCursor.java rename to pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedCursor.java index a91aecb3a7710..526fad470ca78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedCursor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedCursor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions; +package org.apache.pulsar.utils; import com.google.common.collect.Range; import java.util.ArrayList; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedger.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedger.java similarity index 99% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedger.java rename to pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedger.java index 3df0d36bcc190..a832cfa75bb46 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedger.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedger.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions; +package org.apache.pulsar.utils; import com.google.common.collect.Range; import io.netty.buffer.ByteBuf; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerFactory.java similarity index 99% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerFactory.java rename to pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerFactory.java index 955c7a99498ff..250c7c9bacf97 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerFactory.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerFactory.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions; +package org.apache.pulsar.utils; import java.util.Map; import java.util.Optional; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerMXBean.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerMXBean.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerMXBean.java rename to pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerMXBean.java index 0c2f243a2e134..3134371db9259 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerMXBean.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerMXBean.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions; +package org.apache.pulsar.utils; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerStorage.java similarity index 97% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerStorage.java rename to pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerStorage.java index d31a57b8e9250..1a3d52d54bd0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockManagedLedgerStorage.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerStorage.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions; +package org.apache.pulsar.utils; import io.netty.channel.EventLoopGroup; import io.opentelemetry.api.OpenTelemetry; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockSchemaStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockSchemaStorage.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockSchemaStorage.java rename to pulsar-broker/src/test/java/org/apache/pulsar/utils/MockSchemaStorage.java index 1ee03524cb25c..59253c5c20649 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/MockSchemaStorage.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockSchemaStorage.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.broker.loadbalance.extensions; +package org.apache.pulsar.utils; import java.nio.ByteBuffer; import java.util.HashMap; From 678ba31b0f27ade708d71556c20693e7da20df6d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 01:34:06 +0800 Subject: [PATCH 03/11] Replace inmemory metadata store and mocked storage with ZK and BK --- .../impl/MetadataStoreNodeDeletedTest.java | 32 +- .../pulsar/utils/MockManagedCursor.java | 463 ------------------ .../pulsar/utils/MockManagedLedger.java | 458 ----------------- .../utils/MockManagedLedgerFactory.java | 191 -------- .../pulsar/utils/MockManagedLedgerMXBean.java | 215 -------- .../utils/MockManagedLedgerStorage.java | 63 --- .../pulsar/utils/MockSchemaStorage.java | 122 ----- 7 files changed, 11 insertions(+), 1533 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedCursor.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedger.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerFactory.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerMXBean.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerStorage.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/utils/MockSchemaStorage.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java index 036c59890dc60..79c8678f007c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import org.apache.bookkeeper.util.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -29,12 +30,9 @@ import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; -import org.apache.pulsar.utils.MockManagedLedgerStorage; -import org.apache.pulsar.utils.MockSchemaStorage; -import org.apache.pulsar.broker.service.schema.SchemaStorageFactory; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.protocol.schema.SchemaStorage; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -45,10 +43,13 @@ public class MetadataStoreNodeDeletedTest { private static final String clusterName = "test"; + private final int zkPort = PortManager.nextFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort); private PulsarService pulsar; @BeforeClass protected void setup() throws Exception { + bk.start(); pulsar = new PulsarService(brokerConfig()); pulsar.start(); final var admin = pulsar.getAdminClient(); @@ -60,12 +61,15 @@ protected void setup() throws Exception { @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { - pulsar.close(); + if (pulsar != null) { + pulsar.close(); + } + bk.stop(); } @Test public void testLookupAfterSessionTimeout() throws Exception { - final var metadataStore = (LocalMemoryMetadataStore) pulsar.getLocalMetadataStore(); + final var metadataStore = (ZKMetadataStore) pulsar.getLocalMetadataStore(); final var children = metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get(); Assert.assertEquals(children, List.of(pulsar.getBrokerId())); @@ -90,13 +94,7 @@ private ServiceConfiguration brokerConfig() { config.setAdvertisedAddress("localhost"); config.setBrokerServicePort(Optional.of(0)); config.setWebServicePort(Optional.of(0)); - config.setMetadataStoreUrl("memory:local"); - config.setSchemaRegistryStorageClassName(MockSchemaStorageFactory.class.getName()); - config.setManagedLedgerStorageClassName(MockManagedLedgerStorage.class.getName()); - - config.setSystemTopicEnabled(false); - config.setTopicLevelPoliciesEnabled(false); - + config.setMetadataStoreUrl("zk:127.0.0.1:" + bk.getZookeeperPort()); config.setManagedLedgerDefaultWriteQuorum(1); config.setManagedLedgerDefaultAckQuorum(1); config.setManagedLedgerDefaultEnsembleSize(1); @@ -106,12 +104,4 @@ private ServiceConfiguration brokerConfig() { config.setBrokerShutdownTimeoutMs(100); return config; } - - public static class MockSchemaStorageFactory implements SchemaStorageFactory { - - @Override - public SchemaStorage create(PulsarService pulsar) throws Exception { - return new MockSchemaStorage(); - } - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedCursor.java deleted file mode 100644 index 526fad470ca78..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedCursor.java +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.utils; - -import com.google.common.collect.Range; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import lombok.RequiredArgsConstructor; -import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedCursorMXBean; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.PositionFactory; -import org.apache.bookkeeper.mledger.impl.EntryImpl; -import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; - -@RequiredArgsConstructor -public class MockManagedCursor implements ManagedCursor { - - private final String name; - private final MockManagedLedger managedLedger; - private long offset = 0L; - - @Override - public String getName() { - return name; - } - - @Override - public long getLastActive() { - return 0; - } - - @Override - public void updateLastActive() { - } - - @Override - public Map getProperties() { - return Map.of(); - } - - @Override - public Map getCursorProperties() { - return Map.of(); - } - - @Override - public CompletableFuture putCursorProperty(String key, String value) { - return null; - } - - @Override - public CompletableFuture setCursorProperties(Map cursorProperties) { - return null; - } - - @Override - public CompletableFuture removeCursorProperty(String key) { - return null; - } - - @Override - public boolean putProperty(String key, Long value) { - return false; - } - - @Override - public boolean removeProperty(String key) { - return false; - } - - @Override - public synchronized List readEntries(int numberOfEntriesToRead) { - synchronized (managedLedger) { - final var nextOffset = Math.min(offset + numberOfEntriesToRead, managedLedger.entries.size()); - final var entries = new ArrayList(); - for (long i = offset; i < nextOffset; i++) { - entries.add(EntryImpl.create(PositionFactory.create(managedLedger.ledgerId, i), - managedLedger.entries.get((int) i))); - } - offset = nextOffset; - return entries; - } - } - - @Override - public void asyncReadEntries(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx, - Position maxPosition) { - callback.readEntriesComplete(readEntries(numberOfEntriesToRead), ctx); - } - - @Override - public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, - AsyncCallbacks.ReadEntriesCallback callback, Object ctx, Position maxPosition) { - callback.readEntriesComplete(readEntries(numberOfEntriesToRead), ctx); - } - - @Override - public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) { - throw new RuntimeException("getNthEntry is not supported"); - } - - @Override - public void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, - AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - callback.readEntryFailed(new ManagedLedgerException("getNthEntry is not supported"), ctx); - } - - @Override - public List readEntriesOrWait(int numberOfEntriesToRead) { - return List.of(); - } - - @Override - public List readEntriesOrWait(int maxEntries, long maxSizeBytes) { - final var future = new CompletableFuture>(); - asyncReadEntriesOrWait(maxEntries, Long.MAX_VALUE, new AsyncCallbacks.ReadEntriesCallback() { - @Override - public void readEntriesComplete(List entries, Object ctx) { - future.complete(entries); - } - - @Override - public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null, PositionFactory.LATEST); - return future.join(); - } - - @Override - public void asyncReadEntriesOrWait(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, - Object ctx, Position maxPosition) { - asyncReadEntriesOrWait(numberOfEntriesToRead, Long.MAX_VALUE, callback, ctx, maxPosition); - } - - @Override - public synchronized void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, - AsyncCallbacks.ReadEntriesCallback callback, - Object ctx, Position maxPosition) { - final var entries = readEntries(maxEntries); - if (entries.isEmpty()) { - CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS).execute(() -> - callback.readEntriesComplete(entries, ctx)); - } else { - callback.readEntriesComplete(entries, ctx); - } - } - - @Override - public boolean cancelPendingReadRequest() { - return false; - } - - @Override - public synchronized boolean hasMoreEntries() { - synchronized (managedLedger) { - return offset >= managedLedger.entries.size(); - } - } - - @Override - public long getNumberOfEntries() { - return managedLedger.getNumberOfEntries(); - } - - @Override - public long getNumberOfEntriesInBacklog(boolean isPrecise) { - return 0; - } - - @Override - public void markDelete(Position position) { - - } - - @Override - public void markDelete(Position position, Map properties) { - } - - @Override - public void asyncMarkDelete(Position position, AsyncCallbacks.MarkDeleteCallback callback, Object ctx) { - callback.markDeleteComplete(ctx); - } - - @Override - public void asyncMarkDelete(Position position, Map properties, - AsyncCallbacks.MarkDeleteCallback callback, Object ctx) { - callback.markDeleteComplete(ctx); - } - - @Override - public void delete(Position position) throws InterruptedException, ManagedLedgerException { - } - - @Override - public void asyncDelete(Position position, AsyncCallbacks.DeleteCallback callback, Object ctx) { - callback.deleteComplete(ctx); - } - - @Override - public void delete(Iterable positions) throws InterruptedException, ManagedLedgerException { - } - - @Override - public void asyncDelete(Iterable position, AsyncCallbacks.DeleteCallback callback, Object ctx) { - callback.deleteComplete(ctx); - } - - @Override - public synchronized Position getReadPosition() { - return PositionFactory.create(managedLedger.ledgerId, offset); - } - - @Override - public Position getMarkDeletedPosition() { - return getReadPosition(); - } - - @Override - public Position getPersistentMarkDeletedPosition() { - return getMarkDeletedPosition(); - } - - @Override - public synchronized void rewind() { - offset = 0L; - } - - @Override - public synchronized void seek(Position newReadPosition, boolean force) { - if (newReadPosition.getLedgerId() != managedLedger.ledgerId) { - throw new RuntimeException("Failed to seek " + newReadPosition); - } - offset = newReadPosition.getEntryId(); - } - - @Override - public void clearBacklog() { - } - - @Override - public void asyncClearBacklog(AsyncCallbacks.ClearBacklogCallback callback, Object ctx) { - callback.clearBacklogComplete(ctx); - } - - @Override - public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries) { - } - - @Override - public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries, - AsyncCallbacks.SkipEntriesCallback callback, Object ctx) { - callback.skipEntriesComplete(ctx); - } - - @Override - public Position findNewestMatching(Predicate condition) { - return getFirstPosition(); - } - - @Override - public Position findNewestMatching(FindPositionConstraint constraint, Predicate condition) { - return getFirstPosition(); - } - - @Override - public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, - AsyncCallbacks.FindEntryCallback callback, Object ctx) { - callback.findEntryComplete(getFirstPosition(), ctx); - } - - @Override - public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, - AsyncCallbacks.FindEntryCallback callback, Object ctx, - boolean isFindFromLedger) { - callback.findEntryComplete(getFirstPosition(), ctx); - } - - @Override - public void resetCursor(Position position) { - seek(position); - } - - @Override - public void asyncResetCursor(Position position, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) { - seek(position); - callback.resetComplete(null); - } - - @Override - public List replayEntries(Set positions) { - return List.of(); - } - - @Override - public Set asyncReplayEntries(Set positions, - AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - callback.readEntriesComplete(List.of(), ctx); - return Set.of(); - } - - @Override - public Set asyncReplayEntries(Set positions, - AsyncCallbacks.ReadEntriesCallback callback, Object ctx, - boolean sortEntries) { - return asyncReplayEntries(positions, callback, ctx); - } - - @Override - public void close() { - } - - @Override - public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) { - callback.closeComplete(ctx); - } - - @Override - public Position getFirstPosition() { - return managedLedger.getFirstPosition(); - } - - @Override - public void setActive() { - } - - @Override - public void setInactive() { - } - - @Override - public void setAlwaysInactive() { - } - - @Override - public boolean isActive() { - return false; - } - - @Override - public boolean isDurable() { - return false; - } - - @Override - public long getNumberOfEntriesSinceFirstNotAckedMessage() { - return 0; - } - - @Override - public int getTotalNonContiguousDeletedMessagesRange() { - return 0; - } - - @Override - public int getNonContiguousDeletedMessagesRangeSerializedSize() { - return 0; - } - - @Override - public long getEstimatedSizeSinceMarkDeletePosition() { - return 0; - } - - @Override - public double getThrottleMarkDelete() { - return 0; - } - - @Override - public void setThrottleMarkDelete(double throttleMarkDelete) { - } - - @Override - public ManagedLedger getManagedLedger() { - return managedLedger; - } - - @Override - public Range getLastIndividualDeletedRange() { - throw new RuntimeException("getLastIndividualDeletedRange is not supported"); - } - - @Override - public void trimDeletedEntries(List entries) { - } - - @Override - public long[] getDeletedBatchIndexesAsLongArray(Position position) { - return new long[0]; - } - - @Override - public ManagedCursorMXBean getStats() { - return null; - } - - @Override - public boolean checkAndUpdateReadPositionChanged() { - return false; - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public ManagedLedgerInternalStats.CursorStats getCursorStats() { - return null; - } - - @Override - public boolean isMessageDeleted(Position position) { - return false; - } - - @Override - public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException { - return null; - } - - @Override - public long[] getBatchPositionAckSet(Position position) { - return new long[0]; - } - - @Override - public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { - return 0; - } - - @Override - public void updateReadStats(int readEntriesCount, long readEntriesSize) { - - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedger.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedger.java deleted file mode 100644 index a832cfa75bb46..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedger.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.utils; - -import com.google.common.collect.Range; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Optional; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Predicate; -import lombok.RequiredArgsConstructor; -import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; -import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.PositionBound; -import org.apache.bookkeeper.mledger.PositionFactory; -import org.apache.bookkeeper.mledger.impl.EntryImpl; -import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; -import org.apache.bookkeeper.mledger.proto.MLDataFormats; -import org.apache.pulsar.common.api.proto.CommandSubscribe; -import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; - -@RequiredArgsConstructor -public class MockManagedLedger implements ManagedLedger { - - private static final AtomicLong ledgerIdGenerator = new AtomicLong(0L); - - final long ledgerId = ledgerIdGenerator.getAndIncrement(); - final List entries = new ArrayList<>(); - private final Map cursors = new ConcurrentHashMap<>(); - private final String name; - - @Override - public String getName() { - return name; - } - - @Override - public synchronized Position addEntry(byte[] data) { - final var buf = Unpooled.wrappedBuffer(data); - entries.add(buf); - return PositionFactory.create(ledgerId, entries.size() - 1); - } - - @Override - public Position addEntry(byte[] data, int numberOfMessages) { - return addEntry(data); - } - - @Override - public synchronized void asyncAddEntry(byte[] data, AsyncCallbacks.AddEntryCallback callback, Object ctx) { - final var position = addEntry(data); - callback.addComplete(addEntry(data), entries.get((int) position.getEntryId()), ctx); - } - - @Override - public Position addEntry(byte[] data, int offset, int length) { - return addEntry(data); - } - - @Override - public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) { - return addEntry(data); - } - - @Override - public void asyncAddEntry(byte[] data, int offset, int length, AsyncCallbacks.AddEntryCallback callback, - Object ctx) { - asyncAddEntry(data, callback, ctx); - } - - @Override - public void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, - AsyncCallbacks.AddEntryCallback callback, Object ctx) { - asyncAddEntry(data, callback, ctx); - } - - @Override - public synchronized void asyncAddEntry(ByteBuf buffer, AsyncCallbacks.AddEntryCallback callback, Object ctx) { - final var buf = Unpooled.wrappedBuffer(buffer); - entries.add(buf); - final var position = PositionFactory.create(ledgerId, entries.size() - 1); - callback.addComplete(position, buf, ctx); - } - - @Override - public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AsyncCallbacks.AddEntryCallback callback, - Object ctx) { - asyncAddEntry(buffer, callback, ctx); - } - - @Override - public ManagedCursor openCursor(String name) { - return cursors.computeIfAbsent(name, __ -> new MockManagedCursor(name, this)); - } - - @Override - public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition) { - return openCursor(name); - } - - @Override - public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition, - Map properties, Map cursorProperties) { - return openCursor(name); - } - - @Override - public ManagedCursor newNonDurableCursor(Position startCursorPosition) { - return openCursor(name); - } - - @Override - public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) { - return openCursor(subscriptionName); - } - - @Override - public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, - CommandSubscribe.InitialPosition initialPosition, - boolean isReadCompacted) { - return openCursor(subscriptionName); - } - - @Override - public void asyncDeleteCursor(String name, AsyncCallbacks.DeleteCursorCallback callback, Object ctx) { - cursors.remove(name); - callback.deleteCursorComplete(ctx); - } - - @Override - public void deleteCursor(String name) { - cursors.remove(name); - } - - @Override - public void removeWaitingCursor(ManagedCursor cursor) { - } - - @Override - public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) { - callback.openCursorComplete(openCursor(name), ctx); - } - - @Override - public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition, - AsyncCallbacks.OpenCursorCallback callback, Object ctx) { - callback.openCursorComplete(openCursor(name), ctx); - } - - @Override - public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition, - Map properties, Map cursorProperties, - AsyncCallbacks.OpenCursorCallback callback, Object ctx) { - callback.openCursorComplete(openCursor(name), ctx); - } - - @Override - public Iterable getCursors() { - return this.cursors.values(); - } - - @Override - public Iterable getActiveCursors() { - return getCursors(); - } - - @Override - public long getNumberOfEntries() { - return cursors.size(); - } - - @Override - public long getNumberOfEntries(Range range) { - return 0; - } - - @Override - public long getNumberOfActiveEntries() { - return 0; - } - - @Override - public long getTotalSize() { - return 0; - } - - @Override - public long getEstimatedBacklogSize() { - return 0; - } - - @Override - public CompletableFuture getEarliestMessagePublishTimeInBacklog() { - return CompletableFuture.completedFuture(System.currentTimeMillis()); - } - - @Override - public long getOffloadedSize() { - return 0; - } - - @Override - public long getLastOffloadedLedgerId() { - return 0; - } - - @Override - public long getLastOffloadedSuccessTimestamp() { - return 0; - } - - @Override - public long getLastOffloadedFailureTimestamp() { - return 0; - } - - @Override - public void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) { - callback.terminateComplete(getFirstPosition(), ctx); - } - - @Override - public CompletableFuture asyncMigrate() { - return CompletableFuture.completedFuture(getFirstPosition()); - } - - @Override - public Position terminate() { - return getFirstPosition(); - } - - @Override - public void close() { - } - - @Override - public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) { - callback.closeComplete(ctx); - } - - @Override - public ManagedLedgerMXBean getStats() { - return new MockManagedLedgerMXBean(); - } - - @Override - public void delete() throws InterruptedException, ManagedLedgerException { - } - - @Override - public void asyncDelete(AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) { - callback.deleteLedgerComplete(ctx); - } - - @Override - public Position offloadPrefix(Position pos) { - return getFirstPosition(); - } - - @Override - public void asyncOffloadPrefix(Position pos, AsyncCallbacks.OffloadCallback callback, Object ctx) { - callback.offloadComplete(getFirstPosition(), ctx); - } - - @Override - public ManagedCursor getSlowestConsumer() { - return cursors.values().stream().findAny().orElse(null); - } - - @Override - public boolean isTerminated() { - return false; - } - - @Override - public boolean isMigrated() { - return false; - } - - @Override - public ManagedLedgerConfig getConfig() { - return new ManagedLedgerConfig(); - } - - @Override - public void setConfig(ManagedLedgerConfig config) { - } - - @Override - public synchronized Position getLastConfirmedEntry() { - return PositionFactory.create(ledgerId, entries.size() - 1); - } - - @Override - public void readyToCreateNewLedger() { - } - - @Override - public Map getProperties() { - return Map.of(); - } - - @Override - public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException { - } - - @Override - public void asyncSetProperty(String key, String value, AsyncCallbacks.UpdatePropertiesCallback callback, - Object ctx) { - callback.updatePropertiesComplete(Map.of(), ctx); - } - - @Override - public void deleteProperty(String key) { - } - - @Override - public void asyncDeleteProperty(String key, AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx) { - - } - - @Override - public void setProperties(Map properties) throws InterruptedException, ManagedLedgerException { - } - - @Override - public void asyncSetProperties(Map properties, AsyncCallbacks.UpdatePropertiesCallback callback, - Object ctx) { - callback.updatePropertiesComplete(Map.of(), ctx); - } - - @Override - public void trimConsumedLedgersInBackground(CompletableFuture promise) { - promise.complete(null); - } - - @Override - public void rollCurrentLedgerIfFull() { - } - - @Override - public CompletableFuture asyncFindPosition(Predicate predicate) { - return CompletableFuture.completedFuture(getFirstPosition()); - } - - @Override - public ManagedLedgerInterceptor getManagedLedgerInterceptor() { - return null; - } - - @Override - public CompletableFuture getLedgerInfo(long ledgerId) { - return CompletableFuture.completedFuture(null); - } - - @Override - public Optional getOptionalLedgerInfo(long ledgerId) { - return Optional.empty(); - } - - @Override - public CompletableFuture asyncTruncate() { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture getManagedLedgerInternalStats(boolean includeLedgerMetadata) { - return CompletableFuture.completedFuture(null); - } - - @Override - public boolean checkInactiveLedgerAndRollOver() { - return false; - } - - @Override - public void checkCursorsToCacheEntries() { - } - - @Override - public synchronized void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - if (position.getLedgerId() != ledgerId) { - callback.readEntryFailed(new ManagedLedgerException(position.getLedgerId() + " does not exist"), ctx); - return; - } - final var buf = entries.get((int) position.getEntryId()); - callback.readEntryComplete(EntryImpl.create(position, buf), ctx); - } - - @Override - public NavigableMap getLedgersInfo() { - return new TreeMap<>(); - } - - @Override - public Position getNextValidPosition(Position position) { - return PositionFactory.create(position.getLedgerId(), position.getEntryId() + 1); - } - - @Override - public Position getPreviousPosition(Position position) { - return PositionFactory.create(position.getLedgerId(), position.getEntryId() - 1); - } - - @Override - public long getEstimatedBacklogSize(Position position) { - return 0; - } - - @Override - public Position getPositionAfterN(Position startPosition, long n, PositionBound startRange) { - return PositionFactory.create(startPosition.getLedgerId(), startPosition.getEntryId() + n); - } - - @Override - public int getPendingAddEntriesCount() { - return 0; - } - - @Override - public long getCacheSize() { - return 0; - } - - @Override - public Position getFirstPosition() { - return PositionFactory.create(ledgerId, 0L); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerFactory.java deleted file mode 100644 index 250c7c9bacf97..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerFactory.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.utils; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; -import lombok.Setter; -import org.apache.bookkeeper.mledger.AsyncCallbacks; -import org.apache.bookkeeper.mledger.ManagedLedger; -import org.apache.bookkeeper.mledger.ManagedLedgerConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.ManagedLedgerFactory; -import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; -import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; -import org.apache.bookkeeper.mledger.ManagedLedgerInfo; -import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.ReadOnlyCursor; -import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; -import org.apache.pulsar.metadata.api.MetadataStore; -import org.mockito.Mockito; - -public class MockManagedLedgerFactory implements ManagedLedgerFactory { - - private final Map ledgers = new ConcurrentHashMap<>(); - @Setter - private MetadataStore store; - - @Override - public ManagedLedger open(String name) { - return open(name, null); - } - - @Override - public ManagedLedger open(String name, ManagedLedgerConfig config) { - final var future = new CompletableFuture(); - asyncOpen(name, new AsyncCallbacks.OpenLedgerCallback() { - @Override - public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - future.complete(ledger); - } - - @Override - public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - future.completeExceptionally(exception); - } - }, null); - return future.join(); - } - - @Override - public void asyncOpen(String name, AsyncCallbacks.OpenLedgerCallback callback, Object ctx) { - asyncOpen(name, new ManagedLedgerConfig(), callback, null, ctx); - } - - @Override - public void asyncOpen(String name, ManagedLedgerConfig config, AsyncCallbacks.OpenLedgerCallback callback, - Supplier> mlOwnershipChecker, Object ctx) { - final var ledger = ledgers.computeIfAbsent(name, __ -> { - final String path = "/managed-ledgers/" + name; - store.put(path, new byte[0], Optional.empty()); - return new MockManagedLedger(name); - }); - callback.openLedgerComplete(ledger, ctx); - } - - @Override - public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition, - ManagedLedgerConfig config) { - throw new RuntimeException("openReadOnlyCursor is not supported"); - } - - @Override - public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config, - AsyncCallbacks.OpenReadOnlyCursorCallback callback, Object ctx) { - throw new RuntimeException("openReadOnlyCursor is not supported"); - } - - @Override - public void asyncOpenReadOnlyManagedLedger(String managedLedgerName, - AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback, - ManagedLedgerConfig config, Object ctx) { - throw new RuntimeException("openReadOnlyCursor is not supported"); - } - - @Override - public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException { - return new ManagedLedgerInfo(); - } - - @Override - public void asyncGetManagedLedgerInfo(String name, AsyncCallbacks.ManagedLedgerInfoCallback callback, Object ctx) { - callback.getInfoComplete(new ManagedLedgerInfo(), ctx); - } - - @Override - public void delete(String name) { - ledgers.remove(name); - } - - @Override - public void delete(String name, CompletableFuture mlConfigFuture) { - delete(name); - } - - @Override - public void asyncDelete(String name, AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) { - delete(name); - callback.deleteLedgerComplete(ctx); - } - - @Override - public void asyncDelete(String name, CompletableFuture mlConfigFuture, - AsyncCallbacks.DeleteLedgerCallback callback, Object ctx) { - delete(name); - callback.deleteLedgerComplete(ctx); - } - - @Override - public void shutdown() throws InterruptedException, ManagedLedgerException { - } - - @Override - public CompletableFuture shutdownAsync() { - return CompletableFuture.completedFuture(null); - } - - @Override - public CompletableFuture asyncExists(String ledgerName) { - return CompletableFuture.completedFuture(ledgers.containsKey(ledgerName)); - } - - @Override - public EntryCacheManager getEntryCacheManager() { - return Mockito.mock(EntryCacheManager.class); - } - - @Override - public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos) { - } - - @Override - public long getCacheEvictionTimeThreshold() { - return 0; - } - - @Override - public CompletableFuture> getManagedLedgerPropertiesAsync(String name) { - return CompletableFuture.completedFuture(Map.of()); - } - - @Override - public Map getManagedLedgers() { - return Map.of(); - } - - @Override - public ManagedLedgerFactoryMXBean getCacheStats() { - return Mockito.mock(ManagedLedgerFactoryMXBean.class); - } - - @Override - public void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats offlineTopicStats, TopicName topicName, - boolean accurate, Object ctx) throws Exception { - } - - @Override - public ManagedLedgerFactoryConfig getConfig() { - return new ManagedLedgerFactoryConfig(); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerMXBean.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerMXBean.java deleted file mode 100644 index 3134371db9259..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerMXBean.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.utils; - -import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; -import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; -import org.apache.bookkeeper.mledger.util.StatsBuckets; - -public class MockManagedLedgerMXBean extends ManagedLedgerMBeanImpl { - - public MockManagedLedgerMXBean() { - super(null); - } - - @Override - public String getName() { - return ""; - } - - @Override - public long getStoredMessagesSize() { - return 0; - } - - @Override - public long getStoredMessagesLogicalSize() { - return 0; - } - - @Override - public long getNumberOfMessagesInBacklog() { - return 0; - } - - @Override - public double getAddEntryMessagesRate() { - return 0; - } - - @Override - public double getAddEntryBytesRate() { - return 0; - } - - @Override - public long getAddEntryBytesTotal() { - return 0; - } - - @Override - public double getAddEntryWithReplicasBytesRate() { - return 0; - } - - @Override - public long getAddEntryWithReplicasBytesTotal() { - return 0; - } - - @Override - public double getReadEntriesRate() { - return 0; - } - - @Override - public double getReadEntriesBytesRate() { - return 0; - } - - @Override - public long getReadEntriesBytesTotal() { - return 0; - } - - @Override - public double getMarkDeleteRate() { - return 0; - } - - @Override - public long getMarkDeleteTotal() { - return 0; - } - - @Override - public long getAddEntrySucceed() { - return 0; - } - - @Override - public long getAddEntrySucceedTotal() { - return 0; - } - - @Override - public long getAddEntryErrors() { - return 0; - } - - @Override - public long getAddEntryErrorsTotal() { - return 0; - } - - @Override - public long getEntriesReadTotalCount() { - return 0; - } - - @Override - public long getReadEntriesSucceeded() { - return 0; - } - - @Override - public long getReadEntriesSucceededTotal() { - return 0; - } - - @Override - public long getReadEntriesErrors() { - return 0; - } - - @Override - public long getReadEntriesErrorsTotal() { - return 0; - } - - @Override - public double getReadEntriesOpsCacheMissesRate() { - return 0; - } - - @Override - public long getReadEntriesOpsCacheMissesTotal() { - return 0; - } - - @Override - public double getEntrySizeAverage() { - return 0; - } - - @Override - public long[] getEntrySizeBuckets() { - return new long[0]; - } - - @Override - public double getAddEntryLatencyAverageUsec() { - return 0; - } - - @Override - public long[] getAddEntryLatencyBuckets() { - return new long[0]; - } - - @Override - public long[] getLedgerSwitchLatencyBuckets() { - return new long[0]; - } - - @Override - public double getLedgerSwitchLatencyAverageUsec() { - return 0; - } - - @Override - public StatsBuckets getInternalAddEntryLatencyBuckets() { - return new StatsBuckets(500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000, 200_000, 1000_000); - } - - @Override - public StatsBuckets getInternalEntrySizeBuckets() { - return new StatsBuckets(128, 512, 1024, 2048, 4096, 16_384, 102_400, 1_048_576); - } - - @Override - public PendingBookieOpsStats getPendingBookieOpsStats() { - return new PendingBookieOpsStats(); - } - - @Override - public double getLedgerAddEntryLatencyAverageUsec() { - return 0; - } - - @Override - public long[] getLedgerAddEntryLatencyBuckets() { - return new long[0]; - } - - @Override - public StatsBuckets getInternalLedgerAddEntryLatencyBuckets() { - return new StatsBuckets(500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000, 200_000, 1000_000); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerStorage.java deleted file mode 100644 index 1a3d52d54bd0c..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockManagedLedgerStorage.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.utils; - -import io.netty.channel.EventLoopGroup; -import io.opentelemetry.api.OpenTelemetry; -import java.io.IOException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.mledger.ManagedLedgerFactory; -import org.apache.bookkeeper.stats.NullStatsProvider; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.pulsar.broker.BookKeeperClientFactory; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.storage.ManagedLedgerStorage; -import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import org.mockito.Mockito; - -public class MockManagedLedgerStorage implements ManagedLedgerStorage { - - private final MockManagedLedgerFactory factory = new MockManagedLedgerFactory(); - - @Override - public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore, - BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup, - OpenTelemetry openTelemetry) throws Exception { - factory.setStore(metadataStore); - } - - @Override - public ManagedLedgerFactory getManagedLedgerFactory() { - return factory; - } - - @Override - public StatsProvider getStatsProvider() { - return new NullStatsProvider(); - } - - @Override - public BookKeeper getBookKeeperClient() { - return Mockito.mock(BookKeeper.class); - } - - @Override - public void close() throws IOException { - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockSchemaStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockSchemaStorage.java deleted file mode 100644 index 59253c5c20649..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/MockSchemaStorage.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.utils; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.common.protocol.schema.SchemaStorage; -import org.apache.pulsar.common.protocol.schema.SchemaVersion; -import org.apache.pulsar.common.protocol.schema.StoredSchema; -import org.apache.pulsar.common.schema.LongSchemaVersion; - -public class MockSchemaStorage implements SchemaStorage { - - private final Map> schemasMap = new HashMap<>(); - - @Override - public synchronized CompletableFuture put(String key, byte[] value, byte[] hash) { - final var existingValue = schemasMap.get(key); - final LongSchemaVersion version; - if (existingValue == null) { - version = new LongSchemaVersion(0L); - final var schemas = new TreeMap(); - schemas.put(version.getVersion(), new StoredSchema(value, version)); - schemasMap.put(key, schemas); - } else { - final var lastVersion = existingValue.lastEntry().getKey(); - version = new LongSchemaVersion(lastVersion + 1); - existingValue.put(version.getVersion(), new StoredSchema(value, version)); - } - return CompletableFuture.completedFuture(version); - } - - @Override - public CompletableFuture put( - String key, Function>>, - CompletableFuture>> fn) { - return fn.apply(getAll(key)).thenCompose(pair -> { - if (pair != null) { - return put(key, pair.getLeft(), pair.getRight()); - } else { - // TODO: figure out why it would come here. With the default logic, NPE will happen. - return CompletableFuture.completedFuture(SchemaVersion.Empty); - } - }); - } - - @Override - public synchronized CompletableFuture get(String key, SchemaVersion version) { - final var schemas = schemasMap.get(key); - if (schemas == null) { - return CompletableFuture.completedFuture(null); - } - final LongSchemaVersion storedVersion; - if (version == SchemaVersion.Latest) { - storedVersion = (LongSchemaVersion) schemas.lastEntry().getValue().version; - } else { - storedVersion = (LongSchemaVersion) version; - } - return CompletableFuture.completedFuture(schemas.get(storedVersion.getVersion())); - } - - @Override - public synchronized CompletableFuture>> getAll(String key) { - return Optional.ofNullable(schemasMap.get(key)).map(schemas -> schemas.values().stream() - .map(CompletableFuture::completedFuture).toList() - ).map(CompletableFuture::completedFuture).orElse(CompletableFuture.completedFuture(List.of())); - } - - @Override - public CompletableFuture delete(String key, boolean forcefully) { - final var schemas = schemasMap.remove(key); - if (schemas != null) { - return CompletableFuture.completedFuture(new LongSchemaVersion(-1L)); - } else { - return CompletableFuture.completedFuture(null); - } - } - - @Override - public CompletableFuture delete(String key) { - return delete(key, true); - } - - @Override - public SchemaVersion versionFromBytes(byte[] version) { - final var buffer = ByteBuffer.wrap(version); - return new LongSchemaVersion(buffer.getLong()); - } - - @Override - public void start() throws Exception { - // No ops - } - - @Override - public void close() throws Exception { - // No ops - } -} From 4e8078d57d378456673b3fcff43e0b7999496c02 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 01:37:43 +0800 Subject: [PATCH 04/11] Move the test under the loadbalance package --- .../extensions/BrokerRegistryIntegrationTest.java} | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) rename pulsar-broker/src/test/java/org/apache/pulsar/{metadata/impl/MetadataStoreNodeDeletedTest.java => broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java} (87%) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java similarity index 87% rename from pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index 79c8678f007c3..834c8e555c3a5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreNodeDeletedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.metadata.impl; +package org.apache.pulsar.broker.loadbalance.extensions; import java.time.Duration; import java.util.Collections; @@ -26,10 +26,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LoadManager; -import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; -import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper; -import org.apache.pulsar.metadata.api.Notification; -import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -40,7 +36,7 @@ import org.testng.annotations.Test; @Test(groups = "broker") -public class MetadataStoreNodeDeletedTest { +public class BrokerRegistryIntegrationTest { private static final String clusterName = "test"; private final int zkPort = PortManager.nextFreePort(); @@ -68,15 +64,14 @@ protected void cleanup() throws Exception { } @Test - public void testLookupAfterSessionTimeout() throws Exception { - final var metadataStore = (ZKMetadataStore) pulsar.getLocalMetadataStore(); + public void testRecoverFromNodeDeletion() throws Exception { + final var metadataStore = pulsar.getLocalMetadataStore(); final var children = metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get(); Assert.assertEquals(children, List.of(pulsar.getBrokerId())); // Simulate the case that the node was somehow deleted (e.g. by session timeout) final var path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId(); metadataStore.delete(path, Optional.empty()); - metadataStore.receivedNotification(new Notification(NotificationType.Deleted, path)); Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { final var newChildren = metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join(); Assert.assertEquals(newChildren, List.of(pulsar.getBrokerId())); From b56293427e5b7cf01e88904f4dccbaead388e930 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 01:47:34 +0800 Subject: [PATCH 05/11] Add more test for register again --- .../BrokerRegistryIntegrationTest.java | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index 834c8e555c3a5..c1b2152b42df5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.util.PortManager; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -35,6 +36,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class BrokerRegistryIntegrationTest { @@ -42,6 +44,8 @@ public class BrokerRegistryIntegrationTest { private final int zkPort = PortManager.nextFreePort(); private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextFreePort); private PulsarService pulsar; + private BrokerRegistry brokerRegistry; + private String brokerMetadataPath; @BeforeClass protected void setup() throws Exception { @@ -53,6 +57,8 @@ protected void setup() throws Exception { admin.tenants().createTenant("public", TenantInfo.builder() .allowedClusters(Collections.singleton(clusterName)).build()); admin.namespaces().createNamespace("public/default"); + brokerRegistry = ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get()).get().getBrokerRegistry(); + brokerMetadataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId(); } @AfterClass(alwaysRun = true) @@ -65,22 +71,32 @@ protected void cleanup() throws Exception { @Test public void testRecoverFromNodeDeletion() throws Exception { - final var metadataStore = pulsar.getLocalMetadataStore(); - final var children = metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get(); - Assert.assertEquals(children, List.of(pulsar.getBrokerId())); - // Simulate the case that the node was somehow deleted (e.g. by session timeout) - final var path = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + pulsar.getBrokerId(); - metadataStore.delete(path, Optional.empty()); - Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { - final var newChildren = metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join(); - Assert.assertEquals(newChildren, List.of(pulsar.getBrokerId())); - }); + Assert.assertEquals(brokerRegistry.getAvailableBrokersAsync().get(), List.of(pulsar.getBrokerId())); + pulsar.getLocalMetadataStore().delete(brokerMetadataPath, Optional.empty()); + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( + brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId()))); // If the node is deleted by unregister(), it should not recreate the path - ((ExtensibleLoadManagerWrapper) pulsar.getLoadManager().get()).get().getBrokerRegistry().unregister(); + brokerRegistry.unregister(); Thread.sleep(3000); - Assert.assertTrue(metadataStore.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join().isEmpty()); + Assert.assertTrue(brokerRegistry.getAvailableBrokersAsync().get().isEmpty()); + + // Restore the normal state + brokerRegistry.registerAsync().get(); + Assert.assertEquals(brokerRegistry.getAvailableBrokersAsync().get(), List.of(pulsar.getBrokerId())); + } + + @Test + public void testRegisterAgain() throws Exception { + final var metadataStore = pulsar.getLocalMetadataStore(); + final var oldResult = metadataStore.get(brokerMetadataPath).get().orElseThrow(); + log.info("Old result: {} {}", new String(oldResult.getValue()), oldResult.getStat().getVersion()); + brokerRegistry.registerAsync().get(); + final var newResult = metadataStore.get(brokerMetadataPath).get().orElseThrow(); + log.info("New result: {} {}", new String(newResult.getValue()), newResult.getStat().getVersion()); + Assert.assertTrue(newResult.getStat().getVersion() > oldResult.getStat().getVersion()); + Assert.assertEquals(newResult.getValue(), oldResult.getValue()); } private ServiceConfiguration brokerConfig() { From b7fb4afe9d87d4dcdd9571cbf511deb85ad6042f Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 02:00:36 +0800 Subject: [PATCH 06/11] Move the registerAsync to handleBrokerRegistrationEvent --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 1bbf3ba69270a..67c035bcbeb61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1234,6 +1234,9 @@ protected void handleBrokerRegistrationEvent(String broker, NotificationType typ handleBrokerCreationEvent(broker); } else if (type == NotificationType.Deleted) { log.info("BrokerRegistry detected the broker:{} registry has been deleted.", broker); + if (brokerRegistry.getBrokerId().equals(broker)) { + brokerRegistry.registerAsync(); + } handleBrokerDeletionEvent(broker); } } @@ -1276,7 +1279,6 @@ private void handleBrokerDeletionEvent(String broker) { log.error("Failed to handle broker deletion event.", e); return; } - brokerRegistry.registerAsync(); MetadataState state = getMetadataState(); log.info("Handling broker:{} ownership cleanup based on metadata connection state:{}, event:{}, event_ts:{}:", broker, state, lastMetadataSessionEvent, lastMetadataSessionEventTimestamp); From 78448f5400eb0fc74a3f218ea99b1fb488f094af Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 09:32:55 +0800 Subject: [PATCH 07/11] Add comments for why to register again --- .../extensions/channel/ServiceUnitStateChannelImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 67c035bcbeb61..6883a3a4bd106 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1234,6 +1234,8 @@ protected void handleBrokerRegistrationEvent(String broker, NotificationType typ handleBrokerCreationEvent(broker); } else if (type == NotificationType.Deleted) { log.info("BrokerRegistry detected the broker:{} registry has been deleted.", broker); + // The registered node is an ephemeral node that could be deleted when the metadata store client's session + // is expired. In this case, we should register again. if (brokerRegistry.getBrokerId().equals(broker)) { brokerRegistry.registerAsync(); } From 34b7f7e3a544211d38eb0480eb1f31ed1859979d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 13:08:12 +0800 Subject: [PATCH 08/11] Move registerAsync into the BrokerRegistryImpl --- .../broker/loadbalance/extensions/BrokerRegistryImpl.java | 7 +++++++ .../extensions/channel/ServiceUnitStateChannelImpl.java | 5 ----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index b8cfa70513077..cc8225d083bb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -82,6 +82,13 @@ public BrokerRegistryImpl(PulsarService pulsar) { this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class); this.scheduler = pulsar.getLoadManagerExecutor(); this.listeners = new ArrayList<>(); + // The registered node is an ephemeral node that could be deleted when the metadata store client's session + // is expired. In this case, we should register again. + this.listeners.add((broker, notificationType) -> { + if (notificationType == NotificationType.Deleted && getBrokerId().equals(broker)) { + registerAsync(); + } + }); this.brokerIdKeyPath = keyPath(pulsar.getBrokerId()); this.brokerLookupData = new BrokerLookupData( pulsar.getWebServiceAddress(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 6883a3a4bd106..ddbc9eacac921 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -1234,11 +1234,6 @@ protected void handleBrokerRegistrationEvent(String broker, NotificationType typ handleBrokerCreationEvent(broker); } else if (type == NotificationType.Deleted) { log.info("BrokerRegistry detected the broker:{} registry has been deleted.", broker); - // The registered node is an ephemeral node that could be deleted when the metadata store client's session - // is expired. In this case, we should register again. - if (brokerRegistry.getBrokerId().equals(broker)) { - brokerRegistry.registerAsync(); - } handleBrokerDeletionEvent(broker); } } From a2203909748761ca18462104432b59682a6296b6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 13:10:50 +0800 Subject: [PATCH 09/11] Fix flakiness of BrokerRegistryIntegrationTest --- .../extensions/BrokerRegistryIntegrationTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index c1b2152b42df5..8afbcaa44f743 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -72,7 +72,8 @@ protected void cleanup() throws Exception { @Test public void testRecoverFromNodeDeletion() throws Exception { // Simulate the case that the node was somehow deleted (e.g. by session timeout) - Assert.assertEquals(brokerRegistry.getAvailableBrokersAsync().get(), List.of(pulsar.getBrokerId())); + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( + brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId()))); pulsar.getLocalMetadataStore().delete(brokerMetadataPath, Optional.empty()); Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId()))); @@ -89,6 +90,8 @@ public void testRecoverFromNodeDeletion() throws Exception { @Test public void testRegisterAgain() throws Exception { + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> Assert.assertEquals( + brokerRegistry.getAvailableBrokersAsync().join(), List.of(pulsar.getBrokerId()))); final var metadataStore = pulsar.getLocalMetadataStore(); final var oldResult = metadataStore.get(brokerMetadataPath).get().orElseThrow(); log.info("Old result: {} {}", new String(oldResult.getValue()), oldResult.getStat().getVersion()); From ef0a8ebd6e2fd6b81cd52d10b7b39805513efde1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 13:11:43 +0800 Subject: [PATCH 10/11] Fix flakiness of BrokerRegistryIntegrationTest --- .../extensions/BrokerRegistryIntegrationTest.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java index 8afbcaa44f743..162ea50829d40 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java @@ -96,10 +96,13 @@ public void testRegisterAgain() throws Exception { final var oldResult = metadataStore.get(brokerMetadataPath).get().orElseThrow(); log.info("Old result: {} {}", new String(oldResult.getValue()), oldResult.getStat().getVersion()); brokerRegistry.registerAsync().get(); - final var newResult = metadataStore.get(brokerMetadataPath).get().orElseThrow(); - log.info("New result: {} {}", new String(newResult.getValue()), newResult.getStat().getVersion()); - Assert.assertTrue(newResult.getStat().getVersion() > oldResult.getStat().getVersion()); - Assert.assertEquals(newResult.getValue(), oldResult.getValue()); + + Awaitility.await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> { + final var newResult = metadataStore.get(brokerMetadataPath).get().orElseThrow(); + log.info("New result: {} {}", new String(newResult.getValue()), newResult.getStat().getVersion()); + Assert.assertTrue(newResult.getStat().getVersion() > oldResult.getStat().getVersion()); + Assert.assertEquals(newResult.getValue(), oldResult.getValue()); + }); } private ServiceConfiguration brokerConfig() { From 5c720d8ed5b3ddbde9929d9de79b37597e2e20ba Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 27 Sep 2024 13:48:50 +0800 Subject: [PATCH 11/11] Fix failed BrokerRegistryTest --- .../broker/loadbalance/extensions/BrokerRegistryImpl.java | 3 ++- .../broker/loadbalance/extensions/BrokerRegistryTest.java | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index cc8225d083bb8..9fd0518a054cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -74,7 +74,8 @@ protected enum State { Closed } - private final AtomicReference state = new AtomicReference<>(State.Init); + @VisibleForTesting + final AtomicReference state = new AtomicReference<>(State.Init); public BrokerRegistryImpl(PulsarService pulsar) { this.pulsar = pulsar; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java index 3d7d6ea2205a5..28a2a18500f5f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java @@ -396,8 +396,8 @@ public void testKeyPath() { assertEquals(keyPath, LOADBALANCE_BROKERS_ROOT + "/brokerId"); } - public BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) { - return WhiteboxImpl.getInternalState(brokerRegistry, BrokerRegistryImpl.State.class); + private static BrokerRegistryImpl.State getState(BrokerRegistryImpl brokerRegistry) { + return brokerRegistry.state.get(); } }