diff --git a/blackbox/docs/appendices/release-notes/unreleased.rst b/blackbox/docs/appendices/release-notes/unreleased.rst index 58e4a518539c..8b0db4a34f86 100644 --- a/blackbox/docs/appendices/release-notes/unreleased.rst +++ b/blackbox/docs/appendices/release-notes/unreleased.rst @@ -160,6 +160,14 @@ Changes Fixes ===== +- Fixed bug in the disk threshold decider logic that would ignore to account + new relocating shard (``STARTED`` to ``RELOCATING``) when deciding how to + allocate or relocate shards with respect to + :ref:`cluster.routing.allocation.disk.watermark.low + ` and + :ref:`cluster.routing.allocation.disk.watermark.high + ` settings. + - Fixed regression that prevented shards from reallocation when a node passes over :ref:`cluster.routing.allocation.disk.watermark.high `. diff --git a/es/es-server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/es/es-server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 52542e56818f..6dc302c74465 100644 --- a/es/es-server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/es/es-server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -48,9 +48,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ReceiveTimeoutTransportException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -68,7 +67,7 @@ */ public class InternalClusterInfoService implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener { - private static final Logger logger = LogManager.getLogger(InternalClusterInfoService.class); + private static final Logger LOGGER = LogManager.getLogger(InternalClusterInfoService.class); public static final Setting INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING = Setting.timeSetting("cluster.info.update.interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(10), @@ -79,8 +78,8 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode private volatile TimeValue updateFrequency; - private volatile ImmutableOpenMap leastAvailableSpaceUsages; - private volatile ImmutableOpenMap mostAvailableSpaceUsages; + volatile ImmutableOpenMap leastAvailableSpaceUsages; + volatile ImmutableOpenMap mostAvailableSpaceUsages; private volatile ImmutableOpenMap shardRoutingToDataPath; private volatile ImmutableOpenMap shardSizes; private volatile boolean isMaster = false; @@ -89,7 +88,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode private final ClusterService clusterService; private final ThreadPool threadPool; private final NodeClient client; - private final List> listeners = Collections.synchronizedList(new ArrayList<>(1)); + private final List> listeners = new CopyOnWriteArrayList<>(); public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { this.leastAvailableSpaceUsages = ImmutableOpenMap.of(); @@ -124,8 +123,8 @@ void setUpdateFrequency(TimeValue updateFrequency) { @Override public void onMaster() { this.isMaster = true; - if (logger.isTraceEnabled()) { - logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("I have been elected master, scheduling a ClusterInfoUpdateJob"); } // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running @@ -137,8 +136,8 @@ public void onMaster() { threadPool.executor(executorName()).execute(this::maybeRefresh); } } catch (EsRejectedExecutionException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Couldn't schedule cluster info update task - node might be shutting down", ex); } } } @@ -169,17 +168,17 @@ public void clusterChanged(ClusterChangedEvent event) { } if (this.isMaster && dataNodeAdded && event.state().getNodes().getDataNodes().size() > 1) { - if (logger.isDebugEnabled()) { - logger.debug("data node was added, retrieving new cluster info"); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("data node was added, retrieving new cluster info"); } - threadPool.executor(executorName()).execute(() -> maybeRefresh()); + threadPool.executor(executorName()).execute(this::maybeRefresh); } if (this.isMaster && event.nodesRemoved()) { for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { if (removedNode.isDataNode()) { - if (logger.isTraceEnabled()) { - logger.trace("Removing node from cluster info: {}", removedNode.getId()); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Removing node from cluster info: {}", removedNode.getId()); } if (leastAvailableSpaceUsages.containsKey(removedNode.getId())) { ImmutableOpenMap.Builder newMaxUsages = ImmutableOpenMap.builder(leastAvailableSpaceUsages); @@ -211,8 +210,8 @@ public ClusterInfo getClusterInfo() { public class SubmitReschedulingClusterInfoUpdatedJob implements Runnable { @Override public void run() { - if (logger.isTraceEnabled()) { - logger.trace("Submitting new rescheduling cluster info update job"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Submitting new rescheduling cluster info update job"); } try { threadPool.executor(executorName()).execute(() -> { @@ -220,16 +219,16 @@ public void run() { maybeRefresh(); } finally { //schedule again after we refreshed if (isMaster) { - if (logger.isTraceEnabled()) { - logger.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString()); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Scheduling next run for updating cluster info in: {}", updateFrequency.toString()); } threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), this); } } }); } catch (EsRejectedExecutionException ex) { - if (logger.isDebugEnabled()) { - logger.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Couldn't re-schedule cluster info update task - node might be shutting down", ex); } } } @@ -252,7 +251,7 @@ protected CountDownLatch updateNodeStats(final ActionListener listener) { + private CountDownLatch updateIndicesStats(final ActionListener listener) { final CountDownLatch latch = new CountDownLatch(1); final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.clear(); @@ -267,8 +266,8 @@ private void maybeRefresh() { if (enabled) { refresh(); } else { - if (logger.isTraceEnabled()) { - logger.trace("Skipping ClusterInfoUpdatedJob since it is disabled"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Skipping ClusterInfoUpdatedJob since it is disabled"); } } } @@ -277,30 +276,34 @@ private void maybeRefresh() { * Refreshes the ClusterInfo in a blocking fashion */ public final ClusterInfo refresh() { - if (logger.isTraceEnabled()) { - logger.trace("Performing ClusterInfoUpdateJob"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Performing ClusterInfoUpdateJob"); } final CountDownLatch nodeLatch = updateNodeStats(new ActionListener<>() { @Override - public void onResponse(NodesStatsResponse nodeStatses) { - ImmutableOpenMap.Builder newLeastAvaiableUsages = ImmutableOpenMap.builder(); - ImmutableOpenMap.Builder newMostAvaiableUsages = ImmutableOpenMap.builder(); - fillDiskUsagePerNode(logger, nodeStatses.getNodes(), newLeastAvaiableUsages, newMostAvaiableUsages); - leastAvailableSpaceUsages = newLeastAvaiableUsages.build(); - mostAvailableSpaceUsages = newMostAvaiableUsages.build(); + public void onResponse(NodesStatsResponse nodesStatsResponse) { + ImmutableOpenMap.Builder leastAvailableUsagesBuilder = ImmutableOpenMap.builder(); + ImmutableOpenMap.Builder mostAvailableUsagesBuilder = ImmutableOpenMap.builder(); + fillDiskUsagePerNode( + LOGGER, + nodesStatsResponse.getNodes(), + leastAvailableUsagesBuilder, + mostAvailableUsagesBuilder); + leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build(); + mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build(); } @Override public void onFailure(Exception e) { if (e instanceof ReceiveTimeoutTransportException) { - logger.error("NodeStatsAction timed out for ClusterInfoUpdateJob", e); + LOGGER.error("NodeStatsAction timed out for ClusterInfoUpdateJob", e); } else { if (e instanceof ClusterBlockException) { - if (logger.isTraceEnabled()) { - logger.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); } } else { - logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); + LOGGER.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); } // we empty the usages list, to be safe - we don't know what's going on. leastAvailableSpaceUsages = ImmutableOpenMap.of(); @@ -315,7 +318,7 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { ShardStats[] stats = indicesStatsResponse.getShards(); ImmutableOpenMap.Builder newShardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder newShardRoutingToDataPath = ImmutableOpenMap.builder(); - buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state()); + buildShardLevelInfo(LOGGER, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state()); shardSizes = newShardSizes.build(); shardRoutingToDataPath = newShardRoutingToDataPath.build(); } @@ -323,14 +326,14 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { @Override public void onFailure(Exception e) { if (e instanceof ReceiveTimeoutTransportException) { - logger.error("IndicesStatsAction timed out for ClusterInfoUpdateJob", e); + LOGGER.error("IndicesStatsAction timed out for ClusterInfoUpdateJob", e); } else { if (e instanceof ClusterBlockException) { - if (logger.isTraceEnabled()) { - logger.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); } } else { - logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); + LOGGER.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); } // we empty the usages list, to be safe - we don't know what's going on. shardSizes = ImmutableOpenMap.of(); @@ -343,24 +346,24 @@ public void onFailure(Exception e) { nodeLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // restore interrupt status - logger.warn("Failed to update node information for ClusterInfoUpdateJob within {} timeout", fetchTimeout); + LOGGER.warn("Failed to update node information for ClusterInfoUpdateJob within {} timeout", fetchTimeout); } try { indicesLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // restore interrupt status - logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout); + LOGGER.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout); } ClusterInfo clusterInfo = getClusterInfo(); boolean anyListeners = false; for (final Consumer listener : listeners) { anyListeners = true; try { - logger.trace("notifying [{}] of new cluster info", listener); + LOGGER.trace("notifying [{}] of new cluster info", listener); listener.accept(clusterInfo); } catch (Exception e) { - logger.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e); + LOGGER.info(new ParameterizedMessage("failed to notify [{}] of new cluster info", listener), e); } } assert anyListeners : "expected to notify at least one listener"; @@ -385,10 +388,10 @@ static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpen } } - private static void fillDiskUsagePerNode(Logger logger, - List nodeStatsArray, - ImmutableOpenMap.Builder newLeastAvaiableUsages, - ImmutableOpenMap.Builder newMostAvaiableUsages) { + static void fillDiskUsagePerNode(Logger logger, + List nodeStatsArray, + ImmutableOpenMap.Builder newLeastAvaiableUsages, + ImmutableOpenMap.Builder newMostAvaiableUsages) { boolean traceEnabled = logger.isTraceEnabled(); for (NodeStats nodeStats : nodeStatsArray) { if (nodeStats.getFs() == null) { @@ -400,7 +403,7 @@ private static void fillDiskUsagePerNode(Logger logger, if (leastAvailablePath == null) { assert mostAvailablePath == null; mostAvailablePath = leastAvailablePath = info; - } else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()){ + } else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) { leastAvailablePath = info; } else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) { mostAvailablePath = info; diff --git a/es/es-server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/es/es-server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 5f1560af27a9..700ee8d737e5 100644 --- a/es/es-server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/es/es-server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -90,20 +90,39 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio boolean subtractShardsMovingAway, String dataPath) { ClusterInfo clusterInfo = allocation.clusterInfo(); long totalSize = 0; - for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING, ShardRoutingState.INITIALIZING)) { - String actualPath = clusterInfo.getDataPath(routing); - if (dataPath.equals(actualPath)) { - if (routing.initializing() && routing.relocatingNodeId() != null) { - totalSize += getExpectedShardSize(routing, allocation, 0); - } else if (subtractShardsMovingAway && routing.relocating()) { + + for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) { + if (routing.relocatingNodeId() == null) { + // in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created + // by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking + // any additional space and can be ignored here + continue; + } + + final String actualPath = clusterInfo.getDataPath(routing); + // if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least + // free space + if (actualPath == null || actualPath.equals(dataPath)) { + totalSize += getExpectedShardSize(routing, allocation, 0); + } + } + + if (subtractShardsMovingAway) { + for (ShardRouting routing : node.shardsWithState(ShardRoutingState.RELOCATING)) { + String actualPath = clusterInfo.getDataPath(routing); + if (actualPath == null) { + // we might know the path of this shard from before when it was relocating + actualPath = clusterInfo.getDataPath(routing.cancelRelocation()); + } + if (dataPath.equals(actualPath)) { totalSize -= getExpectedShardSize(routing, allocation, 0); } } } + return totalSize; } - @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { ClusterInfo clusterInfo = allocation.clusterInfo(); diff --git a/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index d153886f4ac6..47cf96d79ef6 100644 --- a/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -955,4 +954,20 @@ public void logShardStates(ClusterState state) { rn.shardsWithState(RELOCATING), rn.shardsWithState(STARTED)); } + + /** + * ClusterInfo that always reports /dev/null for the shards' data paths. + */ + static class DevNullClusterInfo extends ClusterInfo { + DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null); + } + + @Override + public String getDataPath(ShardRouting shardRouting) { + return "/dev/null"; + } + } } diff --git a/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index 3fd51b1adc98..16ef9622cac8 100644 --- a/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/es/es-server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -28,7 +28,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.ESAllocationTestCase; -import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; +import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDeciderTests.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; diff --git a/es/es-testing/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/es/es-testing/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index b6167ec3b771..9d406716bf23 100644 --- a/es/es-testing/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/es/es-testing/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -16,125 +16,116 @@ * specific language governing permissions and limitations * under the License. */ -package org.elasticsearch.cluster; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; +package org.elasticsearch.cluster; -import org.elasticsearch.Version; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; -/** - * Fake ClusterInfoService class that allows updating the nodes stats disk - * usage with fake values - */ public class MockInternalClusterInfoService extends InternalClusterInfoService { + private static final Logger LOGGER = LogManager.getLogger(MockInternalClusterInfoService.class); + /** * This is a marker plugin used to trigger MockNode to use this mock info service. */ public static class TestPlugin extends Plugin {} - private final ClusterName clusterName; - private volatile NodeStats[] stats = new NodeStats[3]; + @Nullable // if no fakery should take place + public volatile Function shardSizeFunction; - /** - * Create a fake NodeStats for the given node and usage - */ - private static NodeStats makeStats(String nodeName, DiskUsage usage) { - FsInfo.Path[] paths = new FsInfo.Path[1]; - FsInfo.Path path = new FsInfo.Path("/dev/null", null, - usage.getTotalBytes(), usage.getFreeBytes(), usage.getFreeBytes()); - paths[0] = path; - FsInfo fsInfo = new FsInfo(System.currentTimeMillis(), null, paths); - return new NodeStats( - new DiscoveryNode( - nodeName, - ESTestCase.buildNewFakeTransportAddress(), - emptyMap(), - emptySet(), - Version.CURRENT), - System.currentTimeMillis(), - fsInfo); - } + @Nullable // if no fakery should take place + public volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { super(settings, clusterService, threadPool, client); - this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings); - stats[0] = makeStats("node_t1", new DiskUsage("node_t1", "n1", "/dev/null", 100, 100)); - stats[1] = makeStats("node_t2", new DiskUsage("node_t2", "n2", "/dev/null", 100, 100)); - stats[2] = makeStats("node_t3", new DiskUsage("node_t3", "n3", "/dev/null", 100, 100)); - } - - public void setN1Usage(String nodeName, DiskUsage newUsage) { - stats[0] = makeStats(nodeName, newUsage); - } - - public void setN2Usage(String nodeName, DiskUsage newUsage) { - stats[1] = makeStats(nodeName, newUsage); - } - - public void setN3Usage(String nodeName, DiskUsage newUsage) { - stats[2] = makeStats(nodeName, newUsage); } @Override - public CountDownLatch updateNodeStats(final ActionListener listener) { - NodesStatsResponse response = new NodesStatsResponse( - clusterName, - Arrays.asList(stats), - Collections.emptyList()); - listener.onResponse(response); - return new CountDownLatch(0); + public ClusterInfo getClusterInfo() { + final ClusterInfo clusterInfo = super.getClusterInfo(); + return new SizeFakingClusterInfo(clusterInfo); } @Override - public CountDownLatch updateIndicesStats(final ActionListener listener) { - // Not used, so noop - return new CountDownLatch(0); + protected CountDownLatch updateNodeStats(ActionListener listener) { + return super.updateNodeStats(new ActionListener<>() { + @Override + public void onResponse(NodesStatsResponse nodesStatsResponse) { + ImmutableOpenMap.Builder leastAvailableUsagesBuilder = ImmutableOpenMap.builder(); + ImmutableOpenMap.Builder mostAvailableUsagesBuilder = ImmutableOpenMap.builder(); + fillDiskUsagePerNode( + LOGGER, + adjustNodesStats(nodesStatsResponse.getNodes()), + leastAvailableUsagesBuilder, + mostAvailableUsagesBuilder + ); + leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build(); + mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build(); + } + + @Override + public void onFailure(Exception e) { + } + }); } - @Override - public ClusterInfo getClusterInfo() { - ClusterInfo clusterInfo = super.getClusterInfo(); - return new DevNullClusterInfo(clusterInfo.getNodeLeastAvailableDiskUsages(), - clusterInfo.getNodeMostAvailableDiskUsages(), clusterInfo.shardSizes); + private List adjustNodesStats(List nodesStats) { + BiFunction diskUsageFunction = this.diskUsageFunction; + return nodesStats.stream().map(nodeStats -> { + final DiscoveryNode discoveryNode = nodeStats.getNode(); + final FsInfo oldFsInfo = nodeStats.getFs(); + return new NodeStats( + discoveryNode, + nodeStats.getTimestamp(), + new FsInfo( + oldFsInfo.getTimestamp(), + oldFsInfo.getIoStats(), + StreamSupport.stream(oldFsInfo.spliterator(), false) + .map(fsInfoPath -> diskUsageFunction.apply(discoveryNode, fsInfoPath)) + .toArray(FsInfo.Path[]::new) + )); + }).collect(Collectors.toList()); } - /** - * ClusterInfo that always points to DevNull. - */ - public static class DevNullClusterInfo extends ClusterInfo { - public DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, - ImmutableOpenMap mostAvailableSpaceUsage, - ImmutableOpenMap shardSizes) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null); + class SizeFakingClusterInfo extends ClusterInfo { + SizeFakingClusterInfo(ClusterInfo delegate) { + super(delegate.getNodeLeastAvailableDiskUsages(), delegate.getNodeMostAvailableDiskUsages(), + delegate.shardSizes, delegate.routingToDataPath); } @Override - public String getDataPath(ShardRouting shardRouting) { - return "/dev/null"; + public Long getShardSize(ShardRouting shardRouting) { + final Function shardSizeFunction = MockInternalClusterInfoService.this.shardSizeFunction; + if (shardSizeFunction == null) { + return super.getShardSize(shardRouting); + } + + return shardSizeFunction.apply(shardRouting); } } diff --git a/sql/src/test/java/io/crate/integrationtests/DiskUsagesITest.java b/sql/src/test/java/io/crate/integrationtests/DiskUsagesITest.java index 5d9109a2e716..2d48904b13ea 100644 --- a/sql/src/test/java/io/crate/integrationtests/DiskUsagesITest.java +++ b/sql/src/test/java/io/crate/integrationtests/DiskUsagesITest.java @@ -24,25 +24,30 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.MockInternalClusterInfoService; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.Environment; +import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.StreamSupport; import static java.util.stream.Collectors.toList; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class DiskUsagesITest extends SQLTransportIntegrationTest { @@ -55,6 +60,7 @@ protected Collection> nodePlugins() { return plugins; } + @Test public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { for (int i = 0; i < 3; i++) { // ensure that each node has a single data path @@ -63,67 +69,289 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { .put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()) .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")); } + List nodeIds = getNodeIds(); - List nodeIds = StreamSupport.stream(clusterState().getRoutingNodes().spliterator(), false) - .map(RoutingNode::nodeId) - .collect(toList()); - - // Start with all nodes at 50% usage - var clusterInfoService = (MockInternalClusterInfoService) - internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); + var clusterInfoService = getMockInternalClusterInfoService(); clusterInfoService.setUpdateFrequency(TimeValue.timeValueMillis(200)); clusterInfoService.onMaster(); - clusterInfoService.setN1Usage(nodeIds.get(0), new DiskUsage(nodeIds.get(0), "n1", "/dev/null", 100, 50)); - clusterInfoService.setN2Usage(nodeIds.get(1), new DiskUsage(nodeIds.get(1), "n2", "/dev/null", 100, 50)); - clusterInfoService.setN3Usage(nodeIds.get(2), new DiskUsage(nodeIds.get(2), "n3", "/dev/null", 100, 50)); + + // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk + clusterInfoService.shardSizeFunction = shardRouting -> 0L; + // start with all nodes below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> + setDiskUsage(fsInfoPath, 100, between(10, 100)); execute("SET GLOBAL TRANSIENT" + " cluster.routing.allocation.disk.watermark.low='80%'," + " cluster.routing.allocation.disk.watermark.high='90%'," + " cluster.routing.allocation.disk.watermark.flood_stage='100%'"); - // Create a table with 10 shards so we can check allocation for it execute("CREATE TABLE t (id INT PRIMARY KEY) " + "CLUSTERED INTO 10 SHARDS " + "WITH (number_of_replicas = 0)"); ensureGreen(); - - assertBusy(() -> { - HashMap shardCountByNodeId = getShardCountByNodeId(); + { + var shardCountByNodeId = getShardCountByNodeId(); assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3)); assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3)); assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3)); - }); + } // move node3 above high watermark - clusterInfoService.setN1Usage(nodeIds.get(0), new DiskUsage(nodeIds.get(0), "n1", "_na_", 100, 50)); - clusterInfoService.setN2Usage(nodeIds.get(1), new DiskUsage(nodeIds.get(1), "n2", "_na_", 100, 50)); - clusterInfoService.setN3Usage(nodeIds.get(2), new DiskUsage(nodeIds.get(2), "n3", "_na_", 100, 0)); + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage( + fsInfoPath, 100, + discoveryNode.getId().equals(nodeIds.get(2)) ? between(0, 9) : between(10, 100)); assertBusy(() -> { - HashMap shardCountByNodeId = getShardCountByNodeId(); - assertThat("node0 has 5 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(5)); - assertThat("node1 has 5 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(5)); - assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(0)); + var shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 5 shards", shardCountByNodeId.get(nodeIds.get(0)), is(5)); + assertThat("node1 has 5 shards", shardCountByNodeId.get(nodeIds.get(1)), is(5)); + assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), is(0)); }); // move all nodes below watermark again - clusterInfoService.setN1Usage(nodeIds.get(0), new DiskUsage(nodeIds.get(0), "n1", "_na_", 100, 50)); - clusterInfoService.setN2Usage(nodeIds.get(1), new DiskUsage(nodeIds.get(1), "n2", "_na_", 100, 50)); - clusterInfoService.setN3Usage(nodeIds.get(2), new DiskUsage(nodeIds.get(2), "n3", "_na_", 100, 50)); + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> + setDiskUsage(fsInfoPath, 100, between(10, 100)); assertBusy(() -> { - HashMap shardCountByNodeId = getShardCountByNodeId(); + var shardCountByNodeId = getShardCountByNodeId(); assertThat("node0 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(0)), greaterThanOrEqualTo(3)); assertThat("node1 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(1)), greaterThanOrEqualTo(3)); assertThat("node2 has at least 3 shards", shardCountByNodeId.get(nodeIds.get(2)), greaterThanOrEqualTo(3)); }); } - static private ClusterState clusterState() { + @Test + public void testOnlyMovesEnoughShardsToDropBelowHighWatermark() throws Exception { + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode( + Settings.builder() + .put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()) + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")); + } + List nodeIds = getNodeIds(); + + var clusterInfoService = getMockInternalClusterInfoService(); + AtomicReference masterAppliedClusterState = new AtomicReference<>(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + masterAppliedClusterState.set(event.state()); + clusterInfoService.refresh(); // so subsequent reroute sees disk usage according to the current state + }); + + // shards are 1 byte large + clusterInfoService.shardSizeFunction = shardRouting -> 1L; + // start with all nodes below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> + setDiskUsage(fsInfoPath, 1000L, 1000L); + + execute("SET GLOBAL TRANSIENT" + + " cluster.routing.allocation.disk.watermark.low='90%'," + + " cluster.routing.allocation.disk.watermark.high='90%'," + + " cluster.routing.allocation.disk.watermark.flood_stage='100%'"); + + execute("CREATE TABLE t (id INT PRIMARY KEY) " + + "CLUSTERED INTO 6 SHARDS " + + "WITH (number_of_replicas = 0)"); + ensureGreen(); + + var shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), is(2)); + assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), is(2)); + assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), is(2)); + + // disable rebalancing, or else we might move too many shards away and then rebalance them back again + execute("SET GLOBAL TRANSIENT cluster.routing.rebalance.enable='none'"); + + // node2 suddenly has 99 bytes free, less than 10%, but moving one shard is enough to bring it up to 100 bytes free: + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage( + fsInfoPath, 1000L, + discoveryNode.getId().equals(nodeIds.get(2)) + ? 101L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards() + : 1000L); + + clusterInfoService.refresh(); + + logger.info("waiting for shards to relocate off node [{}]", nodeIds.get(2)); + + // must wait for relocation to start + assertBusy(() -> assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), is(1))); + + // ensure that relocations finished without moving any more shards + ensureGreen(); + assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), is(1)); + } + + @Test + public void testDoesNotExceedLowWatermarkWhenRebalancing() { + for (int i = 0; i < 3; i++) { + // ensure that each node has a single data path + internalCluster().startNode( + Settings.builder() + .put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()) + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")); + } + List nodeIds = getNodeIds(); + + var clusterInfoService = getMockInternalClusterInfoService(); + + AtomicReference masterAppliedClusterState = new AtomicReference<>(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + assertThat(event.state().getRoutingNodes().node(nodeIds.get(2)).size(), lessThanOrEqualTo(1)); + masterAppliedClusterState.set(event.state()); + clusterInfoService.refresh(); // so a subsequent reroute sees disk usage according to the current state + }); + + // shards are 1 byte large + clusterInfoService.shardSizeFunction = shardRouting -> 1L; + // node 2 only has space for one shard + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage( + fsInfoPath, 1000L, + discoveryNode.getId().equals(nodeIds.get(2)) + ? 150L - masterAppliedClusterState.get().getRoutingNodes().node(nodeIds.get(2)).numberOfOwningShards() + : 1000L); + + execute("SET GLOBAL TRANSIENT" + + " cluster.routing.allocation.disk.watermark.low='85%'," + + " cluster.routing.allocation.disk.watermark.high='100%'," + + " cluster.routing.allocation.disk.watermark.flood_stage='100%'"); + + execute("CREATE TABLE t (id INT PRIMARY KEY) " + + "CLUSTERED INTO 6 SHARDS " + + "WITH (" + + " number_of_replicas = 0, \"routing.allocation.exclude._id\" = ?)", + new Object[]{nodeIds.get(2)}); + ensureGreen(); + + var shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 3 shards", shardCountByNodeId.get(nodeIds.get(0)), is(3)); + assertThat("node1 has 3 shards", shardCountByNodeId.get(nodeIds.get(1)), is(3)); + assertThat("node2 has 0 shards", shardCountByNodeId.get(nodeIds.get(2)), is(0)); + + execute("ALTER TABLE t RESET (\"routing.allocation.exclude._id\")"); + + logger.info("waiting for shards to relocate onto node [{}]", nodeIds.get(2)); + + ensureGreen(); + assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), is(1)); + } + + @Test + public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception { + // start one node with two data paths + String pathOverWatermark = createTempDir().toString(); + Settings.Builder twoPathSettings = Settings.builder() + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms"); + if (randomBoolean()) { + twoPathSettings.putList( + Environment.PATH_DATA_SETTING.getKey(), + createTempDir().toString(), + pathOverWatermark); + } else { + twoPathSettings.putList( + Environment.PATH_DATA_SETTING.getKey(), + pathOverWatermark, + createTempDir().toString()); + } + internalCluster().startNode(twoPathSettings); + + execute("SELECT id FROM sys.nodes"); + assertThat(response.rows().length, is(1)); + String nodeIdWithTwoPaths = (String) response.rows()[0][0]; + + // other two nodes have one data path each + internalCluster().startNode( + Settings.builder() + .put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()) + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")); + internalCluster().startNode( + Settings.builder() + .put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()) + .put(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "1ms")); + List nodeIds = getNodeIds(); + + var clusterInfoService = getMockInternalClusterInfoService(); + // prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk + clusterInfoService.shardSizeFunction = shardRouting -> 0L; + // start with all paths below the watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> + setDiskUsage(fsInfoPath, 100, between(10, 100)); + + execute("SET GLOBAL TRANSIENT" + + " cluster.routing.allocation.disk.watermark.low='90%'," + + " cluster.routing.allocation.disk.watermark.high='90%'," + + " cluster.routing.allocation.disk.watermark.flood_stage='100%'"); + + execute("CREATE TABLE doc.t (id INT PRIMARY KEY) " + + "CLUSTERED INTO 6 SHARDS " + + "WITH (number_of_replicas = 0)"); + ensureGreen(); + + var shardCountByNodeId = getShardCountByNodeId(); + assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), is(2)); + assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), is(2)); + assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), is(2)); + + // there should be one shard on a bad path on node0 + execute( + "SELECT path " + + "FROM sys.shards " + + "WHERE table_name = 't' " + + " AND schema_name = 'doc'" + + " AND node['id'] = ?", + new Object[]{nodeIdWithTwoPaths}); + assertThat( + Arrays.stream(response.rows()) + .map(row -> (String) row[0]) + .filter(path -> path != null && path.startsWith(pathOverWatermark)) + .count(), + is(1L)); + + // one of the paths on node0 suddenly exceeds the high watermark + clusterInfoService.diskUsageFunction = (discoveryNode, fsInfoPath) -> setDiskUsage( + fsInfoPath, 100L, + fsInfoPath.getPath().startsWith(pathOverWatermark) ? between(0, 9) : between(10, 100)); + + // disable rebalancing, or else we might move shards back + // onto the over-full path since we're not faking that + execute("SET GLOBAL TRANSIENT cluster.routing.rebalance.enable='none'"); + + clusterInfoService.refresh(); + + logger.info("waiting for shards to relocate off path [{}]", pathOverWatermark); + assertBusy(() -> { + execute( + "SELECT path " + + "FROM sys.shards " + + "WHERE table_name = 't'" + + " AND schema_name = 'doc'"); + assertThat( + Arrays.stream(response.rows()) + .map(row -> (String) row[0]) + .filter(path -> path != null && path.startsWith(pathOverWatermark)) + .count(), + is(0L)); + }); + } + + private static ClusterState clusterState() { return client().admin().cluster().prepareState().get().getState(); } + private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); + } + + private static List getNodeIds() { + return StreamSupport.stream(clusterState().getRoutingNodes().spliterator(), false) + .map(RoutingNode::nodeId) + .collect(toList()); + } + + private MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentMasterNodeInstance(ClusterInfoService.class); + } + private HashMap getShardCountByNodeId() { HashMap shardCountByNodeId = new HashMap<>(); var clusterState = clusterState();