Skip to content

Commit

Permalink
Enable creation of indices using Remote Translog (#5638)
Browse files Browse the repository at this point in the history
* Enable creation of indices using Remote Translog behind a setting and feature flag
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna authored Dec 28, 2022
1 parent ea1cc9d commit d388051
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ public Iterator<Setting<?>> settings() {
public static final String SETTING_REMOTE_STORE_REPOSITORY = "index.remote_store.repository";

public static final String SETTING_REMOTE_TRANSLOG_STORE_ENABLED = "index.remote_store.translog.enabled";

public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository";

/**
* Used to specify if the index data should be persisted in the remote store.
*/
Expand Down Expand Up @@ -405,6 +408,43 @@ public Iterator<Setting<?>> settings() {
Property.Final
);

public static final Setting<String> INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING = Setting.simpleString(
SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY,
new Setting.Validator<>() {

@Override
public void validate(final String value) {}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(
"Setting " + INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey() + " should be provided with non-empty repository ID"
);
} else {
final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) {
throw new IllegalArgumentException(
"Settings "
+ INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.getKey()
+ " can only be set/enabled when "
+ INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING.getKey()
+ " is set to true"
);
}
}
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
FeatureFlags.REMOTE_STORE,
List.of(
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING
),
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ public IndexService newIndexService(
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -547,7 +548,8 @@ public IndexService newIndexService(
allowExpensiveQueries,
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory
recoveryStateFactory,
repositoriesServiceSupplier
);
success = true;
return indexService;
Expand Down
19 changes: 16 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,17 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.mapper.MapperRegistry;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -173,6 +176,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final IndexNameExpressionResolver expressionResolver;
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -204,7 +208,8 @@ public IndexService(
BooleanSupplier allowExpensiveQueries,
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -276,6 +281,7 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -518,6 +524,14 @@ public synchronized IndexShard createShard(
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY);
}

TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() && routing.primary()
? new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
this.indexSettings.getRemoteStoreTranslogRepository()
)
: new InternalTranslogFactory();

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
store = new Store(
shardId,
Expand Down Expand Up @@ -548,8 +562,7 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
translogFactory,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
);
Expand Down
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ public final class IndexSettings {
private final ReplicationType replicationType;
private final boolean isRemoteStoreEnabled;
private final boolean isRemoteTranslogStoreEnabled;
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
Expand Down Expand Up @@ -745,6 +746,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
replicationType = ReplicationType.parseString(settings.get(IndexMetadata.SETTING_REPLICATION_TYPE));
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
Expand Down Expand Up @@ -1011,6 +1013,10 @@ public String getRemoteStoreRepository() {
return remoteStoreRepository;
}

public String getRemoteStoreTranslogRepository() {
return remoteStoreTranslogRepository;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ protected void findAndProcessShardPath(
}

final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);
if (indexSettings.isRemoteTranslogStoreEnabled()) {
// ToDo : Need to revisit corrupt shard recovery strategy for remote store enabled indices
throw new OpenSearchException("tool doesn't work for remote translog enabled indices");
}

final Index index = indexMetadata.getIndex();
final ShardId shId = new ShardId(index, shardId);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog;

import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* Translog Factory for the remotefs translog {@link RemoteFsTranslog}
*
* @opensearch.internal
*/
public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final Repository repository;

private final ExecutorService executorService;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName
) {
Repository repository;
try {
repository = repositoriesServiceSupplier.get().repository(repositoryName);
} catch (RepositoryMissingException ex) {
throw new IllegalArgumentException("Repository should be created before creating index with remote_store enabled setting", ex);
}
this.repository = repository;
this.executorService = threadPool.executor(ThreadPool.Names.TRANSLOG_TRANSFER);
}

@Override
public Translog newTranslog(
TranslogConfig config,
String translogUUID,
TranslogDeletionPolicy deletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
) throws IOException {

assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = ((BlobStoreRepository) repository);
return new RemoteFsTranslog(
config,
translogUUID,
deletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer,
blobStoreRepository,
executorService
);
}
}
13 changes: 10 additions & 3 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -269,6 +270,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final boolean nodeWriteDanglingIndicesInfo;
private final ValuesSourceRegistry valuesSourceRegistry;
private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory;
private final Supplier<RepositoriesService> repositoriesServiceSupplier;

@Override
protected void doStart() {
Expand Down Expand Up @@ -297,7 +299,8 @@ public IndicesService(
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -386,6 +389,7 @@ protected void closeInternal() {
this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
}

public IndicesService(
Expand All @@ -410,7 +414,8 @@ public IndicesService(
Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
ValuesSourceRegistry valuesSourceRegistry,
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -499,6 +504,7 @@ protected void closeInternal() {
this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
this.remoteDirectoryFactory = remoteDirectoryFactory;
this.repositoriesServiceSupplier = repositoriesServiceSupplier;
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
Expand Down Expand Up @@ -861,7 +867,8 @@ private synchronized IndexService createIndexService(
namedWriteableRegistry,
this::isIdFieldDataEnabled,
valuesSourceRegistry,
remoteDirectoryFactory
remoteDirectoryFactory,
repositoriesServiceSupplier
);
}

Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,6 @@ protected Node(
);

final IndicesService indicesService;

if (FeatureFlags.isEnabled(FeatureFlags.EXTENSIONS)) {
indicesService = new IndicesService(
settings,
Expand All @@ -687,7 +686,8 @@ protected Node(
Map.copyOf(directoryFactories),
searchModule.getValuesSourceRegistry(),
recoveryStateFactories,
remoteDirectoryFactory
remoteDirectoryFactory,
repositoriesServiceReference::get
);
} else {
indicesService = new IndicesService(
Expand All @@ -711,7 +711,8 @@ protected Node(
Map.copyOf(directoryFactories),
searchModule.getValuesSourceRegistry(),
recoveryStateFactories,
remoteDirectoryFactory
remoteDirectoryFactory,
repositoriesServiceReference::get
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.SetOnce;
import org.apache.lucene.util.SetOnce.AlreadySetException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -217,6 +218,8 @@ public void tearDown() throws Exception {
}

private IndexService newIndexService(IndexModule module) throws IOException {
final SetOnce<RepositoriesService> repositoriesServiceReference = new SetOnce<>();
repositoriesServiceReference.set(repositoriesService);
return module.newIndexService(
CREATE_INDEX,
nodeEnvironment,
Expand All @@ -234,7 +237,8 @@ private IndexService newIndexService(IndexModule module) throws IOException {
writableRegistry(),
() -> false,
null,
new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService)
new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService),
repositoriesServiceReference::get
);
}

Expand Down
Loading

0 comments on commit d388051

Please sign in to comment.