Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Update init and shutdown time and other minor logic (ExtensibleLoadManagerImpl only) #22930

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ public static void createTenantIfAbsent(PulsarResources resources, String tenant
}
}

static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
public static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName namespaceName,
String cluster, int bundleNumber) throws IOException {
NamespaceResources namespaceResources = resources.getNamespaceResources();

if (!namespaceResources.namespaceExists(namespaceName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,9 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker,
boolean force) {
boolean force,
long timeout,
TimeUnit timeoutUnit) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -691,7 +693,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
timeout, timeoutUnit);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
public static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
public static final long VERSION_ID_INIT = 1; // initial versionId
public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 3 * 60; // 3 mins
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
Expand Down Expand Up @@ -298,7 +297,8 @@ public synchronized void start() throws PulsarServerException {
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE.getTenant(), config.getClusterName());

PulsarClusterMetadataSetup.createNamespaceIfAbsent
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName());
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE, config.getClusterName(),
config.getDefaultNumberOfNamespaceBundles());

ExtensibleLoadManagerImpl.createSystemTopic(pulsar, TOPIC);

Expand Down Expand Up @@ -1018,6 +1018,9 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean
if (ex != null) {
log.error("Failed to close topics under bundle:{} in {} ms",
bundle.toString(), unloadBundleTime, ex);
if (!disconnectClients) {
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
} else {
log.info("Unloading bundle:{} with {} topics completed in {} ms",
bundle, unloadedTopics, unloadBundleTime);
Expand Down Expand Up @@ -1342,11 +1345,6 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
if (cleaned) {
try {
MILLISECONDS.sleep(OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS);
} catch (InterruptedException e) {
log.warn("Interrupted while gracefully waiting for the cleanup convergence.");
}
break;
} else {
try {
Expand All @@ -1357,9 +1355,23 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
}
log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId,
System.currentTimeMillis() - started);
}

private synchronized void doCleanup(String broker) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
.isEmpty()) {
log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup",
broker);
return;
}
} catch (Exception e) {
log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", broker);
return;
}

long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;

Expand All @@ -44,6 +43,7 @@
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {

private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
private static final long INIT_TIMEOUT_IN_SECS = 5;

private volatile TableView<T> tableView;
private volatile long tableViewLastUpdateTimestamp;
Expand Down Expand Up @@ -123,10 +123,11 @@ public synchronized void start() throws LoadDataStoreException {
public synchronized void startTableView() throws LoadDataStoreException {
if (tableView == null) {
try {
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).create();
tableView = client.newTableViewBuilder(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
tableView.forEachAndListen((k, v) ->
tableViewLastUpdateTimestamp = System.currentTimeMillis());
} catch (PulsarClientException e) {
} catch (Exception e) {
tableView = null;
throw new LoadDataStoreException(e);
}
Expand All @@ -137,8 +138,9 @@ public synchronized void startTableView() throws LoadDataStoreException {
public synchronized void startProducer() throws LoadDataStoreException {
if (producer == null) {
try {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
} catch (PulsarClientException e) {
producer = client.newProducer(Schema.JSON(clazz)).topic(topic).createAsync()
.get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
} catch (Exception e) {
producer = null;
throw new LoadDataStoreException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false, timeout, timeoutUnit);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1290,7 +1290,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
nsBundle, Optional.empty(), true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -73,7 +72,7 @@ public void testStopBroker() throws PulsarServerException {
pulsar.close();
final var elapsedMs = System.currentTimeMillis() - beforeStop;
log.info("It spends {} ms to stop the broker ({} for protocol handler)", elapsedMs, handler.closeTimeMs);
Assert.assertTrue(elapsedMs < ServiceUnitStateChannelImpl.OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
Assert.assertTrue(elapsedMs <
+ handler.closeTimeMs + shutdownTimeoutMs + 1000); // tolerate 1 more second for other processes
}

Expand Down
Loading