Skip to content

Commit

Permalink
Returns primary_term for Get/MultiGet requests
Browse files Browse the repository at this point in the history
Currently, we are using _version for CAS operations. However, using _version
alone does not guarantee the uniqueness during a network partition.  To remedy
the issue, we recommend using a combination of _version and _primary_term for
CAS operations.

This is the first step towards that goal. In this change, we return
_primary_term for GET and MGET requests.

Relates elastic#26493
  • Loading branch information
dnhatn committed Oct 30, 2017
1 parent a566942 commit cb44da8
Show file tree
Hide file tree
Showing 18 changed files with 235 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
final Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(indexSourceAsBytes, true, updateIndexRequest.getContentType());
updateResponse.setGetResult(UpdateHelper.extractGetResult(updateRequest, concreteIndex,
indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
indexResponse.getVersion(), primary.getPrimaryTerm(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
// set translated request as replica request
replicaRequest = new BulkItemRequest(bulkReqId, updateIndexRequest);
Expand All @@ -311,11 +311,11 @@ static BulkItemResultHolder processUpdateResponse(final UpdateRequest updateRequ
result.getSeqNo(), primary.getPrimaryTerm(), result.getVersion(), ((Engine.DeleteResult) result).isFound());

updateResponse = new UpdateResponse(deleteResponse.getShardInfo(), deleteResponse.getShardId(),
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), deleteResponse.getPrimaryTerm(),
deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(), primary.getPrimaryTerm(),
deleteResponse.getVersion(), deleteResponse.getResult());

final GetResult getResult = UpdateHelper.extractGetResult(updateRequest, concreteIndex, deleteResponse.getVersion(),
translate.updatedSourceAsMap(), translate.updateSourceContentType(), null);
primary.getPrimaryTerm(), translate.updatedSourceAsMap(), translate.updateSourceContentType(), null);

updateResponse.setGetResult(getResult);
// set translated request as replica request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ public long getVersion() {
return getResult.getVersion();
}

/**
* The current primary term of the cluster.
*/
public long getPrimaryTerm() {
return getResult.getPrimaryTerm();
}

/**
* The source of the document if exists.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
(request.fields() != null && request.fields().length > 0)) {
Tuple<XContentType, Map<String, Object>> sourceAndContent =
XContentHelper.convertToMap(upsertSourceBytes, true, upsertRequest.getContentType());
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), response.getPrimaryTerm(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
} else {
update.setGetResult(null);
}
Expand All @@ -201,7 +201,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), response.getPrimaryTerm(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
Expand All @@ -212,7 +212,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
ActionListener.<DeleteResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), response.getPrimaryTerm(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
}, exception -> handleUpdateFailureWithRetry(listener, request, exception, retryCount)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
if (detectNoop && noop) {
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
getResult.getVersion(), DocWriteResponse.Result.NOOP);
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap,
updateSourceContentType, getResult.internalSourceRef()));
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), getResult.getPrimaryTerm(),
updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
return new Result(update, DocWriteResponse.Result.NOOP, updatedSourceAsMap, updateSourceContentType);
} else {
final IndexRequest finalIndexRequest = Requests.indexRequest(request.index())
Expand Down Expand Up @@ -289,8 +289,8 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
// If it was neither an INDEX or DELETE operation, treat it as a noop
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
getResult.getVersion(), DocWriteResponse.Result.NOOP);
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap,
updateSourceContentType, getResult.internalSourceRef()));
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), getResult.getPrimaryTerm(),
updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
return new Result(update, DocWriteResponse.Result.NOOP, updatedSourceAsMap, updateSourceContentType);

}
Expand All @@ -314,7 +314,7 @@ private Map<String, Object> executeScript(Script script, Map<String, Object> ctx
* Applies {@link UpdateRequest#fetchSource()} to the _source of the updated document to be returned in a update response.
* For BWC this function also extracts the {@link UpdateRequest#fields()} from the updated document to be returned in a update response
*/
public static GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version,
public static GetResult extractGetResult(final UpdateRequest request, String concreteIndex, long version, long primaryTerm,
final Map<String, Object> source, XContentType sourceContentType,
@Nullable final BytesReference sourceAsBytes) {
if ((request.fields() == null || request.fields().length == 0) &&
Expand Down Expand Up @@ -365,7 +365,7 @@ public static GetResult extractGetResult(final UpdateRequest request, String con
}

// TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)
return new GetResult(concreteIndex, request.type(), request.id(), version, true,
return new GetResult(concreteIndex, request.type(), request.id(), version, primaryTerm, true,
sourceRequested ? sourceFilteredAsBytes : null, fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public UpdateResponse build() {
}
if (getResult != null) {
update.setGetResult(new GetResult(update.getIndex(), update.getType(), update.getId(), update.getVersion(),
getResult.isExists(),getResult.internalSourceRef(), getResult.getFields()));
update.getPrimaryTerm(), getResult.isExists(),getResult.internalSourceRef(), getResult.getFields()));
}
update.setForcedRefresh(forcedRefresh);
return update;
Expand Down
26 changes: 24 additions & 2 deletions core/src/main/java/org/elasticsearch/index/get/GetResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.get;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.document.DocumentField;
Expand Down Expand Up @@ -51,13 +52,15 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
public static final String _TYPE = "_type";
public static final String _ID = "_id";
private static final String _VERSION = "_version";
private static final String _PRIMARY_TERM = "_primary_term";
private static final String FOUND = "found";
private static final String FIELDS = "fields";

private String index;
private String type;
private String id;
private long version;
private long primaryTerm = -1L;
private boolean exists;
private Map<String, DocumentField> fields;
private Map<String, Object> sourceAsMap;
Expand All @@ -67,12 +70,13 @@ public class GetResult implements Streamable, Iterable<DocumentField>, ToXConten
GetResult() {
}

public GetResult(String index, String type, String id, long version, boolean exists, BytesReference source,
public GetResult(String index, String type, String id, long version, long primaryTerm, boolean exists, BytesReference source,
Map<String, DocumentField> fields) {
this.index = index;
this.type = type;
this.id = id;
this.version = version;
this.primaryTerm = primaryTerm;
this.exists = exists;
this.source = source;
this.fields = fields;
Expand Down Expand Up @@ -116,6 +120,13 @@ public long getVersion() {
return version;
}

/**
* The current primary term of the cluster.
*/
public long getPrimaryTerm() {
return primaryTerm;
}

/**
* The source of the document if exists.
*/
Expand Down Expand Up @@ -258,6 +269,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (version != -1) {
builder.field(_VERSION, version);
}
builder.field(_PRIMARY_TERM, primaryTerm);
toXContentEmbedded(builder, params);
} else {
builder.field(FOUND, false);
Expand All @@ -273,6 +285,7 @@ public static GetResult fromXContentEmbedded(XContentParser parser) throws IOExc
String currentFieldName = parser.currentName();
String index = null, type = null, id = null;
long version = -1;
long primaryTerm = -1;
Boolean found = null;
BytesReference source = null;
Map<String, DocumentField> fields = new HashMap<>();
Expand All @@ -288,6 +301,8 @@ public static GetResult fromXContentEmbedded(XContentParser parser) throws IOExc
id = parser.text();
} else if (_VERSION.equals(currentFieldName)) {
version = parser.longValue();
} else if (_PRIMARY_TERM.equals(currentFieldName)) {
primaryTerm = parser.longValue();
} else if (FOUND.equals(currentFieldName)) {
found = parser.booleanValue();
} else {
Expand All @@ -313,7 +328,7 @@ public static GetResult fromXContentEmbedded(XContentParser parser) throws IOExc
parser.skipChildren(); // skip potential inner arrays for forward compatibility
}
}
return new GetResult(index, type, id, version, found, source, fields);
return new GetResult(index, type, id, version, primaryTerm, found, source, fields);
}

public static GetResult fromXContent(XContentParser parser) throws IOException {
Expand Down Expand Up @@ -352,6 +367,9 @@ public void readFrom(StreamInput in) throws IOException {
}
}
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
primaryTerm = in.readVLong();
}
}

@Override
Expand All @@ -372,6 +390,9 @@ public void writeTo(StreamOutput out) throws IOException {
}
}
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(primaryTerm);
}
}

@Override
Expand All @@ -384,6 +405,7 @@ public boolean equals(Object o) {
}
GetResult getResult = (GetResult) o;
return version == getResult.version &&
primaryTerm == getResult.primaryTerm &&
exists == getResult.exists &&
Objects.equals(index, getResult.index) &&
Objects.equals(type, getResult.type) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public GetResult get(String type, String id, String[] gFields, boolean realtime,
*/
public GetResult get(Engine.GetResult engineGetResult, String id, String type, String[] fields, FetchSourceContext fetchSourceContext) {
if (!engineGetResult.exists()) {
return new GetResult(shardId.getIndexName(), type, id, -1, false, null, null);
return new GetResult(shardId.getIndexName(), type, id, -1, indexShard.getPrimaryTerm(), false, null, null);
}

currentMetric.inc();
Expand Down Expand Up @@ -162,7 +162,7 @@ private GetResult innerGet(String type, String id, String[] gFields, boolean rea
}

if (get == null || get.exists() == false) {
return new GetResult(shardId.getIndexName(), type, id, -1, false, null, null);
return new GetResult(shardId.getIndexName(), type, id, -1, indexShard.getPrimaryTerm(), false, null, null);
}

try {
Expand Down Expand Up @@ -233,7 +233,7 @@ private GetResult innerGetLoadFromStoredFields(String type, String id, String[]
}
}

return new GetResult(shardId.getIndexName(), type, id, get.version(), get.exists(), source, fields);
return new GetResult(shardId.getIndexName(), type, id, get.version(), indexShard.getPrimaryTerm(), get.exists(), source, fields);
}

private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceContext fetchSourceContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,26 +91,26 @@ private void doFromXContentTestWithRandomFields(boolean addRandomFields) throws

public void testToXContent() {
{
GetResponse getResponse = new GetResponse(new GetResult("index", "type", "id", 1, true, new BytesArray("{ \"field1\" : " +
GetResponse getResponse = new GetResponse(new GetResult("index", "type", "id", 1, 4,true, new BytesArray("{ \"field1\" : " +
"\"value1\", \"field2\":\"value2\"}"), Collections.singletonMap("field1", new DocumentField("field1",
Collections.singletonList("value1")))));
String output = Strings.toString(getResponse);
assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":1,\"found\":true,\"_source\":{ \"field1\" " +
": \"value1\", \"field2\":\"value2\"},\"fields\":{\"field1\":[\"value1\"]}}", output);
assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":1,\"_primary_term\":4,\"found\":true," +
"\"_source\":{ \"field1\" : \"value1\", \"field2\":\"value2\"},\"fields\":{\"field1\":[\"value1\"]}}", output);
}
{
GetResponse getResponse = new GetResponse(new GetResult("index", "type", "id", 1, false, null, null));
GetResponse getResponse = new GetResponse(new GetResult("index", "type", "id", 1, 4, false, null, null));
String output = Strings.toString(getResponse);
assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"found\":false}", output);
}
}

public void testToString() {
GetResponse getResponse = new GetResponse(
new GetResult("index", "type", "id", 1, true, new BytesArray("{ \"field1\" : " + "\"value1\", \"field2\":\"value2\"}"),
new GetResult("index", "type", "id", 1, 101, true, new BytesArray("{ \"field1\" : " + "\"value1\", \"field2\":\"value2\"}"),
Collections.singletonMap("field1", new DocumentField("field1", Collections.singletonList("value1")))));
assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":1,\"found\":true,\"_source\":{ \"field1\" "
+ ": \"value1\", \"field2\":\"value2\"},\"fields\":{\"field1\":[\"value1\"]}}", getResponse.toString());
assertEquals("{\"_index\":\"index\",\"_type\":\"type\",\"_id\":\"id\",\"_version\":1,\"_primary_term\":101,\"found\":true," +
"\"_source\":{ \"field1\" : \"value1\", \"field2\":\"value2\"},\"fields\":{\"field1\":[\"value1\"]}}", getResponse.toString());
}

public void testEqualsAndHashcode() {
Expand All @@ -119,7 +119,8 @@ public void testEqualsAndHashcode() {
}

public void testFromXContentThrowsParsingException() throws IOException {
GetResponse getResponse = new GetResponse(new GetResult(null, null, null, randomIntBetween(1, 5), randomBoolean(), null, null));
GetResponse getResponse = new GetResponse(new GetResult(null, null, null, randomIntBetween(1, 5), randomNonNegativeLong(),
randomBoolean(), null, null));

XContentType xContentType = randomFrom(XContentType.values());
BytesReference originalBytes = toShuffledXContent(getResponse, xContentType, ToXContent.EMPTY_PARAMS, randomBoolean());
Expand Down
Loading

0 comments on commit cb44da8

Please sign in to comment.