diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 543118a172f95..968b77edce040 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -36,6 +36,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.StaleShard; @@ -88,8 +91,13 @@ public ShardStateAction(Settings settings, ClusterService clusterService, Transp this.clusterService = clusterService; this.threadPool = threadPool; - transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); - transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger)); + transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, + new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger)); + AllocationRetryBackoffPolicy backoffPolicy = AllocationRetryBackoffPolicy.policyForSettings(settings); + ShardFailedClusterStateTaskExecutor shardFailedExecutor = + new ShardFailedClusterStateTaskExecutor(allocationService, routingService, backoffPolicy, logger); + transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, + new ShardFailedTransportHandler(clusterService, shardFailedExecutor, logger)); } private void sendShardAction(final String actionName, final ClusterState currentState, final ShardEntry shardEntry, final Listener listener) { @@ -251,11 +259,14 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final RoutingService routingService; + private final AllocationRetryBackoffPolicy backoffPolicy; private final Logger logger; - public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, Logger logger) { + public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, + AllocationRetryBackoffPolicy backoffPolicy, Logger logger) { this.allocationService = allocationService; this.routingService = routingService; + this.backoffPolicy = backoffPolicy; this.logger = logger; } @@ -341,14 +352,15 @@ ClusterState applyFailedShards(ClusterState currentState, List fail @Override public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { - int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size(); - if (numberOfUnassignedShards > 0) { - String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards); - if (logger.isTraceEnabled()) { - logger.trace("{}, scheduling a reroute", reason); - } - routingService.reroute(reason); - } + List unassigned = clusterChangedEvent.state().getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED); + unassigned.stream() + .mapToInt(s -> s.unassignedInfo().getNumFailedAllocations()) + .min() + .ifPresent(numberOfFailures -> { + String reason = String.format(Locale.ROOT, "Schedule rerouting after [%s] failures", numberOfFailures); + TimeValue delay = backoffPolicy.delayInterval(numberOfFailures); + routingService.scheduleReroute(reason, delay); + }); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 1c3d629a72fea..130532ce25791 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -30,8 +30,14 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.threadpool.ThreadPool; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * A {@link RoutingService} listens to clusters state. When this service @@ -51,14 +57,17 @@ public class RoutingService extends AbstractLifecycleComponent { private final ClusterService clusterService; private final AllocationService allocationService; + private final ThreadPool threadPool; - private AtomicBoolean rerouting = new AtomicBoolean(); + private final AtomicBoolean rerouting = new AtomicBoolean(); + private final AtomicReference pendingTask = new AtomicReference<>(); @Inject - public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService) { + public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService, ThreadPool threadPool) { super(settings); this.clusterService = clusterService; this.allocationService = allocationService; + this.threadPool = threadPool; } @Override @@ -74,8 +83,39 @@ protected void doClose() { } /** - * Initiates a reroute. + * Schedules a one-shot reroute action after the given delay. + * This schedule may be skipped if there is an ongoing rerouting. */ + public void scheduleReroute(String reason, TimeValue delay) { + if (logger.isTraceEnabled()){ + logger.trace("Schedule reroute in [{}], reason [{}]", delay, reason); + } + + ScheduledFuture newTask = null; + while (true) { + final ScheduledFuture existingTask = pendingTask.get(); + final long existingDelayMS = (existingTask == null) ? Long.MAX_VALUE : existingTask.getDelay(TimeUnit.MILLISECONDS); + + if (newTask == null && existingDelayMS > delay.millis()) { + newTask = threadPool.schedule(delay, ThreadPool.Names.SAME, () -> performReroute(reason)); + } + if (newTask == null) { + return; + } + if (existingDelayMS > newTask.getDelay(TimeUnit.MILLISECONDS)) { + if (pendingTask.compareAndSet(existingTask, newTask) == true) { + if (existingTask != null) { + FutureUtils.cancel(existingTask); + } + return; + } + } else { + FutureUtils.cancel(newTask); + return; + } + } + } + public final void reroute(String reason) { performReroute(reason); } @@ -119,6 +159,8 @@ public void onFailure(String source, Exception e) { rerouting.set(false); ClusterState state = clusterService.state(); logger.warn((Supplier) () -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e); + } finally { + pendingTask.set(null); } } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationRetryBackoffPolicy.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationRetryBackoffPolicy.java new file mode 100644 index 0000000000000..226ab8c46a8fb --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationRetryBackoffPolicy.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.Random; +import java.util.function.Function; + +/** + * + * A policy controls when to retry allocating if shard allocation has failed. + */ +public abstract class AllocationRetryBackoffPolicy { + public enum PolicyType { + NO_BACKOFF(5) { + @Override + AllocationRetryBackoffPolicy policyForSettings(Settings settings) { + return noBackOffPolicy(); + } + }, + EXPONENTIAL_BACKOFF(1000) { + @Override + AllocationRetryBackoffPolicy policyForSettings(Settings settings) { + return exponentialBackoffPolicy(settings); + } + }; + private final int defaultMaxRetries; + + abstract AllocationRetryBackoffPolicy policyForSettings(Settings settings); + + PolicyType(int defaultMaxRetries) { + this.defaultMaxRetries = defaultMaxRetries; + } + + public int getDefaultMaxRetries() { + return defaultMaxRetries; + } + + public static PolicyType fromString(String policyName) { + if ("exponential_backoff".equals(policyName)) { + return EXPONENTIAL_BACKOFF; + } else if ("no_backoff".equals(policyName)) { + return NO_BACKOFF; + } + throw new IllegalStateException("No backoff policy name match for [" + policyName + "]"); + } + } + + public static final Setting SETTING_ALLOCATION_RETRY_POLICY = + new Setting<>("cluster.allocation.retry.policy", "exponential_backoff", PolicyType::fromString, Setting.Property.NodeScope); + + public static final Function SETTING_ALLOCATION_DEFAULT_MAX_RETRIES = + settings -> SETTING_ALLOCATION_RETRY_POLICY.get(settings).defaultMaxRetries; + + public static final Setting SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_BASE_DELAY = + Setting.positiveTimeSetting("cluster.allocation.retry.exponential_backoff.base_delay", + TimeValue.timeValueMillis(50), Setting.Property.NodeScope); + + public static final Setting SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_MAX_DELAY = + Setting.positiveTimeSetting("cluster.allocation.retry.exponential_backoff.max_delay", + TimeValue.timeValueMinutes(30), Setting.Property.NodeScope); + + /** + * Determines a delay interval after a shard allocation has failed numOfFailures times. + * This method may produce different value for each call. + */ + public abstract TimeValue delayInterval(int numOfFailures); + + /** + * Constructs the allocation retry policy for the given settings. + */ + public static AllocationRetryBackoffPolicy policyForSettings(Settings settings) { + return SETTING_ALLOCATION_RETRY_POLICY.get(settings).policyForSettings(settings); + } + + public static AllocationRetryBackoffPolicy exponentialBackoffPolicy(Settings settings) { + return new ExponentialBackOffPolicy(settings); + } + + public static AllocationRetryBackoffPolicy noBackOffPolicy() { + return new NoBackoffPolicy(); + } + + static class ExponentialBackOffPolicy extends AllocationRetryBackoffPolicy { + private final Random random; + private final long delayUnitMS; + private final long maxDelayMS; + + ExponentialBackOffPolicy(Settings settings) { + this.random = new Random(Randomness.get().nextInt()); + this.delayUnitMS = SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_BASE_DELAY.get(settings).millis(); + this.maxDelayMS = SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_MAX_DELAY.get(settings).millis(); + } + + @Override + public TimeValue delayInterval(int numOfFailures) { + assert numOfFailures >= 0; + int bound = numOfFailures > 30 ? Integer.MAX_VALUE : 1 << numOfFailures; + return TimeValue.timeValueMillis(Math.min(maxDelayMS, delayUnitMS * random.nextInt(bound))); + } + } + + static class NoBackoffPolicy extends AllocationRetryBackoffPolicy { + @Override + public TimeValue delayInterval(int numOfFailures) { + return TimeValue.ZERO; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java index c3817b429bbf3..f5c5fe87911ad 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/MaxRetryAllocationDecider.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -37,8 +38,8 @@ */ public class MaxRetryAllocationDecider extends AllocationDecider { - public static final Setting SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries", 5, 0, - Setting.Property.Dynamic, Setting.Property.IndexScope); + public static final Setting SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries", + AllocationRetryBackoffPolicy.SETTING_ALLOCATION_DEFAULT_MAX_RETRIES, 0, Setting.Property.Dynamic, Setting.Property.IndexScope); public static final String NAME = "max_retry"; diff --git a/core/src/main/java/org/elasticsearch/common/settings/Setting.java b/core/src/main/java/org/elasticsearch/common/settings/Setting.java index f35df27e3b338..a9bd65ac11de5 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/core/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -903,6 +903,10 @@ public static Setting intSetting(String key, int defaultValue, int minV return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), properties); } + public static Setting intSetting(String key, Function defaultValue, int minValue, Property... properties) { + return new Setting<>(key, defaultValue.andThen(n -> Integer.toString(n)), (s) -> parseInt(s, minValue, key), properties); + } + public static Setting intSetting(String key, Setting fallbackSetting, int minValue, Property... properties) { return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key), properties); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 794f71786ca58..827b686ac72cc 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; @@ -30,20 +31,29 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.StaleShard; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; import org.junit.Before; import java.util.ArrayList; @@ -54,8 +64,10 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.hasSize; public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCase { @@ -65,7 +77,10 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa private MetaData metaData; private RoutingTable routingTable; private ClusterState clusterState; + private CapturingRetryBackoffPolicy retryBackoffPolicy; + private TestRoutingService routingService; private ShardStateAction.ShardFailedClusterStateTaskExecutor executor; + private ThreadPool threadPool; @Before public void setUp() throws Exception { @@ -83,7 +98,19 @@ public void setUp() throws Exception { .addAsNew(metaData.index(INDEX)) .build(); clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); - executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger); + retryBackoffPolicy = new CapturingRetryBackoffPolicy(Settings.EMPTY); + threadPool = new TestThreadPool(getClass().getName()); + routingService = new TestRoutingService(Settings.EMPTY, allocationService, threadPool); + executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, routingService, retryBackoffPolicy,logger); + } + + @After + public void shutDownThreadPool() throws Exception { + super.tearDown(); + if (threadPool != null){ + threadPool.shutdown(); + } + threadPool = null; } public void testEmptyTaskListProducesSameClusterState() throws Exception { @@ -114,7 +141,7 @@ public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Excepti ClusterState currentState = createClusterStateWithStartedShards(reason); List failingTasks = createExistingShards(currentState, reason); List nonExistentTasks = createNonExistentShards(currentState, reason); - ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) { + ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, AllocationRetryBackoffPolicy.noBackOffPolicy(), logger) { @Override ClusterState applyFailedShards(ClusterState currentState, List failedShards, List staleShards) { throw new RuntimeException("simulated applyFailedShards failure"); @@ -150,6 +177,32 @@ public void testIllegalShardFailureRequests() throws Exception { assertTaskResults(taskResultMap, result, currentState, false); } + public void testScheduleRetryWithBackoffDelay() throws Exception { + String reason = "test retry failed shard with backoff delay"; + ClusterState startedClusterState = createClusterStateWithStartedShards(reason); + int numOfFailures = randomIntBetween(1, 100); + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "Analyzer not found", + new UnsupportedOperationException(""), numOfFailures, + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT); + ShardRouting shardRouting = TestShardRouting.newShardRouting(INDEX, 0, null, "node-0", false, UNASSIGNED, unassignedInfo); + IndexRoutingTable routingTable = IndexRoutingTable.builder(clusterState.metaData().index(INDEX).getIndex()) + .addShard(shardRouting) + .build(); + ClusterState unassignedClusterState = ClusterState.builder(startedClusterState) + .routingTable(RoutingTable.builder().add(routingTable).build()) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("failed shard", unassignedClusterState, startedClusterState); + executor.clusterStatePublished(event); + + assertThat(retryBackoffPolicy.generatedDelays, hasSize(1)); + assertThat(routingService.schedules, hasSize(1)); + + Tuple backoffDelay = retryBackoffPolicy.generatedDelays.get(0); + TimeValue reroutingDelay = routingService.schedules.get(0); + assertThat(backoffDelay.v1(), equalTo(numOfFailures)); + assertThat(reroutingDelay, equalTo(backoffDelay.v2())); + } + private ClusterState createClusterStateWithStartedShards(String reason) { int numberOfNodes = 1 + numberOfReplicas; DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); @@ -278,4 +331,33 @@ private static List toTasks(ClusterState currentSta new CorruptIndexException("simulated", indexUUID))) .collect(Collectors.toList()); } + + static class CapturingRetryBackoffPolicy extends AllocationRetryBackoffPolicy { + final AllocationRetryBackoffPolicy delegate; + final List> generatedDelays = new ArrayList<>(10); + + CapturingRetryBackoffPolicy(Settings settings) { + this.delegate = AllocationRetryBackoffPolicy.policyForSettings(settings); + } + + @Override + public TimeValue delayInterval(int numOfFailures) { + TimeValue delay = delegate.delayInterval(numOfFailures); + generatedDelays.add(Tuple.tuple(numOfFailures, delay)); + return delay; + } + } + + static class TestRoutingService extends RoutingService { + final List schedules = new ArrayList<>(); + + TestRoutingService(Settings settings, AllocationService allocationService, ThreadPool threadPool) { + super(settings, null, allocationService, threadPool); + } + + @Override + public void scheduleReroute(String reason, TimeValue delay) { + schedules.add(delay); + } + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java index 09b957069a624..8fe2755be5931 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java @@ -19,45 +19,98 @@ package org.elasticsearch.cluster.routing; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; import org.junit.Before; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; public class RoutingServiceTests extends ESAllocationTestCase { private TestRoutingService routingService; + private ThreadPool threadPool; + + @After + public void stopThreadPool() throws Exception { + super.tearDown(); + if (threadPool != null) { + threadPool.shutdownNow(); + threadPool = null; + } + } @Before public void createRoutingService() { - routingService = new TestRoutingService(); + if (threadPool == null) { + threadPool = new TestThreadPool(getTestClass().getName()); + } + this.routingService = new TestRoutingService(threadPool); } public void testReroute() { - assertThat(routingService.hasReroutedAndClear(), equalTo(false)); + assertThat(routingService.getSchedulesAndClear(), empty()); routingService.reroute("test"); - assertThat(routingService.hasReroutedAndClear(), equalTo(true)); + assertThat(routingService.getSchedulesAndClear(), contains("test")); + } + + public void testScheduleReroute() throws Exception { + assertThat(routingService.getSchedulesAndClear(), empty()); + routingService.scheduleReroute("Schedule rerouting", TimeValue.timeValueMillis(20)); + assertBusy(() -> assertThat(routingService.getSchedulesAndClear(), contains("Schedule rerouting")), + 1, TimeUnit.SECONDS); + } + + public void testScheduleMultipleTimesCollapseToSingle() throws Exception { + assertThat(routingService.getSchedulesAndClear(), empty()); + IntStream.rangeClosed(0, randomIntBetween(5, 10)).forEach(n -> + routingService.scheduleReroute("Schedule-" + n, TimeValue.timeValueMillis(randomIntBetween(200, 300)))); + + routingService.scheduleReroute("master", TimeValue.timeValueMillis(randomInt(10))); + boolean rerouteMoreThanOnce = awaitBusy(() -> routingService.schedules.size() > 1, 2, TimeUnit.SECONDS); + assertThat(rerouteMoreThanOnce, equalTo(false)); + } + + public void testScheduleOverridePreviousSchedule() throws Exception { + assertThat(routingService.getSchedulesAndClear(), empty()); + routingService.scheduleReroute("1000ms", TimeValue.timeValueMillis(1000)); + routingService.scheduleReroute("108ms", TimeValue.timeValueMillis(108)); + routingService.scheduleReroute("500ms", TimeValue.timeValueMillis(500)); + routingService.scheduleReroute("100ms", TimeValue.timeValueMillis(100)); + assertBusy(() -> assertTrue(routingService.schedules.contains("100ms"))); + assertBusy(() -> assertFalse(routingService.schedules.contains("108ms"))); } private class TestRoutingService extends RoutingService { - private AtomicBoolean rerouted = new AtomicBoolean(); + final BlockingQueue schedules = new LinkedBlockingQueue<>(); - TestRoutingService() { - super(Settings.EMPTY, null, null); + TestRoutingService(ThreadPool threadPool) { + super(Settings.EMPTY, null, null, threadPool); } - public boolean hasReroutedAndClear() { - return rerouted.getAndSet(false); + List getSchedulesAndClear() { + List result = new ArrayList<>(); + schedules.drainTo(result); + return result; } @Override protected void performReroute(String reason) { logger.info("--> performing fake reroute [{}]", reason); - rerouted.set(true); + schedules.add(reason); } } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationRetryBackoffPolicyTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationRetryBackoffPolicyTests.java new file mode 100644 index 0000000000000..5039c1ca14962 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationRetryBackoffPolicyTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class AllocationRetryBackoffPolicyTests extends ESTestCase { + + public void testNoBackOff() throws Exception { + Settings noBackOffSettings = Settings.builder().put("cluster.allocation.retry.policy", "no_backoff").build(); + AllocationRetryBackoffPolicy backoffPolicy = AllocationRetryBackoffPolicy.policyForSettings(noBackOffSettings); + assertThat(backoffPolicy.delayInterval(randomIntBetween(0, 1000)), equalTo(TimeValue.ZERO)); + } + + public void testExponentialBackoff() throws Exception { + Settings defaultSettings = Settings.EMPTY; + TimeValue baseInterval = AllocationRetryBackoffPolicy.SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_BASE_DELAY.get(defaultSettings); + TimeValue maxInterval = AllocationRetryBackoffPolicy.SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_MAX_DELAY.get(defaultSettings); + assertThat(baseInterval, greaterThan(TimeValue.ZERO)); + assertThat(maxInterval, greaterThanOrEqualTo(baseInterval)); + AllocationRetryBackoffPolicy backoffPolicy = AllocationRetryBackoffPolicy.exponentialBackoffPolicy(defaultSettings); + int numOfFailures = randomIntBetween(10, 30); + TimeValue longestDelay = TimeValue.ZERO; + for (int n = 0; n < numOfFailures; n++) { + TimeValue delayInterval = backoffPolicy.delayInterval(n); + assertThat(delayInterval, lessThanOrEqualTo(maxInterval)); + if (longestDelay.compareTo(delayInterval) < 0) { + longestDelay = delayInterval; + } + long times = delayInterval.millis() / baseInterval.millis(); + assertThat((double) times, lessThanOrEqualTo(Math.pow(2, n))); + } + assertThat(longestDelay, greaterThan(baseInterval)); + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java index 616eff4381caf..b7056cc7238aa 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/InSyncAllocationIdTests.java @@ -58,7 +58,8 @@ public class InSyncAllocationIdTests extends ESAllocationTestCase { @Before public void setupAllocationService() { allocation = createAllocationService(); - failedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger); + failedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, + AllocationRetryBackoffPolicy.noBackOffPolicy(), logger); } public void testInSyncAllocationIdsUpdated() { @@ -164,7 +165,8 @@ public void testDeadNodesBeforeReplicaFailed() throws Exception { logger.info("fail replica (for which there is no shard routing in the CS anymore)"); assertNull(clusterState.getRoutingNodes().getByAllocationId(replicaShard.shardId(), replicaShard.allocationId().getId())); ShardStateAction.ShardFailedClusterStateTaskExecutor failedClusterStateTaskExecutor = - new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, logger); + new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocation, null, + AllocationRetryBackoffPolicy.noBackOffPolicy(), logger); long primaryTerm = clusterState.metaData().index("test").primaryTerm(0); clusterState = failedClusterStateTaskExecutor.execute(clusterState, Arrays.asList( new ShardEntry(shardRoutingTable.shardId(), replicaShard.allocationId().getId(), primaryTerm, "dummy", null)) diff --git a/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java b/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java index 65d51e126c9f6..68c836114013f 100644 --- a/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java +++ b/core/src/test/java/org/elasticsearch/common/settings/SettingTests.java @@ -644,6 +644,15 @@ public void testMinMaxInt() { assertEquals(1, integerSetting.get(Settings.EMPTY).intValue()); } + public void testDefaultFunctionInt() throws Exception { + Setting bar = Setting.intSetting("bar", 100); + Setting foo = Setting.intSetting("foo", bar::get, 0); + assertThat(foo.get(Settings.EMPTY), equalTo(100)); + assertThat(foo.get(Settings.builder().put("foo", 1).build()), equalTo(1)); + assertThat(foo.get(Settings.builder().put("bar", 2).build()), equalTo(2)); + assertThat(foo.get(Settings.builder().put("foo",10).put("bar", 20).build()), equalTo(10)); + } + /** * Only one single scope can be added to any setting */ diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 61bc09fb0f1bb..83410802a012d 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -57,6 +57,7 @@ import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.RandomAllocationDeciderTests; @@ -125,7 +126,8 @@ public ClusterStateChanges(NamedXContentRegistry xContentRegistry, ThreadPool th new RandomAllocationDeciderTests.RandomAllocationDecider(getRandom())))), new TestGatewayAllocator(), new BalancedShardsAllocator(settings), EmptyClusterInfoService.INSTANCE); - shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger); + shardFailedClusterStateTaskExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, + AllocationRetryBackoffPolicy.noBackOffPolicy(), logger); shardStartedClusterStateTaskExecutor = new ShardStateAction.ShardStartedClusterStateTaskExecutor(allocationService, logger); ActionFilters actionFilters = new ActionFilters(Collections.emptySet()); IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver(settings); diff --git a/docs/reference/cluster/reroute.asciidoc b/docs/reference/cluster/reroute.asciidoc index aba51f4893f24..f3163c0b72a8b 100644 --- a/docs/reference/cluster/reroute.asciidoc +++ b/docs/reference/cluster/reroute.asciidoc @@ -112,7 +112,7 @@ are available: === Retry failed shards The cluster will attempt to allocate a shard a maximum of -`index.allocation.max_retries` times in a row (defaults to `5`), before giving +`index.allocation.max_retries` (defaults to `1000`) times with some exponential backoff delays between retries, before giving up and leaving the shard unallocated. This scenario can be caused by structural problems such as having an analyzer which refers to a stopwords file which doesn't exist on all nodes.