Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster access to INITIALIZING/RELOCATING shards #47817

Merged
merged 9 commits into from
Oct 31, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.*;

/**
* A {@link RoutingNode} represents a cluster node associated with a single {@link DiscoveryNode} including all shards
Expand All @@ -41,6 +37,10 @@ public class RoutingNode implements Iterable<ShardRouting> {

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order

private final LinkedHashMap<ShardId, ShardRouting> initializingShards;

private final LinkedHashMap<ShardId, ShardRouting> relocatingShards;

public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}
Expand All @@ -49,6 +49,15 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
this.relocatingShards = new LinkedHashMap<>();
this.initializingShards = new LinkedHashMap<>();
for (ShardRouting shardRouting : shards.values()) {
if (shardRouting.initializing()) {
initializingShards.put(shardRouting.shardId(), shardRouting);
} else if(shardRouting.relocating()) {
relocatingShards.put(shardRouting.shardId(), shardRouting);
}
}
}

private static LinkedHashMap<ShardId, ShardRouting> buildShardRoutingMap(ShardRouting... shardRoutings) {
Expand Down Expand Up @@ -104,6 +113,11 @@ void add(ShardRouting shard) {
+ "] where it already exists. current [" + shards.get(shard.shardId()) + "]. new [" + shard + "]");
}
shards.put(shard.shardId(), shard);
if (shard.initializing()) {
initializingShards.put(shard.shardId(), shard);
} else if(shard.relocating()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace nit:

Suggested change
} else if(shard.relocating()) {
} else if (shard.relocating()) {

relocatingShards.put(shard.shardId(), shard);
}
}

void update(ShardRouting oldShard, ShardRouting newShard) {
Expand All @@ -114,11 +128,29 @@ void update(ShardRouting oldShard, ShardRouting newShard) {
}
ShardRouting previousValue = shards.put(newShard.shardId(), newShard);
assert previousValue == oldShard : "expected shard " + previousValue + " but was " + oldShard;

if (initializingShards.containsKey(oldShard.shardId())) {
initializingShards.remove(oldShard.shardId());
} else if (relocatingShards.containsKey(oldShard.shardId())) {
relocatingShards.remove(oldShard.shardId());
}

if (newShard.initializing()) {
initializingShards.put(newShard.shardId(), newShard);
} else if (newShard.relocating()) {
relocatingShards.put(newShard.shardId(), newShard);
}
}

void remove(ShardRouting shard) {
ShardRouting previousValue = shards.remove(shard.shardId());
assert previousValue == shard : "expected shard " + previousValue + " but was " + shard;

if (initializingShards.containsKey(shard.shardId())) {
initializingShards.remove(shard.shardId());
} else if (relocatingShards.containsKey(shard.shardId())) {
relocatingShards.remove(shard.shardId());
}
}

/**
Expand All @@ -127,6 +159,14 @@ void remove(ShardRouting shard) {
* @return number of shards
*/
public int numberOfShardsWithState(ShardRoutingState... states) {
if (states.length == 1) {
if(states[0] == ShardRoutingState.INITIALIZING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace nit:

Suggested change
if(states[0] == ShardRoutingState.INITIALIZING) {
if (states[0] == ShardRoutingState.INITIALIZING) {

return initializingShards.size();
} else if (states[0] == ShardRoutingState.RELOCATING) {
return relocatingShards.size();
}
}

int count = 0;
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
Expand All @@ -144,6 +184,14 @@ public int numberOfShardsWithState(ShardRoutingState... states) {
* @return List of shards
*/
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
if (states.length == 1) {
if(states[0] == ShardRoutingState.INITIALIZING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace nit:

Suggested change
if(states[0] == ShardRoutingState.INITIALIZING) {
if (states[0] == ShardRoutingState.INITIALIZING) {

return new ArrayList<>(initializingShards.values());
} else if (states[0] == ShardRoutingState.RELOCATING) {
return new ArrayList<>(relocatingShards.values());
}
}

List<ShardRouting> shards = new ArrayList<>();
for (ShardRouting shardEntry : this) {
for (ShardRoutingState state : states) {
Expand All @@ -164,6 +212,26 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
public List<ShardRouting> shardsWithState(String index, ShardRoutingState... states) {
List<ShardRouting> shards = new ArrayList<>();

if (states.length == 1) {
if(states[0] == ShardRoutingState.INITIALIZING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace nit:

Suggested change
if(states[0] == ShardRoutingState.INITIALIZING) {
if (states[0] == ShardRoutingState.INITIALIZING) {

for (ShardRouting shardEntry : initializingShards.values()) {
if (!shardEntry.getIndexName().equals(index)) {
continue;
}
shards.add(shardEntry);
}
return shards;
} else if (states[0] == ShardRoutingState.RELOCATING) {
for (ShardRouting shardEntry : relocatingShards.values()) {
if (!shardEntry.getIndexName().equals(index)) {
continue;
}
shards.add(shardEntry);
}
return shards;
}
}

for (ShardRouting shardEntry : this) {
if (!shardEntry.getIndexName().equals(index)) {
continue;
Expand All @@ -181,14 +249,7 @@ public List<ShardRouting> shardsWithState(String index, ShardRoutingState... sta
* The number of shards on this node that will not be eventually relocated.
*/
public int numberOfOwningShards() {
int count = 0;
for (ShardRouting shardEntry : this) {
if (shardEntry.state() != ShardRoutingState.RELOCATING) {
count++;
}
}

return count;
return shards.size() - relocatingShards.size();
}

public String prettyPrint() {
Expand Down