Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement follower rate limiting for file restore #37449

Merged
merged 9 commits into from
Jan 17, 2019
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util;

import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.util.concurrent.atomic.AtomicLong;

/**
* A rate limiter designed for multiple concurrent users.
*/
public class CombinedRateLimiter {

// TODO: This rate limiter has some concurrency issues between the two maybePause operations

private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final RateLimiter.SimpleRateLimiter rateLimiter;
private volatile boolean rateLimit;

public CombinedRateLimiter(ByteSizeValue maxBytesPerSec) {
rateLimit = maxBytesPerSec.getBytes() > 0;
rateLimiter = new RateLimiter.SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}

public long maybePause(int bytes) {
if (rateLimit) {
long bytesSincePause = bytesSinceLastPause.addAndGet(bytes);
if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytesSincePause);
return Math.max(rateLimiter.pause(bytesSincePause), 0);
}
}
return 0;
}

public void setMBPerSec(ByteSizeValue maxBytesPerSec) {
rateLimit = maxBytesPerSec.getBytes() > 0;
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
private final Settings settings;
private final CcrLicenseChecker ccrLicenseChecker;
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
private final SetOnce<CcrSettings> ccrSettings = new SetOnce<>();
private Client client;

/**
Expand Down Expand Up @@ -159,6 +160,8 @@ public Collection<Object> createComponents(

CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService();
this.restoreSourceService.set(restoreSourceService);
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
this.ccrSettings.set(ccrSettings);
return Arrays.asList(
ccrLicenseChecker,
restoreSourceService,
Expand Down Expand Up @@ -291,7 +294,8 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {

@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings);
Repository.Factory repositoryFactory =
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get());
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
*/
package org.elasticsearch.xpack.ccr;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CombinedRateLimiter;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -18,11 +23,6 @@
*/
public final class CcrSettings {

// prevent construction
private CcrSettings() {

}

/**
* Index setting for a following index.
*/
Expand All @@ -35,6 +35,14 @@ private CcrSettings() {
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);


/**
* Max bytes a node can recover per second.
*/
public static final Setting<ByteSizeValue> RECOVERY_MAX_BYTES_PER_SECOND =
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -44,7 +52,23 @@ static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
RECOVERY_MAX_BYTES_PER_SECOND,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
}

private final CombinedRateLimiter ccrRateLimiter;

public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
}

public CombinedRateLimiter getRateLimiter() {
return ccrRateLimiter;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CombinedRateLimiter;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineException;
Expand All @@ -49,6 +51,7 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
Expand All @@ -66,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.LongConsumer;

/**
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
Expand All @@ -79,12 +83,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST);

private final RepositoryMetaData metadata;
private final CcrSettings ccrSettings;
private final String remoteClusterAlias;
private final Client client;
private final CcrLicenseChecker ccrLicenseChecker;

public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) {
private final CounterMetric throttledTime = new CounterMetric();

public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
CcrSettings ccrSettings) {
this.metadata = metadata;
this.ccrSettings = ccrSettings;
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
this.ccrLicenseChecker = ccrLicenseChecker;
Expand Down Expand Up @@ -206,7 +215,7 @@ public long getSnapshotThrottleTimeInNanos() {

@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
return throttledTime.count();
}

@Override
Expand Down Expand Up @@ -257,7 +266,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
// response, we should be able to retry by creating a new session.
String name = metadata.name();
try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
restoreSession.restoreFiles();
} catch (Exception e) {
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
Expand Down Expand Up @@ -285,6 +294,15 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
}
}

private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
}

private static class RestoreSession extends FileRestoreContext implements Closeable {

private static final int BUFFER_SIZE = 1 << 16;
Expand All @@ -293,23 +311,19 @@ private static class RestoreSession extends FileRestoreContext implements Closea
private final String sessionUUID;
private final DiscoveryNode node;
private final Store.MetadataSnapshot sourceMetaData;
private final CombinedRateLimiter rateLimiter;
private final LongConsumer throttleListener;

RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) {
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
LongConsumer throttleListener) {
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.sourceMetaData = sourceMetaData;
}

static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData());
this.rateLimiter = rateLimiter;
this.throttleListener = throttleListener;
}

void restoreFiles() throws IOException {
Expand All @@ -324,7 +338,7 @@ void restoreFiles() throws IOException {

@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata());
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
}

@Override
Expand All @@ -341,14 +355,19 @@ private static class RestoreFileInputStream extends InputStream {
private final String sessionUUID;
private final DiscoveryNode node;
private final StoreFileMetaData fileToRecover;
private final CombinedRateLimiter rateLimiter;
private final LongConsumer throttleListener;

private long pos = 0;

private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) {
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
this.fileToRecover = fileToRecover;
this.rateLimiter = rateLimiter;
this.throttleListener = throttleListener;
}


Expand All @@ -365,6 +384,10 @@ public int read(byte[] bytes, int off, int len) throws IOException {
}

int bytesRequested = (int) Math.min(remainingBytes, len);

long nanosPaused = rateLimiter.maybePause(bytesRequested);
throttleListener.accept(nanosPaused);

String fileName = fileToRecover.name();
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
Expand All @@ -388,5 +411,6 @@ public int read(byte[] bytes, int off, int len) throws IOException {

return bytesReceived;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -38,6 +39,8 @@
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -235,6 +238,60 @@ public void testDocsAreRecovered() throws Exception {
thread.join();
}

public void testRateLimitingIsEmployed() throws Exception {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());

String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";

final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);

final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

List<CcrRepository> repositories = new ArrayList<>();

for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
Repository repository = repositoriesService.repository(leaderClusterRepoName);
repositories.add((CcrRepository) repository);
}

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}

leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();

Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
false, true, settingsBuilder.build(), new String[0],
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
future.actionGet();

assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));

settingsRequest = new ClusterUpdateSettingsRequest();
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}

public void testFollowerMappingIsUpdated() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
Expand Down