diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java index 2a850ce9c14dc..4dc3873c5859d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlAssignmentNotifier.java @@ -10,9 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -26,75 +24,57 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; -public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMasterListener { +public class MlAssignmentNotifier implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class); private final Auditor auditor; - private final ClusterService clusterService; private final MlConfigMigrator mlConfigMigrator; private final ThreadPool threadPool; - private final AtomicBoolean enabled = new AtomicBoolean(false); MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) { this.auditor = auditor; - this.clusterService = clusterService; this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService); this.threadPool = threadPool; - clusterService.addLocalNodeMasterListener(this); + clusterService.addListener(this); } MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) { this.auditor = auditor; - this.clusterService = clusterService; this.mlConfigMigrator = mlConfigMigrator; this.threadPool = threadPool; - clusterService.addLocalNodeMasterListener(this); + clusterService.addListener(this); } - @Override - public void onMaster() { - if (enabled.compareAndSet(false, true)) { - clusterService.addListener(this); - } - } - - @Override - public void offMaster() { - if (enabled.compareAndSet(true, false)) { - clusterService.removeListener(this); - } - } - - @Override - public String executorName() { + private String executorName() { return ThreadPool.Names.GENERIC; } @Override public void clusterChanged(ClusterChangedEvent event) { - if (enabled.get() == false) { - return; - } - if (event.metaDataChanged() == false) { + + if (event.localNodeMaster() == false) { return; } - PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap( - response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())), + response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)), e -> { logger.error("error migrating ml configurations", e); - threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())); + threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)); } )); } - private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous, - ClusterState state) { + private void auditChangesToMlTasks(ClusterChangedEvent event) { + + if (event.metaDataChanged() == false) { + return; + } + + PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); if (Objects.equals(previous, current)) { return; @@ -112,7 +92,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis if (currentAssignment.getExecutorNode() == null) { auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]"); } else { - DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); auditor.info(jobId, "Opening job on node [" + node.toString() + "]"); } } else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) { @@ -126,7 +106,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis auditor.warning(jobId, msg); } } else { - DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode()); + DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode()); if (jobId != null) { auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]"); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index e400fe0df4b93..e17c23da0686e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -9,10 +9,14 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -31,12 +35,14 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; +import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -90,12 +96,14 @@ public class MlConfigMigrator { private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private final AtomicBoolean migrationInProgress; + private final AtomicBoolean firstTime; public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationInProgress = new AtomicBoolean(false); + this.firstTime = new AtomicBoolean(true); } /** @@ -145,8 +153,23 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); + if (firstTime.get()) { + snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( + response -> { + firstTime.set(false); + migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress); + }, + unMarkMigrationInProgress::onFailure + )); + return; + } + + migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress); + } + + private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener listener) { if (jobsAndDatafeedsToMigrate.totalCount() == 0) { - unMarkMigrationInProgress.onResponse(Boolean.FALSE); + listener.onResponse(Boolean.FALSE); return; } @@ -157,9 +180,9 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs); List successfulDatafeedWrites = filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs); - removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, unMarkMigrationInProgress); + removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener); }, - unMarkMigrationInProgress::onFailure + listener::onFailure )); } @@ -299,6 +322,45 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To return indexRequest; } + + // public for testing + public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listener) { + + if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) { + listener.onResponse(Boolean.TRUE); + return; + } + + logger.debug("taking a snapshot of mlmetadata"); + String documentId = "ml-config"; + IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(), + ElasticsearchMappings.DOC_TYPE, documentId) + .setOpType(DocWriteRequest.OpType.CREATE); + + ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")); + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.startObject(); + mlMetadata.toXContent(builder, params); + builder.endObject(); + + indexRequest.setSource(builder); + } catch (IOException e) { + logger.error("failed to serialise mlmetadata", e); + listener.onFailure(e); + return; + } + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(), + ActionListener.wrap( + indexResponse -> { + listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED); + }, + listener::onFailure), + client::index + ); + } + + public static Job updateJobForMigration(Job job) { Job.Builder builder = new Job.Builder(job); Map custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java index b77ed582709ca..5c8c253794794 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlAssignmentNotifierTests.java @@ -31,7 +31,6 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -69,34 +68,39 @@ private void setupMocks() { public void testClusterChanged_info() { MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); - notifier.onMaster(); - DiscoveryNode node = - new DiscoveryNode("node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT); ClusterState previous = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0L, Collections.emptyMap()))) .build(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); - addJobTask("job_id", "node_id", null, tasksBuilder); + addJobTask("job_id", "_node_id", null, tasksBuilder); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); - ClusterState state = ClusterState.builder(new ClusterName("_name")) + ClusterState newState = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) - .nodes(DiscoveryNodes.builder().add(node)) + // set local node master + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) .build(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).info(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); - notifier.offMaster(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + // no longer master + newState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))) + .build(); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verifyNoMoreInteractions(auditor); } public void testClusterChanged_warning() { MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); - notifier.onMaster(); ClusterState previous = ClusterState.builder(new ClusterName("_name")) .metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, @@ -106,21 +110,31 @@ public void testClusterChanged_warning() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, null, tasksBuilder); MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build(); - ClusterState state = ClusterState.builder(new ClusterName("_name")) + ClusterState newState = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) + // set local node master + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) .build(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verify(auditor, times(1)).warning(eq("job_id"), any()); - verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any()); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any()); + + // no longer master + newState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) + .build(); - notifier.offMaster(); - notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous)); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); verifyNoMoreInteractions(auditor); } public void testClusterChanged_noPersistentTaskChanges() { MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService); - notifier.onMaster(); PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask("job_id", null, null, tasksBuilder); @@ -129,14 +143,26 @@ public void testClusterChanged_noPersistentTaskChanges() { .metaData(metaData) .build(); - ClusterState current = ClusterState.builder(new ClusterName("_name")) + ClusterState newState = ClusterState.builder(new ClusterName("_name")) .metaData(metaData) + // set local node master + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) .build(); - notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous)); - verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); + verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any()); + verifyNoMoreInteractions(auditor); - notifier.offMaster(); - verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any()); + // no longer master + newState = ClusterState.builder(new ClusterName("_name")) + .metaData(metaData) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))) + .build(); + notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous)); + verifyNoMoreInteractions(configMigrator); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index b81805fb3fbdf..1dc06e0e2aef6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.integration; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -13,9 +14,16 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck; import org.elasticsearch.xpack.ml.MlConfigMigrator; import org.elasticsearch.xpack.ml.MlSingleNodeTestCase; @@ -23,6 +31,8 @@ import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; import org.junit.Before; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -98,7 +108,7 @@ public void testWriteConfigToIndex() throws InterruptedException { assertNull(alreadyMigratedJob.getCustomSettings()); } - public void testMigrateConfigs() throws InterruptedException { + public void testMigrateConfigs() throws InterruptedException, IOException { // and jobs and datafeeds clusterstate MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); @@ -124,11 +134,13 @@ public void testMigrateConfigs() throws InterruptedException { // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + // the first time this is called mlmetadata will be snap-shotted blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertTrue(responseHolder.get()); + assertSnapshot(mlMetadata.build()); // check the jobs have been migrated AtomicReference> jobsHolder = new AtomicReference<>(); @@ -171,9 +183,9 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) - .metaData(MetaData.builder() - .putCustom(MlMetadata.TYPE, mlMetadata.build())) - .build(); + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); AtomicReference exceptionHolder = new AtomicReference<>(); AtomicReference responseHolder = new AtomicReference<>(); @@ -181,7 +193,7 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int // do the migration MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(settings, client(), clusterService); blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), - responseHolder, exceptionHolder); + responseHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertFalse(responseHolder.get()); @@ -190,7 +202,7 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int AtomicReference> jobsHolder = new AtomicReference<>(); JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), - jobsHolder, exceptionHolder); + jobsHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(jobsHolder.get().isEmpty(), is(true)); @@ -198,11 +210,25 @@ public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws Int DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry()); AtomicReference> datafeedsHolder = new AtomicReference<>(); blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener), - datafeedsHolder, exceptionHolder); + datafeedsHolder, exceptionHolder); assertNull(exceptionHolder.get()); assertThat(datafeedsHolder.get().isEmpty(), is(true)); } + + public void assertSnapshot(MlMetadata expectedMlMetadata) throws IOException { + GetResponse getResponse = client() + .prepareGet(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, "ml-config").get(); + + assertTrue(getResponse.isExists()); + + try (InputStream stream = getResponse.getSourceAsBytesRef().streamInput(); + XContentParser parser = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { + MlMetadata recoveredMeta = MlMetadata.LENIENT_PARSER.apply(parser, null).build(); + assertEquals(expectedMlMetadata, recoveredMeta); + } + } }