Skip to content

Commit

Permalink
[ML] Snapshot ml configs before migrating (#36645)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Dec 17, 2018
1 parent 3449283 commit 4808c65
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -26,75 +24,57 @@
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMasterListener {

public class MlAssignmentNotifier implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class);

private final Auditor auditor;
private final ClusterService clusterService;
private final MlConfigMigrator mlConfigMigrator;
private final ThreadPool threadPool;
private final AtomicBoolean enabled = new AtomicBoolean(false);

MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
this.auditor = auditor;
this.clusterService = clusterService;
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addListener(this);
}

MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
this.auditor = auditor;
this.clusterService = clusterService;
this.mlConfigMigrator = mlConfigMigrator;
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addListener(this);
}

@Override
public void onMaster() {
if (enabled.compareAndSet(false, true)) {
clusterService.addListener(this);
}
}

@Override
public void offMaster() {
if (enabled.compareAndSet(true, false)) {
clusterService.removeListener(this);
}
}

@Override
public String executorName() {
private String executorName() {
return ThreadPool.Names.GENERIC;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (enabled.get() == false) {
return;
}
if (event.metaDataChanged() == false) {

if (event.localNodeMaster() == false) {
return;
}
PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

mlConfigMigrator.migrateConfigsWithoutTasks(event.state(), ActionListener.wrap(
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state())),
response -> threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event)),
e -> {
logger.error("error migrating ml configurations", e);
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(current, previous, event.state()));
threadPool.executor(executorName()).execute(() -> auditChangesToMlTasks(event));
}
));
}

private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, PersistentTasksCustomMetaData previous,
ClusterState state) {
private void auditChangesToMlTasks(ClusterChangedEvent event) {

if (event.metaDataChanged() == false) {
return;
}

PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

if (Objects.equals(previous, current)) {
return;
Expand All @@ -112,7 +92,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis
if (currentAssignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
} else {
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
Expand All @@ -126,7 +106,7 @@ private void auditChangesToMlTasks(PersistentTasksCustomMetaData current, Persis
auditor.warning(jobId, msg);
}
} else {
DiscoveryNode node = state.nodes().get(currentAssignment.getExecutorNode());
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
if (jobId != null) {
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -31,12 +35,14 @@
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -90,12 +96,14 @@ public class MlConfigMigrator {
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

private final AtomicBoolean migrationInProgress;
private final AtomicBoolean firstTime;

public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.migrationInProgress = new AtomicBoolean(false);
this.firstTime = new AtomicBoolean(true);
}

/**
Expand Down Expand Up @@ -145,8 +153,23 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
}
);

if (firstTime.get()) {
snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap(
response -> {
firstTime.set(false);
migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
},
unMarkMigrationInProgress::onFailure
));
return;
}

migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress);
}

private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener<Boolean> listener) {
if (jobsAndDatafeedsToMigrate.totalCount() == 0) {
unMarkMigrationInProgress.onResponse(Boolean.FALSE);
listener.onResponse(Boolean.FALSE);
return;
}

Expand All @@ -157,9 +180,9 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
List<String> successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs);
List<String> successfulDatafeedWrites =
filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, unMarkMigrationInProgress);
removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener);
},
unMarkMigrationInProgress::onFailure
listener::onFailure
));
}

Expand Down Expand Up @@ -299,6 +322,45 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To
return indexRequest;
}


// public for testing
public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener<Boolean> listener) {

if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) {
listener.onResponse(Boolean.TRUE);
return;
}

logger.debug("taking a snapshot of mlmetadata");
String documentId = "ml-config";
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(),
ElasticsearchMappings.DOC_TYPE, documentId)
.setOpType(DocWriteRequest.OpType.CREATE);

ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject();
mlMetadata.toXContent(builder, params);
builder.endObject();

indexRequest.setSource(builder);
} catch (IOException e) {
logger.error("failed to serialise mlmetadata", e);
listener.onFailure(e);
return;
}

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest.request(),
ActionListener.<IndexResponse>wrap(
indexResponse -> {
listener.onResponse(indexResponse.getResult() == DocWriteResponse.Result.CREATED);
},
listener::onFailure),
client::index
);
}


public static Job updateJobForMigration(Job job) {
Job.Builder builder = new Job.Builder(job);
Map<String, Object> custom = job.getCustomSettings() == null ? new HashMap<>() : new HashMap<>(job.getCustomSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -69,34 +68,39 @@ private void setupMocks() {

public void testClusterChanged_info() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

DiscoveryNode node =
new DiscoveryNode("node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", null, tasksBuilder);
addJobTask("job_id", "_node_id", null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder().add(node))
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).info(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());

notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
}

public void testClusterChanged_warning() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
Expand All @@ -106,21 +110,31 @@ public void testClusterChanged_warning() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).warning(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());

// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
.build();

notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
}

public void testClusterChanged_noPersistentTaskChanges() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
Expand All @@ -129,14 +143,26 @@ public void testClusterChanged_noPersistentTaskChanges() {
.metaData(metaData)
.build();

ClusterState current = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();

notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(any(), any());
verifyNoMoreInteractions(auditor);

notifier.offMaster();
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(configMigrator);
}
}
Loading

0 comments on commit 4808c65

Please sign in to comment.