Skip to content

Commit

Permalink
Handle delete document level failures (#46100)
Browse files Browse the repository at this point in the history
Today we assume that document failures can not occur for deletes. This
assumption is bogus, as they can fail for a variety of reasons such as
the Lucene index having reached the document limit. Because of this
assumption, we were asserting that such a document-level failure would
never happen. When this bogus assertion is violated, we fail the node, a
catastrophe. Instead, we need to treat this as a fatal engine exception.
  • Loading branch information
jasontedor committed Aug 29, 2019
1 parent 43a22da commit 7df8dcf
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -1464,14 +1465,21 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws
}
return new DeleteResult(
plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
// there is no tragic event and such it must be a document level failure
return new DeleteResult(
ex, plan.versionOfDeletion, delete.primaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false);
} else {
throw ex;
} catch (final Exception ex) {
/*
* Document level failures when deleting are unexpected, we likely hit something fatal such as the Lucene index being corrupt,
* or the Lucene document limit. We have already issued a sequence number here so this is fatal, fail the engine.
*/
if (ex instanceof AlreadyClosedException == false && indexWriter.getTragicException() == null) {
final String reason = String.format(
Locale.ROOT,
"delete id[%s] origin [%s] seq#[%d] failed at the document level",
delete.id(),
delete.origin(),
delete.seqNo());
failEngine(reason, ex);
}
throw ex;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -3318,8 +3319,8 @@ public void testHandleDocumentFailure() throws Exception {
AtomicReference<ThrowingIndexWriter> throwingIndexWriter = new AtomicReference<>();
try (InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
(directory, iwc) -> {
throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc));
return throwingIndexWriter.get();
throwingIndexWriter.set(new ThrowingIndexWriter(directory, iwc));
return throwingIndexWriter.get();
})
) {
// test document failure while indexing
Expand All @@ -3343,22 +3344,6 @@ public void testHandleDocumentFailure() throws Exception {
assertNotNull(indexResult.getTranslogLocation());
engine.index(indexForDoc(doc2));

// test failure while deleting
// all these simulated exceptions are not fatal to the IW so we treat them as document failures
final Engine.DeleteResult deleteResult;
if (randomBoolean()) {
throwingIndexWriter.get().setThrowFailure(() -> new IOException("simulated"));
deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get()));
assertThat(deleteResult.getFailure(), instanceOf(IOException.class));
} else {
throwingIndexWriter.get().setThrowFailure(() -> new IllegalArgumentException("simulated max token length"));
deleteResult = engine.delete(new Engine.Delete("test", "1", newUid(doc1), primaryTerm.get()));
assertThat(deleteResult.getFailure(),
instanceOf(IllegalArgumentException.class));
}
assertThat(deleteResult.getVersion(), equalTo(2L));
assertThat(deleteResult.getSeqNo(), equalTo(3L));

// test non document level failure is thrown
if (randomBoolean()) {
// simulate close by corruption
Expand Down Expand Up @@ -5815,4 +5800,48 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
}
}

public void testDeleteFailureSoftDeletesEnabledDocAlreadyDeleted() throws IOException {
runTestDeleteFailure(true, InternalEngine::delete);
}

public void testDeleteFailureSoftDeletesEnabled() throws IOException {
runTestDeleteFailure(true, (engine, op) -> {});
}

public void testDeleteFailureSoftDeletesDisabled() throws IOException {
runTestDeleteFailure(false, (engine, op) -> {});
}

private void runTestDeleteFailure(
final boolean softDeletesEnabled,
final CheckedBiConsumer<InternalEngine, Engine.Delete, IOException> consumer) throws IOException {
engine.close();
final Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), softDeletesEnabled).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
final AtomicReference<ThrowingIndexWriter> iw = new AtomicReference<>();
try (Store store = createStore();
InternalEngine engine = createEngine(
(dir, iwc) -> {
iw.set(new ThrowingIndexWriter(dir, iwc));
return iw.get();
},
null,
null,
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
engine.index(new Engine.Index(newUid("0"), primaryTerm.get(), InternalEngineTests.createParsedDoc("0", null)));
final Engine.Delete op = new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get());
consumer.accept(engine, op);
iw.get().setThrowFailure(() -> new IllegalArgumentException("fatal"));
final IllegalArgumentException e = expectThrows(IllegalArgumentException. class, () -> engine.delete(op));
assertThat(e.getMessage(), equalTo("fatal"));
assertTrue(engine.isClosed.get());
assertThat(engine.failedEngine.get(), not(nullValue()));
assertThat(engine.failedEngine.get(), instanceOf(IllegalArgumentException.class));
assertThat(engine.failedEngine.get().getMessage(), equalTo("fatal"));
}
}

}

0 comments on commit 7df8dcf

Please sign in to comment.