Skip to content

Commit

Permalink
[improve][loadbalance] added loadBalancerReportUpdateMinIntervalMilli…
Browse files Browse the repository at this point in the history
…s and ignores memory usage in getMaxResourceUsage() (#17598)
  • Loading branch information
heesung-sn authored Sep 14, 2022
1 parent 52a380f commit de7c586
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 10 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,9 @@ loadBalancerEnabled=true
# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10

# minimum interval to update load report
loadBalancerReportUpdateMinIntervalMillis=5000

# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,9 @@ loadBalancerEnabled=false
# Percentage of change to trigger load report update
loadBalancerReportUpdateThresholdPercentage=10

# minimum interval to update load report
loadBalancerReportUpdateMinIntervalMillis=5000

# maximum interval to update load report
loadBalancerReportUpdateMaxIntervalMinutes=15

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,6 +2087,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_LOAD_BALANCER,
doc = "maximum interval to update load report"
)
private int loadBalancerReportUpdateMinIntervalMillis = 5000;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
dynamic = true,
doc = "Min delay of load report to collect, in milli-seconds"
)
private int loadBalancerReportUpdateMaxIntervalMinutes = 15;
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadSheddingTask;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandlers;
Expand Down Expand Up @@ -1164,7 +1163,7 @@ protected void startLoadManagementService() throws PulsarServerException {
if (config.isLoadBalancerEnabled()) {
LOG.info("Starting load balancer");
if (this.loadReportTask == null) {
long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
long loadReportMinInterval = config.getLoadBalancerReportUpdateMinIntervalMillis();
this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate(
new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval,
TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ protected Set<String> initialValue() throws Exception {
}
};

// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MINIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);

private static final String DEFAULT_DOMAIN = "default";

// Don't allow construction: static method namespace only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;

import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -1153,10 +1152,12 @@ public void setLoadReportForceUpdateFlag() {
public void writeLoadReportOnZookeeper() throws Exception {
// update average JVM heap usage to average value of the last 120 seconds
long realtimeJvmHeapUsage = getRealtimeJvmHeapUsageMBytes();
int minInterval = pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
if (this.avgJvmHeapUsageMBytes <= 0) {
this.avgJvmHeapUsageMBytes = realtimeJvmHeapUsage;
} else {
long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / LOAD_REPORT_UPDATE_MINIMUM_INTERVAL);

long weight = Math.max(1, TimeUnit.SECONDS.toMillis(120) / minInterval);
this.avgJvmHeapUsageMBytes = ((weight - 1) * this.avgJvmHeapUsageMBytes + realtimeJvmHeapUsage) / weight;
}

Expand All @@ -1175,7 +1176,7 @@ public void writeLoadReportOnZookeeper() throws Exception {
int maxUpdateIntervalInMinutes = pulsar.getConfiguration().getLoadBalancerReportUpdateMaxIntervalMinutes();
if (timeElapsedSinceLastReport > TimeUnit.MINUTES.toMillis(maxUpdateIntervalInMinutes)) {
needUpdate = true;
} else if (timeElapsedSinceLastReport > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL) {
} else if (timeElapsedSinceLastReport > minInterval) {
// check number of bundles assigned, comparing with last LoadReport
long oldBundleCount = lastLoadReport.getNumBundles();
long newBundleCount = pulsar.getBrokerService().getNumberOfNamespaceBundles();
Expand Down Expand Up @@ -1244,7 +1245,7 @@ public void writeLoadReportOnZookeeper() throws Exception {
*/
private boolean isLoadReportGenerationIntervalPassed() {
long timeSinceLastGenMillis = System.currentTimeMillis() - lastLoadReport.getTimestamp();
return timeSinceLastGenMillis > LOAD_REPORT_UPDATE_MINIMUM_INTERVAL;
return timeSinceLastGenMillis > pulsar.getConfiguration().getLoadBalancerReportUpdateMinIntervalMillis();
}

// todo: changeme: this can be optimized, we don't have to iterate through everytime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ private void updateBundleData(final Map<String, NamespaceBundleStats> bundleStat
}

public double getMaxResourceUsage() {
return max(cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
// does not consider memory because it is noisy by gc.
return max(cpu.percentUsage(), directMemory.percentUsage(), bandwidthIn.percentUsage(),
bandwidthOut.percentUsage()) / 100;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;

public class LocalBrokerDataTest {

@Test
Expand All @@ -33,4 +35,28 @@ public void testLocalBrokerDataDeserialization() {
Assert.assertEquals(localBrokerData.getMemory().usage, 614.0d, 0.0001f);
Assert.assertEquals(localBrokerData.getMemory().percentUsage(), ((float) localBrokerData.getMemory().usage) / ((float) localBrokerData.getMemory().limit) * 100, 0.0001f);
}

@Test
public void testMaxResourceUsage() {
LocalBrokerData data = new LocalBrokerData();
data.setCpu(new ResourceUsage(1.0, 100.0));
data.setMemory(new ResourceUsage(800.0, 200.0));
data.setDirectMemory(new ResourceUsage(2.0, 100.0));
data.setBandwidthIn(new ResourceUsage(3.0, 100.0));
data.setBandwidthOut(new ResourceUsage(4.0, 100.0));

double epsilon = 0.00001;
double weight = 0.5;
// skips memory usage
assertEquals(data.getMaxResourceUsage(), 0.04, epsilon);

assertEquals(
data.getMaxResourceUsageWithWeight(
weight, weight, weight, weight, weight), 2.0, epsilon);

assertEquals(
data.getMaxResourceUsageWithWeightWithinLimit(
weight, weight, weight, weight, weight), 0.02, epsilon);

}
}

0 comments on commit de7c586

Please sign in to comment.