Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy retention leases when trim unsafe commits #37995

Merged
merged 7 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1521,7 +1521,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
if (existingCommits.isEmpty()) {
throw new IllegalArgumentException("No index found to trim");
}
final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit startingIndexCommit;
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
Expand All @@ -1546,7 +1547,14 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
+ startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid ["
+ translogUUID + "]");
}
if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) {
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
/*
* Unlike other commit tags, the retention-leases tag is not restored when an engine is
* recovered from translog. We need to manually copy it from the last commit to the safe commit;
* otherwise we might lose the latest committed retention leases when re-opening an engine.
*/
final Map<String, String> userData = new HashMap<>(startingIndexCommit.getUserData());
userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, ""));
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
Expand All @@ -1557,7 +1565,7 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long

// The new commit will use segment files from the starting commit but userData from the last commit by default.
// Thus, we need to manually set the userData from the starting commit to the new commit.
writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet());
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,42 @@
package org.elasticsearch.index.shard;

import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
Expand Down Expand Up @@ -264,6 +274,76 @@ public void testRetentionLeaseStats() throws IOException {
}
}

public void testRecoverFromStoreReserveRetentionLeases() throws Exception {
final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean();
final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(),
config -> new InternalEngine(config) {
@Override
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
long recoverUpToSeqNo) throws IOException {
if (throwDuringRecoverFromTranslog.get()) {
throw new RuntimeException("crashed before recover from translog is completed");
}
return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
}
});
final List<RetentionLease> leases = new ArrayList<>();
long version = randomLongBetween(0, 100);
long primaryTerm = randomLongBetween(1, 100);
final int iterations = randomIntBetween(1, 10);
for (int i = 0; i < iterations; i++) {
if (randomBoolean()) {
indexDoc(shard, "_doc", Integer.toString(i));
} else {
leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(),
randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test"));
}
if (randomBoolean()) {
if (randomBoolean()) {
version += randomLongBetween(1, 100);
primaryTerm += randomLongBetween(0, 100);
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
}
}
if (randomBoolean()) {
shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test");
flushShard(shard);
}
}
version += randomLongBetween(1, 100);
primaryTerm += randomLongBetween(0, 100);
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
closeShard(shard, false);

final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
throwDuringRecoverFromTranslog.set(true);
expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore);
closeShards(failedShard);

final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
throwDuringRecoverFromTranslog.set(false);
assertTrue(newShard.recoverFromStore());
final RetentionLeases retentionLeases = newShard.getRetentionLeases();
assertThat(retentionLeases.version(), equalTo(version));
assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
if (leases.isEmpty()) {
assertThat(retentionLeases.leases(), empty());
} else {
assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0])));
}
closeShards(newShard);
}

private void assertRetentionLeases(
final IndexShard indexShard,
final int size,
Expand Down