Skip to content

Commit

Permalink
Avoid overshooting watermarks during relocation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kovrus committed Sep 25, 2019
1 parent 970e50a commit 367c034
Show file tree
Hide file tree
Showing 7 changed files with 427 additions and 163 deletions.
8 changes: 8 additions & 0 deletions blackbox/docs/appendices/release-notes/unreleased.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ Changes
Fixes
=====

- Fixed bug in the disk threshold decider logic that would ignore to account
new relocating shard (``STARTED`` to ``RELOCATING``) when deciding how to
allocate or relocate shards with respect to
:ref:`cluster.routing.allocation.disk.watermark.low
<cluster.routing.allocation.disk.watermark.low>` and
:ref:`cluster.routing.allocation.disk.watermark.high
<cluster.routing.allocation.disk.watermark.high>` settings.

- Fixed regression that prevented shards from reallocation when a node passes
over :ref:`cluster.routing.allocation.disk.watermark.high
<cluster.routing.allocation.disk.watermark.high>`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -68,7 +67,7 @@
*/
public class InternalClusterInfoService implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {

private static final Logger logger = LogManager.getLogger(InternalClusterInfoService.class);
private static final Logger LOGGER = LogManager.getLogger(InternalClusterInfoService.class);

public static final Setting<TimeValue> INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING =
Setting.timeSetting("cluster.info.update.interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(10),
Expand All @@ -79,8 +78,8 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode

private volatile TimeValue updateFrequency;

private volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
private volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
private volatile ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath;
private volatile ImmutableOpenMap<String, Long> shardSizes;
private volatile boolean isMaster = false;
Expand All @@ -89,7 +88,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final NodeClient client;
private final List<Consumer<ClusterInfo>> listeners = Collections.synchronizedList(new ArrayList<>(1));
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();

public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
Expand Down Expand Up @@ -124,8 +123,8 @@ void setUpdateFrequency(TimeValue updateFrequency) {
@Override
public void onMaster() {
this.isMaster = true;
if (logger.isTraceEnabled()) {
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
}

// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
Expand All @@ -137,8 +136,8 @@ public void onMaster() {
threadPool.executor(executorName()).execute(this::maybeRefresh);
}
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
}
}
}
Expand Down Expand Up @@ -169,17 +168,17 @@ public void clusterChanged(ClusterChangedEvent event) {
}

if (this.isMaster && dataNodeAdded && event.state().getNodes().getDataNodes().size() > 1) {
if (logger.isDebugEnabled()) {
logger.debug("data node was added, retrieving new cluster info");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("data node was added, retrieving new cluster info");
}
threadPool.executor(executorName()).execute(() -> maybeRefresh());
threadPool.executor(executorName()).execute(this::maybeRefresh);
}

if (this.isMaster && event.nodesRemoved()) {
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
if (removedNode.isDataNode()) {
if (logger.isTraceEnabled()) {
logger.trace("Removing node from cluster info: {}", removedNode.getId());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Removing node from cluster info: {}", removedNode.getId());
}
if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) {
ImmutableOpenMap.Builder<String, DiskUsage> newMaxUsages = ImmutableOpenMap.builder(leastAvailableSpaceUsages);
Expand Down Expand Up @@ -211,25 +210,25 @@ public ClusterInfo getClusterInfo() {
public class SubmitReschedulingClusterInfoUpdatedJob implements Runnable {
@Override
public void run() {
if (logger.isTraceEnabled()) {
logger.trace("Submitting new rescheduling cluster info update job");
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Submitting new rescheduling cluster info update job");
}
try {
threadPool.executor(executorName()).execute(() -> {
try {
maybeRefresh();
} finally { //schedule again after we refreshed
if (isMaster) {
if (logger.isTraceEnabled()) {
logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString());
}
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), this);
}
}
});
} catch (EsRejectedExecutionException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex);
}
}
}
Expand All @@ -252,7 +251,7 @@ protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse
* Retrieve the latest indices stats, calling the listener when complete
* @return a latch that can be used to wait for the indices stats to complete if desired
*/
protected CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsResponse> listener) {
private CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsResponse> listener) {
final CountDownLatch latch = new CountDownLatch(1);
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
Expand All @@ -267,8 +266,8 @@ private void maybeRefresh() {
if (enabled) {
refresh();
} else {
if (logger.isTraceEnabled()) {
logger.trace("Skipping ClusterInfoUpdatedJob since it is disabled");
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Skipping ClusterInfoUpdatedJob since it is disabled");
}
}
}
Expand All @@ -277,30 +276,34 @@ private void maybeRefresh() {
* Refreshes the ClusterInfo in a blocking fashion
*/
public final ClusterInfo refresh() {
if (logger.isTraceEnabled()) {
logger.trace("Performing ClusterInfoUpdateJob");
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Performing ClusterInfoUpdateJob");
}
final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages = ImmutableOpenMap.builder();
fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages);
leastAvailableSpaceUsages = newLeastAvaiableUsages.build();
mostAvailableSpaceUsages = newMostAvaiableUsages.build();
public void onResponse(NodesStatsResponse nodesStatsResponse) {
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsagesBuilder = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsagesBuilder = ImmutableOpenMap.builder();
fillDiskUsagePerNode(
LOGGER,
nodesStatsResponse.getNodes(),
leastAvailableUsagesBuilder,
mostAvailableUsagesBuilder);
leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build();
mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build();
}

@Override
public void onFailure(Exception e) {
if (e instanceof ReceiveTimeoutTransportException) {
logger.error("NodeStatsAction timed out for ClusterInfoUpdateJob", e);
LOGGER.error("NodeStatsAction timed out for ClusterInfoUpdateJob", e);
} else {
if (e instanceof ClusterBlockException) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
} else {
logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
LOGGER.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e);
}
// we empty the usages list, to be safe - we don't know what's going on.
leastAvailableSpaceUsages = ImmutableOpenMap.of();
Expand All @@ -315,22 +318,22 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
ShardStats[] stats = indicesStatsResponse.getShards();
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
buildShardLevelInfo(LOGGER, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
shardSizes = newShardSizes.build();
shardRoutingToDataPath = newShardRoutingToDataPath.build();
}

@Override
public void onFailure(Exception e) {
if (e instanceof ReceiveTimeoutTransportException) {
logger.error("IndicesStatsAction timed out for ClusterInfoUpdateJob", e);
LOGGER.error("IndicesStatsAction timed out for ClusterInfoUpdateJob", e);
} else {
if (e instanceof ClusterBlockException) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
}
} else {
logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
LOGGER.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e);
}
// we empty the usages list, to be safe - we don't know what's going on.
shardSizes = ImmutableOpenMap.of();
Expand All @@ -343,24 +346,24 @@ public void onFailure(Exception e) {
nodeLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore interrupt status
logger.warn("Failed to update node information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
LOGGER.warn("Failed to update node information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
}

try {
indicesLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore interrupt status
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
LOGGER.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
}
ClusterInfo clusterInfo = getClusterInfo();
boolean anyListeners = false;
for (final Consumer<ClusterInfo> listener : listeners) {
anyListeners = true;
try {
logger.trace("notifying [{}] of new cluster info", listener);
LOGGER.trace("notifying [{}] of new cluster info", listener);
listener.accept(clusterInfo);
} catch (Exception e) {
logger.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e);
LOGGER.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e);
}
}
assert anyListeners : "expected to notify at least one listener";
Expand All @@ -385,10 +388,10 @@ static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpen
}
}

private static void fillDiskUsagePerNode(Logger logger,
List<NodeStats> nodeStatsArray,
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages,
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages) {
static void fillDiskUsagePerNode(Logger logger,
List<NodeStats> nodeStatsArray,
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages,
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages) {
boolean traceEnabled = logger.isTraceEnabled();
for (NodeStats nodeStats : nodeStatsArray) {
if (nodeStats.getFs() == null) {
Expand All @@ -400,7 +403,7 @@ private static void fillDiskUsagePerNode(Logger logger,
if (leastAvailablePath == null) {
assert mostAvailablePath == null;
mostAvailablePath = leastAvailablePath = info;
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
leastAvailablePath = info;
} else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) {
mostAvailablePath = info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,39 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
boolean subtractShardsMovingAway, String dataPath) {
ClusterInfo clusterInfo = allocation.clusterInfo();
long totalSize = 0;
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (dataPath.equals(actualPath)) {
if (routing.initializing() && routing.relocatingNodeId() != null) {
totalSize += getExpectedShardSize(routing, allocation, 0);
} else if (subtractShardsMovingAway && routing.relocating()) {

for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
if (routing.relocatingNodeId() == null) {
// in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created
// by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking
// any additional space and can be ignored here
continue;
}

final String actualPath = clusterInfo.getDataPath(routing);
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
// free space
if (actualPath == null || actualPath.equals(dataPath)) {
totalSize += getExpectedShardSize(routing, allocation, 0);
}
}

if (subtractShardsMovingAway) {
for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) {
String actualPath = clusterInfo.getDataPath(routing);
if (actualPath == null) {
// we might know the path of this shard from before when it was relocating
actualPath = clusterInfo.getDataPath(routing.cancelRelocation());
}
if (dataPath.equals(actualPath)) {
totalSize -= getExpectedShardSize(routing, allocation, 0);
}
}
}

return totalSize;
}


@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
ClusterInfo clusterInfo = allocation.clusterInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -955,4 +954,20 @@ public void logShardStates(ClusterState state) {
rn.shardsWithState(RELOCATING),
rn.shardsWithState(STARTED));
}

/**
* ClusterInfo that always reports /dev/null for the shards' data paths.
*/
static class DevNullClusterInfo extends ClusterInfo {
DevNullClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage,
ImmutableOpenMap<String, Long> shardSizes) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null);
}

@Override
public String getDataPath(ShardRouting shardRouting) {
return "/dev/null";
}
}
}
Loading

0 comments on commit 367c034

Please sign in to comment.