From f2241b29bb164ba1a3e722d0bd12ad2ab94d1f6c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 5 Apr 2019 10:52:40 +0200 Subject: [PATCH] Only use UpdateRequest#setIfSeqNo(...) and UpdateRequest#setIfPrimaryTerm(...) for updating watch status if all nodes are at least on 6.7.0. Otherwise fallback using UpdateRequest#version(...) Closes #40841 --- .../xpack/core/watcher/watch/Watch.java | 9 +++++++++ .../xpack/watcher/WatcherIndexingListener.java | 1 + .../watcher/execution/ExecutionService.java | 16 ++++++++++++---- .../actions/ack/TransportAckWatchAction.java | 1 + .../activate/TransportActivateWatchAction.java | 1 + .../actions/get/TransportGetWatchAction.java | 1 + .../watcher/execution/ExecutionServiceTests.java | 6 ++++++ 7 files changed, 31 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java index e34b0a15e7133..c87fac13ae671 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/watcher/watch/Watch.java @@ -40,6 +40,7 @@ public class Watch implements ToXContentObject { private final long sourceSeqNo; private final long sourcePrimaryTerm; + private transient long version; public Watch(String id, Trigger trigger, ExecutableInput input, ExecutableCondition condition, @Nullable ExecutableTransform transform, @Nullable TimeValue throttlePeriod, List actions, @Nullable Map metadata, @@ -107,6 +108,14 @@ public long getSourcePrimaryTerm() { return sourcePrimaryTerm; } + public long version() { + return version; + } + + public void version(long version) { + this.version = version; + } + /** * Sets the state of this watch to in/active * diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index bdbeb7e8c62fb..c34716a6e4d96 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -102,6 +102,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { try { Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON, operation.getIfSeqNo(), operation.getIfPrimaryTerm()); + watch.version(operation.version()); ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId); if (shardAllocationConfiguration == null) { logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}", diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index d90ccb415a504..63b34f5d83904 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; @@ -278,8 +279,10 @@ record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "W if (resp.isExists() == false) { throw new ResourceNotFoundException("watch [{}] does not exist", watchId); } - return parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), XContentType.JSON, - resp.getSeqNo(), resp.getPrimaryTerm()); + Watch watch = parser.parseWithSecrets(watchId, true, resp.getSourceAsBytesRef(), ctx.executionTime(), + XContentType.JSON, resp.getSeqNo(), resp.getPrimaryTerm()); + watch.version(resp.getVersion()); + return watch; }); } catch (ResourceNotFoundException e) { String message = "unable to find watch for record [" + ctx.id() + "]"; @@ -350,8 +353,13 @@ public void updateWatchStatus(Watch watch) throws IOException { UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id()); updateRequest.doc(source); - updateRequest.setIfSeqNo(watch.getSourceSeqNo()); - updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm()); + boolean useSeqNoForCAS = clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0); + if (useSeqNoForCAS) { + updateRequest.setIfSeqNo(watch.getSourceSeqNo()); + updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm()); + } else { + updateRequest.version(watch.version()); + } try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) { client.update(updateRequest).actionGet(indexDefaultTimeout); } catch (DocumentMissingException e) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java index 352b83967d49f..ee9407698661a 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/ack/TransportAckWatchAction.java @@ -93,6 +93,7 @@ protected void masterOperation(AckWatchRequest request, ClusterState state, DateTime now = new DateTime(clock.millis(), UTC); Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now, XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm()); + watch.version(getResponse.getVersion()); watch.status().version(getResponse.getVersion()); String[] actionIds = request.getActionIds(); if (actionIds == null || actionIds.length == 0) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java index e8b169f3f28fb..538c20508382b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/activate/TransportActivateWatchAction.java @@ -95,6 +95,7 @@ protected void masterOperation(ActivateWatchRequest request, ClusterState state, if (getResponse.isExists()) { Watch watch = parser.parseWithSecrets(request.getWatchId(), true, getResponse.getSourceAsBytesRef(), now, XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm()); + watch.version(getResponse.getVersion()); watch.status().version(getResponse.getVersion()); // if we are not yet running in distributed mode, only call triggerservice, if we are on the master node if (localExecute(request) == false && this.clusterService.state().nodes().isLocalNodeElectedMaster()) { diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java index 34ee72c411d3d..1b87019f86331 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/get/TransportGetWatchAction.java @@ -73,6 +73,7 @@ protected void masterOperation(GetWatchRequest request, ClusterState state, DateTime now = new DateTime(clock.millis(), UTC); Watch watch = parser.parseWithSecrets(request.getId(), true, getResponse.getSourceAsBytesRef(), now, XContentType.JSON, getResponse.getSeqNo(), getResponse.getPrimaryTerm()); + watch.version(getRequest.version()); watch.toXContent(builder, WatcherParams.builder() .hideSecrets(true) .includeStatus(false) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java index 7dd87be42cf10..c8b74d560def9 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecutionServiceTests.java @@ -14,7 +14,10 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; @@ -150,6 +153,9 @@ public void init() throws Exception { DiscoveryNode discoveryNode = new DiscoveryNode("node_1", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT); ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("cluster")) + .nodes(DiscoveryNodes.builder().add(discoveryNode).build()) + .build()); when(clusterService.localNode()).thenReturn(discoveryNode); executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, clock, parser,