From 8ff4e2e5aa60d47fc6362cbf283539549fdb354b Mon Sep 17 00:00:00 2001 From: David Roberts Date: Thu, 29 Nov 2018 17:17:34 +0000 Subject: [PATCH 01/10] Periodically try to reassign unassigned persistent tasks Previously persistent task assignment was checked in the following situations: - Persistent tasks are changed - A node joins or leaves the cluster - The routing table is changed - Custom metadata in the cluster state is changed - A new master node is elected However, there could be situations when a persistent task that could not be assigned to a node could become assignable due to some other change, such as memory usage on the nodes. This change adds a timed recheck of persistent task assignment to account for such situations. The timer is suspended while checks triggered by cluster state changes are in-flight to avoid adding burden to an already busy cluster. Closes #35792 --- docs/reference/modules/cluster/misc.asciidoc | 14 +- .../common/settings/ClusterSettings.java | 2 + .../java/org/elasticsearch/node/Node.java | 2 +- .../PersistentTasksClusterService.java | 130 +++++++++++++--- .../PersistentTasksClusterServiceTests.java | 139 +++++++++++++++++- .../PersistentTasksDecidersTestCase.java | 5 +- 6 files changed, 262 insertions(+), 30 deletions(-) diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index fcf3491fb25ec..c96b4c666b42a 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -130,10 +130,10 @@ Plugins can create a kind of tasks called persistent tasks. Those tasks are usually long-live tasks and are stored in the cluster state, allowing the tasks to be revived after a full cluster restart. -Every time a persistent task is created, the master nodes takes care of +Every time a persistent task is created, the master node takes care of assigning the task to a node of the cluster, and the assigned node will then pick up the task and execute it locally. The process of assigning persistent -tasks to nodes is controlled by the following property, which can be updated +tasks to nodes is controlled by the following properties, which can be updated dynamically: `cluster.persistent_tasks.allocation.enable`:: @@ -148,3 +148,13 @@ This setting does not affect the persistent tasks that are already being execute Only newly created persistent tasks, or tasks that must be reassigned (after a node left the cluster, for example), are impacted by this setting. -- + +`cluster.persistent_tasks.allocation.recheck_interval`:: + + The master node will automatically check whether persistent tasks need to + be assigned when the cluster state changes significantly. However, there + may be other factors, such as memory usage, that affect whether persistent + tasks can be assigned to nodes but do not cause the cluster state to change. + This setting controls how often assignment checks are performed to react to + these factors. The default is 30 seconds. The minimum permitted value is 10 + seconds. diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 974c77210b533..597fd01e55075 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -80,6 +80,7 @@ import org.elasticsearch.monitor.os.OsService; import org.elasticsearch.monitor.process.ProcessService; import org.elasticsearch.node.Node; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.fs.FsRepository; @@ -443,6 +444,7 @@ public void apply(Settings value, Settings current, Settings previous) { Node.BREAKER_TYPE_KEY, OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, + PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING ))); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index f3433dfa1ba1a..283d6b6883fad 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -499,7 +499,7 @@ protected Node( final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors); final PersistentTasksClusterService persistentTasksClusterService = - new PersistentTasksClusterService(settings, registry, clusterService); + new PersistentTasksClusterService(settings, registry, clusterService, threadPool); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); modules.add(b -> { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 2adeb04e4eec0..4592f904f75b5 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -31,30 +31,54 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.decider.AssignmentDecision; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Objects; +import java.util.concurrent.Future; /** * Component that runs only on the master node and is responsible for assigning running tasks to nodes */ public class PersistentTasksClusterService implements ClusterStateListener { + public static final Setting CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING = + Setting.timeSetting("cluster.persistent_tasks.allocation.recheck_interval", TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10), Setting.Property.Dynamic, Setting.Property.NodeScope); + private static final Logger logger = LogManager.getLogger(PersistentTasksClusterService.class); private final ClusterService clusterService; private final PersistentTasksExecutorRegistry registry; private final EnableAssignmentDecider decider; + private final ThreadPool threadPool; + private final PeriodicRechecker periodicRechecker; + private volatile TimeValue recheckInterval; - public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) { + public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService, + ThreadPool threadPool) { this.clusterService = clusterService; clusterService.addListener(this); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, + this::setRecheckInterval); this.registry = registry; this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); + this.recheckInterval = CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings); + this.threadPool = threadPool; + this.periodicRechecker = new PeriodicRechecker(); + } + + void setRecheckInterval(TimeValue recheckInterval) { + this.recheckInterval = recheckInterval; + periodicRechecker.rescheduleIfScheduled(); } /** @@ -241,24 +265,36 @@ private Assignment createAssignment(final @Override public void clusterChanged(ClusterChangedEvent event) { + periodicRechecker.cancel(); if (event.localNodeMaster()) { if (shouldReassignPersistentTasks(event)) { - logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); - clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return reassignTasks(currentState); - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn("failed to reassign persistent tasks", e); - } - }); + reassignPersistentTasks(event.state().getVersion()); + } else { + periodicRechecker.schedule(); } } } + /** + * Submit a cluster state update to reassign any persistent tasks that need reassigning + */ + private void reassignPersistentTasks(long currentStateVersion) { + logger.trace("checking task reassignment for cluster state {}", currentStateVersion); + clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterState newState = reassignTasks(currentState); + periodicRechecker.schedule(); + return newState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to reassign persistent tasks", e); + } + }); + } + /** * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the @@ -278,12 +314,21 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { || event.metaDataChanged() || masterChanged) { - for (PersistentTask task : tasks.tasks()) { - if (needsReassignment(task.getAssignment(), event.state().nodes())) { - Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); - if (Objects.equals(assignment, task.getAssignment()) == false) { - return true; - } + return anyTaskNeedsReassignment(tasks, event.state()); + } + return false; + } + + /** + * Returns true if any persistent task provided requires reassignment, + * i.e. is not assigned or is assigned to a non-existing node. + */ + private boolean anyTaskNeedsReassignment(final PersistentTasksCustomMetaData tasks, final ClusterState state) { + for (PersistentTask task : tasks.tasks()) { + if (needsReassignment(task.getAssignment(), state.nodes())) { + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), state); + if (Objects.equals(assignment, task.getAssignment()) == false) { + return true; } } } @@ -347,4 +392,51 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus return currentState; } } + + /** + * Class to periodically try to reassign unassigned persistent tasks. + */ + private class PeriodicRechecker implements Runnable { + + private volatile Future nextRun; + + void schedule() { + try { + synchronized (this) { + FutureUtils.cancel(nextRun); + nextRun = threadPool.schedule(recheckInterval, ThreadPool.Names.GENERIC, this); + } + } catch (EsRejectedExecutionException e) { + logger.debug("could not schedule periodic persistent task assignment check", e); + } + } + + synchronized void cancel() { + FutureUtils.cancel(nextRun); + nextRun = null; + } + + synchronized void rescheduleIfScheduled() { + if (nextRun != null) { + schedule(); + } + } + + @Override + public void run() { + synchronized (this) { + nextRun = null; + } + if (clusterService.localNode().isMasterNode()) { + logger.trace("periodic persistent task assignment check running"); + ClusterState state = clusterService.state(); + final PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasks != null && anyTaskNeedsReassignment(tasks, state)) { + reassignPersistentTasks(state.getVersion()); + } else { + schedule(); + } + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 3eec748808ecf..d0974598e1be4 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -32,6 +33,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; @@ -51,6 +53,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import static java.util.Collections.emptyMap; @@ -63,6 +66,11 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PersistentTasksClusterServiceTests extends ESTestCase { @@ -71,6 +79,8 @@ public class PersistentTasksClusterServiceTests extends ESTestCase { /** Needed by {@link PersistentTasksClusterService} **/ private ClusterService clusterService; + private volatile boolean nonClusterStateCondition; + @BeforeClass public static void setUpThreadPool() { threadPool = new TestThreadPool(PersistentTasksClusterServiceTests.class.getSimpleName()); @@ -83,7 +93,7 @@ public void setUp() throws Exception { } @AfterClass - public static void tearDownThreadPool() throws Exception { + public static void tearDownThreadPool() { terminate(threadPool); } @@ -177,7 +187,7 @@ public void testReassignConsidersClusterStateUpdates() { addTestNodes(nodes, randomIntBetween(1, 10)); int numberOfTasks = randomIntBetween(2, 40); for (int i = 0; i < numberOfTasks; i++) { - addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exists"); } MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); @@ -186,7 +196,42 @@ public void testReassignConsidersClusterStateUpdates() { PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasksInProgress, notNullValue()); + } + + public void testNonClusterStateConditionAssignment() { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder( + clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 3)); + addTask(tasks, "assign_based_on_non_cluster_state_condition", null); + MetaData.Builder metaData = MetaData.builder(clusterState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + clusterState = builder.metaData(metaData).nodes(nodes).build(); + + nonClusterStateCondition = false; + ClusterState newClusterState = reassign(clusterState); + + PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), nullValue()); + assertThat(task.isAssigned(), equalTo(false)); + assertThat(task.getAssignment().getExplanation(), equalTo("non-cluster state condition prevents assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + nonClusterStateCondition = true; + ClusterState finalClusterState = reassign(newClusterState); + + tasksInProgress = finalClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), notNullValue()); + assertThat(task.isAssigned(), equalTo(true)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); } public void testReassignTasks() { @@ -201,14 +246,14 @@ public void testReassignTasks() { switch (randomInt(2)) { case 0: // add an unassigned task that should get assigned because it's assigned to a non-existing node or unassigned - addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "assign_me", randomBoolean() ? null : "no_longer_exists"); break; case 1: // add a task assigned to non-existing node that should not get assigned - addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "dont_assign_me", randomBoolean() ? null : "no_longer_exists"); break; case 2: - addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exits"); + addTask(tasks, "assign_one", randomBoolean() ? null : "no_longer_exists"); break; } @@ -368,6 +413,72 @@ public void testNeedsReassignment() { assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes)); } + public void testPeriodicRecheck() throws Exception { + ClusterState initialState = initialState(); + ClusterState.Builder builder = ClusterState.builder(initialState); + PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder( + initialState.metaData().custom(PersistentTasksCustomMetaData.TYPE)); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(initialState.nodes()); + addTestNodes(nodes, randomIntBetween(1, 3)); + addTask(tasks, "assign_based_on_non_cluster_state_condition", null); + MetaData.Builder metaData = MetaData.builder(initialState.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); + ClusterState clusterState = builder.metaData(metaData).nodes(nodes).build(); + + nonClusterStateCondition = false; + + ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState); + PersistentTasksClusterService service = createService(recheckTestClusterService, + (params, currentState) -> assignBasedOnNonClusterStateCondition(currentState.nodes())); + + ClusterChangedEvent event = new ClusterChangedEvent("test", clusterState, initialState); + service.clusterChanged(event); + ClusterState newClusterState = recheckTestClusterService.state(); + + { + PersistentTasksCustomMetaData tasksInProgress = newClusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), nullValue()); + assertThat(task.isAssigned(), equalTo(false)); + assertThat(task.getAssignment().getExplanation(), equalTo("non-cluster state condition prevents assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + } + + nonClusterStateCondition = true; + service.setRecheckInterval(TimeValue.timeValueMillis(1)); + + assertBusy(() -> { + PersistentTasksCustomMetaData tasksInProgress = + recheckTestClusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + assertThat(tasksInProgress, notNullValue()); + for (PersistentTask task : tasksInProgress.tasks()) { + assertThat(task.getExecutorNode(), notNullValue()); + assertThat(task.isAssigned(), equalTo(true)); + assertThat(task.getAssignment().getExplanation(), equalTo("test assignment")); + } + assertThat(tasksInProgress.tasks().size(), equalTo(1)); + }); + } + + private ClusterService createRecheckTestClusterService(ClusterState initialState) { + AtomicReference state = new AtomicReference<>(initialState); + ClusterService recheckTestClusterService = mock(ClusterService.class); + when(recheckTestClusterService.getClusterSettings()).thenReturn(clusterService.getClusterSettings()); + doAnswer(invocationOnMock -> state.get().getNodes().getLocalNode()).when(recheckTestClusterService).localNode(); + doAnswer(invocationOnMock -> state.get()).when(recheckTestClusterService).state(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1]; + ClusterState before = state.get(); + ClusterState after = task.execute(before); + state.set(after); + return null; + }).when(recheckTestClusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); + + return recheckTestClusterService; + } + private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) { for (int i = 0; i < nonLocalNodesCount; i++) { nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); @@ -387,6 +498,8 @@ private ClusterState reassign(ClusterState clusterState) { return null; case "assign_one": return assignOnlyOneTaskAtATime(currentState); + case "assign_based_on_non_cluster_state_condition": + return assignBasedOnNonClusterStateCondition(currentState.nodes()); default: fail("unknown param " + testParams.getTestParam()); } @@ -408,6 +521,14 @@ private Assignment assignOnlyOneTaskAtATime(ClusterState clusterState) { } } + private Assignment assignBasedOnNonClusterStateCondition(DiscoveryNodes nodes) { + if (nonClusterStateCondition) { + return randomNodeAssignment(nodes); + } else { + return new Assignment(null, "non-cluster state condition prevents assignment"); + } + } + private Assignment randomNodeAssignment(DiscoveryNodes nodes) { if (nodes.getNodes().isEmpty()) { return NO_NODE_FOUND; @@ -623,6 +744,7 @@ private ClusterState initialState() { nodes.masterNodeId("this_node"); return ClusterState.builder(ClusterName.DEFAULT) + .nodes(nodes) .metaData(metaData) .routingTable(routingTable.build()) .build(); @@ -640,6 +762,11 @@ private void changeRoutingTable(MetaData.Builder metaData, RoutingTable.Builder /** Creates a PersistentTasksClusterService with a single PersistentTasksExecutor implemented by a BiFunction **/ private

PersistentTasksClusterService createService(final BiFunction fn) { + return createService(clusterService, fn); + } + + private

PersistentTasksClusterService createService(ClusterService clusterService, + final BiFunction fn) { PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry( singleton(new PersistentTasksExecutor

(TestPersistentTasksExecutor.NAME, null) { @Override @@ -652,6 +779,6 @@ protected void nodeOperation(AllocatedPersistentTask task, P params, PersistentT throw new UnsupportedOperationException(); } })); - return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService); + return new PersistentTasksClusterService(Settings.EMPTY, registry, clusterService, threadPool); } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index 7f2dada7c4cb7..2007c350b555b 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -69,11 +69,12 @@ protected void nodeOperation(AllocatedPersistentTask task, Params params, Persis }; } }; - persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService); + persistentTasksClusterService = new PersistentTasksClusterService(clusterService.getSettings(), registry, clusterService, + threadPool); } @AfterClass - public static void tearDownThreadPool() throws Exception { + public static void tearDownThreadPool() { terminate(threadPool); } From 1acf74b72df671c877a4be8432807f54e526d117 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 5 Dec 2018 13:05:27 +0000 Subject: [PATCH 02/10] Address some review comments (The one that is still to be addressed is the integration test) --- .../util/concurrent/AbstractAsyncTask.java | 179 ++++++++++++++++++ .../org/elasticsearch/index/IndexService.java | 95 +--------- .../java/org/elasticsearch/node/Node.java | 1 + .../PersistentTasksClusterService.java | 119 +++++++----- .../concurrent/AbstractAsyncTaskTests.java | 152 +++++++++++++++ .../index/IndexServiceTests.java | 1 - .../PersistentTasksClusterServiceTests.java | 1 + 7 files changed, 408 insertions(+), 140 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java new file mode 100644 index 0000000000000..c6d5e86805f6d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -0,0 +1,179 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A base class for tasks that need to repeat. + */ +public abstract class AbstractAsyncTask implements Runnable, Closeable { + + private final Logger logger; + private final ThreadPool threadPool; + private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile ScheduledFuture scheduledFuture; + private volatile boolean isScheduledOrRunning; + private volatile Exception lastThrownException; + private volatile TimeValue interval; + + protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval) { + this.logger = logger; + this.threadPool = threadPool; + this.interval = interval; + } + + /** + * Change the interval between runs. + * If a future run is scheduled then this will reschedule it. + * @param interval The new interval between runs. + */ + public synchronized void setInterval(TimeValue interval) { + this.interval = interval; + if (scheduledFuture != null) { + rescheduleIfNecessary(); + } + } + + public TimeValue getInterval() { + return interval; + } + + /** + * Test any external conditions that determine whether the task + * should be scheduled. This method does *not* need to test if + * the task is closed, as being closed automatically prevents + * scheduling. + * @return Should the task be scheduled to run? + */ + protected abstract boolean mustReschedule(); + + /** + * Schedule the task to run after the configured interval if it + * is not closed and any further conditions imposed by derived + * classes are met. Any previously scheduled invocation is + * cancelled. + */ + public synchronized void rescheduleIfNecessary() { + if (isClosed()) { + return; + } + if (scheduledFuture != null) { + FutureUtils.cancel(scheduledFuture); + } + if (interval.millis() > 0 && mustReschedule()) { + if (logger.isTraceEnabled()) { + logger.trace("scheduling {} every {}", toString(), interval); + } + scheduledFuture = threadPool.schedule(interval, getThreadPool(), this); + isScheduledOrRunning = true; + } else { + logger.trace("scheduled {} disabled", toString()); + scheduledFuture = null; + isScheduledOrRunning = false; + } + } + + public boolean isScheduled() { + // Currently running counts as scheduled to avoid an oscillating return value + // from this method when a task is repeatedly running and rescheduling itself. + return isScheduledOrRunning; + } + + /** + * Cancel any scheduled run, but do not prevent subsequent restarts. + */ + public synchronized void cancel() { + FutureUtils.cancel(scheduledFuture); + scheduledFuture = null; + isScheduledOrRunning = false; + } + + /** + * Cancel any scheduled run + */ + @Override + public synchronized void close() { + if (closed.compareAndSet(false, true)) { + cancel(); + } + } + + public boolean isClosed() { + return this.closed.get(); + } + + @Override + public final void run() { + synchronized (this) { + scheduledFuture = null; + } + try { + runInternal(); + } catch (Exception ex) { + if (lastThrownException == null || sameException(lastThrownException, ex) == false) { + // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs + logger.warn( + () -> new ParameterizedMessage( + "failed to run task {} - suppressing re-occurring exceptions unless the exception changes", + toString()), + ex); + lastThrownException = ex; + } + } finally { + rescheduleIfNecessary(); + } + } + + private static boolean sameException(Exception left, Exception right) { + if (left.getClass() == right.getClass()) { + if (Objects.equals(left.getMessage(), right.getMessage())) { + StackTraceElement[] stackTraceLeft = left.getStackTrace(); + StackTraceElement[] stackTraceRight = right.getStackTrace(); + if (stackTraceLeft.length == stackTraceRight.length) { + for (int i = 0; i < stackTraceLeft.length; i++) { + if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) { + return false; + } + } + return true; + } + } + } + return false; + } + + protected abstract void runInternal(); + + /** + * Use the same threadpool by default. + * Derived classes can change this if required. + */ + protected String getThreadPool() { + return ThreadPool.Names.SAME; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index c34a5228b7f6b..d4bbcb99091ff 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -38,8 +38,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; @@ -87,7 +87,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -805,100 +804,18 @@ private void maybeSyncGlobalCheckpoints() { } } - abstract static class BaseAsyncTask implements Runnable, Closeable { + abstract static class BaseAsyncTask extends AbstractAsyncTask { protected final IndexService indexService; - protected final ThreadPool threadPool; - private final TimeValue interval; - private ScheduledFuture scheduledFuture; - private final AtomicBoolean closed = new AtomicBoolean(false); - private volatile Exception lastThrownException; BaseAsyncTask(IndexService indexService, TimeValue interval) { + super(indexService.logger, indexService.threadPool, interval); this.indexService = indexService; - this.threadPool = indexService.getThreadPool(); - this.interval = interval; - onTaskCompletion(); + rescheduleIfNecessary(); } - boolean mustReschedule() { + protected boolean mustReschedule() { // don't re-schedule if its closed or if we don't have a single shard here..., we are done - return indexService.closed.get() == false - && closed.get() == false && interval.millis() > 0; - } - - private synchronized void onTaskCompletion() { - if (mustReschedule()) { - if (indexService.logger.isTraceEnabled()) { - indexService.logger.trace("scheduling {} every {}", toString(), interval); - } - this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this); - } else { - indexService.logger.trace("scheduled {} disabled", toString()); - this.scheduledFuture = null; - } - } - - boolean isScheduled() { - return scheduledFuture != null; - } - - @Override - public final void run() { - try { - runInternal(); - } catch (Exception ex) { - if (lastThrownException == null || sameException(lastThrownException, ex) == false) { - // prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs - indexService.logger.warn( - () -> new ParameterizedMessage( - "failed to run task {} - suppressing re-occurring exceptions unless the exception changes", - toString()), - ex); - lastThrownException = ex; - } - } finally { - onTaskCompletion(); - } - } - - private static boolean sameException(Exception left, Exception right) { - if (left.getClass() == right.getClass()) { - if (Objects.equals(left.getMessage(), right.getMessage())) { - StackTraceElement[] stackTraceLeft = left.getStackTrace(); - StackTraceElement[] stackTraceRight = right.getStackTrace(); - if (stackTraceLeft.length == stackTraceRight.length) { - for (int i = 0; i < stackTraceLeft.length; i++) { - if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) { - return false; - } - } - return true; - } - } - } - return false; - } - - protected abstract void runInternal(); - - protected String getThreadPool() { - return ThreadPool.Names.SAME; - } - - @Override - public synchronized void close() { - if (closed.compareAndSet(false, true)) { - FutureUtils.cancel(scheduledFuture); - scheduledFuture = null; - } - } - - TimeValue getInterval() { - return interval; - } - - boolean isClosed() { - return this.closed.get(); + return indexService.closed.get() == false; } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 283d6b6883fad..8ca9b556429d1 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -500,6 +500,7 @@ protected Node( final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors); final PersistentTasksClusterService persistentTasksClusterService = new PersistentTasksClusterService(settings, registry, clusterService, threadPool); + resourcesToClose.add(persistentTasksClusterService); final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); modules.add(b -> { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 4592f904f75b5..729f2eb35e038 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -34,21 +34,20 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.decider.AssignmentDecision; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; import java.util.Objects; -import java.util.concurrent.Future; /** * Component that runs only on the master node and is responsible for assigning running tasks to nodes */ -public class PersistentTasksClusterService implements ClusterStateListener { +public class PersistentTasksClusterService implements ClusterStateListener, Closeable { public static final Setting CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING = Setting.timeSetting("cluster.persistent_tasks.allocation.recheck_interval", TimeValue.timeValueSeconds(30), @@ -66,19 +65,24 @@ public class PersistentTasksClusterService implements ClusterStateListener { public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService, ThreadPool threadPool) { this.clusterService = clusterService; - clusterService.addListener(this); - clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, - this::setRecheckInterval); this.registry = registry; this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); this.recheckInterval = CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings); this.threadPool = threadPool; this.periodicRechecker = new PeriodicRechecker(); + clusterService.addListener(this); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, + this::setRecheckInterval); } void setRecheckInterval(TimeValue recheckInterval) { this.recheckInterval = recheckInterval; - periodicRechecker.rescheduleIfScheduled(); + periodicRechecker.setInterval(recheckInterval); + } + + @Override + public void close() { + periodicRechecker.close(); } /** @@ -179,7 +183,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public void removePersistentTask(String id, ActionListener> listener) { clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); if (tasksInProgress.hasTask(id)) { return update(currentState, tasksInProgress.removeTask(id)); @@ -267,11 +271,15 @@ private Assignment createAssignment(final public void clusterChanged(ClusterChangedEvent event) { periodicRechecker.cancel(); if (event.localNodeMaster()) { - if (shouldReassignPersistentTasks(event)) { - reassignPersistentTasks(event.state().getVersion()); - } else { - periodicRechecker.schedule(); - } + assignIfNecessaryAndScheduleRecheck(event.state(), shouldReassignPersistentTasks(event)); + } + } + + private void assignIfNecessaryAndScheduleRecheck(final ClusterState state, final boolean assignmentNecessary) { + if (assignmentNecessary) { + reassignPersistentTasks(state.getVersion()); + } else { + scheduleRecheckIfUnassignedTasks(state); } } @@ -283,18 +291,28 @@ private void reassignPersistentTasks(long currentStateVersion) { clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - ClusterState newState = reassignTasks(currentState); - periodicRechecker.schedule(); - return newState; + return reassignTasks(currentState); } @Override public void onFailure(String source, Exception e) { logger.warn("failed to reassign persistent tasks", e); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + scheduleRecheckIfUnassignedTasks(newState); + } }); } + private void scheduleRecheckIfUnassignedTasks(ClusterState state) { + final PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + if (tasks != null && isAnyTaskUnassigned(tasks, state)) { + periodicRechecker.rescheduleIfNecessary(); + } + } + /** * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the @@ -320,8 +338,8 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { } /** - * Returns true if any persistent task provided requires reassignment, - * i.e. is not assigned or is assigned to a non-existing node. + * Returns true if any persistent task provided is unassigned + * and would be assigned if reassignment were run now. */ private boolean anyTaskNeedsReassignment(final PersistentTasksCustomMetaData tasks, final ClusterState state) { for (PersistentTask task : tasks.tasks()) { @@ -335,6 +353,19 @@ private boolean anyTaskNeedsReassignment(final PersistentTasksCustomMetaData tas return false; } + /** + * Returns true if any persistent task provided is unassigned, + * i.e. is not assigned or is assigned to a non-existing node. + */ + private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks, final ClusterState state) { + for (PersistentTask task : tasks.tasks()) { + if (needsReassignment(task.getAssignment(), state.nodes())) { + return true; + } + } + return false; + } + /** * Evaluates the cluster state and tries to assign tasks to nodes. * @@ -396,47 +427,35 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus /** * Class to periodically try to reassign unassigned persistent tasks. */ - private class PeriodicRechecker implements Runnable { - - private volatile Future nextRun; - - void schedule() { - try { - synchronized (this) { - FutureUtils.cancel(nextRun); - nextRun = threadPool.schedule(recheckInterval, ThreadPool.Names.GENERIC, this); - } - } catch (EsRejectedExecutionException e) { - logger.debug("could not schedule periodic persistent task assignment check", e); - } - } + private class PeriodicRechecker extends AbstractAsyncTask { - synchronized void cancel() { - FutureUtils.cancel(nextRun); - nextRun = null; + PeriodicRechecker() { + super(logger, threadPool, recheckInterval); } - synchronized void rescheduleIfScheduled() { - if (nextRun != null) { - schedule(); - } + @Override + protected boolean mustReschedule() { + return true; } @Override - public void run() { - synchronized (this) { - nextRun = null; - } + public void runInternal() { if (clusterService.localNode().isMasterNode()) { logger.trace("periodic persistent task assignment check running"); - ClusterState state = clusterService.state(); + final ClusterState state = clusterService.state(); final PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasks != null && anyTaskNeedsReassignment(tasks, state)) { - reassignPersistentTasks(state.getVersion()); - } else { - schedule(); - } + assignIfNecessaryAndScheduleRecheck(state, tasks != null && anyTaskNeedsReassignment(tasks, state)); } } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + + @Override + public String toString() { + return "persistent_task_recheck"; + } } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java new file mode 100644 index 0000000000000..f9b5e29b42e5b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -0,0 +1,152 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class AbstractAsyncTaskTests extends ESTestCase { + + private static ThreadPool threadPool; + + @BeforeClass + public static void setUpThreadPool() { + threadPool = new TestThreadPool(AbstractAsyncTaskTests.class.getSimpleName()); + } + + @AfterClass + public static void tearDownThreadPool() { + terminate(threadPool); + } + + public void testRepeat() throws InterruptedException { + + final AtomicReference latch1 = new AtomicReference<>(new CountDownLatch(1)); + final AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); + final AtomicInteger count = new AtomicInteger(); + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1)) { + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + final CountDownLatch l1 = latch1.get(); + final CountDownLatch l2 = latch2.get(); + count.incrementAndGet(); + l1.countDown(); + try { + l2.await(); + } catch (InterruptedException e) { + fail("interrupted"); + } + if (randomBoolean()) { + throw new RuntimeException("foo"); + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + assertTrue(task.isScheduled()); + latch1.get().await(); + latch1.set(new CountDownLatch(1)); + assertEquals(1, count.get()); + // here we need to swap first before we let it go otherwise threads might be very fast and run that task twice due to + // random exception and the schedule interval is 1ms + latch2.getAndSet(new CountDownLatch(1)).countDown(); + latch1.get().await(); + assertEquals(2, count.get()); + assertTrue(task.isScheduled()); + task.close(); + assertTrue(task.isClosed()); + assertFalse(task.isScheduled()); + latch2.get().countDown(); + assertEquals(2, count.get()); + } + + public void testCloseWithNoRun() { + + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMinutes(10)) { + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + assertTrue(task.isScheduled()); + task.close(); + assertTrue(task.isClosed()); + assertFalse(task.isScheduled()); + } + + public void testChangeInterval() throws Exception { + + final CountDownLatch latch = new CountDownLatch(2); + + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueHours(1)) { + + @Override + protected boolean mustReschedule() { + return latch.getCount() > 0; + } + + @Override + protected void runInternal() { + latch.countDown(); + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + assertTrue(task.isScheduled()); + task.setInterval(TimeValue.timeValueMillis(1)); + assertTrue(task.isScheduled()); + // This should only take 2 milliseconds in ideal conditions, but allow 10 seconds in case of VM stalls + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertBusy(() -> assertFalse(task.isScheduled())); + task.close(); + assertFalse(task.isScheduled()); + assertTrue(task.isClosed()); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 52513ce7a8b77..a47d4db2a2579 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -136,7 +136,6 @@ public void testRefreshTaskIsUpdated() throws IOException { assertNotSame(refreshTask, indexService.getRefreshTask()); assertTrue(refreshTask.isClosed()); assertFalse(refreshTask.isScheduled()); - assertFalse(indexService.getRefreshTask().mustReschedule()); // set it to 100ms client().admin().indices().prepareUpdateSettings("test") diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index d0974598e1be4..12cf66a194d6c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -473,6 +473,7 @@ private ClusterService createRecheckTestClusterService(ClusterState initialState ClusterState before = state.get(); ClusterState after = task.execute(before); state.set(after); + task.clusterStateProcessed("test", before, after); return null; }).when(recheckTestClusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); From 74cff93d4afc8502aade1385f38ff6418e546229 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 5 Dec 2018 16:24:53 +0000 Subject: [PATCH 03/10] Adding an integration test --- .../util/concurrent/AbstractAsyncTask.java | 9 +++- .../org/elasticsearch/index/IndexService.java | 2 +- .../PersistentTasksClusterService.java | 2 +- .../concurrent/AbstractAsyncTaskTests.java | 52 +++++++++++++++++-- .../persistent/PersistentTasksExecutorIT.java | 45 +++++++++++++++- .../persistent/TestPersistentTasksPlugin.java | 10 +++- 6 files changed, 109 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java index c6d5e86805f6d..e06e1a41907db 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java @@ -36,15 +36,17 @@ public abstract class AbstractAsyncTask implements Runnable, Closeable { private final Logger logger; private final ThreadPool threadPool; private final AtomicBoolean closed = new AtomicBoolean(false); + private final boolean autoReschedule; private volatile ScheduledFuture scheduledFuture; private volatile boolean isScheduledOrRunning; private volatile Exception lastThrownException; private volatile TimeValue interval; - protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval) { + protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) { this.logger = logger; this.threadPool = threadPool; this.interval = interval; + this.autoReschedule = autoReschedule; } /** @@ -131,6 +133,7 @@ public boolean isClosed() { public final void run() { synchronized (this) { scheduledFuture = null; + isScheduledOrRunning = autoReschedule; } try { runInternal(); @@ -145,7 +148,9 @@ public final void run() { lastThrownException = ex; } } finally { - rescheduleIfNecessary(); + if (autoReschedule) { + rescheduleIfNecessary(); + } } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index d4bbcb99091ff..54bf5fa1aa18e 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -808,7 +808,7 @@ abstract static class BaseAsyncTask extends AbstractAsyncTask { protected final IndexService indexService; BaseAsyncTask(IndexService indexService, TimeValue interval) { - super(indexService.logger, indexService.threadPool, interval); + super(indexService.logger, indexService.threadPool, interval, true); this.indexService = indexService; rescheduleIfNecessary(); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 729f2eb35e038..10d667b99cb35 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -430,7 +430,7 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus private class PeriodicRechecker extends AbstractAsyncTask { PeriodicRechecker() { - super(logger, threadPool, recheckInterval); + super(logger, threadPool, recheckInterval, false); } @Override diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java index f9b5e29b42e5b..a02eba1b742ce 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -44,12 +44,12 @@ public static void tearDownThreadPool() { terminate(threadPool); } - public void testRepeat() throws InterruptedException { + public void testAutoRepeat() throws InterruptedException { final AtomicReference latch1 = new AtomicReference<>(new CountDownLatch(1)); final AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); final AtomicInteger count = new AtomicInteger(); - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1)) { + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), true) { @Override protected boolean mustReschedule() { @@ -98,9 +98,53 @@ protected String getThreadPool() { assertEquals(2, count.get()); } + public void testManualRepeat() throws InterruptedException { + + final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); + final AtomicInteger count = new AtomicInteger(); + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), false) { + + @Override + protected boolean mustReschedule() { + return true; + } + + @Override + protected void runInternal() { + assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + count.incrementAndGet(); + latch.get().countDown(); + if (randomBoolean()) { + throw new RuntimeException("foo"); + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.GENERIC; + } + }; + + assertFalse(task.isScheduled()); + task.rescheduleIfNecessary(); + latch.get().await(); + assertEquals(1, count.get()); + assertFalse(task.isScheduled()); + latch.set(new CountDownLatch(1)); + assertFalse(latch.get().await(100, TimeUnit.MILLISECONDS)); + assertEquals(1, count.get()); + task.rescheduleIfNecessary(); + latch.get().await(); + assertEquals(2, count.get()); + assertFalse(task.isScheduled()); + assertFalse(task.isClosed()); + task.close(); + assertTrue(task.isClosed()); + } + public void testCloseWithNoRun() { - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMinutes(10)) { + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMinutes(10), true) { @Override protected boolean mustReschedule() { @@ -124,7 +168,7 @@ public void testChangeInterval() throws Exception { final CountDownLatch latch = new CountDownLatch(2); - AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueHours(1)) { + AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueHours(1), true) { @Override protected boolean mustReschedule() { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index e746ff71627cd..943cbfd6f88c8 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; import org.junit.After; +import org.junit.Before; import java.util.Collection; import java.util.Collections; @@ -45,6 +46,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2) @@ -64,6 +66,11 @@ protected boolean ignoreExternalCluster() { return true; } + @Before + public void resetNonClusterStateCondition() { + TestPersistentTasksExecutor.setNonClusterStateCondition(true); + } + @After public void cleanup() throws Exception { assertNoRunningTasks(); @@ -173,6 +180,42 @@ public void testPersistentActionWithNoAvailableNode() throws Exception { assertEquals(removeFuture.get().getId(), taskId); } + public void testPersistentActionWithNonClusterStateCondition() throws Exception { + PersistentTasksClusterService persistentTasksClusterService = + internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName()); + // Speed up rechecks to a rate that is quicker than what settings would allow + persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(10)); + + TestPersistentTasksExecutor.setNonClusterStateCondition(false); + + PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); + PlainActionFuture> future = new PlainActionFuture<>(); + TestParams testParams = new TestParams("Blah"); + persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); + String taskId = future.get().getId(); + + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks(), + empty()); + + TestPersistentTasksExecutor.setNonClusterStateCondition(true); + + assertBusy(() -> { + // Wait for the task to start + assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks() + .size(), equalTo(1)); + }); + TaskInfo taskInfo = client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") + .get().getTasks().get(0); + + // Verifying the the task can now be assigned + assertThat(taskInfo.getTaskId().getNodeId(), notNullValue()); + + // Remove the persistent task + PlainActionFuture> removeFuture = new PlainActionFuture<>(); + persistentTasksService.sendRemoveRequest(taskId, removeFuture); + assertEquals(removeFuture.get().getId(), taskId); + } + public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); @@ -277,8 +320,6 @@ private void stopOrCancelTask(TaskId taskId) { assertThat(client().admin().cluster().prepareCancelTasks().setTaskId(taskId) .get().getTasks().size(), equalTo(1)); } - - } private void assertNoRunningTasks() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 0faf226ff0500..fcc11e7c2f1df 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -298,13 +298,22 @@ public static class TestPersistentTasksExecutor extends PersistentTasksExecutor< public static final String NAME = "cluster:admin/persistent/test"; private final ClusterService clusterService; + private static volatile boolean nonClusterStateCondition = true; + public TestPersistentTasksExecutor(ClusterService clusterService) { super(NAME, ThreadPool.Names.GENERIC); this.clusterService = clusterService; } + public static void setNonClusterStateCondition(boolean nonClusterStateCondition) { + TestPersistentTasksExecutor.nonClusterStateCondition = nonClusterStateCondition; + } + @Override public Assignment getAssignment(TestParams params, ClusterState clusterState) { + if (nonClusterStateCondition == false) { + return new Assignment(null, "non cluster state condition prevents assignment"); + } if (params == null || params.getExecutorNodeAttr() == null) { return super.getAssignment(params, clusterState); } else { @@ -315,7 +324,6 @@ public Assignment getAssignment(TestParams params, ClusterState clusterState) { } else { return NO_NODE_FOUND; } - } } From 28464ac55c183c1c425d5088b33fa356c1580ba2 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 10 Dec 2018 14:40:39 +0000 Subject: [PATCH 04/10] Address further code review comments --- .../PersistentTasksClusterService.java | 42 +++++++++---------- .../concurrent/AbstractAsyncTaskTests.java | 21 ++++++---- .../PersistentTasksClusterServiceTests.java | 8 ++-- .../integration/BasicDistributedJobsIT.java | 4 +- .../integration/MlDistributedFailureIT.java | 4 +- 5 files changed, 42 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 10d667b99cb35..ef329656fff91 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -60,23 +60,20 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos private final EnableAssignmentDecider decider; private final ThreadPool threadPool; private final PeriodicRechecker periodicRechecker; - private volatile TimeValue recheckInterval; public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService, ThreadPool threadPool) { this.clusterService = clusterService; this.registry = registry; this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); - this.recheckInterval = CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings); this.threadPool = threadPool; - this.periodicRechecker = new PeriodicRechecker(); + this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings)); clusterService.addListener(this); clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval); } void setRecheckInterval(TimeValue recheckInterval) { - this.recheckInterval = recheckInterval; periodicRechecker.setInterval(recheckInterval); } @@ -269,17 +266,14 @@ private Assignment createAssignment(final @Override public void clusterChanged(ClusterChangedEvent event) { - periodicRechecker.cancel(); if (event.localNodeMaster()) { - assignIfNecessaryAndScheduleRecheck(event.state(), shouldReassignPersistentTasks(event)); - } - } - - private void assignIfNecessaryAndScheduleRecheck(final ClusterState state, final boolean assignmentNecessary) { - if (assignmentNecessary) { - reassignPersistentTasks(state.getVersion()); - } else { - scheduleRecheckIfUnassignedTasks(state); + if (shouldReassignPersistentTasks(event)) { + // We want to avoid a periodic check duplicating this work + periodicRechecker.cancel(); + reassignPersistentTasks(event.state().getVersion()); + } else { + scheduleRecheckIfUnassignedTasks(event.state()); + } } } @@ -332,7 +326,7 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { || event.metaDataChanged() || masterChanged) { - return anyTaskNeedsReassignment(tasks, event.state()); + return anyTaskReassignmentRequired(tasks, event.state()); } return false; } @@ -341,9 +335,9 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { * Returns true if any persistent task provided is unassigned * and would be assigned if reassignment were run now. */ - private boolean anyTaskNeedsReassignment(final PersistentTasksCustomMetaData tasks, final ClusterState state) { + private boolean anyTaskReassignmentRequired(final PersistentTasksCustomMetaData tasks, final ClusterState state) { for (PersistentTask task : tasks.tasks()) { - if (needsReassignment(task.getAssignment(), state.nodes())) { + if (isAssignedToValidNode(task.getAssignment(), state.nodes())) { Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), state); if (Objects.equals(assignment, task.getAssignment()) == false) { return true; @@ -359,7 +353,7 @@ private boolean anyTaskNeedsReassignment(final PersistentTasksCustomMetaData tas */ private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks, final ClusterState state) { for (PersistentTask task : tasks.tasks()) { - if (needsReassignment(task.getAssignment(), state.nodes())) { + if (isAssignedToValidNode(task.getAssignment(), state.nodes())) { return true; } } @@ -382,7 +376,7 @@ ClusterState reassignTasks(final ClusterState currentState) { // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTask task : tasks.tasks()) { - if (needsReassignment(task.getAssignment(), nodes)) { + if (isAssignedToValidNode(task.getAssignment(), nodes)) { Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace("reassigning task {} from node {} to node {}", task.getId(), @@ -406,7 +400,7 @@ static boolean persistentTasksChanged(final ClusterChangedEvent event) { } /** Returns true if the task is not assigned or is assigned to a non-existing node */ - public static boolean needsReassignment(final Assignment assignment, final DiscoveryNodes nodes) { + public static boolean isAssignedToValidNode(final Assignment assignment, final DiscoveryNodes nodes) { return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false); } @@ -429,7 +423,7 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus */ private class PeriodicRechecker extends AbstractAsyncTask { - PeriodicRechecker() { + PeriodicRechecker(TimeValue recheckInterval) { super(logger, threadPool, recheckInterval, false); } @@ -444,7 +438,11 @@ public void runInternal() { logger.trace("periodic persistent task assignment check running"); final ClusterState state = clusterService.state(); final PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assignIfNecessaryAndScheduleRecheck(state, tasks != null && anyTaskNeedsReassignment(tasks, state)); + if (tasks != null && anyTaskReassignmentRequired(tasks, state)) { + reassignPersistentTasks(state.getVersion()); + } else { + scheduleRecheckIfUnassignedTasks(state); + } } } diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java index a02eba1b742ce..9008c3316f29e 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -26,7 +26,9 @@ import org.junit.BeforeClass; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -98,9 +100,9 @@ protected String getThreadPool() { assertEquals(2, count.get()); } - public void testManualRepeat() throws InterruptedException { + public void testManualRepeat() throws Exception { - final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); + final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence final AtomicInteger count = new AtomicInteger(); AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), false) { @@ -113,7 +115,11 @@ protected boolean mustReschedule() { protected void runInternal() { assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); count.incrementAndGet(); - latch.get().countDown(); + try { + barrier.await(10, TimeUnit.SECONDS); // should happen very quickly + } catch (Exception e) { + fail("barrier not passed in reasonable time"); + } if (randomBoolean()) { throw new RuntimeException("foo"); } @@ -127,14 +133,15 @@ protected String getThreadPool() { assertFalse(task.isScheduled()); task.rescheduleIfNecessary(); - latch.get().await(); + barrier.await(10, TimeUnit.SECONDS); // should happen very quickly assertEquals(1, count.get()); assertFalse(task.isScheduled()); - latch.set(new CountDownLatch(1)); - assertFalse(latch.get().await(100, TimeUnit.MILLISECONDS)); + barrier.reset(); + expectThrows(TimeoutException.class, () -> barrier.await(100, TimeUnit.MILLISECONDS)); assertEquals(1, count.get()); + barrier.reset(); task.rescheduleIfNecessary(); - latch.get().await(); + barrier.await(10, TimeUnit.SECONDS); // should happen very quickly assertEquals(2, count.get()); assertFalse(task.isScheduled()); assertFalse(task.isClosed()); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 12cf66a194d6c..0067b66d74534 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -58,7 +58,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; -import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.isAssignedToValidNode; import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; @@ -408,9 +408,9 @@ public void testNeedsReassignment() { .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) .build(); - assertTrue(needsReassignment(new Assignment(null, "unassigned"), nodes)); - assertTrue(needsReassignment(new Assignment("_node_left", "assigned to a node that left"), nodes)); - assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes)); + assertTrue(isAssignedToValidNode(new Assignment(null, "unassigned"), nodes)); + assertTrue(isAssignedToValidNode(new Assignment("_node_left", "assigned to a node that left"), nodes)); + assertFalse(isAssignedToValidNode(new Assignment("_node_1", "assigned"), nodes)); } public void testPeriodicRecheck() throws Exception { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 0ef76131bd652..e112f49623a97 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -50,7 +50,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.isAssignedToValidNode; import static org.hamcrest.Matchers.hasEntry; public class BasicDistributedJobsIT extends BaseMlIntegTestCase { @@ -395,7 +395,7 @@ private void assertJobTask(String jobId, JobState expectedState, boolean hasExec if (hasExecutorNode) { assertNotNull(task.getExecutorNode()); - assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes())); + assertFalse(isAssignedToValidNode(task.getAssignment(), clusterState.nodes())); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 8befaf23d6d52..72dfa47d55363 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -42,7 +42,7 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.isAssignedToValidNode; public class MlDistributedFailureIT extends BaseMlIntegTestCase { @@ -212,7 +212,7 @@ private void run(String jobId, CheckedRunnable disrupt) throws Except assertNotNull(tasks); assertEquals("Expected 2 tasks, but got [" + tasks.taskMap() + "]", 2, tasks.taskMap().size()); for (PersistentTask task : tasks.tasks()) { - assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes())); + assertFalse(isAssignedToValidNode(task.getAssignment(), clusterState.nodes())); } GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); From 5e63d179cc24f2c12d4b240237a07de5b7f5e5c2 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 11 Dec 2018 12:07:52 +0000 Subject: [PATCH 05/10] Address further review comments --- .../PersistentTasksClusterService.java | 52 +++++++------------ .../PersistentTasksClusterServiceTests.java | 26 ++++++---- .../integration/BasicDistributedJobsIT.java | 4 +- .../integration/MlDistributedFailureIT.java | 4 +- 4 files changed, 40 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index ef329656fff91..5d574d730f69c 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -270,9 +270,8 @@ public void clusterChanged(ClusterChangedEvent event) { if (shouldReassignPersistentTasks(event)) { // We want to avoid a periodic check duplicating this work periodicRechecker.cancel(); - reassignPersistentTasks(event.state().getVersion()); - } else { - scheduleRecheckIfUnassignedTasks(event.state()); + logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); + reassignPersistentTasks(); } } } @@ -280,8 +279,7 @@ public void clusterChanged(ClusterChangedEvent event) { /** * Submit a cluster state update to reassign any persistent tasks that need reassigning */ - private void reassignPersistentTasks(long currentStateVersion) { - logger.trace("checking task reassignment for cluster state {}", currentStateVersion); + private void reassignPersistentTasks() { clusterService.submitStateUpdateTask("reassign persistent tasks", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -291,22 +289,20 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn("failed to reassign persistent tasks", e); + // There must be a task that's worth rechecking because there was one + // that caused this method to be called and the method failed to assign it + periodicRechecker.rescheduleIfNecessary(); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - scheduleRecheckIfUnassignedTasks(newState); + if (isAnyTaskUnassigned(newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) { + periodicRechecker.rescheduleIfNecessary(); + } } }); } - private void scheduleRecheckIfUnassignedTasks(ClusterState state) { - final PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasks != null && isAnyTaskUnassigned(tasks, state)) { - periodicRechecker.rescheduleIfNecessary(); - } - } - /** * Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following * situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the @@ -332,12 +328,11 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { } /** - * Returns true if any persistent task provided is unassigned - * and would be assigned if reassignment were run now. + * Returns true if any persistent task would change assignment if reassignment were run now. */ private boolean anyTaskReassignmentRequired(final PersistentTasksCustomMetaData tasks, final ClusterState state) { for (PersistentTask task : tasks.tasks()) { - if (isAssignedToValidNode(task.getAssignment(), state.nodes())) { + if (requiresAssignment(task.getAssignment(), state.nodes())) { Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), state); if (Objects.equals(assignment, task.getAssignment()) == false) { return true; @@ -348,16 +343,10 @@ private boolean anyTaskReassignmentRequired(final PersistentTasksCustomMetaData } /** - * Returns true if any persistent task provided is unassigned, - * i.e. is not assigned or is assigned to a non-existing node. + * Returns true if any persistent task is unassigned. */ - private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks, final ClusterState state) { - for (PersistentTask task : tasks.tasks()) { - if (isAssignedToValidNode(task.getAssignment(), state.nodes())) { - return true; - } - } - return false; + private boolean isAnyTaskUnassigned(final PersistentTasksCustomMetaData tasks) { + return tasks != null && tasks.tasks().stream().anyMatch(task -> task.getAssignment().isAssigned() == false); } /** @@ -376,7 +365,7 @@ ClusterState reassignTasks(final ClusterState currentState) { // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTask task : tasks.tasks()) { - if (isAssignedToValidNode(task.getAssignment(), nodes)) { + if (requiresAssignment(task.getAssignment(), nodes)) { Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace("reassigning task {} from node {} to node {}", task.getId(), @@ -400,7 +389,7 @@ static boolean persistentTasksChanged(final ClusterChangedEvent event) { } /** Returns true if the task is not assigned or is assigned to a non-existing node */ - public static boolean isAssignedToValidNode(final Assignment assignment, final DiscoveryNodes nodes) { + public static boolean requiresAssignment(final Assignment assignment, final DiscoveryNodes nodes) { return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false); } @@ -435,13 +424,10 @@ protected boolean mustReschedule() { @Override public void runInternal() { if (clusterService.localNode().isMasterNode()) { - logger.trace("periodic persistent task assignment check running"); final ClusterState state = clusterService.state(); - final PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - if (tasks != null && anyTaskReassignmentRequired(tasks, state)) { - reassignPersistentTasks(state.getVersion()); - } else { - scheduleRecheckIfUnassignedTasks(state); + logger.trace("periodic persistent task assignment check running for cluster state {}", state.getVersion()); + if (isAnyTaskUnassigned(state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) { + reassignPersistentTasks(); } } } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 0067b66d74534..061d9f84af6db 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -53,12 +53,13 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; -import static org.elasticsearch.persistent.PersistentTasksClusterService.isAssignedToValidNode; +import static org.elasticsearch.persistent.PersistentTasksClusterService.requiresAssignment; import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; @@ -408,9 +409,9 @@ public void testNeedsReassignment() { .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) .build(); - assertTrue(isAssignedToValidNode(new Assignment(null, "unassigned"), nodes)); - assertTrue(isAssignedToValidNode(new Assignment("_node_left", "assigned to a node that left"), nodes)); - assertFalse(isAssignedToValidNode(new Assignment("_node_1", "assigned"), nodes)); + assertTrue(requiresAssignment(new Assignment(null, "unassigned"), nodes)); + assertTrue(requiresAssignment(new Assignment("_node_left", "assigned to a node that left"), nodes)); + assertFalse(requiresAssignment(new Assignment("_node_1", "assigned"), nodes)); } public void testPeriodicRecheck() throws Exception { @@ -426,7 +427,8 @@ public void testPeriodicRecheck() throws Exception { nonClusterStateCondition = false; - ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState); + boolean shouldSimulateFailure = randomBoolean(); + ClusterService recheckTestClusterService = createRecheckTestClusterService(clusterState, shouldSimulateFailure); PersistentTasksClusterService service = createService(recheckTestClusterService, (params, currentState) -> assignBasedOnNonClusterStateCondition(currentState.nodes())); @@ -440,7 +442,8 @@ public void testPeriodicRecheck() throws Exception { for (PersistentTask task : tasksInProgress.tasks()) { assertThat(task.getExecutorNode(), nullValue()); assertThat(task.isAssigned(), equalTo(false)); - assertThat(task.getAssignment().getExplanation(), equalTo("non-cluster state condition prevents assignment")); + assertThat(task.getAssignment().getExplanation(), equalTo(shouldSimulateFailure ? + "explanation: assign_based_on_non_cluster_state_condition" : "non-cluster state condition prevents assignment")); } assertThat(tasksInProgress.tasks().size(), equalTo(1)); } @@ -461,7 +464,8 @@ public void testPeriodicRecheck() throws Exception { }); } - private ClusterService createRecheckTestClusterService(ClusterState initialState) { + private ClusterService createRecheckTestClusterService(ClusterState initialState, boolean shouldSimulateFailure) { + AtomicBoolean testFailureNextTime = new AtomicBoolean(shouldSimulateFailure); AtomicReference state = new AtomicReference<>(initialState); ClusterService recheckTestClusterService = mock(ClusterService.class); when(recheckTestClusterService.getClusterSettings()).thenReturn(clusterService.getClusterSettings()); @@ -472,8 +476,12 @@ private ClusterService createRecheckTestClusterService(ClusterState initialState ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocationOnMock.getArguments()[1]; ClusterState before = state.get(); ClusterState after = task.execute(before); - state.set(after); - task.clusterStateProcessed("test", before, after); + if (testFailureNextTime.compareAndSet(true, false)) { + task.onFailure("testing failure", new RuntimeException("foo")); + } else { + state.set(after); + task.clusterStateProcessed("test", before, after); + } return null; }).when(recheckTestClusterService).submitStateUpdateTask(anyString(), any(ClusterStateUpdateTask.class)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index e112f49623a97..5ab45471c8e15 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -50,7 +50,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.persistent.PersistentTasksClusterService.isAssignedToValidNode; +import static org.elasticsearch.persistent.PersistentTasksClusterService.requiresAssignment; import static org.hamcrest.Matchers.hasEntry; public class BasicDistributedJobsIT extends BaseMlIntegTestCase { @@ -395,7 +395,7 @@ private void assertJobTask(String jobId, JobState expectedState, boolean hasExec if (hasExecutorNode) { assertNotNull(task.getExecutorNode()); - assertFalse(isAssignedToValidNode(task.getAssignment(), clusterState.nodes())); + assertFalse(requiresAssignment(task.getAssignment(), clusterState.nodes())); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 72dfa47d55363..d2b3a1cacae2d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -42,7 +42,7 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.persistent.PersistentTasksClusterService.isAssignedToValidNode; +import static org.elasticsearch.persistent.PersistentTasksClusterService.requiresAssignment; public class MlDistributedFailureIT extends BaseMlIntegTestCase { @@ -212,7 +212,7 @@ private void run(String jobId, CheckedRunnable disrupt) throws Except assertNotNull(tasks); assertEquals("Expected 2 tasks, but got [" + tasks.taskMap() + "]", 2, tasks.taskMap().size()); for (PersistentTask task : tasks.tasks()) { - assertFalse(isAssignedToValidNode(task.getAssignment(), clusterState.nodes())); + assertFalse(requiresAssignment(task.getAssignment(), clusterState.nodes())); } GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); From 824ffab3d95a6dc413b6dfcb1d27224feb6942c0 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 11 Dec 2018 12:16:01 +0000 Subject: [PATCH 06/10] Revert some redundant changes to reduce PR size --- .../PersistentTasksClusterService.java | 24 +++++++------------ .../PersistentTasksClusterServiceTests.java | 8 +++---- .../integration/BasicDistributedJobsIT.java | 4 ++-- .../integration/MlDistributedFailureIT.java | 4 ++-- 4 files changed, 16 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 5d574d730f69c..102b6c78b6ddd 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -322,20 +322,12 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) { || event.metaDataChanged() || masterChanged) { - return anyTaskReassignmentRequired(tasks, event.state()); - } - return false; - } - - /** - * Returns true if any persistent task would change assignment if reassignment were run now. - */ - private boolean anyTaskReassignmentRequired(final PersistentTasksCustomMetaData tasks, final ClusterState state) { - for (PersistentTask task : tasks.tasks()) { - if (requiresAssignment(task.getAssignment(), state.nodes())) { - Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), state); - if (Objects.equals(assignment, task.getAssignment()) == false) { - return true; + for (PersistentTask task : tasks.tasks()) { + if (needsReassignment(task.getAssignment(), event.state().nodes())) { + Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state()); + if (Objects.equals(assignment, task.getAssignment()) == false) { + return true; + } } } } @@ -365,7 +357,7 @@ ClusterState reassignTasks(final ClusterState currentState) { // We need to check if removed nodes were running any of the tasks and reassign them for (PersistentTask task : tasks.tasks()) { - if (requiresAssignment(task.getAssignment(), nodes)) { + if (needsReassignment(task.getAssignment(), nodes)) { Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState); if (Objects.equals(assignment, task.getAssignment()) == false) { logger.trace("reassigning task {} from node {} to node {}", task.getId(), @@ -389,7 +381,7 @@ static boolean persistentTasksChanged(final ClusterChangedEvent event) { } /** Returns true if the task is not assigned or is assigned to a non-existing node */ - public static boolean requiresAssignment(final Assignment assignment, final DiscoveryNodes nodes) { + public static boolean needsReassignment(final Assignment assignment, final DiscoveryNodes nodes) { return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false); } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 061d9f84af6db..ebf77d1e80360 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -59,7 +59,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; -import static org.elasticsearch.persistent.PersistentTasksClusterService.requiresAssignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; import static org.elasticsearch.persistent.PersistentTasksClusterService.persistentTasksChanged; import static org.elasticsearch.persistent.PersistentTasksExecutor.NO_NODE_FOUND; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; @@ -409,9 +409,9 @@ public void testNeedsReassignment() { .add(new DiscoveryNode("_node_2", buildNewFakeTransportAddress(), Version.CURRENT)) .build(); - assertTrue(requiresAssignment(new Assignment(null, "unassigned"), nodes)); - assertTrue(requiresAssignment(new Assignment("_node_left", "assigned to a node that left"), nodes)); - assertFalse(requiresAssignment(new Assignment("_node_1", "assigned"), nodes)); + assertTrue(needsReassignment(new Assignment(null, "unassigned"), nodes)); + assertTrue(needsReassignment(new Assignment("_node_left", "assigned to a node that left"), nodes)); + assertFalse(needsReassignment(new Assignment("_node_1", "assigned"), nodes)); } public void testPeriodicRecheck() throws Exception { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 5ab45471c8e15..0ef76131bd652 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -50,7 +50,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.persistent.PersistentTasksClusterService.requiresAssignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; import static org.hamcrest.Matchers.hasEntry; public class BasicDistributedJobsIT extends BaseMlIntegTestCase { @@ -395,7 +395,7 @@ private void assertJobTask(String jobId, JobState expectedState, boolean hasExec if (hasExecutorNode) { assertNotNull(task.getExecutorNode()); - assertFalse(requiresAssignment(task.getAssignment(), clusterState.nodes())); + assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes())); DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20")); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index d2b3a1cacae2d..8befaf23d6d52 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -42,7 +42,7 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; -import static org.elasticsearch.persistent.PersistentTasksClusterService.requiresAssignment; +import static org.elasticsearch.persistent.PersistentTasksClusterService.needsReassignment; public class MlDistributedFailureIT extends BaseMlIntegTestCase { @@ -212,7 +212,7 @@ private void run(String jobId, CheckedRunnable disrupt) throws Except assertNotNull(tasks); assertEquals("Expected 2 tasks, but got [" + tasks.taskMap() + "]", 2, tasks.taskMap().size()); for (PersistentTask task : tasks.tasks()) { - assertFalse(requiresAssignment(task.getAssignment(), clusterState.nodes())); + assertFalse(needsReassignment(task.getAssignment(), clusterState.nodes())); } GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); From 610f25e19aba89e0ccd65566ac453f2c26fa3e46 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 11 Dec 2018 17:52:38 +0000 Subject: [PATCH 07/10] Schedule recheck if unassignable tasks exist after cluster state change --- .../persistent/PersistentTasksClusterService.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 102b6c78b6ddd..61ed4a48aee2b 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -272,6 +272,11 @@ public void clusterChanged(ClusterChangedEvent event) { periodicRechecker.cancel(); logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); reassignPersistentTasks(); + } else if (periodicRechecker.isScheduled() == false && + isAnyTaskUnassigned(event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) { + // If a task is unassigned but would still be unassigned following this cluster state event then schedule a + // recheck just in case it would be assignable after a change in some condition external to the cluster state + periodicRechecker.rescheduleIfNecessary(); } } } From 9260ab12914dffe40d3d0f1f092b5ee39be7d167 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 12 Dec 2018 11:38:11 +0000 Subject: [PATCH 08/10] Address some more review comments --- .../PersistentTasksClusterService.java | 5 --- .../concurrent/AbstractAsyncTaskTests.java | 39 ++++++++----------- .../persistent/PersistentTasksExecutorIT.java | 2 +- 3 files changed, 17 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 61ed4a48aee2b..9dd45b62ecae1 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -429,11 +429,6 @@ public void runInternal() { } } - @Override - protected String getThreadPool() { - return ThreadPool.Names.GENERIC; - } - @Override public String toString() { return "persistent_task_recheck"; diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java index 9008c3316f29e..69704256d9f1d 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; public class AbstractAsyncTaskTests extends ESTestCase { @@ -46,10 +45,10 @@ public static void tearDownThreadPool() { terminate(threadPool); } - public void testAutoRepeat() throws InterruptedException { + public void testAutoRepeat() throws Exception { - final AtomicReference latch1 = new AtomicReference<>(new CountDownLatch(1)); - final AtomicReference latch2 = new AtomicReference<>(new CountDownLatch(1)); + boolean shouldRunThrowException = randomBoolean(); + final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence final AtomicInteger count = new AtomicInteger(); AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), true) { @@ -61,16 +60,13 @@ protected boolean mustReschedule() { @Override protected void runInternal() { assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); - final CountDownLatch l1 = latch1.get(); - final CountDownLatch l2 = latch2.get(); count.incrementAndGet(); - l1.countDown(); try { - l2.await(); - } catch (InterruptedException e) { + barrier.await(); + } catch (Exception e) { fail("interrupted"); } - if (randomBoolean()) { + if (shouldRunThrowException) { throw new RuntimeException("foo"); } } @@ -84,24 +80,21 @@ protected String getThreadPool() { assertFalse(task.isScheduled()); task.rescheduleIfNecessary(); assertTrue(task.isScheduled()); - latch1.get().await(); - latch1.set(new CountDownLatch(1)); + barrier.await(); assertEquals(1, count.get()); - // here we need to swap first before we let it go otherwise threads might be very fast and run that task twice due to - // random exception and the schedule interval is 1ms - latch2.getAndSet(new CountDownLatch(1)).countDown(); - latch1.get().await(); + barrier.reset(); + barrier.await(); assertEquals(2, count.get()); assertTrue(task.isScheduled()); task.close(); assertTrue(task.isClosed()); assertFalse(task.isScheduled()); - latch2.get().countDown(); assertEquals(2, count.get()); } public void testManualRepeat() throws Exception { + boolean shouldRunThrowException = randomBoolean(); final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence final AtomicInteger count = new AtomicInteger(); AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), false) { @@ -116,11 +109,11 @@ protected void runInternal() { assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); count.incrementAndGet(); try { - barrier.await(10, TimeUnit.SECONDS); // should happen very quickly + barrier.await(); } catch (Exception e) { - fail("barrier not passed in reasonable time"); + fail("interrupted"); } - if (randomBoolean()) { + if (shouldRunThrowException) { throw new RuntimeException("foo"); } } @@ -133,15 +126,15 @@ protected String getThreadPool() { assertFalse(task.isScheduled()); task.rescheduleIfNecessary(); - barrier.await(10, TimeUnit.SECONDS); // should happen very quickly + barrier.await(); assertEquals(1, count.get()); assertFalse(task.isScheduled()); barrier.reset(); - expectThrows(TimeoutException.class, () -> barrier.await(100, TimeUnit.MILLISECONDS)); + expectThrows(TimeoutException.class, () -> barrier.await(10, TimeUnit.MILLISECONDS)); assertEquals(1, count.get()); barrier.reset(); task.rescheduleIfNecessary(); - barrier.await(10, TimeUnit.SECONDS); // should happen very quickly + barrier.await(); assertEquals(2, count.get()); assertFalse(task.isScheduled()); assertFalse(task.isClosed()); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 943cbfd6f88c8..8f6393986da9d 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -184,7 +184,7 @@ public void testPersistentActionWithNonClusterStateCondition() throws Exception PersistentTasksClusterService persistentTasksClusterService = internalCluster().getInstance(PersistentTasksClusterService.class, internalCluster().getMasterName()); // Speed up rechecks to a rate that is quicker than what settings would allow - persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(10)); + persistentTasksClusterService.setRecheckInterval(TimeValue.timeValueMillis(1)); TestPersistentTasksExecutor.setNonClusterStateCondition(false); From bcb1dafa4206a4a38b30e3fced3bdb88614f753f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 12 Dec 2018 13:18:34 +0000 Subject: [PATCH 09/10] Schedule recheck if initial assignment is null on creation --- .../persistent/PersistentTasksClusterService.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 9dd45b62ecae1..27aac8661bafa 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -116,7 +116,11 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (tasks != null) { - listener.onResponse(tasks.getTask(taskId)); + PersistentTask task = tasks.getTask(taskId); + listener.onResponse(task); + if (task != null && task.isAssigned() == false && periodicRechecker.isScheduled() == false) { + periodicRechecker.rescheduleIfNecessary(); + } } else { listener.onResponse(null); } @@ -272,11 +276,6 @@ public void clusterChanged(ClusterChangedEvent event) { periodicRechecker.cancel(); logger.trace("checking task reassignment for cluster state {}", event.state().getVersion()); reassignPersistentTasks(); - } else if (periodicRechecker.isScheduled() == false && - isAnyTaskUnassigned(event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE))) { - // If a task is unassigned but would still be unassigned following this cluster state event then schedule a - // recheck just in case it would be assignable after a change in some condition external to the cluster state - periodicRechecker.rescheduleIfNecessary(); } } } From 0907000350ab169a767d5c516d0d6f727f3be2d1 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 12 Dec 2018 13:41:28 +0000 Subject: [PATCH 10/10] Fix auto repeat test --- .../concurrent/AbstractAsyncTaskTests.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java index 69704256d9f1d..3a1cab90f0d8f 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTaskTests.java @@ -48,7 +48,8 @@ public static void tearDownThreadPool() { public void testAutoRepeat() throws Exception { boolean shouldRunThrowException = randomBoolean(); - final CyclicBarrier barrier = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence + final CyclicBarrier barrier1 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence + final CyclicBarrier barrier2 = new CyclicBarrier(2); // 1 for runInternal plus 1 for the test sequence final AtomicInteger count = new AtomicInteger(); AbstractAsyncTask task = new AbstractAsyncTask(logger, threadPool, TimeValue.timeValueMillis(1), true) { @@ -60,9 +61,14 @@ protected boolean mustReschedule() { @Override protected void runInternal() { assertTrue("generic threadpool is configured", Thread.currentThread().getName().contains("[generic]")); + try { + barrier1.await(); + } catch (Exception e) { + fail("interrupted"); + } count.incrementAndGet(); try { - barrier.await(); + barrier2.await(); } catch (Exception e) { fail("interrupted"); } @@ -80,13 +86,17 @@ protected String getThreadPool() { assertFalse(task.isScheduled()); task.rescheduleIfNecessary(); assertTrue(task.isScheduled()); - barrier.await(); + barrier1.await(); + assertTrue(task.isScheduled()); + barrier2.await(); assertEquals(1, count.get()); - barrier.reset(); - barrier.await(); - assertEquals(2, count.get()); + barrier1.reset(); + barrier2.reset(); + barrier1.await(); assertTrue(task.isScheduled()); task.close(); + barrier2.await(); + assertEquals(2, count.get()); assertTrue(task.isClosed()); assertFalse(task.isScheduled()); assertEquals(2, count.get());