Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement follower rate limiting for file restore #37449

Merged
merged 9 commits into from
Jan 17, 2019

Conversation

Tim-Brooks
Copy link
Contributor

This is related to #35975. This commit implements rate limiting on the
follower side using the RateLimitingInputStream.

This is related to elastic#35975. This commit implements rate limiting on the
follower side using the `RateLimitingInputStream`.
@Tim-Brooks Tim-Brooks added >non-issue v7.0.0 :Distributed Indexing/CCR Issues around the Cross Cluster State Replication features v6.7.0 labels Jan 15, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed

@Tim-Brooks
Copy link
Contributor Author

This implements rate limiting on the follower side. However, it is mostly a POC for us to talk about because I'm not sure this is what we want. I implemented it in the same way we implement it in BlobStoreRepository. Essentially there is a global SimpleRateLimiter which uses the nanos since last pause and a parameter arg bytes since last pause to calculate a pause. However, the bytes written since last pause are local to the input stream. Which means if there are concurrent restores happening one input stream will pause impacting when the other input streams pause (because the rate limiter will think that there just was a pause).

We could:

  1. Make rate limiting local to a recovery. This means that each input stream would have a rate limiter. But I assume we want to consider the total bytes being read when limiting reads? I assume we definitely want to do this on writes on the leader side.

  2. Make the rate limiting concurrent. We kind of have an example of this in the normal recovery process. But I'm not sure it is actually working how we intend it to?

On the read side:

        @Override
        public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
            try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
                final RecoveryTarget recoveryTarget = recoveryRef.target();
                final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
                if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
                    indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
                }

                RateLimiter rateLimiter = recoverySettings.rateLimiter();
                if (rateLimiter != null) {
                    long bytes = bytesSinceLastPause.addAndGet(request.content().length());
                    if (bytes > rateLimiter.getMinPauseCheckBytes()) {
                        // Time to pause
                        bytesSinceLastPause.addAndGet(-bytes);
                        long throttleTimeInNanos = rateLimiter.pause(bytes);
                        indexState.addTargetThrottling(throttleTimeInNanos);
                        recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos);
                    }
                }
                final ActionListener<TransportResponse> listener =
                    new HandledTransportAction.ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
                recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(),
                    request.totalTranslogOps(),
                    ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
            }
        }

In particular:

long bytes = bytesSinceLastPause.addAndGet(request.content().length());
...
bytesSinceLastPause.addAndGet(-bytes);

Each individual operation is atomic, but not in combination. If two messages are being received at the same time bytesSinceLastPause can go negative.

So I'm not sure if I should be following that strategy? We do the same thing on writing the normal recovery chunks. So maybe that is what we want? It just seems to me that when concurrent restores are happening, this allows more bytes to be sent than we intended.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the same approach, where the rate limiter works in a concurrent setting, to be used for peer recovery, recovery from remote as well as snapshot restore. If there are problems with it (like you pointed out), we can fix it (for all of peer recovery / remote recovery / snapshot restore) in a follow-up. We will also need a setting here to make the rate limiter configurable for CCR.

@@ -336,7 +342,7 @@ public void close() {
}
}

private static class RestoreFileInputStream extends InputStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the static here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to access the rate limiter field. We also need to add a AtomicLong field for bytes written that needs to be shared with the input streams.

@Tim-Brooks
Copy link
Contributor Author

Let's use the same approach, where the rate limiter works in a concurrent setting, to be used for peer recovery, recovery from remote as well as snapshot restore.

I replicated this logic in the repository. It might be possible to wrap rate limiter and atomic long in a ConcurrentRateLimiter? I could use it in the blobstore repository if you want? Although, that would change the behavior of the blobstore repository.

Also I added a test. Although this is inherently kind of hard to test. The test starts a cluster with a low rate limit, indexes some documents, and then checks that rate limiting is applied. As the test starts and stops multiple clusters, it take about 10 seconds to run. And it also depends on "time" (if writes were very delayed for some reason the rate limiting might not kick in). Thoughts on the best testing approach?

@Tim-Brooks Tim-Brooks requested a review from ywelsch January 15, 2019 19:45
* Max bytes a follower node can recover per second.
*/
public static final Setting<ByteSizeValue> FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND =
Setting.byteSizeSetting("ccr.follower.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use the same setting for rate-limiting inbound as well as outbound CCR recovery trafic. This means removing the "follower" part from the setting name. Perhaps ccr.indices.recovery.max_bytes_per_sec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add a bullet point to the meta issue to document the newly introduced settings

public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) {
super(settings);
this.metadata = metadata;
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
this.ccrLicenseChecker = ccrLicenseChecker;
this.client = client;
this.rateLimiter = new RateLimiter.SimpleRateLimiter(FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.get(settings).getMbFrac());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting is dynamic, so let's allow to dynamically adjust it by registering a settings update consumer (see the RecoverySettings how it's done). We can use the CCRSettings object to store the latest value, similar to the RecoverySettings object. You can create the CCSettings instance in the CCR class and do the registration with CLusterSettings in createComponents, which can be accessed from clusterService.getClusterSettings()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I registered a listener for the setting change.

@@ -235,6 +238,60 @@ public void testDocsAreRecovered() throws Exception {
thread.join();
}

public void testRateLimitingIsEmployed() throws Exception {
restartClustersWithSettings(Settings.builder().put(CcrSettings.FOLLOWER_RECOVERY_MAX_BYTES_READ_PER_SECOND.getKey(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by making the setting truly dynamic, we won't need the restarts here, which will greatly speed up the tests. See SharedClusterSnapshotRestoreIT#testThrottling for how it's tested for snapshot /restore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the same setup and assertion as that test.

@@ -389,5 +402,17 @@ public int read(byte[] bytes, int off, int len) throws IOException {

return bytesReceived;
}

private void maybePause(int bytesRequested) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make sure, this is the same code as in PeerRecoveryTargetService. Let's have a follow-up to refactor it into a class that is reusable, and use it everywhere (peer recovery source, target as well as snapshot + restore).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did have to extract a CombinedRateLimiter. Otherwise the different repositories for different remote clusters will not share the rate limiter. However, I added a meta issue to ensure that we use the same code base for this and peer recovery. So we can work out the issues in that follow-up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good.

@ywelsch
Copy link
Contributor

ywelsch commented Jan 15, 2019

I replicated this logic in the repository. It might be possible to wrap rate limiter and atomic long in a ConcurrentRateLimiter? I could use it in the blobstore repository if you want? Although, that would change the behavior of the blobstore repository.

let's do this in a follow-up. Also we should do a little more testing at that point, to make sure it's behaving the correct way under concurrency. I'm ok to change the behavior of blobstore repository, if it fixes broken concurrent behavior.

As the test starts and stops multiple clusters, it take about 10 seconds to run.

should be fixed by making this setting dynamically updateable, which will not require cluster restarts

@Tim-Brooks
Copy link
Contributor Author

let's do this in a follow-up. Also we should do a little more testing at that point, to make sure it's behaving the correct way under concurrency.

I added a meta task to combine the different code usages.

should be fixed by making this setting dynamically updateable, which will not require cluster restarts

Done.

@Tim-Brooks Tim-Brooks requested a review from ywelsch January 16, 2019 02:12
Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left some minor comments

if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytesSincePause);
return rateLimiter.pause(bytesSincePause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have this always return a non-negative number?

@@ -389,5 +402,17 @@ public int read(byte[] bytes, int off, int len) throws IOException {

return bytesReceived;
}

private void maybePause(int bytesRequested) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good.

@Tim-Brooks Tim-Brooks requested a review from ywelsch January 17, 2019 04:33
@Tim-Brooks
Copy link
Contributor Author

Made your changes @ywelsch

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Tim-Brooks
Copy link
Contributor Author

Jenkins run Gradle build tests 1
Jenkins run Gradle build tests 2

@Tim-Brooks Tim-Brooks merged commit b6f06a4 into elastic:master Jan 17, 2019
Tim-Brooks added a commit to Tim-Brooks/elasticsearch that referenced this pull request Jan 17, 2019
Commit elastic#37535 removed an internal restore request in favor of the
RestoreSnapshotRequest. Commit elastic#37449 added a new test that used the
internal restore request. This commit modifies the new test to use the
RestoreSnapshotRequest.
Tim-Brooks added a commit that referenced this pull request Jan 21, 2019
This is related to #35975. This commit implements rate limiting on the
follower side using a new class `CombinedRateLimiter`.
@Tim-Brooks Tim-Brooks deleted the follower_rate_limiting branch December 18, 2019 14:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/CCR Issues around the Cross Cluster State Replication features >non-issue v6.7.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants