Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement follower rate limiting for file restore #37449

Merged
merged 9 commits into from
Jan 17, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

Expand Down Expand Up @@ -35,6 +37,14 @@ private CcrSettings() {
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);


/**
* Max bytes a follower node can recover per second.
*/
public static final Setting<ByteSizeValue> FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND =
Setting.byteSizeSetting("ccr.follower.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use the same setting for rate-limiting inbound as well as outbound CCR recovery trafic. This means removing the "follower" part from the setting name. Perhaps ccr.indices.recovery.max_bytes_per_sec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add a bullet point to the meta issue to document the newly introduced settings

Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -44,6 +54,7 @@ static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.ccr.repository;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
Expand All @@ -24,6 +25,7 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -66,6 +68,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.xpack.ccr.CcrSettings.FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND;

/**
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
Expand All @@ -83,13 +88,18 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private final Client client;
private final CcrLicenseChecker ccrLicenseChecker;

private final RateLimiter.SimpleRateLimiter rateLimiter;
private final CounterMetric throttledTime = new CounterMetric();
private final AtomicLong bytesSinceLastPause = new AtomicLong();

public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) {
super(settings);
this.metadata = metadata;
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
this.ccrLicenseChecker = ccrLicenseChecker;
this.client = client;
this.rateLimiter = new RateLimiter.SimpleRateLimiter(FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.get(settings).getMbFrac());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting is dynamic, so let's allow to dynamically adjust it by registering a settings update consumer (see the RecoverySettings how it's done). We can use the CCRSettings object to store the latest value, similar to the RecoverySettings object. You can create the CCSettings instance in the CCR class and do the registration with CLusterSettings in createComponents, which can be accessed from clusterService.getClusterSettings()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I registered a listener for the setting change.

}

@Override
Expand Down Expand Up @@ -207,7 +217,7 @@ public long getSnapshotThrottleTimeInNanos() {

@Override
public long getRestoreThrottleTimeInNanos() {
return 0;
return throttledTime.count();
}

@Override
Expand Down Expand Up @@ -258,7 +268,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
// response, we should be able to retry by creating a new session.
String name = metadata.name();
try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
restoreSession.restoreFiles();
} catch (Exception e) {
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
Expand Down Expand Up @@ -286,7 +296,16 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
}
}

private static class RestoreSession extends FileRestoreContext implements Closeable {
private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData());
}

private class RestoreSession extends FileRestoreContext implements Closeable {

private static final int BUFFER_SIZE = 1 << 16;

Expand All @@ -304,15 +323,6 @@ private static class RestoreSession extends FileRestoreContext implements Closea
this.sourceMetaData = sourceMetaData;
}

static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
RecoveryState recoveryState) {
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
response.getStoreFileMetaData());
}

void restoreFiles() throws IOException {
ArrayList<BlobStoreIndexShardSnapshot.FileInfo> fileInfos = new ArrayList<>();
for (StoreFileMetaData fileMetaData : sourceMetaData) {
Expand All @@ -336,7 +346,7 @@ public void close() {
}
}

private static class RestoreFileInputStream extends InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the static here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to access the rate limiter field. We also need to add a AtomicLong field for bytes written that needs to be shared with the input streams.

private class RestoreFileInputStream extends InputStream {

private final Client remoteClient;
private final String sessionUUID;
Expand Down Expand Up @@ -366,6 +376,9 @@ public int read(byte[] bytes, int off, int len) throws IOException {
}

int bytesRequested = (int) Math.min(remainingBytes, len);

maybePause(bytesRequested);

String fileName = fileToRecover.name();
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
Expand All @@ -389,5 +402,17 @@ public int read(byte[] bytes, int off, int len) throws IOException {

return bytesReceived;
}

private void maybePause(int bytesRequested) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make sure, this is the same code as in PeerRecoveryTargetService. Let's have a follow-up to refactor it into a class that is reusable, and use it everywhere (peer recovery source, target as well as snapshot + restore).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did have to extract a CombinedRateLimiter. Otherwise the different repositories for different remote clusters will not share the rate limiter. However, I added a meta issue to ensure that we use the same code base for this and peer recovery. So we can work out the issues in that follow-up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good.

long bytesSincePause = bytesSinceLastPause.addAndGet(bytesRequested);
if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytesSincePause);
long throttleTimeInNanos = rateLimiter.pause(bytesSincePause);
if (throttleTimeInNanos > 0) {
throttledTime.inc(throttleTimeInNanos);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public abstract class CcrIntegTestCase extends ESTestCase {

@Before
public final void startClusters() throws Exception {
startClusters(Settings.EMPTY);
}

private void startClusters(Settings additionalSettings) throws Exception {
if (clusterGroup != null && reuseClusters()) {
clusterGroup.leaderCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster());
clusterGroup.followerCluster.ensureAtMostNumDataNodes(numberOfNodesPerCluster());
Expand All @@ -122,8 +126,8 @@ public final void startClusters() throws Exception {
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin());

InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins,
Function.identity());
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null, additionalSettings), 0,
"leader", mockPlugins, Function.identity());
leaderCluster.beforeTest(random(), 0.0D);
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
assertBusy(() -> {
Expand All @@ -133,8 +137,8 @@ public final void startClusters() throws Exception {

String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, "follower",
mockPlugins, Function.identity());
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address, additionalSettings), 0,
"follower", mockPlugins, Function.identity());
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);

followerCluster.beforeTest(random(), 0.0D);
Expand All @@ -145,6 +149,11 @@ public final void startClusters() throws Exception {
});
}

protected void restartClustersWithSettings(Settings settings) throws Exception {
stopClusters();
startClusters(settings);
}

/**
* Follower indices don't get all the settings from leader, for example 'index.unassigned.node_left.delayed_timeout'
* is not replicated and if tests kill nodes, we have to wait 60s by default...
Expand Down Expand Up @@ -180,7 +189,7 @@ public void afterTest() throws Exception {
}
}

private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) {
private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress, Settings settings) {
Settings.Builder builder = Settings.builder();
builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
// Default the watermarks to absurdly low to prevent the tests
Expand All @@ -202,6 +211,7 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
builder.put(settings);
if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -38,6 +39,8 @@
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -235,6 +238,60 @@ public void testDocsAreRecovered() throws Exception {
thread.join();
}

public void testRateLimitingIsEmployed() throws Exception {
restartClustersWithSettings(Settings.builder().put(CcrSettings.FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.getKey(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by making the setting truly dynamic, we won't need the restarts here, which will greatly speed up the tests. See SharedClusterSnapshotRestoreIT#testThrottling for how it's tested for snapshot /restore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the same setup and assertion as that test.

new ByteSizeValue(500)).build());
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";

final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);

final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

List<CcrRepository> repositories = new ArrayList<>();
try {
for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
Repository repository = repositoriesService.repository(leaderClusterRepoName);
repositories.add((CcrRepository) repository);
}
} catch (RepositoryMissingException e) {
fail("need repository");
}

final int firstBatchNumDocs = 10;
logger.info("Indexing [{}] docs as first batch", firstBatchNumDocs);
for (int i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
}

leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();

try {
Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
false, true, settingsBuilder.build(), new String[0],
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));

assertBusy(() -> assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0)));
} finally {
restartClustersWithSettings(Settings.EMPTY);
}
}

public void testFollowerMappingIsUpdated() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
Expand Down