Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add read-block to source index during data stream reindex #122887

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.migrate.MigratePlugin;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -68,12 +69,22 @@

public class ReindexDatastreamIndexTransportActionIT extends ESIntegTestCase {

private String sourceIndex;

@Before
private void setup() throws Exception {
sourceIndex = null;
deletePipeline(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME);
assertBusy(() -> { assertTrue(getPipelines(MigrateTemplateRegistry.REINDEX_DATA_STREAM_PIPELINE_NAME).isFound()); });
}

@After
private void cleanup() {
if (sourceIndex != null) {
cleanupMetadataBlocks(sourceIndex);
}
}

private static final String MAPPING = """
{
"_doc":{
Expand Down Expand Up @@ -110,7 +121,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
""";

public void testTimestamp0AddedIfMissing() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// add doc without timestamp
Expand All @@ -135,7 +146,7 @@ public void testTimestamp0AddedIfMissing() {

public void testTimestampNotAddedIfExists() {

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// add doc with timestamp
Expand Down Expand Up @@ -185,7 +196,7 @@ public void testCustomReindexPipeline() {

safeGet(clusterAdmin().execute(PutPipelineTransportAction.TYPE, putRequest));

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// add doc with timestamp
Expand All @@ -212,7 +223,7 @@ public void testCustomReindexPipeline() {

public void testDestIndexDeletedIfExists() throws Exception {
// empty source index
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// dest index with docs
Expand All @@ -231,7 +242,7 @@ public void testDestIndexDeletedIfExists() throws Exception {
}

public void testDestIndexNameSet_noDotPrefix() throws Exception {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// call reindex
Expand All @@ -243,9 +254,8 @@ public void testDestIndexNameSet_noDotPrefix() throws Exception {
assertEquals(expectedDestIndexName, response.getDestIndex());
}

public void testDestIndexNameSet_withDotPrefix() throws Exception {

var sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
public void testDestIndexNameSet_withDotPrefix() {
sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// call reindex
Expand All @@ -257,10 +267,10 @@ public void testDestIndexNameSet_withDotPrefix() throws Exception {
assertEquals(expectedDestIndexName, response.getDestIndex());
}

public void testDestIndexContainsDocs() throws Exception {
public void testDestIndexContainsDocs() {
// source index with docs
var numDocs = randomIntBetween(1, 100);
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
indexDocs(sourceIndex, numDocs);

Expand All @@ -274,19 +284,19 @@ public void testDestIndexContainsDocs() throws Exception {
assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs);
}

public void testSetSourceToBlockWrites() throws Exception {
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY;
public void testSetSourceToReadOnly() throws Exception {
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_READ_ONLY, true).build() : Settings.EMPTY;

// empty source index
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

// call reindex
safeGet(client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)));

// Assert that source index is now read-only but not verified read-only
GetSettingsResponse getSettingsResponse = safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(sourceIndex)));
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_READ_ONLY)));
assertFalse(
parseBoolean(getSettingsResponse.getSetting(sourceIndex, MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()))
);
Expand All @@ -309,7 +319,7 @@ public void testSettingsAddedBeforeReindex() {
// start with a static setting
var numShards = randomIntBetween(1, 10);
var staticSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build();
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, staticSettings)));

// update with a dynamic setting
Expand All @@ -334,7 +344,7 @@ public void testSettingsAddedBeforeReindex() {
}

public void testMappingsAddedToDestIndex() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex).mapping(MAPPING)));

// call reindex
Expand All @@ -355,7 +365,7 @@ public void testMappingsAddedToDestIndex() {
}

public void testFailIfMetadataBlockSet() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

Expand All @@ -364,12 +374,10 @@ public void testFailIfMetadataBlockSet() {
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
);
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));

cleanupMetadataBlocks(sourceIndex);
}

public void testFailIfReadBlockSet() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

Expand All @@ -378,12 +386,10 @@ public void testFailIfReadBlockSet() {
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
);
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));

cleanupMetadataBlocks(sourceIndex);
}

public void testReadOnlyBlocksNotAddedBack() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder()
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
.put(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE, randomBoolean())
Expand All @@ -401,7 +407,6 @@ public void testReadOnlyBlocksNotAddedBack() {
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));

cleanupMetadataBlocks(sourceIndex);
cleanupMetadataBlocks(destIndex);
}

Expand All @@ -414,7 +419,7 @@ public void testUpdateSettingsDefaultsRestored() {
indicesAdmin().execute(TransportDeleteIndexTemplateAction.TYPE, new DeleteIndexTemplateRequest("random_index_template"))
);

var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
assertAcked(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

// call reindex
Expand Down Expand Up @@ -453,7 +458,7 @@ public void testSettingsAndMappingsFromTemplate() throws IOException {
request.indexTemplate(template);
safeGet(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request));

var sourceIndex = "logs-" + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
sourceIndex = "logs-" + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

{
Expand Down Expand Up @@ -588,9 +593,8 @@ private static void cleanupMetadataBlocks(String index) {
var settings = Settings.builder()
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
.build();
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)));
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA);
updateIndexSettings(settings, index);
}

private static void indexDocs(String index, int numDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;

public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
ReindexDataStreamIndexAction.Request,
Expand Down Expand Up @@ -156,7 +156,7 @@ protected void doExecute(
return;
}
final boolean wasClosed = isClosed(sourceIndex);
SubscribableListener.<AcknowledgedResponse>newForked(l -> setBlockWrites(sourceIndexName, l, taskId))
SubscribableListener.<AcknowledgedResponse>newForked(l -> setReadOnly(sourceIndexName, l, taskId))
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
Expand Down Expand Up @@ -201,9 +201,9 @@ private static boolean isClosed(IndexMetadata indexMetadata) {
return indexMetadata.getState().equals(IndexMetadata.State.CLOSE);
}

private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting write block on source index [{}]", sourceIndexName);
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
private void setReadOnly(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting read-only on source index [{}]", sourceIndexName);
addBlockToIndex(READ_ONLY, sourceIndexName, new ActionListener<>() {
@Override
public void onResponse(AddIndexBlockResponse response) {
if (response.isAcknowledged()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.support.CountDownActionListener;
Expand All @@ -23,8 +25,10 @@
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -216,7 +220,9 @@ private void maybeProcessNextIndex(
l -> client.execute(ReindexDataStreamIndexAction.INSTANCE, reindexDataStreamIndexRequest, l)
)
.<AcknowledgedResponse>andThen(
(l, result) -> updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), l, parentTaskId)
(l, result) -> removeReadOnlyBlock(index.getName(), parentTaskId, l.delegateFailure((delegate, response) -> {
updateDataStream(sourceDataStream, index.getName(), result.getDestIndex(), delegate, parentTaskId);
}))
)
.<AcknowledgedResponse>andThen(l -> deleteIndex(index.getName(), parentTaskId, l))
.addListener(ActionListener.wrap(unused -> {
Expand Down Expand Up @@ -246,6 +252,15 @@ private void updateDataStream(
client.execute(ModifyDataStreamsAction.INSTANCE, modifyDataStreamRequest, listener);
}

private void removeReadOnlyBlock(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
var updateSettingsRequest = new UpdateSettingsRequest(
Settings.builder().putNull(IndexMetadata.SETTING_READ_ONLY).build(),
indexName
);
updateSettingsRequest.setParentTask(parentTaskId);
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, listener);
}

private void deleteIndex(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
deleteIndexRequest.setParentTask(parentTaskId);
Expand Down