Skip to content

Commit

Permalink
Add sequence numbers based optimistic concurrency control support to …
Browse files Browse the repository at this point in the history
…Engine (#36467)

This commit add support to engine operations for resolving and verifying the sequence number and
primary term of the last modification to a document before performing an operation. This is
infrastructure to move our (optimistic concurrency control)[http://en.wikipedia.org/wiki/Optimistic_concurrency_control] API to use sequence numbers instead of internal versioning.

Relates #36148
Relates #10708
  • Loading branch information
bleskes committed Dec 13, 2018
1 parent 9fe8e32 commit 4da3a24
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
* using the same cache key. Otherwise we'd have to disable caching
* entirely for these readers.
*/
public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
Expand All @@ -108,7 +108,28 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
if (versions.advanceExact(docID) == false) {
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
}
return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase);
final long seqNo;
final long term;
if (loadSeqNo) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNos != null && seqNos.advanceExact(docID)) {
seqNo = seqNos.longValue();
} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (terms != null && terms.advanceExact(docID)) {
term = terms.longValue();
} else {
term = 0;
}

} else {
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
term = 0;
}
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
} else {
return null;
}
Expand Down Expand Up @@ -150,6 +171,7 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
final long seqNo;
// remove the null check in 7.0 once we can't read indices with no seq#
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
seqNo = seqNoDV.longValue();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;

import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;

/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
public final class VersionsAndSeqNoResolver {

Expand Down Expand Up @@ -96,12 +94,16 @@ private VersionsAndSeqNoResolver() {
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final long seqNo;
public final long primaryTerm;
public final LeafReader reader;
public final int docBase;

public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) {
public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
this.docId = docId;
this.version = version;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.reader = reader;
this.docBase = docBase;
}
Expand Down Expand Up @@ -129,15 +131,15 @@ public static class DocIdAndSeqNo {
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
List<LeafReaderContext> leaves = reader.leaves();
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf);
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);
if (result != null) {
return result;
}
Expand Down Expand Up @@ -175,15 +177,4 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
}
return latest;
}

/**
* Load the version for the uid from the reader, returning<ul>
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
* <li>the version associated with the provided uid otherwise
* </ul>
*/
public static long loadVersion(IndexReader reader, Term term) throws IOException {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}
}
44 changes: 38 additions & 6 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
final Searcher searcher = searcherFactory.apply("get", scope);
final DocIdAndVersion docIdAndVersion;
try {
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid(), true);
} catch (Exception e) {
Releasables.closeWhileHandlingException(searcher);
//TODO: A better exception goes here
Expand Down Expand Up @@ -1344,13 +1344,22 @@ public static class Index extends Operation {
private final ParsedDocument doc;
private final long autoGeneratedIdTimestamp;
private final boolean isRetry;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;

public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNoMatch, long ifPrimaryTermMatch) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
}

public Index(Term uid, long primaryTerm, ParsedDocument doc) {
Expand All @@ -1359,7 +1368,7 @@ public Index(Term uid, long primaryTerm, ParsedDocument doc) {

Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
Origin.PRIMARY, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
} // TEST ONLY

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -1419,28 +1428,44 @@ public boolean isRetry() {
return isRetry;
}

public long getIfSeqNoMatch() {
return ifSeqNoMatch;
}

public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
}
}

public static class Delete extends Operation {

private final String type;
private final String id;
private final long ifSeqNoMatch;
private final long ifPrimaryTermMatch;

public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime) {
Origin origin, long startTime, long ifSeqNoMatch, long ifPrimaryTermMatch) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
"cas operations are only allowed if origin is primary. get [" + origin + "]";
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.ifSeqNoMatch = ifSeqNoMatch;
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
}

public Delete(String type, String id, Term uid, long primaryTerm) {
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime());
Origin.PRIMARY, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
}

public Delete(Delete template, VersionType versionType) {
this(template.type(), template.id(), template.uid(), template.seqNo(), template.primaryTerm(), template.version(),
versionType, template.origin(), template.startTime());
versionType, template.origin(), template.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
}

@Override
Expand All @@ -1463,6 +1488,13 @@ public int estimatedSizeInBytes() {
return (uid().field().length() + uid().text().length()) * 2 + 20;
}

public long getIfSeqNoMatch() {
return ifSeqNoMatch;
}

public long getIfPrimaryTermMatch() {
return ifPrimaryTermMatch;
}
}

public static class NoOp extends Operation {
Expand Down
Loading

0 comments on commit 4da3a24

Please sign in to comment.