Skip to content

Commit

Permalink
Only use UpdateRequest#setIfSeqNo(...) and UpdateRequest#setIfPrimary…
Browse files Browse the repository at this point in the history
…Term(...) for updating watch status if all nodes are at least on 6.7.0.

Otherwise fallback using UpdateRequest#version(...)

Closes elastic#40841
  • Loading branch information
martijnvg committed Apr 5, 2019
1 parent bd2ec25 commit f2241b2
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class Watch implements ToXContentObject {

private final long sourceSeqNo;
private final long sourcePrimaryTerm;
private transient long version;

public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform,
@Nullable TimeValue throttlePeriod, List<ActionWrapper> actions, @Nullable Map<String, Object> metadata,
Expand Down Expand Up @@ -107,6 +108,14 @@ public long getSourcePrimaryTerm() {
return sourcePrimaryTerm;
}

public long version() {
return version;
}

public void version(long version) {
this.version = version;
}

/**
* Sets the state of this watch to in/active
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
operation.getIfSeqNo(), operation.getIfPrimaryTerm());
watch.version(operation.version());
ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -278,8 +279,10 @@ record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "W
if (resp.isExists() == false) {
throw new ResourceNotFoundException("watch [{}] does not exist", watchId);
}
return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON,
resp.getSeqNo(), resp.getPrimaryTerm());
Watch watch = parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(),
XContentType.JSON, resp.getSeqNo(), resp.getPrimaryTerm());
watch.version(resp.getVersion());
return watch;
});
} catch (ResourceNotFoundException e) {
String message = "unable to find watch for record [" + ctx.id() + "]";
Expand Down Expand Up @@ -350,8 +353,13 @@ public void updateWatchStatus(Watch watch) throws IOException {

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
updateRequest.doc(source);
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
boolean useSeqNoForCAS = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0);
if (useSeqNoForCAS) {
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
} else {
updateRequest.version(watch.version());
}
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
client.update(updateRequest).actionGet(indexDefaultTimeout);
} catch (DocumentMissingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ protected void masterOperation(AckWatchRequest request, ClusterState state,
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(),
now, XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
String[] actionIds = request.getActionIds();
if (actionIds == null || actionIds.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected void masterOperation(ActivateWatchRequest request, ClusterState state,
if (getResponse.isExists()) {
Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now,
XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.version(getResponse.getVersion());
watch.status().version(getResponse.getVersion());
// if we are not yet running in distributed mode, only call triggerservice, if we are on the master node
if (localExecute(request) == false && this.clusterService.state().nodes().isLocalNodeElectedMaster()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected void masterOperation(GetWatchRequest request, ClusterState state,
DateTime now = new DateTime(clock.millis(), UTC);
Watch watch = parser.parseWithSecrets(request.getId(), true, getResponse.getSourceAsBytesRef(), now,
XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm());
watch.version(getRequest.version());
watch.toXContent(builder, WatcherParams.builder()
.hideSecrets(true)
.includeStatus(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -150,6 +153,9 @@ public void init() throws Exception {
DiscoveryNode discoveryNode = new DiscoveryNode("node_1", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("cluster"))
.nodes(DiscoveryNodes.builder().add(discoveryNode).build())
.build());
when(clusterService.localNode()).thenReturn(discoveryNode);

executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, parser,
Expand Down

0 comments on commit f2241b2

Please sign in to comment.