diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java index edbe042d55114..bdb794cbf0a76 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java @@ -94,7 +94,7 @@ final class PerThreadIDVersionAndSeqNoLookup { * using the same cache key. Otherwise we'd have to disable caching * entirely for these readers. */ - public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context) + public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context) throws IOException { assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) : "context's reader is not the same as the reader class was initialized on."; @@ -108,7 +108,28 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context) if (versions.advanceExact(docID) == false) { throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field"); } - return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase); + final long seqNo; + final long term; + if (loadSeqNo) { + NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); + // remove the null check in 7.0 once we can't read indices with no seq# + if (seqNos != null && seqNos.advanceExact(docID)) { + seqNo = seqNos.longValue(); + } else { + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + } + NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (terms != null && terms.advanceExact(docID)) { + term = terms.longValue(); + } else { + term = 0; + } + + } else { + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + term = 0; + } + return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase); } else { return null; } @@ -150,6 +171,7 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME); for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) { final long seqNo; + // remove the null check in 7.0 once we can't read indices with no seq# if (seqNoDV != null && seqNoDV.advanceExact(docID)) { seqNo = seqNoDV.longValue(); } else { diff --git a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java index 7ebcb8998441d..611887342adca 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java @@ -31,8 +31,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentMap; -import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND; - /** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */ public final class VersionsAndSeqNoResolver { @@ -96,12 +94,16 @@ private VersionsAndSeqNoResolver() { public static class DocIdAndVersion { public final int docId; public final long version; + public final long seqNo; + public final long primaryTerm; public final LeafReader reader; public final int docBase; - public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) { + public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) { this.docId = docId; this.version = version; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.reader = reader; this.docBase = docBase; } @@ -129,7 +131,7 @@ public static class DocIdAndSeqNo { *
  • a doc ID and a version otherwise * */ - public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException { + public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException { PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field()); List leaves = reader.leaves(); // iterate backwards to optimize for the frequently updated documents @@ -137,7 +139,7 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) for (int i = leaves.size() - 1; i >= 0; i--) { final LeafReaderContext leaf = leaves.get(i); PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord]; - DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf); + DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf); if (result != null) { return result; } @@ -175,15 +177,4 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr } return latest; } - - /** - * Load the version for the uid from the reader, returning - */ - public static long loadVersion(IndexReader reader, Term term) throws IOException { - final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term); - return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version; - } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 0b1657132922c..2ae5ba36552f5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -606,7 +606,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative"; + assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 : + "ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset"; + assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) : + "cas operations are only allowed if origin is primary. get [" + origin + "]"; this.doc = doc; this.isRetry = isRetry; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; + this.ifSeqNoMatch = ifSeqNoMatch; + this.ifPrimaryTermMatch = ifPrimaryTermMatch; } public Index(Term uid, long primaryTerm, ParsedDocument doc) { @@ -1359,7 +1368,7 @@ public Index(Term uid, long primaryTerm, ParsedDocument doc) { Index(Term uid, long primaryTerm, ParsedDocument doc, long version) { this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL, - Origin.PRIMARY, System.nanoTime(), -1, false); + Origin.PRIMARY, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } // TEST ONLY public ParsedDocument parsedDoc() { @@ -1419,28 +1428,44 @@ public boolean isRetry() { return isRetry; } + public long getIfSeqNoMatch() { + return ifSeqNoMatch; + } + + public long getIfPrimaryTermMatch() { + return ifPrimaryTermMatch; + } } public static class Delete extends Operation { private final String type; private final String id; + private final long ifSeqNoMatch; + private final long ifPrimaryTermMatch; public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, - Origin origin, long startTime) { + Origin origin, long startTime, long ifSeqNoMatch, long ifPrimaryTermMatch) { super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative"; + assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 : + "ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset"; + assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) : + "cas operations are only allowed if origin is primary. get [" + origin + "]"; this.type = Objects.requireNonNull(type); this.id = Objects.requireNonNull(id); + this.ifSeqNoMatch = ifSeqNoMatch; + this.ifPrimaryTermMatch = ifPrimaryTermMatch; } public Delete(String type, String id, Term uid, long primaryTerm) { this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL, - Origin.PRIMARY, System.nanoTime()); + Origin.PRIMARY, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } public Delete(Delete template, VersionType versionType) { this(template.type(), template.id(), template.uid(), template.seqNo(), template.primaryTerm(), template.version(), - versionType, template.origin(), template.startTime()); + versionType, template.origin(), template.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } @Override @@ -1463,6 +1488,13 @@ public int estimatedSizeInBytes() { return (uid().field().length() + uid().text().length()) * 2 + 20; } + public long getIfSeqNoMatch() { + return ifSeqNoMatch; + } + + public long getIfPrimaryTermMatch() { + return ifPrimaryTermMatch; + } } public static class NoOp extends Operation { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1c0d8fefc4091..685cb8b017b7b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -640,11 +640,12 @@ public GetResult get(Get get, BiFunction search Translog.Operation operation = translog.readOperation(versionValue.getLocation()); if (operation != null) { // in the case of a already pruned translog generation we might get null here - yet very unlikely - TranslogLeafReader reader = new TranslogLeafReader((Translog.Index) operation, engineConfig + final Translog.Index index = (Translog.Index) operation; + TranslogLeafReader reader = new TranslogLeafReader(index, engineConfig .getIndexSettings().getIndexVersionCreated()); return new GetResult(new Searcher("realtime_get", new IndexSearcher(reader), reader::close), - new VersionsAndSeqNoResolver.DocIdAndVersion(0, - ((Translog.Index) operation).version(), reader, 0)); + new VersionsAndSeqNoResolver.DocIdAndVersion(0, index.version(), index.seqNo(), index.primaryTerm(), + reader, 0)); } } catch (IOException e) { maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event @@ -721,14 +722,17 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) } /** resolves the current version of the document, returning null if not found */ - private VersionValue resolveDocVersion(final Operation op) throws IOException { + private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) throws IOException { assert incrementVersionLookup(); // used for asserting in tests VersionValue versionValue = getVersionFromMap(op.uid().bytes()); if (versionValue == null) { assert incrementIndexVersionLookup(); // used for asserting in tests - final long currentVersion = loadCurrentVersionFromIndex(op.uid()); - if (currentVersion != Versions.NOT_FOUND) { - versionValue = new IndexVersionValue(null, currentVersion, SequenceNumbers.UNASSIGNED_SEQ_NO, 0L); + final VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion; + try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { + docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), op.uid(), loadSeqNo); + } + if (docIdAndVersion != null) { + versionValue = new IndexVersionValue(null, docIdAndVersion.version, docIdAndVersion.seqNo, docIdAndVersion.primaryTerm); } } else if (engineConfig.isEnableGcDeletes() && versionValue.isDelete() && (engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue)versionValue).time) > getGcDeletesInMillis()) { @@ -741,7 +745,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation throws IOException { assert op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO : "op is resolved based on versions but have a seq#"; assert op.version() >= 0 : "versions should be non-negative. got " + op.version(); - final VersionValue versionValue = resolveDocVersion(op); + final VersionValue versionValue = resolveDocVersion(op, false); if (versionValue == null) { return OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND; } else { @@ -1027,7 +1031,8 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc } else { versionMap.enforceSafeAccess(); // resolves incoming version - final VersionValue versionValue = resolveDocVersion(index); + final VersionValue versionValue = + resolveDocVersion(index, index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO); final long currentVersion; final boolean currentNotFoundOrDeleted; if (versionValue == null) { @@ -1037,7 +1042,17 @@ protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOExc currentVersion = versionValue.version; currentNotFoundOrDeleted = versionValue.isDelete(); } - if (index.versionType().isVersionConflictForWrites( + if (index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) { + final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.type(), index.id(), + index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); + plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm()); + } else if (index.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && ( + versionValue.seqNo != index.getIfSeqNoMatch() || versionValue.term != index.getIfPrimaryTermMatch() + )) { + final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index.type(), index.id(), + index.getIfSeqNoMatch(), index.getIfPrimaryTermMatch(), versionValue.seqNo, versionValue.term); + plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion, getPrimaryTerm()); + } else if (index.versionType().isVersionConflictForWrites( currentVersion, index.version(), currentNotFoundOrDeleted)) { final VersionConflictEngineException e = new VersionConflictEngineException(shardId, index, currentVersion, currentNotFoundOrDeleted); @@ -1078,7 +1093,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); addDocs(index.docs(), indexWriter); } - return new IndexResult(plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); + return new IndexResult(plan.versionForIndexing, index.primaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } catch (Exception ex) { if (indexWriter.getTragicException() == null) { /* There is no tragic event recorded so this must be a document failure. @@ -1094,7 +1109,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) * we return a `MATCH_ANY` version to indicate no document was index. The value is * not used anyway */ - return new IndexResult(ex, Versions.MATCH_ANY, getPrimaryTerm(), plan.seqNoForIndexing); + return new IndexResult(ex, Versions.MATCH_ANY, index.primaryTerm(), plan.seqNoForIndexing); } else { throw ex; } @@ -1368,7 +1383,7 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; // resolve operation from external to internal - final VersionValue versionValue = resolveDocVersion(delete); + final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO); assert incrementVersionLookup(); final long currentVersion; final boolean currentlyDeleted; @@ -1380,7 +1395,17 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE currentlyDeleted = versionValue.isDelete(); } final DeletionStrategy plan; - if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { + if (delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && versionValue == null) { + final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.type(), delete.id(), + delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); + plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted); + } else if (delete.getIfSeqNoMatch() != SequenceNumbers.UNASSIGNED_SEQ_NO && ( + versionValue.seqNo != delete.getIfSeqNoMatch() || versionValue.term != delete.getIfPrimaryTermMatch() + )) { + final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete.type(), delete.id(), + delete.getIfSeqNoMatch(), delete.getIfPrimaryTermMatch(), versionValue.seqNo, versionValue.term); + plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted); + } else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, getPrimaryTerm(), currentlyDeleted); } else { @@ -1427,7 +1452,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws if (indexWriter.getTragicException() == null) { // there is no tragic event and such it must be a document level failure return new DeleteResult( - ex, plan.versionOfDeletion, getPrimaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); + ex, plan.versionOfDeletion, delete.primaryTerm(), plan.seqNoOfDeletion, plan.currentlyDeleted == false); } else { throw ex; } @@ -2147,13 +2172,6 @@ protected final ReferenceManager getReferenceManager(SearcherScop } } - private long loadCurrentVersionFromIndex(Term uid) throws IOException { - assert incrementIndexVersionLookup(); - try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { - return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid); - } - } - private IndexWriter createWriter() throws IOException { try { final IndexWriterConfig iwc = getIndexWriterConfig(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java b/server/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java index f829e35af8912..357c9c107836e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java +++ b/server/src/main/java/org/elasticsearch/index/engine/VersionConflictEngineException.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.engine; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -30,6 +31,16 @@ public VersionConflictEngineException(ShardId shardId, Engine.Operation op, long this(shardId, op.type(), op.id(), op.versionType().explainConflictForWrites(currentVersion, op.version(), deleted)); } + public VersionConflictEngineException(ShardId shardId, String type, String id, + long compareAndWriteSeqNo, long compareAndWriteTerm, + long currentSeqNo, long currentTerm) { + this(shardId, type, id, "required seqNo [" + compareAndWriteSeqNo + "], primary term [" + compareAndWriteTerm +"]." + + (currentSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ? + " but no document was found" : + " current document has seqNo [" + currentSeqNo + "] and primary term ["+ currentTerm + "]" + )); + } + public VersionConflictEngineException(ShardId shardId, String type, String id, String explanation) { this(shardId, null, type, id, explanation); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index bba3a8ee19bb6..48866166a4bfb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -769,7 +769,8 @@ public static Engine.Index prepareIndex(DocumentMapperForType docMapper, Version } else { uid = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(doc.type(), doc.id())); } - return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); + return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry, + UNASSIGNED_SEQ_NO, 0); } private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException { @@ -864,7 +865,7 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Engine.Operation.Origin origin) { long startTime = System.nanoTime(); - return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime); + return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, UNASSIGNED_SEQ_NO, 0); } private Term extractUidForDelete(String type, String id) { diff --git a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java index e1ca8379972af..28b02de80f63c 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionLookupTests.java @@ -58,19 +58,19 @@ public void testSimple() throws Exception { LeafReaderContext segment = reader.leaves().get(0); PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME); // found doc - DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), segment); + DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment); assertNotNull(result); assertEquals(87, result.version); assertEquals(0, result.docId); // not found doc - assertNull(lookup.lookupVersion(new BytesRef("7"), segment)); + assertNull(lookup.lookupVersion(new BytesRef("7"), randomBoolean(), segment)); // deleted doc writer.deleteDocuments(new Term(IdFieldMapper.NAME, "6")); reader.close(); reader = DirectoryReader.open(writer); segment = reader.leaves().get(0); lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME); - assertNull(lookup.lookupVersion(new BytesRef("6"), segment)); + assertNull(lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment)); reader.close(); writer.close(); dir.close(); @@ -93,7 +93,7 @@ public void testTwoDocuments() throws Exception { LeafReaderContext segment = reader.leaves().get(0); PerThreadIDVersionAndSeqNoLookup lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME); // return the last doc when there are duplicates - DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), segment); + DocIdAndVersion result = lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment); assertNotNull(result); assertEquals(87, result.version); assertEquals(1, result.docId); @@ -103,7 +103,7 @@ public void testTwoDocuments() throws Exception { reader = DirectoryReader.open(writer); segment = reader.leaves().get(0); lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME); - result = lookup.lookupVersion(new BytesRef("6"), segment); + result = lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment); assertNotNull(result); assertEquals(87, result.version); assertEquals(1, result.docId); @@ -113,7 +113,7 @@ public void testTwoDocuments() throws Exception { reader = DirectoryReader.open(writer); segment = reader.leaves().get(0); lookup = new PerThreadIDVersionAndSeqNoLookup(segment.reader(), IdFieldMapper.NAME); - assertNull(lookup.lookupVersion(new BytesRef("6"), segment)); + assertNull(lookup.lookupVersion(new BytesRef("6"), randomBoolean(), segment)); reader.close(); writer.close(); dir.close(); diff --git a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index c5e66a3bf2ad5..d19db06caaef9 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -32,14 +32,12 @@ import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import org.hamcrest.MatcherAssert; import java.io.IOException; import java.util.ArrayList; import java.util.List; import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion; -import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadVersion; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -63,15 +61,14 @@ public void testVersions() throws Exception { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1)); - MatcherAssert.assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()), nullValue()); Document doc = new Document(); doc.add(new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE)); doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1)); writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); - assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(1L)); - assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(1L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L)); doc = new Document(); Field uid = new Field(UidFieldMapper.NAME, "1", UidFieldMapper.Defaults.FIELD_TYPE); @@ -80,8 +77,7 @@ public void testVersions() throws Exception { doc.add(version); writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); - assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(2L)); - assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(2L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L)); // test reuse of uid field doc = new Document(); @@ -91,13 +87,11 @@ public void testVersions() throws Exception { writer.updateDocument(new Term(UidFieldMapper.NAME, "1"), doc); directoryReader = reopen(directoryReader); - assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(3L)); - assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(3L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(3L)); writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1")); directoryReader = reopen(directoryReader); - assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); - assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue()); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()), nullValue()); directoryReader.close(); writer.close(); dir.close(); @@ -123,21 +117,18 @@ public void testNestedDocuments() throws IOException { writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "_na_", 1)); - assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(5L)); - assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(5L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(5L)); version.setLongValue(6L); writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); version.setLongValue(7L); writer.updateDocuments(new Term(UidFieldMapper.NAME, "1"), docs); directoryReader = reopen(directoryReader); - assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(7L)); - assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")).version, equalTo(7L)); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(7L)); writer.deleteDocuments(new Term(UidFieldMapper.NAME, "1")); directoryReader = reopen(directoryReader); - assertThat(loadVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), equalTo(Versions.NOT_FOUND)); - assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1")), nullValue()); + assertThat(loadDocIdAndVersion(directoryReader, new Term(UidFieldMapper.NAME, "1"), randomBoolean()), nullValue()); directoryReader.close(); writer.close(); dir.close(); @@ -155,10 +146,10 @@ public void testCache() throws Exception { writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); // should increase cache size by 1 - assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(87, loadDocIdAndVersion(reader, new Term(UidFieldMapper.NAME, "6"), randomBoolean()).version); assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // should be cache hit - assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(87, loadDocIdAndVersion(reader, new Term(UidFieldMapper.NAME, "6"), randomBoolean()).version); assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); reader.close(); @@ -179,11 +170,11 @@ public void testCacheFilterReader() throws Exception { doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87)); writer.addDocument(doc); DirectoryReader reader = DirectoryReader.open(writer); - assertEquals(87, loadVersion(reader, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(87, loadDocIdAndVersion(reader, new Term(UidFieldMapper.NAME, "6"), randomBoolean()).version); assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); // now wrap the reader DirectoryReader wrapped = ElasticsearchDirectoryReader.wrap(reader, new ShardId("bogus", "_na_", 5)); - assertEquals(87, loadVersion(wrapped, new Term(UidFieldMapper.NAME, "6"))); + assertEquals(87, loadDocIdAndVersion(wrapped, new Term(UidFieldMapper.NAME, "6"), randomBoolean()).version); // same size map: core cache key is shared assertEquals(size+1, VersionsAndSeqNoResolver.lookupStates.size()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4e9c52963169b..7daf179bea36f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -80,6 +80,7 @@ import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.TriFunction; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -171,6 +172,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.sameInstance; @@ -590,7 +592,7 @@ public void testSegmentsStatsIncludingFileSizes() throws Exception { public void testCommitStats() throws IOException { final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong globalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); try ( Store store = createStore(); InternalEngine engine = createEngine(store, createTempDir(), (maxSeq, localCP) -> new LocalCheckpointTracker( @@ -626,7 +628,7 @@ public long getCheckpoint() { rarely() || maxSeqNo.get() == SequenceNumbers.NO_OPS_PERFORMED ? SequenceNumbers.NO_OPS_PERFORMED : randomIntBetween(0, 1024)); globalCheckpoint.set(rarely() || localCheckpoint.get() == SequenceNumbers.NO_OPS_PERFORMED ? - SequenceNumbers.UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get())); + UNASSIGNED_SEQ_NO : randomIntBetween(0, (int) localCheckpoint.get())); final Engine.CommitId commitId = engine.flush(true, true); @@ -707,15 +709,13 @@ public void testTranslogMultipleOperationsSameDocument() throws IOException { for (int i = 0; i < ops; i++) { final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); if (randomBoolean()) { - final Engine.Index operation = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, - 0, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), - -1, false); + final Engine.Index operation = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, + 0, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); operations.add(operation); initialEngine.index(operation); } else { - final Engine.Delete operation = new Engine.Delete("test", "1", newUid(doc), - SequenceNumbers.UNASSIGNED_SEQ_NO, 0, i, VersionType.EXTERNAL, - Engine.Operation.Origin.PRIMARY, System.nanoTime()); + final Engine.Delete operation = new Engine.Delete("test", "1", newUid(doc), UNASSIGNED_SEQ_NO, 0, i, + VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0); operations.add(operation); initialEngine.delete(operation); } @@ -1184,10 +1184,10 @@ public void testRenewSyncFlush() throws Exception { final ParsedDocument parsedDoc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_1, null); if (forceMergeFlushes) { - engine.index(new Engine.Index(newUid(parsedDoc3), parsedDoc3, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, + engine.index(new Engine.Index(newUid(parsedDoc3), parsedDoc3, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime() - engine.engineConfig.getFlushMergesAfter().nanos(), - -1, false)); + -1, false, UNASSIGNED_SEQ_NO, 0)); } else { engine.index(indexForDoc(parsedDoc3)); } @@ -1254,7 +1254,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { } if (randomBoolean()) { final String translogUUID = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), - SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm.get()); + UNASSIGNED_SEQ_NO, shardId, primaryTerm.get()); store.associateIndexWithNewTranslog(translogUUID); } trimUnsafeCommits(config); @@ -1293,7 +1293,7 @@ public void testVersioningNewCreate() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1307,7 +1307,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { create = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), create.primaryTerm(), indexResult.getVersion(), - create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = replicaEngine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); assertTrue(indexResult.isCreated()); @@ -1326,7 +1326,7 @@ public void testReplicatedVersioningWithFlush() throws IOException { update = new Engine.Index(newUid(doc), doc, updateResult.getSeqNo(), update.primaryTerm(), updateResult.getVersion(), - update.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + update.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0); updateResult = replicaEngine.index(update); assertThat(updateResult.getVersion(), equalTo(2L)); assertFalse(updateResult.isCreated()); @@ -1383,7 +1383,7 @@ public void testVersioningNewIndex() throws IOException { assertThat(indexResult.getVersion(), equalTo(1L)); index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), - index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -1670,13 +1670,13 @@ public void run() { public void testVersioningCreateExistsException() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); - Engine.Index create = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); + Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(create); assertThat(indexResult.getVersion(), equalTo(1L)); - create = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, - VersionType.INTERNAL, PRIMARY, 0, -1, false); + create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, + VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(create); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); @@ -1744,11 +1744,12 @@ public void testConcurrentOutOfOrderDocsOnReplica() throws IOException, Interrup Document doc = testDocumentWithTextField(index.docs().get(0).get("value")); ParsedDocument parsedDocument = testParsedDocument(index.id(), index.routing(), doc, index.source(), null); return new Engine.Index(index.uid(), parsedDocument, newSeqNo, index.primaryTerm(), index.version(), - index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry()); + index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), + UNASSIGNED_SEQ_NO, 0); } else { Engine.Delete delete = (Engine.Delete) operation; return new Engine.Delete(delete.type(), delete.id(), delete.uid(), newSeqNo, delete.primaryTerm(), - delete.version(), delete.versionType(), delete.origin(), delete.startTime()); + delete.version(), delete.versionType(), delete.origin(), delete.startTime(), UNASSIGNED_SEQ_NO, 0); } }; final List allOps = new ArrayList<>(); @@ -1820,33 +1821,65 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion String lastFieldValue = null; int opsPerformed = 0; long lastOpVersion = currentOpVersion; + long lastOpSeqNo = UNASSIGNED_SEQ_NO; + long lastOpTerm = 0; + final AtomicLong currentTerm = new AtomicLong(1); BiFunction indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(), - index.seqNo(), index.primaryTerm(), version, index.versionType(), index.origin(), index.startTime(), - index.getAutoGeneratedIdTimestamp(), index.isRetry()); + UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(), + index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0); BiFunction delWithVersion = (version, delete) -> new Engine.Delete(delete.type(), delete.id(), - delete.uid(), delete.seqNo(), delete.primaryTerm(), version, delete.versionType(), delete.origin(), delete.startTime()); + delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), version, delete.versionType(), delete.origin(), delete.startTime(), + UNASSIGNED_SEQ_NO, 0); + TriFunction indexWithSeq = (seqNo, term, index) -> new Engine.Index(index.uid(), + index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(), + index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), seqNo, term); + TriFunction delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(), + delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(), + delete.startTime(), seqNo, term); for (Engine.Operation op : ops) { final boolean versionConflict = rarely(); final boolean versionedOp = versionConflict || randomBoolean(); final long conflictingVersion = docDeleted || randomBoolean() ? lastOpVersion + (randomBoolean() ? 1 : -1) : Versions.MATCH_DELETED; + final long conflictingSeqNo = lastOpSeqNo == UNASSIGNED_SEQ_NO || randomBoolean() ? + lastOpSeqNo + 5 : // use 5 to go above 0 for magic numbers + lastOpSeqNo; + final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm; + if (rarely()) { + currentTerm.incrementAndGet(); + } final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion; logger.info("performing [{}]{}{}", op.operationType().name().charAt(0), versionConflict ? " (conflict " + conflictingVersion + ")" : "", - versionedOp ? " (versioned " + correctVersion + ")" : ""); + versionedOp ? " (versioned " + correctVersion + ", seqNo " + lastOpSeqNo + ", term " + lastOpTerm + " )" : ""); if (op instanceof Engine.Index) { final Engine.Index index = (Engine.Index) op; if (versionConflict) { // generate a conflict - Engine.IndexResult result = engine.index(indexWithVersion.apply(conflictingVersion, index)); + final Engine.IndexResult result; + if (randomBoolean()) { + result = engine.index(indexWithSeq.apply(conflictingSeqNo, conflictingTerm, index)); + } else { + result = engine.index(indexWithVersion.apply(conflictingVersion, index)); + } assertThat(result.isCreated(), equalTo(false)); assertThat(result.getVersion(), equalTo(lastOpVersion)); assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); } else { - Engine.IndexResult result = engine.index(versionedOp ? indexWithVersion.apply(correctVersion, index) : index); + final Engine.IndexResult result; + if (versionedOp) { + // TODO: add support for non-existing docs + if (randomBoolean() && lastOpSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + result = engine.index(indexWithSeq.apply(lastOpSeqNo, lastOpTerm, index)); + } else { + result = engine.index(indexWithVersion.apply(correctVersion, index)); + } + } else { + result = engine.index(index); + } assertThat(result.isCreated(), equalTo(docDeleted)); assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1))); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); @@ -1854,25 +1887,41 @@ private int assertOpsOnPrimary(List ops, long currentOpVersion lastFieldValue = index.docs().get(0).get("value"); docDeleted = false; lastOpVersion = result.getVersion(); + lastOpSeqNo = result.getSeqNo(); + lastOpTerm = result.getTerm(); opsPerformed++; } } else { final Engine.Delete delete = (Engine.Delete) op; if (versionConflict) { // generate a conflict - Engine.DeleteResult result = engine.delete(delWithVersion.apply(conflictingVersion, delete)); + Engine.DeleteResult result; + if (randomBoolean()) { + result = engine.delete(delWithSeq.apply(conflictingSeqNo, conflictingTerm, delete)); + } else { + result = engine.delete(delWithVersion.apply(conflictingVersion, delete)); + } assertThat(result.isFound(), equalTo(docDeleted == false)); assertThat(result.getVersion(), equalTo(lastOpVersion)); assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); } else { - Engine.DeleteResult result = engine.delete(versionedOp ? delWithVersion.apply(correctVersion, delete) : delete); + final Engine.DeleteResult result; + if (versionedOp && lastOpSeqNo != UNASSIGNED_SEQ_NO && randomBoolean()) { + result = engine.delete(delWithSeq.apply(lastOpSeqNo, lastOpTerm, delete)); + } else if (versionedOp) { + result = engine.delete(delWithVersion.apply(correctVersion, delete)); + } else { + result = engine.delete(delete); + } assertThat(result.isFound(), equalTo(docDeleted == false)); assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1))); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); assertThat(result.getFailure(), nullValue()); docDeleted = true; lastOpVersion = result.getVersion(); + lastOpSeqNo = UNASSIGNED_SEQ_NO; + lastOpTerm = 0; opsPerformed++; } } @@ -2082,9 +2131,9 @@ class OpAndVersion { Engine.Index index = new Engine.Index(uidTerm, testParsedDocument("1", null, testDocument(), bytesArray(Strings.collectionToCommaDelimitedString(values)), null), - SequenceNumbers.UNASSIGNED_SEQ_NO, 2, + UNASSIGNED_SEQ_NO, 2, get.version(), VersionType.INTERNAL, - PRIMARY, System.currentTimeMillis(), -1, false); + PRIMARY, System.currentTimeMillis(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(index); if (indexResult.getResultType() == Engine.Result.Type.SUCCESS) { history.add(new OpAndVersion(indexResult.getVersion(), removed, added)); @@ -2231,8 +2280,8 @@ public void testSeqNoAndCheckpoints() throws IOException { // we have some docs indexed, so delete one of them id = randomFrom(indexedIds); final Engine.Delete delete = new Engine.Delete( - "test", id, newUid(id), SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm.get(), - rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0); + "test", id, newUid(id), UNASSIGNED_SEQ_NO, primaryTerm.get(), + rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, 0, UNASSIGNED_SEQ_NO, 0); final Engine.DeleteResult result = initialEngine.delete(delete); if (result.getResultType() == Engine.Result.Type.SUCCESS) { assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1)); @@ -2240,7 +2289,7 @@ public void testSeqNoAndCheckpoints() throws IOException { indexedIds.remove(id); primarySeqNo++; } else { - assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(result.getSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo)); } } else { @@ -2248,9 +2297,9 @@ public void testSeqNoAndCheckpoints() throws IOException { id = randomFrom(ids); ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); final Engine.Index index = new Engine.Index(newUid(doc), doc, - SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm.get(), + UNASSIGNED_SEQ_NO, primaryTerm.get(), rarely() ? 100 : Versions.MATCH_ANY, VersionType.INTERNAL, - PRIMARY, 0, -1, false); + PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0); final Engine.IndexResult result = initialEngine.index(index); if (result.getResultType() == Engine.Result.Type.SUCCESS) { assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1)); @@ -2258,7 +2307,7 @@ public void testSeqNoAndCheckpoints() throws IOException { indexedIds.add(id); primarySeqNo++; } else { - assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(result.getSeqNo(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo)); } } @@ -2388,7 +2437,7 @@ public void testConcurrentWritesAndCommits() throws Exception { SequenceNumbers.NO_OPS_PERFORMED; long maxSeqNo = userData.containsKey(SequenceNumbers.MAX_SEQ_NO) ? Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)) : - SequenceNumbers.UNASSIGNED_SEQ_NO; + UNASSIGNED_SEQ_NO; // local checkpoint and max seq no shouldn't go backwards assertThat(localCheckpoint, greaterThanOrEqualTo(prevLocalCheckpoint)); assertThat(maxSeqNo, greaterThanOrEqualTo(prevMaxSeqNo)); @@ -2502,13 +2551,13 @@ public void testEnableGcDeletes() throws Exception { document.add(new TextField("value", "test1", Field.Store.YES)); ParsedDocument doc = testParsedDocument("1", null, document, B_2, null); - engine.index(new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 1, + engine.index(new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1, VersionType.EXTERNAL, - Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false)); + Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0)); // Delete document we just added: - engine.delete(new Engine.Delete("test", "1", newUid(doc), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); + engine.delete(new Engine.Delete("test", "1", newUid(doc), UNASSIGNED_SEQ_NO, 0, + 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0)); // Get should not find the document Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory); @@ -2522,8 +2571,8 @@ public void testEnableGcDeletes() throws Exception { } // Delete non-existent document - engine.delete(new Engine.Delete("test", "2", newUid("2"), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); + engine.delete(new Engine.Delete("test", "2", newUid("2"), UNASSIGNED_SEQ_NO, 0, + 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, 0)); // Get should not find the document (we never indexed uid=2): getResult = engine.get(new Engine.Get(true, false, "type", "2", newUid("2")), @@ -2531,8 +2580,8 @@ public void testEnableGcDeletes() throws Exception { assertThat(getResult.exists(), equalTo(false)); // Try to index uid=1 with a too-old version, should fail: - Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 2, - VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false); + Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2, + VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); @@ -2542,8 +2591,8 @@ public void testEnableGcDeletes() throws Exception { assertThat(getResult.exists(), equalTo(false)); // Try to index uid=2 with a too-old version, should fail: - Engine.Index index1 = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 2, - VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false); + Engine.Index index1 = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2, + VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(index1); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); @@ -2629,8 +2678,8 @@ public void testCurrentTranslogIDisCommitted() throws IOException { store.associateIndexWithNewTranslog(translogUUID); ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); try (InternalEngine engine = createEngine(config)) { engine.index(firstIndexRequest); @@ -2719,8 +2768,7 @@ public void testMissingTranslog() throws IOException { // expected } // when a new translog is created it should be ok - final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, - SequenceNumbers.UNASSIGNED_SEQ_NO, shardId, primaryTerm); + final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, UNASSIGNED_SEQ_NO, shardId, primaryTerm); store.associateIndexWithNewTranslog(translogUUID); EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null); engine = new InternalEngine(config); @@ -2733,10 +2781,9 @@ public void testTranslogReplayWithFailure() throws IOException { final int numDocs = randomIntBetween(1, 10); try (InternalEngine engine = createEngine(store, translogPath)) { for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), - new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -2830,10 +2877,9 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s public void testSkipTranslogReplay() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, - testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, - SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -2872,11 +2918,9 @@ public void testTranslogReplay() throws IOException { final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpoint(); final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), - new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, - SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, - System.nanoTime(), -1, false); + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); } @@ -2908,10 +2952,9 @@ public void testTranslogReplay() throws IOException { final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); - ParsedDocument doc = - testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false); + ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1, + VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); if (flush) { @@ -2920,8 +2963,8 @@ public void testTranslogReplay() throws IOException { } doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null); - Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, 2, - VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2, + VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(idxRequest); engine.refresh("test"); assertThat(result.getVersion(), equalTo(2L)); @@ -2954,10 +2997,9 @@ public void testTranslogReplay() throws IOException { public void testRecoverFromForeignTranslog() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = - testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult index = engine.index(firstIndexRequest); assertThat(index.getVersion(), equalTo(1L)); } @@ -3286,7 +3328,7 @@ public void testDoubleDeliveryReplicaAppendingAndDeleteOnly() throws IOException Engine.Index retry = appendOnlyReplica(doc, true, 1, randomIntBetween(0, 5)); Engine.Delete delete = new Engine.Delete(operation.type(), operation.id(), operation.uid(), Math.max(retry.seqNo(), operation.seqNo())+1, operation.primaryTerm(), operation.version()+1, - operation.versionType(), REPLICA, operation.startTime()+1); + operation.versionType(), REPLICA, operation.startTime()+1, UNASSIGNED_SEQ_NO, 0); // operations with a seq# equal or lower to the local checkpoint are not indexed to lucene // and the version lookup is skipped final boolean belowLckp = operation.seqNo() == 0 && retry.seqNo() == 0; @@ -3445,19 +3487,20 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep boolean isRetry = false; long autoGeneratedIdTimestamp = 0; - Engine.Index index = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), - index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, + UNASSIGNED_SEQ_NO, 0); indexResult = replicaEngine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); isRetry = true; - index = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, - VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); indexResult = engine.index(index); assertThat(indexResult.getVersion(), equalTo(1L)); engine.refresh("test"); @@ -3467,7 +3510,8 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep } index = new Engine.Index(newUid(doc), doc, indexResult.getSeqNo(), index.primaryTerm(), indexResult.getVersion(), - index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, + UNASSIGNED_SEQ_NO, 0); indexResult = replicaEngine.index(index); assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); replicaEngine.refresh("test"); @@ -3484,20 +3528,20 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() boolean isRetry = true; long autoGeneratedIdTimestamp = 0; - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, - 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult result = engine.index(firstIndexRequest); assertThat(result.getVersion(), equalTo(1L)); Engine.Index firstIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), firstIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), - REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexReplicaResult = replicaEngine.index(firstIndexRequestReplica); assertThat(indexReplicaResult.getVersion(), equalTo(1L)); isRetry = false; - Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, - 0, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); + Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); Engine.IndexResult indexResult = engine.index(secondIndexRequest); assertTrue(indexResult.isCreated()); engine.refresh("test"); @@ -3508,7 +3552,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() Engine.Index secondIndexRequestReplica = new Engine.Index(newUid(doc), doc, result.getSeqNo(), secondIndexRequest.primaryTerm(), result.getVersion(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), - autoGeneratedIdTimestamp, isRetry); + autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0); replicaEngine.index(secondIndexRequestReplica); replicaEngine.refresh("test"); try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) { @@ -3526,13 +3570,13 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo } public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp) { - return new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, - VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry); + return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0); } public Engine.Index appendOnlyReplica(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, final long seqNo) { return new Engine.Index(newUid(doc), doc, seqNo, 2, 1, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry); + Engine.Operation.Origin.REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, retry, UNASSIGNED_SEQ_NO, 0); } public void testRetryConcurrently() throws InterruptedException, IOException { @@ -3784,7 +3828,7 @@ public void testSequenceIDs() throws Exception { Tuple seqID = getSequenceID(engine, new Engine.Get(false, false, "type", "2", newUid("1"))); // Non-existent doc returns no seqnum and no primary term - assertThat(seqID.v1(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); + assertThat(seqID.v1(), equalTo(UNASSIGNED_SEQ_NO)); assertThat(seqID.v2(), equalTo(0L)); // create a document @@ -3815,9 +3859,9 @@ public void testSequenceIDs() throws Exception { document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); doc = testParsedDocument("1", null, document, B_1, null); - engine.index(new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 3, + engine.index(new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 3, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, - System.nanoTime(), -1, false)); + System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0)); engine.refresh("test"); seqID = getSequenceID(engine, newGet(false, doc)); @@ -3845,10 +3889,11 @@ public void testLookupSeqNoByIdInLucene() throws Exception { final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null); if (isIndexing) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), - i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true)); + i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true, + UNASSIGNED_SEQ_NO, 0)); } else { operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), - i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); + i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0)); } } seqNo++; @@ -3875,7 +3920,7 @@ public void testLookupSeqNoByIdInLucene() throws Exception { equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX)); } assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion( - searcher.reader(), newUid("any-" + between(1, 10))), nullValue()); + searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue()); Map liveOps = latestOps.entrySet().stream() .filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX) .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo())); @@ -4009,7 +4054,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio final AtomicLong sequenceNumber = new AtomicLong(); final Engine.Operation.Origin origin = randomFrom(LOCAL_TRANSLOG_RECOVERY, PEER_RECOVERY, PRIMARY, REPLICA); final LongSupplier sequenceNumberSupplier = - origin == PRIMARY ? () -> SequenceNumbers.UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement; + origin == PRIMARY ? () -> UNASSIGNED_SEQ_NO : sequenceNumber::getAndIncrement; final Supplier doc = () -> { final Document document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); @@ -4029,7 +4074,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio origin, System.nanoTime(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false); + false, UNASSIGNED_SEQ_NO, 0); operations.add(index); } else { final Engine.Delete delete = new Engine.Delete( @@ -4041,7 +4086,7 @@ public void testOutOfOrderSequenceNumbersWithVersionConflict() throws IOExceptio i, VersionType.EXTERNAL, origin, - System.nanoTime()); + System.nanoTime(), UNASSIGNED_SEQ_NO, 0); operations.add(delete); } } @@ -4290,7 +4335,7 @@ private Tuple getSequenceID(Engine engine, Engine.Get get) throws En DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), get.uid()); if (docIdAndSeqNo == null) { primaryTerm = 0; - seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO; + seqNo = UNASSIGNED_SEQ_NO; } else { seqNo = docIdAndSeqNo.seqNo; NumericDocValues primaryTerms = docIdAndSeqNo.context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); @@ -4565,14 +4610,14 @@ public void testSeqNoGenerator() throws IOException { final Engine.Index index = new Engine.Index( new Term("_id", parsedDocument.id()), parsedDocument, - SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_SEQ_NO, randomIntBetween(1, 8), Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.currentTimeMillis(), System.currentTimeMillis(), - randomBoolean()); + randomBoolean(), UNASSIGNED_SEQ_NO, 0); final Engine.IndexResult indexResult = e.index(index); assertThat(indexResult.getSeqNo(), equalTo(seqNo)); assertThat(seqNoGenerator.get(), equalTo(seqNo + 1)); @@ -4581,12 +4626,12 @@ public void testSeqNoGenerator() throws IOException { type, id, new Term("_id", parsedDocument.id()), - SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_SEQ_NO, randomIntBetween(1, 8), Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, - System.currentTimeMillis()); + System.currentTimeMillis(), UNASSIGNED_SEQ_NO, 0); final Engine.DeleteResult deleteResult = e.delete(delete); assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1)); assertThat(seqNoGenerator.get(), equalTo(seqNo + 2)); @@ -4640,7 +4685,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s final List commits = DirectoryReader.listCommits(store.directory()); // Keep only one safe commit as the oldest commit. final IndexCommit safeCommit = commits.get(0); - if (lastSyncedGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { + if (lastSyncedGlobalCheckpoint == UNASSIGNED_SEQ_NO) { // If the global checkpoint is still unassigned, we keep an empty(eg. initial) commit as a safe commit. assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); @@ -4907,29 +4952,27 @@ public void testStressUpdateSameDocWhileGettingIt() throws IOException, Interrup .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), TimeValue.timeValueMillis(1))).build(); engine.engineConfig.getIndexSettings().updateIndexMetaData(indexMetaData); engine.onSettingsChanged(); - ParsedDocument document = - testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null); - final Engine.Index doc = new Engine.Index(newUid(document), document, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false); + ParsedDocument document = testParsedDocument(Integer.toString(0), null, testDocumentWithTextField(), SOURCE, null); + final Engine.Index doc = new Engine.Index(newUid(document), document, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, + UNASSIGNED_SEQ_NO, 0); // first index an append only document and then delete it. such that we have it in the tombstones engine.index(doc); engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), primaryTerm.get())); // now index more append only docs and refresh so we re-enabel the optimization for unsafe version map - ParsedDocument document1 = - testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); - engine.index(new Engine.Index(newUid(document1), document1, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false)); + ParsedDocument document1 = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); + engine.index(new Engine.Index(newUid(document1), document1, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, UNASSIGNED_SEQ_NO, 0)); engine.refresh("test"); - ParsedDocument document2 = - testParsedDocument(Integer.toString(2), null, testDocumentWithTextField(), SOURCE, null); - engine.index(new Engine.Index(newUid(document2), document2, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false)); + ParsedDocument document2 = testParsedDocument(Integer.toString(2), null, testDocumentWithTextField(), SOURCE, null); + engine.index(new Engine.Index(newUid(document2), document2, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, UNASSIGNED_SEQ_NO, 0)); engine.refresh("test"); - ParsedDocument document3 = - testParsedDocument(Integer.toString(3), null, testDocumentWithTextField(), SOURCE, null); - final Engine.Index doc3 = new Engine.Index(newUid(document3), document3, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false); + ParsedDocument document3 = testParsedDocument(Integer.toString(3), null, testDocumentWithTextField(), SOURCE, null); + final Engine.Index doc3 = new Engine.Index(newUid(document3), document3, UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), 0, false, + UNASSIGNED_SEQ_NO, 0); engine.index(doc3); engine.engineConfig.setEnableGcDeletes(true); // once we are here the version map is unsafe again and we need to do a refresh inside the get calls to ensure we @@ -5051,7 +5094,7 @@ public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception { engine.index(doc); } else { engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(), - doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis())); + doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0)); } maxSeqNoOfNonAppendOnly = seqno; } else { // On primary - do not update max_seqno for non-append-only operations @@ -5123,7 +5166,7 @@ public void testTrimUnsafeCommits() throws Exception { ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null); Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0, - 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false); + 1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0); engine.index(index); if (randomBoolean()) { engine.flush(); @@ -5414,10 +5457,11 @@ public void testRebuildLocalCheckpointTracker() throws Exception { final ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null); if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), - i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true)); + i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), -1, true, + UNASSIGNED_SEQ_NO, 0)); } else if (randomBoolean()) { operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), - i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); + i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), UNASSIGNED_SEQ_NO, 0)); } else { operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i)); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 3e248fcdbe2b5..89f89ae69a4ef 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -53,7 +53,7 @@ public void testReadOnlyEngine() throws Exception { } ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false)); + Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); if (get == null || rarely()) { get = newGet(randomBoolean(), doc); } @@ -122,7 +122,7 @@ public void testFlushes() throws IOException { } ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, VersionType.EXTERNAL - , Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false)); + , Engine.Operation.Origin.REPLICA, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); if (rarely()) { engine.flush(); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index ffdc21fcec4b9..efb1de3e2945c 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2834,7 +2834,7 @@ public void testTranslogOpSerialization() throws Exception { null); Engine.Index eIndex = new Engine.Index(newUid(doc), doc, randomSeqNum, randomPrimaryTerm, - 1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false); + 1, VersionType.INTERNAL, Origin.PRIMARY, 0, 0, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); Engine.IndexResult eIndexResult = new Engine.IndexResult(1, randomPrimaryTerm, randomSeqNum, true); Translog.Index index = new Translog.Index(eIndex, eIndexResult); @@ -2845,7 +2845,7 @@ public void testTranslogOpSerialization() throws Exception { assertEquals(index, serializedIndex); Engine.Delete eDelete = new Engine.Delete(doc.type(), doc.id(), newUid(doc), randomSeqNum, randomPrimaryTerm, - 2, VersionType.INTERNAL, Origin.PRIMARY, 0); + 2, VersionType.INTERNAL, Origin.PRIMARY, 0, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); Engine.DeleteResult eDeleteResult = new Engine.DeleteResult(2, randomPrimaryTerm, randomSeqNum, true); Translog.Delete delete = new Translog.Delete(eDelete, eDeleteResult); diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index a4d4d0a16293f..c6bc69f460407 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -52,6 +52,7 @@ import org.elasticsearch.index.engine.InternalEngineTests; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -254,7 +255,8 @@ public void testUnallocatedShardsDoesNotHang() throws InterruptedException { private void indexDoc(Engine engine, String id) throws IOException { final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null); final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc, - engine.getLocalCheckpoint() + 1, 1L, 1L, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false)); + engine.getLocalCheckpoint() + 1, 1L, 1L, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false, + SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); assertThat(indexResult.getFailure(), nullValue()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index b070bdee2e146..3deb064528fb5 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -634,12 +634,12 @@ protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long boolean isRetry) { return new Engine.Index(newUid(doc), doc, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); + IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo, long startTime) { return new Engine.Delete("test", id, newUid(id), seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, - Engine.Operation.Origin.REPLICA, startTime); + Engine.Operation.Origin.REPLICA, startTime, SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } protected static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException { assertVisibleCount(engine, numDocs, true); @@ -701,8 +701,8 @@ public static List generateSingleDocHistory( version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis(), -1, false - ); + System.currentTimeMillis(), -1, false, + SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } else { op = new Engine.Delete("test", docId, id, forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, @@ -710,7 +710,7 @@ public static List generateSingleDocHistory( version, forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis()); + System.currentTimeMillis(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } ops.add(op); } @@ -735,11 +735,12 @@ public List generateHistoryOnReplica(int numOps, boolean allow switch (opType) { case INDEX: operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), - i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime, -1, true)); + i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime, -1, true, + SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); break; case DELETE: operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), - i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime)); + i, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, startTime, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); break; case NO_OP: operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java index df12010afc671..ea209d6a13081 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.RootObjectMapper; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.translog.Translog; @@ -131,7 +132,7 @@ private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine. final Translog.Delete delete = (Translog.Delete) operation; final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); + origin, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); return engineDelete; case NO_OP: final Translog.NoOp noOp = (Translog.NoOp) operation; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 2a2b4a58e4d4d..f93e9b232c046 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -205,7 +205,8 @@ public void runDeleteTest( randomNonNegativeLong(), VersionType.EXTERNAL, origin, - System.currentTimeMillis()); + System.currentTimeMillis(), + SequenceNumbers.UNASSIGNED_SEQ_NO, 0); consumer.accept(followingEngine, delete); } @@ -295,7 +296,8 @@ private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.O final long version = randomBoolean() ? 1 : randomNonNegativeLong(); final ParsedDocument parsedDocument = EngineTestCase.createParsedDoc(id, null); return new Engine.Index(EngineTestCase.newUid(parsedDocument), parsedDocument, seqNo, primaryTerm.get(), version, - VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean()); + VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean(), + SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } private Engine.Index indexForPrimary(String id) { @@ -314,11 +316,12 @@ private Engine.Result applyOperation(Engine engine, Engine.Operation op, if (op instanceof Engine.Index) { Engine.Index index = (Engine.Index) op; result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(), - index.versionType(), origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry())); + index.versionType(), origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), + SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); } else if (op instanceof Engine.Delete) { Engine.Delete delete = (Engine.Delete) op; result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, - delete.version(), delete.versionType(), origin, delete.startTime())); + delete.version(), delete.versionType(), origin, delete.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); } else { Engine.NoOp noOp = (Engine.NoOp) op; result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason())); @@ -575,10 +578,12 @@ public void testProcessOnceOnPrimary() throws Exception { ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFunc.apply(docId, randomInt(3)); if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, - VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); + VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true, + SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); } else if (randomBoolean()) { operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L, - VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis())); + VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), + SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); } else { operations.add(new Engine.NoOp(i, primaryTerm.get(), Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), "test-" + i)); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 4996ca01b4720..0d711ba99f366 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -177,7 +177,7 @@ private int addDocuments(AtomicLong globalCheckpoint, InternalEngine engine) thr numDocsAdded++; ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, - System.nanoTime(), -1, false)); + System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); if (rarely()) { engine.flush(); }