Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track max shard size for all copies #50638

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -29,19 +30,26 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Map;
import java.util.stream.StreamSupport;

/**
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
* and a map of shard ids to shard sizes, see
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
* for the key used in the shardSizes map
* and a map of shard ids to shard sizes.
*/
public class ClusterInfo implements ToXContentFragment, Writeable {

/**
* Only needed to support shard sizes keyed by String as in {@link Version#V_7_0_0} - TODO remove this in v9.
*/
private static final String BWC_SHARD_ID_UUID = "_bwc_shard_id_uuid_";

private final ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage;
private final ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage;
final ImmutableOpenMap<String, Long> shardSizes;
final ImmutableOpenMap<ShardId, Long> shardSizes;
public static final ClusterInfo EMPTY = new ClusterInfo();
final ImmutableOpenMap<ShardRouting, String> routingToDataPath;

Expand All @@ -56,28 +64,39 @@ protected ClusterInfo() {
* @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node.
* @param shardSizes a shardkey to size in bytes mapping per shard.
* @param routingToDataPath the shard routing to datapath mapping
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<String, Long> shardSizes,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<ShardId, Long> shardSizes,
ImmutableOpenMap<ShardRouting, String> routingToDataPath) {
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
this.shardSizes = shardSizes;
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
this.routingToDataPath = routingToDataPath;
}

private static ShardId readShardId(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
return new ShardId(in);
} else {
// earlier versions used a string of the form "[indexname][0][p]" or "[indexname][0][r]" to distinguish primaries from replicas
// but the only time we use a serialized ClusterInfo is for allocation explanation, not for allocation calculations, so we can
// use a placeholder with a fake UUID and shard number and use the index name field for the original string:
final String shardKey = in.readString();
return new ShardId(shardKey, BWC_SHARD_ID_UUID, 0);
}
}

public ClusterInfo(StreamInput in) throws IOException {
Map<String, DiskUsage> leastMap = in.readMap(StreamInput::readString, DiskUsage::new);
Map<String, DiskUsage> mostMap = in.readMap(StreamInput::readString, DiskUsage::new);
Map<String, Long> sizeMap = in.readMap(StreamInput::readString, StreamInput::readLong);
Map<ShardId, Long> sizeMap = in.readMap(ClusterInfo::readShardId, StreamInput::readLong);
Map<ShardRouting, String> routingMap = in.readMap(ShardRouting::new, StreamInput::readString);

ImmutableOpenMap.Builder<String, DiskUsage> leastBuilder = ImmutableOpenMap.builder();
this.leastAvailableSpaceUsage = leastBuilder.putAll(leastMap).build();
ImmutableOpenMap.Builder<String, DiskUsage> mostBuilder = ImmutableOpenMap.builder();
this.mostAvailableSpaceUsage = mostBuilder.putAll(mostMap).build();
ImmutableOpenMap.Builder<String, Long> sizeBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardId, Long> sizeBuilder = ImmutableOpenMap.builder();
this.shardSizes = sizeBuilder.putAll(sizeMap).build();
ImmutableOpenMap.Builder<ShardRouting, String> routingBuilder = ImmutableOpenMap.builder();
this.routingToDataPath = routingBuilder.putAll(routingMap).build();
Expand All @@ -96,8 +115,12 @@ public void writeTo(StreamOutput out) throws IOException {
c.value.writeTo(out);
}
out.writeVInt(this.shardSizes.size());
for (ObjectObjectCursor<String, Long> c : this.shardSizes) {
out.writeString(c.key);
for (ObjectObjectCursor<ShardId, Long> c : this.shardSizes) {
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
c.key.writeTo(out);
} else {
out.writeString(c.key.toString());
}
if (c.value == null) {
out.writeLong(-1);
} else {
Expand Down Expand Up @@ -133,8 +156,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject(); // end "nodes"
builder.startObject("shard_sizes"); {
for (ObjectObjectCursor<String, Long> c : this.shardSizes) {
builder.humanReadableField(c.key + "_bytes", c.key, new ByteSizeValue(c.value));
for (ObjectObjectCursor<ShardId, Long> c : this.shardSizes) {
final String shardKey;
if (BWC_SHARD_ID_UUID.equals(c.key.getIndex().getUUID())) {
shardKey = c.key.getIndexName();
} else {
shardKey = c.key.toString();
}
builder.humanReadableField(shardKey + "_bytes", shardKey, new ByteSizeValue(c.value));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is arguably a breaking change since we will no longer include the [p] or [r] suffix in the shard sizes reported by the allocation explain output. We could fake these out for BWC, reporting the size of each shard twice, and allowing users to opt-in to the future behaviour here with a system property. TBD.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocation explain API is mostly meant for human consumption. As long as the field names are preserved in the response output, things should be ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands, this PR changes the field names in the response output, because it drops the [p] or [r] marker from shardKey. Even if we were to track all copies then I think we'd also need to change these field names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think breaking this is fine.

}
}
builder.endObject(); // end "shard_sizes"
Expand Down Expand Up @@ -165,7 +194,10 @@ public ImmutableOpenMap<String, DiskUsage> getNodeMostAvailableDiskUsages() {
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
*/
public Long getShardSize(ShardRouting shardRouting) {
return shardSizes.get(shardIdentifierFromRouting(shardRouting));
// this ClusterInfo instance was never serialized and sent over the wire, so it does not have any fake shard IDs.
assert StreamSupport.stream(shardSizes.keys().spliterator(), false)
.noneMatch(c -> BWC_SHARD_ID_UUID.equals(c.value.getIndex().getUUID())) : shardSizes;
return shardSizes.get(shardRouting.shardId());
}

/**
Expand All @@ -183,11 +215,4 @@ public long getShardSize(ShardRouting shardRouting, long defaultValue) {
return shardSize == null ? defaultValue : shardSize;
}

/**
* Method that incorporates the ShardId for the shard into a string that
* includes a 'p' or 'r' depending on whether the shard is a primary.
*/
static String shardIdentifierFromRouting(ShardRouting shardRouting) {
return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
private volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
private volatile ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath;
private volatile ImmutableOpenMap<String, Long> shardSizes;
private volatile ImmutableOpenMap<ShardId, Long> shardSizes;
private volatile boolean isMaster = false;
private volatile boolean enabled;
private volatile TimeValue fetchTimeout;
Expand Down Expand Up @@ -320,7 +321,7 @@ public void onFailure(Exception e) {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
ShardStats[] stats = indicesStatsResponse.getShards();
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardId, Long> newShardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
shardSizes = newShardSizes.build();
Expand Down Expand Up @@ -381,16 +382,17 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
listeners.add(clusterInfoConsumer);
}

static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());
if (logger.isTraceEnabled()) {
logger.trace("shard: {} size: {}", sid, size);
}
newShardSizes.put(sid, size);
static void buildShardLevelInfo(Logger logger, ShardStats[] shardStatses,
ImmutableOpenMap.Builder<ShardId, Long> shardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> shardRoutingToDataPath) {
for (ShardStats shardStats : shardStatses) {
shardRoutingToDataPath.put(shardStats.getShardRouting(), shardStats.getDataPath());
final ShardId shardId = shardStats.getShardRouting().shardId();
final Long oldShardSize = shardSizes.get(shardId);
final long newShardSize = shardStats.getStats().getStore().sizeInBytes();
logger.trace("shard: {} size: {}", shardStats.getShardRouting(), newShardSize);
final long maxShardSize = oldShardSize == null ? newShardSize : Math.max(oldShardSize, newShardSize);
shardSizes.put(shardId, maxShardSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -130,7 +131,7 @@ public void testClusterInfoServiceCollectsInformation() throws Exception {
assertNotNull("info should not be null", info);
ImmutableOpenMap<String, DiskUsage> leastUsages = info.getNodeLeastAvailableDiskUsages();
ImmutableOpenMap<String, DiskUsage> mostUsages = info.getNodeMostAvailableDiskUsages();
ImmutableOpenMap<String, Long> shardSizes = info.shardSizes;
ImmutableOpenMap<ShardId, Long> shardSizes = info.shardSizes;
assertNotNull(leastUsages);
assertNotNull(shardSizes);
assertThat("some usages are populated", leastUsages.values().size(), Matchers.equalTo(2));
Expand Down Expand Up @@ -209,7 +210,7 @@ public void testClusterInfoServiceInformationClearOnError() {
assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThanOrEqualTo(1));
assertThat(info.getNodeMostAvailableDiskUsages().size(), greaterThanOrEqualTo(1));
// indices is guaranteed to time out on the latch, not updating anything.
assertThat(info.shardSizes.size(), greaterThan(1));
assertThat(info.shardSizes.size(), greaterThan(0));

// now we cause an exception
timeout.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -56,13 +57,12 @@ private static ImmutableOpenMap<String, DiskUsage> randomDiskUsage() {
return builder.build();
}

private static ImmutableOpenMap<String, Long> randomShardSizes() {
private static ImmutableOpenMap<ShardId, Long> randomShardSizes() {
int numEntries = randomIntBetween(0, 128);
ImmutableOpenMap.Builder<String, Long> builder = ImmutableOpenMap.builder(numEntries);
ImmutableOpenMap.Builder<ShardId, Long> builder = ImmutableOpenMap.builder(numEntries);
for (int i = 0; i < numEntries; i++) {
String key = randomAlphaOfLength(32);
long shardSize = randomIntBetween(0, Integer.MAX_VALUE);
builder.put(key, shardSize);
builder.put(new ShardId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()), between(0, 100)), shardSize);
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ public void testFillShardLevelInfo() {
new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null, null),
new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null, null)
};
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardId, Long> shardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> routingToPath = ImmutableOpenMap.builder();
ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build();
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath);
assertEquals(2, shardSizes.size());
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0)));
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1)));
assertEquals(100L, shardSizes.get(ClusterInfo.shardIdentifierFromRouting(test_0)).longValue());
assertEquals(1000L, shardSizes.get(ClusterInfo.shardIdentifierFromRouting(test_1)).longValue());
assertTrue(shardSizes.containsKey(test_0.shardId()));
assertTrue(shardSizes.containsKey(test_1.shardId()));
assertEquals(100L, shardSizes.get(test_0.shardId()).longValue());
assertEquals(1000L, shardSizes.get(test_1.shardId()).longValue());

assertEquals(2, routingToPath.size());
assertTrue(routingToPath.containsKey(test_0));
Expand Down
Loading