Skip to content

Commit

Permalink
Add read-block to source index during data stream reindex (#122887) (#…
Browse files Browse the repository at this point in the history
…123186)

When reindexing a data stream, we currently add a write block to the source indices so that new documents cannot be added to the index while it is being reindexed. A write block still allows the index to be deleted and for the metadata to be updated. It is possible that ILM could delete a backing index or update a backing index's lifecycle metadata while it is being reindexed. To avoid this, this PR sets a read-only block on the source index. This block must be removed before source index can be deleted after it is replaced with the destination index.

(cherry picked from commit 2cc86b3)

# Conflicts:
#	x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDatastreamIndexTransportActionIT.java
#	x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java
  • Loading branch information
parkertimmins authored Feb 21, 2025
1 parent 73e9a8f commit 8f4478e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -18,7 +17,6 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.delete.TransportDeleteIndexTemplateAction;
Expand All @@ -36,11 +34,9 @@
import org.elasticsearch.action.ingest.PutPipelineTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -97,7 +93,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

Expand Down Expand Up @@ -304,8 +299,7 @@ public void testDestIndexNameSet_noDotPrefix() throws Exception {
assertEquals(expectedDestIndexName, response.getDestIndex());
}

public void testDestIndexNameSet_withDotPrefix() throws Exception {

public void testDestIndexNameSet_withDotPrefix() {
var sourceIndex = "." + randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));

Expand All @@ -318,13 +312,19 @@ public void testDestIndexNameSet_withDotPrefix() throws Exception {
assertEquals(expectedDestIndexName, response.getDestIndex());
}

public void testDestIndexContainsDocs() throws Exception {
public void testDestIndexContainsDocs() {
// source index with docs
var numDocs = randomIntBetween(1, 100);
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex)));
indexDocs(sourceIndex, numDocs);

var settings = Settings.builder()
.put(IndexMetadata.SETTING_BLOCKS_METADATA, randomBoolean())
.put(IndexMetadata.SETTING_READ_ONLY, randomBoolean())
.build();
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, sourceIndex)));

// call reindex
var response = safeGet(
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
Expand All @@ -335,29 +335,6 @@ public void testDestIndexContainsDocs() throws Exception {
assertHitCount(prepareSearch(response.getDestIndex()).setSize(0), numDocs);
}

public void testSetSourceToBlockWrites() throws Exception {
var settings = randomBoolean() ? Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true).build() : Settings.EMPTY;

// empty source index
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

// call reindex
safeGet(client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex)));

// Assert that source index is now read-only but not verified read-only
GetSettingsResponse getSettingsResponse = safeGet(admin().indices().getSettings(new GetSettingsRequest().indices(sourceIndex)));
assertTrue(parseBoolean(getSettingsResponse.getSetting(sourceIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));
assertFalse(
parseBoolean(getSettingsResponse.getSetting(sourceIndex, MetadataIndexStateService.VERIFIED_READ_ONLY_SETTING.getKey()))
);

// assert that write to source fails
var indexReq = new IndexRequest(sourceIndex).source(jsonBuilder().startObject().field("field", "1").endObject());
expectThrows(ClusterBlockException.class, client().index(indexReq));
assertHitCount(prepareSearch(sourceIndex).setSize(0), 0);
}

public void testMissingSourceIndex() {
var nonExistentSourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
expectThrows(
Expand Down Expand Up @@ -413,34 +390,6 @@ public void testMappingsAddedToDestIndex() {
assertEquals("text", XContentMapValues.extractValue("properties.foo1.type", destMappings));
}

public void testFailIfMetadataBlockSet() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_METADATA, true).build();
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

ElasticsearchException e = expectThrows(
ElasticsearchException.class,
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
);
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));

cleanupMetadataBlocks(sourceIndex);
}

public void testFailIfReadBlockSet() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder().put(IndexMetadata.SETTING_BLOCKS_READ, true).build();
safeGet(indicesAdmin().create(new CreateIndexRequest(sourceIndex, settings)));

ElasticsearchException e = expectThrows(
ElasticsearchException.class,
client().execute(ReindexDataStreamIndexAction.INSTANCE, new ReindexDataStreamIndexAction.Request(sourceIndex))
);
assertTrue(e.getMessage().contains("Cannot reindex index") || e.getCause().getMessage().equals("Cannot reindex index"));

cleanupMetadataBlocks(sourceIndex);
}

public void testReadOnlyBlocksNotAddedBack() {
var sourceIndex = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
var settings = Settings.builder()
Expand All @@ -460,7 +409,6 @@ public void testReadOnlyBlocksNotAddedBack() {
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)));
assertFalse(parseBoolean(settingsResponse.getSetting(destIndex, IndexMetadata.SETTING_BLOCKS_WRITE)));

cleanupMetadataBlocks(sourceIndex);
cleanupMetadataBlocks(destIndex);
}

Expand Down Expand Up @@ -807,9 +755,8 @@ private static void cleanupMetadataBlocks(String index) {
var settings = Settings.builder()
.putNull(IndexMetadata.SETTING_READ_ONLY)
.putNull(IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE)
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA)
.build();
safeGet(indicesAdmin().updateSettings(new UpdateSettingsRequest(settings, index)));
.putNull(IndexMetadata.SETTING_BLOCKS_METADATA);
updateIndexSettings(settings, index);
}

private static void indexDocs(String index, int numDocs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -59,11 +60,13 @@
import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;

import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;

public class ReindexDataStreamIndexTransportAction extends HandledTransportAction<
ReindexDataStreamIndexAction.Request,
Expand Down Expand Up @@ -149,20 +152,12 @@ protected void doExecute(
);
}

if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_READ, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a read block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}
if (settingsBefore.getAsBoolean(IndexMetadata.SETTING_BLOCKS_METADATA, false)) {
var errorMessage = String.format(Locale.ROOT, "Cannot reindex index [%s] which has a metadata block.", destIndexName);
listener.onFailure(new ElasticsearchException(errorMessage));
return;
}
final boolean wasClosed = isClosed(sourceIndex);
SubscribableListener.<FreezeResponse>newForked(l -> unfreezeIfFrozen(sourceIndexName, sourceIndex, l, taskId))
.<AcknowledgedResponse>andThen(l -> setBlockWrites(sourceIndexName, l, taskId))

SubscribableListener.<AcknowledgedResponse>newForked(l -> removeMetadataBlocks(sourceIndexName, taskId, l))
.<FreezeResponse>andThen(l -> unfreezeIfFrozen(sourceIndexName, sourceIndex, l, taskId))
.<OpenIndexResponse>andThen(l -> openIndexIfClosed(sourceIndexName, wasClosed, l, taskId))
.<AcknowledgedResponse>andThen(l -> setReadOnly(sourceIndexName, l, taskId))
.<BroadcastResponse>andThen(l -> refresh(sourceIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> deleteDestIfExists(destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> createIndex(sourceIndex, destIndexName, l, taskId))
Expand All @@ -171,6 +166,7 @@ protected void doExecute(
.<AcknowledgedResponse>andThen(l -> copyIndexMetadataToDest(sourceIndexName, destIndexName, l, taskId))
.<AcknowledgedResponse>andThen(l -> sanityCheck(sourceIndexName, destIndexName, l, taskId))
.<CloseIndexResponse>andThen(l -> closeIndexIfWasClosed(destIndexName, wasClosed, l, taskId))
.<AcknowledgedResponse>andThen(l -> removeAPIBlocks(sourceIndexName, taskId, l, READ_ONLY))
.andThenApply(ignored -> new ReindexDataStreamIndexAction.Response(destIndexName))
.addListener(listener);
}
Expand Down Expand Up @@ -222,9 +218,9 @@ private void unfreezeIfFrozen(
}
}

private void setBlockWrites(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting write block on source index [{}]", sourceIndexName);
addBlockToIndex(WRITE, sourceIndexName, new ActionListener<>() {
private void setReadOnly(String sourceIndexName, ActionListener<AcknowledgedResponse> listener, TaskId parentTaskId) {
logger.debug("Setting read-only on source index [{}]", sourceIndexName);
addBlockToIndex(READ_ONLY, sourceIndexName, new ActionListener<>() {
@Override
public void onResponse(AddIndexBlockResponse response) {
if (response.isAcknowledged()) {
Expand Down Expand Up @@ -420,6 +416,29 @@ private void addBlockToIndex(
client.admin().indices().execute(TransportAddIndexBlockAction.TYPE, addIndexBlockRequest, listener);
}

/**
* All metadata blocks need to be removed at the start for the following reasons:
* 1) If the source index has a metadata only block, the read-only block can't be added.
* 2) If the source index is read-only and closed, it can't be opened.
*/
private void removeMetadataBlocks(String indexName, TaskId parentTaskId, ActionListener<AcknowledgedResponse> listener) {
logger.debug("Removing metadata blocks from index [{}]", indexName);
removeAPIBlocks(indexName, parentTaskId, listener, METADATA, READ_ONLY);
}

private void removeAPIBlocks(
String indexName,
TaskId parentTaskId,
ActionListener<AcknowledgedResponse> listener,
IndexMetadata.APIBlock... blocks
) {
Settings.Builder settings = Settings.builder();
Arrays.stream(blocks).forEach(b -> settings.putNull(b.settingName()));
var updateSettingsRequest = new UpdateSettingsRequest(settings.build(), indexName);
updateSettingsRequest.setParentTask(parentTaskId);
client.execute(TransportUpdateSettingsAction.TYPE, updateSettingsRequest, listener);
}

private void getIndexDocCount(String index, TaskId parentTaskId, ActionListener<Long> listener) {
SearchRequest countRequest = new SearchRequest(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).trackTotalHits(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,9 @@ private void upgradeDataStream(String dataStreamName, int numRolloversOnOldClust
if (randomBoolean()) {
closeIndex(oldIndexName);
}
if (randomBoolean()) {
assertOK(client().performRequest(new Request("PUT", oldIndexName + "/_block/read_only")));
}
}
Request reindexRequest = new Request("POST", "/_migration/reindex");
reindexRequest.setJsonEntity(Strings.format("""
Expand Down

0 comments on commit 8f4478e

Please sign in to comment.