Skip to content

Commit

Permalink
alloc: delay between retries for failed allocation
Browse files Browse the repository at this point in the history
Previously, a failed allocation was retried in a tight loop that filled
up log files and caused the cluster be unstable. We solved this problem
by limiting the number of retries. However, this solution requires
manual intervention when the environment is adjusted. This PR aims to
reduce user intervention by increasing the number of retries and adding
some exponential backoff delays between retries.

Closes elastic#24530
  • Loading branch information
dnhatn committed Oct 23, 2017
1 parent d0104c2 commit 5e42f10
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.StaleShard;
Expand Down Expand Up @@ -88,8 +91,13 @@ public ShardStateAction(Settings settings, ClusterService clusterService, Transp
this.clusterService = clusterService;
this.threadPool = threadPool;

transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME,
new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
AllocationRetryBackoffPolicy backoffPolicy = AllocationRetryBackoffPolicy.policyForSettings(settings);
ShardFailedClusterStateTaskExecutor shardFailedExecutor =
new ShardFailedClusterStateTaskExecutor(allocationService, routingService, backoffPolicy, logger);
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardEntry::new, ThreadPool.Names.SAME,
new ShardFailedTransportHandler(clusterService, shardFailedExecutor, logger));
}

private void sendShardAction(final String actionName, final ClusterState currentState, final ShardEntry shardEntry, final Listener listener) {
Expand Down Expand Up @@ -251,11 +259,14 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardEntry> {
private final AllocationService allocationService;
private final RoutingService routingService;
private final AllocationRetryBackoffPolicy backoffPolicy;
private final Logger logger;

public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, Logger logger) {
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService,
AllocationRetryBackoffPolicy backoffPolicy, Logger logger) {
this.allocationService = allocationService;
this.routingService = routingService;
this.backoffPolicy = backoffPolicy;
this.logger = logger;
}

Expand Down Expand Up @@ -341,14 +352,15 @@ ClusterState applyFailedShards(ClusterState currentState, List<FailedShard> fail

@Override
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
int numberOfUnassignedShards = clusterChangedEvent.state().getRoutingNodes().unassigned().size();
if (numberOfUnassignedShards > 0) {
String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
if (logger.isTraceEnabled()) {
logger.trace("{}, scheduling a reroute", reason);
}
routingService.reroute(reason);
}
List<ShardRouting> unassigned = clusterChangedEvent.state().getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
unassigned.stream()
.mapToInt(s -> s.unassignedInfo().getNumFailedAllocations())
.min()
.ifPresent(numberOfFailures -> {
String reason = String.format(Locale.ROOT, "Schedule rerouting after [%s] failures", numberOfFailures);
TimeValue delay = backoffPolicy.delayInterval(numberOfFailures);
routingService.scheduleReroute(reason, delay);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* A {@link RoutingService} listens to clusters state. When this service
Expand All @@ -51,14 +57,17 @@ public class RoutingService extends AbstractLifecycleComponent {

private final ClusterService clusterService;
private final AllocationService allocationService;
private final ThreadPool threadPool;

private AtomicBoolean rerouting = new AtomicBoolean();
private final AtomicBoolean rerouting = new AtomicBoolean();
private final AtomicReference<ScheduledFuture> pendingTask = new AtomicReference<>();

@Inject
public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService, ThreadPool threadPool) {
super(settings);
this.clusterService = clusterService;
this.allocationService = allocationService;
this.threadPool = threadPool;
}

@Override
Expand All @@ -74,8 +83,39 @@ protected void doClose() {
}

/**
* Initiates a reroute.
* Schedules a one-shot reroute action after the given delay.
* This schedule may be skipped if there is an ongoing rerouting.
*/
public void scheduleReroute(String reason, TimeValue delay) {
if (logger.isTraceEnabled()){
logger.trace("Schedule reroute in [{}], reason [{}]", delay, reason);
}

ScheduledFuture newTask = null;
while (true) {
final ScheduledFuture existingTask = pendingTask.get();
final long existingDelayMS = (existingTask == null) ? Long.MAX_VALUE : existingTask.getDelay(TimeUnit.MILLISECONDS);

if (newTask == null && existingDelayMS > delay.millis()) {
newTask = threadPool.schedule(delay, ThreadPool.Names.SAME, () -> performReroute(reason));
}
if (newTask == null) {
return;
}
if (existingDelayMS > newTask.getDelay(TimeUnit.MILLISECONDS)) {
if (pendingTask.compareAndSet(existingTask, newTask) == true) {
if (existingTask != null) {
FutureUtils.cancel(existingTask);
}
return;
}
} else {
FutureUtils.cancel(newTask);
return;
}
}
}

public final void reroute(String reason) {
performReroute(reason);
}
Expand Down Expand Up @@ -119,6 +159,8 @@ public void onFailure(String source, Exception e) {
rerouting.set(false);
ClusterState state = clusterService.state();
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
} finally {
pendingTask.set(null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Random;
import java.util.function.Function;

/**
*
* A policy controls when to retry allocating if shard allocation has failed.
*/
public abstract class AllocationRetryBackoffPolicy {
public enum PolicyType {
NO_BACKOFF(5) {
@Override
AllocationRetryBackoffPolicy policyForSettings(Settings settings) {
return noBackOffPolicy();
}
},
EXPONENTIAL_BACKOFF(1000) {
@Override
AllocationRetryBackoffPolicy policyForSettings(Settings settings) {
return exponentialBackoffPolicy(settings);
}
};
private final int defaultMaxRetries;

abstract AllocationRetryBackoffPolicy policyForSettings(Settings settings);

PolicyType(int defaultMaxRetries) {
this.defaultMaxRetries = defaultMaxRetries;
}

public int getDefaultMaxRetries() {
return defaultMaxRetries;
}

public static PolicyType fromString(String policyName) {
if ("exponential_backoff".equals(policyName)) {
return EXPONENTIAL_BACKOFF;
} else if ("no_backoff".equals(policyName)) {
return NO_BACKOFF;
}
throw new IllegalStateException("No backoff policy name match for [" + policyName + "]");
}
}

public static final Setting<PolicyType> SETTING_ALLOCATION_RETRY_POLICY =
new Setting<>("cluster.allocation.retry.policy", "exponential_backoff", PolicyType::fromString, Setting.Property.NodeScope);

public static final Function<Settings, Integer> SETTING_ALLOCATION_DEFAULT_MAX_RETRIES =
settings -> SETTING_ALLOCATION_RETRY_POLICY.get(settings).defaultMaxRetries;

public static final Setting<TimeValue> SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_BASE_DELAY =
Setting.positiveTimeSetting("cluster.allocation.retry.exponential_backoff.base_delay",
TimeValue.timeValueMillis(50), Setting.Property.NodeScope);

public static final Setting<TimeValue> SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_MAX_DELAY =
Setting.positiveTimeSetting("cluster.allocation.retry.exponential_backoff.max_delay",
TimeValue.timeValueMinutes(30), Setting.Property.NodeScope);

/**
* Determines a delay interval after a shard allocation has failed numOfFailures times.
* This method may produce different value for each call.
*/
public abstract TimeValue delayInterval(int numOfFailures);

/**
* Constructs the allocation retry policy for the given settings.
*/
public static AllocationRetryBackoffPolicy policyForSettings(Settings settings) {
return SETTING_ALLOCATION_RETRY_POLICY.get(settings).policyForSettings(settings);
}

public static AllocationRetryBackoffPolicy exponentialBackoffPolicy(Settings settings) {
return new ExponentialBackOffPolicy(settings);
}

public static AllocationRetryBackoffPolicy noBackOffPolicy() {
return new NoBackoffPolicy();
}

static class ExponentialBackOffPolicy extends AllocationRetryBackoffPolicy {
private final Random random;
private final long delayUnitMS;
private final long maxDelayMS;

ExponentialBackOffPolicy(Settings settings) {
this.random = new Random(Randomness.get().nextInt());
this.delayUnitMS = SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_BASE_DELAY.get(settings).millis();
this.maxDelayMS = SETTING_ALLOCATION_RETRY_EXPONENTIAL_BACKOFF_MAX_DELAY.get(settings).millis();
}

@Override
public TimeValue delayInterval(int numOfFailures) {
assert numOfFailures >= 0;
int bound = numOfFailures > 30 ? Integer.MAX_VALUE : 1 << numOfFailures;
return TimeValue.timeValueMillis(Math.min(maxDelayMS, delayUnitMS * random.nextInt(bound)));
}
}

static class NoBackoffPolicy extends AllocationRetryBackoffPolicy {
@Override
public TimeValue delayInterval(int numOfFailures) {
return TimeValue.ZERO;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationRetryBackoffPolicy;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -37,8 +38,8 @@
*/
public class MaxRetryAllocationDecider extends AllocationDecider {

public static final Setting<Integer> SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries", 5, 0,
Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<Integer> SETTING_ALLOCATION_MAX_RETRY = Setting.intSetting("index.allocation.max_retries",
AllocationRetryBackoffPolicy.SETTING_ALLOCATION_DEFAULT_MAX_RETRIES, 0, Setting.Property.Dynamic, Setting.Property.IndexScope);

public static final String NAME = "max_retry";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,10 @@ public static Setting<Integer> intSetting(String key, int defaultValue, int minV
return new Setting<>(key, (s) -> Integer.toString(defaultValue), (s) -> parseInt(s, minValue, key), properties);
}

public static Setting<Integer> intSetting(String key, Function<Settings, Integer> defaultValue, int minValue, Property... properties) {
return new Setting<>(key, defaultValue.andThen(n -> Integer.toString(n)), (s) -> parseInt(s, minValue, key), properties);
}

public static Setting<Integer> intSetting(String key, Setting<Integer> fallbackSetting, int minValue, Property... properties) {
return new Setting<>(key, fallbackSetting, (s) -> parseInt(s, minValue, key), properties);
}
Expand Down
Loading

0 comments on commit 5e42f10

Please sign in to comment.