Skip to content

Commit

Permalink
Add node weight changes to balance round summaries (elastic#122195)
Browse files Browse the repository at this point in the history
The node weight changes between two balancer rounds are summarized by
saving the old DesiredBalance's weights per node along with a weights
diff to reach the new DesiredBalance's weights per node. This supports
combining multiple summaries by using the oldest summary's base node
weights and summing the diffs across all summaries to reach a combined
node weight diffs.
  • Loading branch information
DiannaHohensee authored Feb 26, 2025
1 parent feb3a60 commit b1e6908
Show file tree
Hide file tree
Showing 8 changed files with 545 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -83,6 +85,74 @@ public AllocationBalancingRoundSummaryService(ThreadPool threadPool, ClusterSett
});
}

/**
* Summarizes the work required to move from an old to new desired balance shard allocation.
*/
public static BalancingRoundSummary createBalancerRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
return new BalancingRoundSummary(
createWeightsSummary(oldDesiredBalance, newDesiredBalance),
DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance)
);
}

/**
* Creates a summary of the node weight changes from {@code oldDesiredBalance} to {@code newDesiredBalance}.
* See {@link BalancingRoundSummary.NodesWeightsChanges} for content details.
*/
private static Map<String, BalancingRoundSummary.NodesWeightsChanges> createWeightsSummary(
DesiredBalance oldDesiredBalance,
DesiredBalance newDesiredBalance
) {
var oldWeightsPerNode = oldDesiredBalance.weightsPerNode();
var newWeightsPerNode = newDesiredBalance.weightsPerNode();

Map<String, BalancingRoundSummary.NodesWeightsChanges> nodeNameToWeightInfo = new HashMap<>(oldWeightsPerNode.size());
for (var nodeAndWeights : oldWeightsPerNode.entrySet()) {
var discoveryNode = nodeAndWeights.getKey();
var oldNodeWeightStats = nodeAndWeights.getValue();

// The node may no longer exists in the new DesiredBalance. If so, the new weights for that node are effectively zero. New
// weights of zero will result in correctly negative weight diffs for the removed node.
var newNodeWeightStats = newWeightsPerNode.getOrDefault(discoveryNode, DesiredBalanceMetrics.NodeWeightStats.ZERO);

nodeNameToWeightInfo.put(
discoveryNode.getName(),
new BalancingRoundSummary.NodesWeightsChanges(
oldNodeWeightStats,
BalancingRoundSummary.NodeWeightsDiff.create(oldNodeWeightStats, newNodeWeightStats)
)
);
}

// There may be a new node in the new DesiredBalance that was not in the old DesiredBalance. So we'll need to iterate the nodes in
// the new DesiredBalance to check.
for (var nodeAndWeights : newWeightsPerNode.entrySet()) {
var discoveryNode = nodeAndWeights.getKey();
if (nodeNameToWeightInfo.containsKey(discoveryNode.getName()) == false) {
// This node is new in the new DesiredBalance, there was no entry added during iteration of the nodes in the old
// DesiredBalance. So we'll make a new entry with a base of zero value weights and a weights diff of the new node's weights.
nodeNameToWeightInfo.put(
discoveryNode.getName(),
new BalancingRoundSummary.NodesWeightsChanges(
DesiredBalanceMetrics.NodeWeightStats.ZERO,
BalancingRoundSummary.NodeWeightsDiff.create(DesiredBalanceMetrics.NodeWeightStats.ZERO, nodeAndWeights.getValue())
)
);
}
}

return nodeNameToWeightInfo;
}

/**
* Creates and saves a balancer round summary for the work to move from {@code oldDesiredBalance} to {@code newDesiredBalance}. If
* balancer round summaries are not enabled in the cluster (see {@link #ENABLE_BALANCER_ROUND_SUMMARIES_SETTING}), then the summary is
* immediately discarded.
*/
public void addBalancerRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
addBalancerRoundSummary(createBalancerRoundSummary(oldDesiredBalance, newDesiredBalance));
}

/**
* Adds the summary of a balancing round. If summaries are enabled, this will eventually be reported (logging, etc.). If balancer round
* summaries are not enabled in the cluster, then the summary is immediately discarded (so as not to fill up a data structure that will
Expand Down Expand Up @@ -110,7 +180,7 @@ private void reportSummariesAndThenReschedule() {
*/
private void drainAndReportSummaries() {
var combinedSummaries = drainSummaries();
if (combinedSummaries == CombinedBalancingRoundSummary.EMPTY_RESULTS) {
if (combinedSummaries == BalancingRoundSummary.CombinedBalancingRoundSummary.EMPTY_RESULTS) {
return;
}

Expand All @@ -120,14 +190,15 @@ private void drainAndReportSummaries() {
/**
* Returns a combined summary of all unreported allocation round summaries: may summarize a single balancer round, multiple, or none.
*
* @return {@link CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting to be reported.
* @return {@link BalancingRoundSummary.CombinedBalancingRoundSummary#EMPTY_RESULTS} if there are no balancing round summaries waiting
* to be reported.
*/
private CombinedBalancingRoundSummary drainSummaries() {
private BalancingRoundSummary.CombinedBalancingRoundSummary drainSummaries() {
ArrayList<BalancingRoundSummary> batchOfSummaries = new ArrayList<>();
while (summaries.isEmpty() == false) {
batchOfSummaries.add(summaries.poll());
}
return CombinedBalancingRoundSummary.combine(batchOfSummaries);
return BalancingRoundSummary.CombinedBalancingRoundSummary.combine(batchOfSummaries);
}

/**
Expand Down Expand Up @@ -186,7 +257,9 @@ private void rescheduleReporting() {
}
}

// @VisibleForTesting
/**
* Checks that the number of entries in {@link #summaries} matches the given {@code numberOfSummaries}.
*/
protected void verifyNumberOfSummaries(int numberOfSummaries) {
assert numberOfSummaries == summaries.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void allocate(RoutingAllocation allocation) {
balancer.moveShards();
balancer.balance();

// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
collectAndRecordNodeWeightStats(balancer, weightFunction, allocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,149 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Summarizes the impact to the cluster as a result of a rebalancing round.
*
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one.
* @param nodeNameToWeightChanges The shard balance weight changes for each node (by name), comparing a previous DesiredBalance shard
* allocation to a new DesiredBalance allocation.
* @param numberOfShardsToMove The number of shard moves required to move from the previous desired balance to the new one. Does not include
* new (index creation) or removed (index deletion) shard assignements.
*/
public record BalancingRoundSummary(long numberOfShardsToMove) {
public record BalancingRoundSummary(Map<String, NodesWeightsChanges> nodeNameToWeightChanges, long numberOfShardsToMove) {

/**
* Represents the change in weights for a node going from an old DesiredBalance to a new DesiredBalance
* Saves the node weights of an old DesiredBalance, along with a diff against a newer DesiredBalance.
*
* @param baseWeights The starting {@link DesiredBalanceMetrics.NodeWeightStats} of a previous DesiredBalance.
* @param weightsDiff The difference between the {@code baseWeights} and a new DesiredBalance.
*/
record NodesWeightsChanges(DesiredBalanceMetrics.NodeWeightStats baseWeights, NodeWeightsDiff weightsDiff) {}

/**
* Represents the change of shard balance weights for a node, comparing an older DesiredBalance with the latest DesiredBalance.
*
* @param shardCountDiff How many more, or less, shards are assigned to the node in the latest DesiredBalance.
* @param diskUsageInBytesDiff How much more, or less, disk is used by shards assigned to the node in the latest DesiredBalance.
* @param writeLoadDiff How much more, or less, write load is estimated for shards assigned to the node in the latest DesiredBalance.
* @param totalWeightDiff How much more, or less, the total weight is of shards assigned to the node in the latest DesiredBalance.
*/
record NodeWeightsDiff(long shardCountDiff, double diskUsageInBytesDiff, double writeLoadDiff, double totalWeightDiff) {

/**
* Creates a diff where the {@code base} weights will be subtracted from the {@code next} weights, to show the changes made to reach
* the {@code next} weights.
*
* @param base has the original weights
* @param next has the new weights
* @return The diff of ({@code next} - {@code base})
*/
public static NodeWeightsDiff create(DesiredBalanceMetrics.NodeWeightStats base, DesiredBalanceMetrics.NodeWeightStats next) {
return new NodeWeightsDiff(
next.shardCount() - base.shardCount(),
next.diskUsageInBytes() - base.diskUsageInBytes(),
next.writeLoad() - base.writeLoad(),
next.nodeWeight() - base.nodeWeight()
);
}

/**
* Creates a new {@link NodeWeightsDiff} summing this instance's values with {@code otherDiff}'s values.
*/
public NodeWeightsDiff combine(NodeWeightsDiff otherDiff) {
return new NodeWeightsDiff(
this.shardCountDiff + otherDiff.shardCountDiff,
this.diskUsageInBytesDiff + otherDiff.diskUsageInBytesDiff,
this.writeLoadDiff + otherDiff.writeLoadDiff,
this.totalWeightDiff + otherDiff.totalWeightDiff
);
}
}

@Override
public String toString() {
return "BalancingRoundSummary{" + "numberOfShardsToMove=" + numberOfShardsToMove + '}';
return "BalancingRoundSummary{"
+ "nodeNameToWeightChanges"
+ nodeNameToWeightChanges
+ ", numberOfShardsToMove="
+ numberOfShardsToMove
+ '}';
}

/**
* Holds combined {@link BalancingRoundSummary} results. Essentially holds a list of the balancing events and the summed up changes
* across all those events: what allocation work was done across some period of time.
* TODO: WIP ES-10341
*
* Note that each balancing round summary is the difference between, at the time, latest desired balance and the previous desired
* balance. Each summary represents a step towards the next desired balance, which is based on presuming the previous desired balance is
* reached. So combining them is roughly the difference between the first summary's previous desired balance and the last summary's
* latest desired balance.
*
* @param numberOfBalancingRounds How many balancing round summaries are combined in this report.
* @param nodeNameToWeightChanges
* @param numberOfShardMoves The sum of shard moves for each balancing round being combined into a single summary.
*/
public record CombinedBalancingRoundSummary(
int numberOfBalancingRounds,
Map<String, NodesWeightsChanges> nodeNameToWeightChanges,
long numberOfShardMoves
) {

public static final CombinedBalancingRoundSummary EMPTY_RESULTS = new CombinedBalancingRoundSummary(0, new HashMap<>(), 0);

/**
* Merges multiple {@link BalancingRoundSummary} summaries into a single {@link CombinedBalancingRoundSummary}.
*/
public static CombinedBalancingRoundSummary combine(List<BalancingRoundSummary> summaries) {
if (summaries.isEmpty()) {
return EMPTY_RESULTS;
}

// We will loop through the summaries and sum the weight diffs for each node entry.
Map<String, NodesWeightsChanges> combinedNodeNameToWeightChanges = new HashMap<>();

// Number of shards moves are simply summed across summaries. Each new balancing round is built upon the last one, so it is
// possible that a shard is reassigned back to a node before it even moves away, and that will still be counted as 2 moves here.
long numberOfShardMoves = 0;

// Total number of summaries that are being combined.
int numSummaries = 0;

var iterator = summaries.iterator();
while (iterator.hasNext()) {
var summary = iterator.next();

// We'll build the weight changes by keeping the node weight base from the first summary in which a node appears and then
// summing the weight diffs in each summary to get total weight diffs across summaries.
for (var nodeNameAndWeights : summary.nodeNameToWeightChanges.entrySet()) {
var combined = combinedNodeNameToWeightChanges.get(nodeNameAndWeights.getKey());
if (combined == null) {
// Either this is the first summary, and combinedNodeNameToWeightChanges hasn't been initialized yet for this node;
// or a later balancing round had a new node. Either way, initialize the node entry with the weight changes from the
// first summary in which it appears.
combinedNodeNameToWeightChanges.put(nodeNameAndWeights.getKey(), nodeNameAndWeights.getValue());
} else {
// We have at least two summaries containing this node, so let's combine them.
var newCombinedChanges = new NodesWeightsChanges(
combined.baseWeights,
combined.weightsDiff.combine(nodeNameAndWeights.getValue().weightsDiff())
);
combinedNodeNameToWeightChanges.put(nodeNameAndWeights.getKey(), newCombinedChanges);
}
}

++numSummaries;
numberOfShardMoves += summary.numberOfShardsToMove();
}

return new CombinedBalancingRoundSummary(numSummaries, combinedNodeNameToWeightChanges, numberOfShardMoves);
}

}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public class DesiredBalanceMetrics {
*/
public record AllocationStats(long unassignedShards, long totalAllocations, long undesiredAllocationsExcludingShuttingDownNodes) {}

public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {}
public record NodeWeightStats(long shardCount, double diskUsageInBytes, double writeLoad, double nodeWeight) {
public static final NodeWeightStats ZERO = new NodeWeightStats(0, 0, 0, 0);
}

// Reconciliation metrics.
/** See {@link #unassignedShards} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
}

if (currentDesiredBalanceRef.compareAndSet(oldDesiredBalance, newDesiredBalance)) {
balancerRoundSummaryService.addBalancerRoundSummary(calculateBalancingRoundSummary(oldDesiredBalance, newDesiredBalance));
balancerRoundSummaryService.addBalancerRoundSummary(oldDesiredBalance, newDesiredBalance);
if (logger.isTraceEnabled()) {
var diff = DesiredBalance.hasChanges(oldDesiredBalance, newDesiredBalance)
? "Diff: " + DesiredBalance.humanReadableDiff(oldDesiredBalance, newDesiredBalance)
Expand All @@ -339,13 +339,6 @@ private void setCurrentDesiredBalance(DesiredBalance newDesiredBalance) {
}
}

/**
* Summarizes the work required to move from an old to new desired balance shard allocation.
*/
private BalancingRoundSummary calculateBalancingRoundSummary(DesiredBalance oldDesiredBalance, DesiredBalance newDesiredBalance) {
return new BalancingRoundSummary(DesiredBalance.shardMovements(oldDesiredBalance, newDesiredBalance));
}

/**
* Submits the desired balance to be reconciled (applies the desired changes to the routing table) and creates and publishes a new
* cluster state. The data nodes will receive and apply the new cluster state to start/move/remove shards.
Expand Down
Loading

0 comments on commit b1e6908

Please sign in to comment.