From 8c0ab4eab391c7e121ac92ef2b6267bf0cd9929c Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 12 Feb 2025 17:23:23 -0600 Subject: [PATCH] Making reindex data streams actions cancellable (#122438) --- .../action/CreateIndexFromSourceAction.java | 13 +++++++++++++ .../migrate/action/ReindexDataStreamAction.java | 14 ++++++++++++++ .../action/ReindexDataStreamIndexAction.java | 14 ++++++++++++++ 3 files changed, 41 insertions(+) diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java index 14e5e8cccd910..5ab009decd381 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java @@ -15,6 +15,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; @@ -191,5 +194,15 @@ public String[] indices() { public IndicesOptions indicesOptions() { return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return "creating index " + destIndex + " from " + sourceIndex; + } } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java index faf8982b79bf0..5ebd2040fbcb1 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java @@ -16,6 +16,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ConstructingObjectParser; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContent; @@ -24,6 +27,7 @@ import java.io.IOException; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.function.Predicate; @@ -144,5 +148,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.endObject(); return builder; } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return "reindexing data stream " + sourceDataStream; + } } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java index 2e3fd1b76ed32..dec3cf2901fcc 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java @@ -14,8 +14,12 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.util.Map; import java.util.Objects; public class ReindexDataStreamIndexAction extends ActionType { @@ -78,6 +82,16 @@ public String[] indices() { public IndicesOptions indicesOptions() { return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers); + } + + @Override + public String getDescription() { + return "reindexing data stream index " + sourceIndex; + } } public static class Response extends ActionResponse {