diff --git a/conf/broker.conf b/conf/broker.conf index c51a5ac9c125c..ffe2cdb1da7b1 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1171,6 +1171,9 @@ loadBalancerEnabled=true # Percentage of change to trigger load report update loadBalancerReportUpdateThresholdPercentage=10 +# minimum interval to update load report +loadBalancerReportUpdateMinIntervalMillis=5000 + # maximum interval to update load report loadBalancerReportUpdateMaxIntervalMinutes=15 diff --git a/conf/standalone.conf b/conf/standalone.conf index f0c708a97c693..71b72cd6d6e75 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -832,6 +832,9 @@ loadBalancerEnabled=false # Percentage of change to trigger load report update loadBalancerReportUpdateThresholdPercentage=10 +# minimum interval to update load report +loadBalancerReportUpdateMinIntervalMillis=5000 + # maximum interval to update load report loadBalancerReportUpdateMaxIntervalMinutes=15 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 30ca29faad889..b8911289effaf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2087,6 +2087,12 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_LOAD_BALANCER, doc = "maximum interval to update load report" ) + private int loadBalancerReportUpdateMinIntervalMillis = 5000; + @FieldContext( + category = CATEGORY_LOAD_BALANCER, + dynamic = true, + doc = "Min delay of load report to collect, in milli-seconds" + ) private int loadBalancerReportUpdateMaxIntervalMinutes = 15; @FieldContext( category = CATEGORY_LOAD_BALANCER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index afff44620566e..c70d08851c1f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -89,7 +89,6 @@ import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; -import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; import org.apache.pulsar.broker.lookup.v1.TopicLookup; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.protocol.ProtocolHandlers; @@ -1164,7 +1163,7 @@ protected void startLoadManagementService() throws PulsarServerException { if (config.isLoadBalancerEnabled()) { LOG.info("Starting load balancer"); if (this.loadReportTask == null) { - long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL; + long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis(); this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate( new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval, TimeUnit.MILLISECONDS); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java index 8afb6a040076f..0f93ae7a254ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -76,9 +76,6 @@ protected Set initialValue() throws Exception { } }; - // update LoadReport at most every 5 seconds - public static final long LOAD_REPORT_UPDATE_MINIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5); - private static final String DEFAULT_DOMAIN = "default"; // Don't allow construction: static method namespace only. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 6e6f90555d7c5..d516953841084 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheBuilder; @@ -1153,10 +1152,12 @@ public void setLoadReportForceUpdateFlag() { public void writeLoadReportOnZookeeper() throws Exception { // update average JVM heap usage to average value of the last 120 seconds long realtimeJvmHeapUsage = getRealtimeJvmHeapUsageMBytes(); + int minInterval = pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis(); if (this.avgJvmHeapUsageMBytes <= 0) { this.avgJvmHeapUsageMBytes = realtimeJvmHeapUsage; } else { - long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / LOAD_REPORT_UPDATE_MINIMUM_INTERVAL); + + long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / minInterval); this.avgJvmHeapUsageMBytes = ((weight - 1) * this.avgJvmHeapUsageMBytes + realtimeJvmHeapUsage) / weight; } @@ -1175,7 +1176,7 @@ public void writeLoadReportOnZookeeper() throws Exception { int maxUpdateIntervalInMinutes = pulsar.getConfiguration().getLoadBalancerReportUpdateMaxIntervalMinutes(); if (timeElapsedSinceLastReport > TimeUnit.MINUTES.toMillis(maxUpdateIntervalInMinutes)) { needUpdate = true; - } else if (timeElapsedSinceLastReport > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL) { + } else if (timeElapsedSinceLastReport > minInterval) { // check number of bundles assigned, comparing with last LoadReport long oldBundleCount = lastLoadReport.getNumBundles(); long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles(); @@ -1244,7 +1245,7 @@ public void writeLoadReportOnZookeeper() throws Exception { */ private boolean isLoadReportGenerationIntervalPassed() { long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp(); - return timeSinceLastGenMillis > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL; + return timeSinceLastGenMillis > pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis(); } // todo: changeme: this can be optimized, we don't have to iterate through everytime diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java index 8736b675272de..3c97439f8143c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java @@ -235,7 +235,8 @@ private void updateBundleData(final Map bundleStat } public double getMaxResourceUsage() { - return max(cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(), + // does not consider memory because it is noisy by gc. + return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(), bandwidthOut.percentUsage()) / 100; } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java index e1f7222133a88..69d4a7f4cd198 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerDataTest.java @@ -22,6 +22,8 @@ import org.testng.Assert; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; + public class LocalBrokerDataTest { @Test @@ -33,4 +35,28 @@ public void testLocalBrokerDataDeserialization() { Assert.assertEquals(localBrokerData.getMemory().usage, 614.0d, 0.0001f); Assert.assertEquals(localBrokerData.getMemory().percentUsage(), ((float) localBrokerData.getMemory().usage) / ((float) localBrokerData.getMemory().limit) * 100, 0.0001f); } + + @Test + public void testMaxResourceUsage() { + LocalBrokerData data = new LocalBrokerData(); + data.setCpu(new ResourceUsage(1.0, 100.0)); + data.setMemory(new ResourceUsage(800.0, 200.0)); + data.setDirectMemory(new ResourceUsage(2.0, 100.0)); + data.setBandwidthIn(new ResourceUsage(3.0, 100.0)); + data.setBandwidthOut(new ResourceUsage(4.0, 100.0)); + + double epsilon = 0.00001; + double weight = 0.5; + // skips memory usage + assertEquals(data.getMaxResourceUsage(), 0.04, epsilon); + + assertEquals( + data.getMaxResourceUsageWithWeight( + weight, weight, weight, weight, weight), 2.0, epsilon); + + assertEquals( + data.getMaxResourceUsageWithWeightWithinLimit( + weight, weight, weight, weight, weight), 0.02, epsilon); + + } }