diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index dcc0ea33e6131..d32b933b558f0 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -297,6 +297,9 @@ public Iterator> settings() { public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository"; public static final String SETTING_REMOTE_TRANSLOG_STORE_ENABLED = "index.remote_store.translog.enabled"; + + public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository"; + /** * Used to specify if the index data should be persisted in the remote store. */ @@ -405,6 +408,43 @@ public Iterator> settings() { Property.Final ); + public static final Setting INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING = Setting.simpleString( + SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, + new Setting.Validator<>() { + + @Override + public void validate(final String value) {} + + @Override + public void validate(final String value, final Map, Object> settings) { + if (value == null || value.isEmpty()) { + throw new IllegalArgumentException( + "Setting " + INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey() + " should be provided with non-empty repository ID" + ); + } else { + final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) { + throw new IllegalArgumentException( + "Settings " + + INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey() + + " can only be set/enabled when " + + INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING.getKey() + + " is set to true" + ); + } + } + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + return settings.iterator(); + } + }, + Property.IndexScope, + Property.Final + ); + public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 079fc38415328..1efce2eba8867 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -225,8 +225,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { FeatureFlags.REMOTE_STORE, List.of( IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, + IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING, - IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING + IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING ), FeatureFlags.SEARCHABLE_SNAPSHOT, List.of( diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 9f7e3e9fb5eee..69543577f48b4 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -492,7 +492,8 @@ public IndexService newIndexService( NamedWriteableRegistry namedWriteableRegistry, BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + Supplier repositoriesServiceSupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -547,7 +548,8 @@ public IndexService newIndexService( allowExpensiveQueries, expressionResolver, valuesSourceRegistry, - recoveryStateFactory + recoveryStateFactory, + repositoriesServiceSupplier ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 92f957633db84..36237b56987e6 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -89,7 +89,9 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.Store; import org.opensearch.index.translog.InternalTranslogFactory; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -97,6 +99,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -173,6 +176,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final IndexNameExpressionResolver expressionResolver; private final Supplier indexSortSupplier; private final ValuesSourceRegistry valuesSourceRegistry; + private final Supplier repositoriesServiceSupplier; public IndexService( IndexSettings indexSettings, @@ -204,7 +208,8 @@ public IndexService( BooleanSupplier allowExpensiveQueries, IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, - IndexStorePlugin.RecoveryStateFactory recoveryStateFactory + IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, + Supplier repositoriesServiceSupplier ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -276,6 +281,7 @@ public IndexService( this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); + this.repositoriesServiceSupplier = repositoriesServiceSupplier; updateFsyncTaskIfNecessary(); } @@ -518,6 +524,14 @@ public synchronized IndexShard createShard( remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); } + TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() && routing.primary() + ? new RemoteBlobStoreInternalTranslogFactory( + repositoriesServiceSupplier, + threadPool, + this.indexSettings.getRemoteStoreTranslogRepository() + ) + : new InternalTranslogFactory(); + Directory directory = directoryFactory.newDirectory(this.indexSettings, path); store = new Store( shardId, @@ -548,8 +562,7 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - // TODO Replace with remote translog factory in the follow up PR - this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(), + translogFactory, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore ); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 7648f0a192ce7..be7e63a5c9068 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -583,6 +583,7 @@ public final class IndexSettings { private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; private final boolean isRemoteTranslogStoreEnabled; + private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; // volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock private volatile Settings settings; @@ -745,6 +746,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE)); isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false); + remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); @@ -1011,6 +1013,10 @@ public String getRemoteStoreRepository() { return remoteStoreRepository; } + public String getRemoteStoreTranslogRepository() { + return remoteStoreTranslogRepository; + } + /** * Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the * index settings and the node settings where node settings are overwritten by index settings. diff --git a/server/src/main/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommand.java index c7e380f842fa0..ca679d457c0dc 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommand.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -191,6 +191,11 @@ protected void findAndProcessShardPath( } final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); + if (indexSettings.isRemoteTranslogStoreEnabled()) { + // ToDo : Need to revisit corrupt shard recovery strategy for remote store enabled indices + throw new OpenSearchException("tool doesn't work for remote translog enabled indices"); + } + final Index index = indexMetadata.getIndex(); final ShardId shId = new ShardId(index, shardId); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java new file mode 100644 index 0000000000000..0d9e01aef4891 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.RepositoryMissingException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.function.LongConsumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +/** + * Translog Factory for the remotefs translog {@link RemoteFsTranslog} + * + * @opensearch.internal + */ +public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory { + + private final Repository repository; + + private final ExecutorService executorService; + + public RemoteBlobStoreInternalTranslogFactory( + Supplier repositoriesServiceSupplier, + ThreadPool threadPool, + String repositoryName + ) { + Repository repository; + try { + repository = repositoriesServiceSupplier.get().repository(repositoryName); + } catch (RepositoryMissingException ex) { + throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex); + } + this.repository = repository; + this.executorService = threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER); + } + + @Override + public Translog newTranslog( + TranslogConfig config, + String translogUUID, + TranslogDeletionPolicy deletionPolicy, + LongSupplier globalCheckpointSupplier, + LongSupplier primaryTermSupplier, + LongConsumer persistedSequenceNumberConsumer + ) throws IOException { + + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository); + return new RemoteFsTranslog( + config, + translogUUID, + deletionPolicy, + globalCheckpointSupplier, + primaryTermSupplier, + persistedSequenceNumberConsumer, + blobStoreRepository, + executorService + ); + } +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 204bf4204511e..2946411fc9238 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -180,6 +180,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -269,6 +270,7 @@ public class IndicesService extends AbstractLifecycleComponent private final boolean nodeWriteDanglingIndicesInfo; private final ValuesSourceRegistry valuesSourceRegistry; private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; + private final Supplier repositoriesServiceSupplier; @Override protected void doStart() { @@ -297,7 +299,8 @@ public IndicesService( Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + Supplier repositoriesServiceSupplier ) { this.settings = settings; this.threadPool = threadPool; @@ -386,6 +389,7 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; + this.repositoriesServiceSupplier = repositoriesServiceSupplier; } public IndicesService( @@ -410,7 +414,8 @@ public IndicesService( Map directoryFactories, ValuesSourceRegistry valuesSourceRegistry, Map recoveryStateFactories, - IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory + IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, + Supplier repositoriesServiceSupplier ) { this.settings = settings; this.threadPool = threadPool; @@ -499,6 +504,7 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; + this.repositoriesServiceSupplier = repositoriesServiceSupplier; } private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; @@ -861,7 +867,8 @@ private synchronized IndexService createIndexService( namedWriteableRegistry, this::isIdFieldDataEnabled, valuesSourceRegistry, - remoteDirectoryFactory + remoteDirectoryFactory, + repositoriesServiceSupplier ); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 46270230ccf27..ed3256d499520 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -663,7 +663,6 @@ protected Node( ); final IndicesService indicesService; - if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { indicesService = new IndicesService( settings, @@ -687,7 +686,8 @@ protected Node( Map.copyOf(directoryFactories), searchModule.getValuesSourceRegistry(), recoveryStateFactories, - remoteDirectoryFactory + remoteDirectoryFactory, + repositoriesServiceReference::get ); } else { indicesService = new IndicesService( @@ -711,7 +711,8 @@ protected Node( Map.copyOf(directoryFactories), searchModule.getValuesSourceRegistry(), recoveryStateFactories, - remoteDirectoryFactory + remoteDirectoryFactory, + repositoriesServiceReference::get ); } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 6bfdd9ae16773..429c2126d9a00 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -44,6 +44,7 @@ import org.apache.lucene.search.similarities.BM25Similarity; import org.apache.lucene.search.similarities.Similarity; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce.AlreadySetException; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; @@ -217,6 +218,8 @@ public void tearDown() throws Exception { } private IndexService newIndexService(IndexModule module) throws IOException { + final SetOnce repositoriesServiceReference = new SetOnce<>(); + repositoriesServiceReference.set(repositoriesService); return module.newIndexService( CREATE_INDEX, nodeEnvironment, @@ -234,7 +237,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { writableRegistry(), () -> false, null, - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + repositoriesServiceReference::get ); } diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 9ea262ab2263d..34087c7fa8df9 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -943,4 +943,56 @@ public void testSetRemoteRepositoryFailsWhenEmptyString() { ); assertEquals("Setting index.remote_store.repository should be provided with non-empty repository ID", iae.getMessage()); } + + public void testRemoteTranslogRepoDefaultSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertNull(settings.getRemoteStoreRepository()); + } + + public void testRemoteTranslogExplicitSetting() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "tlog-store") + .build() + ); + IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); + assertNull(settings.getRemoteStoreRepository()); + assertEquals("tlog-store", settings.getRemoteStoreTranslogRepository()); + } + + public void testSetRemoteTranslogRepositoryFailsWhenRemoteTranslogIsNotEnabled() { + Settings indexSettings = Settings.builder() + .put("index.replication.type", ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "repo1") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get(indexSettings) + ); + assertEquals( + "Settings index.remote_store.translog.repository can only be set/enabled when index.remote_store.translog.enabled is set to true", + iae.getMessage() + ); + } + + public void testSetRemoteTranslogRepositoryFailsWhenEmptyString() { + Settings indexSettings = Settings.builder() + .put("index.replication.type", ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.get(indexSettings) + ); + assertEquals("Setting index.remote_store.translog.repository should be provided with non-empty repository ID", iae.getMessage()); + } } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 663c325db12c2..5732fc5bfa270 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1797,6 +1797,8 @@ public void onFailure(final Exception e) { ); final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test"); final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); + final SetOnce repositoriesServiceReference = new SetOnce<>(); + repositoriesServiceReference.set(repositoriesService); if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) { indicesService = new IndicesService( settings, @@ -1831,7 +1833,8 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + repositoriesServiceReference::get ); } else { indicesService = new IndicesService( @@ -1866,10 +1869,10 @@ public void onFailure(final Exception e) { emptyMap(), null, emptyMap(), - new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService) + new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), + repositoriesServiceReference::get ); } - final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( settings,