diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index 7cacfdc972736..c32d3520e83cf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -18,14 +18,17 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.opensearch.transport.client.Client; import org.opensearch.transport.client.Requests; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -195,4 +198,87 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) .get(); } + + public void testMixedModeRelocation_FailInFinalize() throws Exception { + String docRepNode = internalCluster().startNode(); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + // create shard with 0 replica and 1 shard + client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); + ensureGreen("test"); + + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); + + refresh("test"); + + // add remote node in mixed mode cluster + setAddRemote(true); + String remoteNode = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + AtomicBoolean failFinalize = new AtomicBoolean(true); + + MockTransportService remoteNodeTransportService = (MockTransportService) internalCluster().getInstance( + TransportService.class, + remoteNode + ); + + remoteNodeTransportService.addRequestHandlingBehavior( + PeerRecoveryTargetService.Actions.FINALIZE, + (handler, request, channel, task) -> { + if (failFinalize.get()) { + throw new IOException("Failing finalize"); + } else { + handler.messageReceived(request, channel, task); + } + } + ); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), "40s")) + .get(); + + // Change direction to remote store + updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + logger.info("--> relocating from {} to {} ", docRepNode, remoteNode); + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setTimeout(TimeValue.timeValueSeconds(5)) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .execute() + .actionGet(); + + assertTrue(clusterHealthResponse.getRelocatingShards() == 1); + + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest() + .waitForNoRelocatingShards(true) + .waitForNoInitializingShards(true); + ClusterHealthResponse actionGet = client().admin().cluster().health(healthRequest).actionGet(); + assertEquals(actionGet.getRelocatingShards(), 0); + assertEquals(docRepNode, primaryNodeName("test")); + + // now unblock it + logger.info("Unblocking the finalize recovery now"); + failFinalize.set(false); + + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, docRepNode, remoteNode)).execute().actionGet(); + waitForRelocation(); + + asyncIndexingService.stopIndexing(); + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) + .get(); + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java index b6644ffd16bab..931841ae3de6e 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/fs/FsBlobContainer.java @@ -225,6 +225,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream } private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { + Files.createDirectories(path); try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { final int bufferSize = blobStore.bufferSizeInBytes(); org.opensearch.common.util.io.Streams.copy( diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f26e53967b873..df841dac4cf8e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -5056,7 +5056,7 @@ public void deleteTranslogFilesFromRemoteTranslog() throws IOException { */ public void deleteRemoteStoreContents() throws IOException { deleteTranslogFilesFromRemoteTranslog(); - getRemoteDirectory().deleteStaleSegments(0); + getRemoteDirectory().delete(); } public void syncTranslogFilesFromRemoteTranslog() throws IOException { diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 941cf047347f7..46a90da2a18b6 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -1061,7 +1061,7 @@ private boolean deleteIfEmpty() throws IOException { return delete(); } - private boolean delete() { + public boolean delete() { try { remoteDataDirectory.delete(); remoteMetadataDirectory.delete();