Skip to content

Commit

Permalink
Ensure no snapshotted commit when close engine (#38663)
Browse files Browse the repository at this point in the history
With this change, we can automatically detect an implementation 
that acquires an index commit but fails to release.
  • Loading branch information
dnhatn authored Feb 12, 2019
1 parent 96f68c8 commit 9b83da9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2509,6 +2509,7 @@ public void testConcurrentWritesAndCommits() throws Exception {
prevLocalCheckpoint = localCheckpoint;
prevMaxSeqNo = maxSeqNo;
}
IOUtils.close(commits);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {

// check retention leases have been committed on the primary
final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
primary.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));

// check current retention leases have been synced to all replicas
Expand All @@ -121,7 +121,7 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception {

// check retention leases have been committed on the replica
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -157,6 +158,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -1195,6 +1197,7 @@ public void beforeIndexDeletion() throws Exception {
//check that shards that have same sync id also contain same number of documents
assertSameSyncIdSameDocs();
assertOpenTranslogReferences();
assertNoSnapshottedIndexCommit();
}

private void assertSameSyncIdSameDocs() {
Expand Down Expand Up @@ -1265,6 +1268,28 @@ private void assertOpenTranslogReferences() throws Exception {
});
}

private void assertNoSnapshottedIndexCommit() throws Exception {
assertBusy(() -> {
final Collection<NodeAndClient> nodesAndClients = nodes.values();
for (NodeAndClient nodeAndClient : nodesAndClients) {
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
try {
Engine engine = IndexShardTestCase.getEngine(indexShard);
if (engine instanceof InternalEngine) {
assertFalse(indexShard.routingEntry().toString() + " has unreleased snapshotted index commits",
EngineTestCase.hasSnapshottedCommits(engine));
}
} catch (AlreadyClosedException ignored) {

}
}
}
}
});
}

/**
* Asserts that the document history in Lucene index is consistent with Translog's on every index shard of the cluster.
* This assertion might be expensive, thus we prefer not to execute on every test but only interesting tests.
Expand Down

0 comments on commit 9b83da9

Please sign in to comment.