Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to run SegRep integ tests using remote store settings #7361

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* The aim of this class is to run Segment Replication integ tests by enabling remote store specific settings.
* This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling
* remote store.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
33 changes: 23 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@
import org.opensearch.index.store.Store.MetadataSnapshot;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -2234,6 +2236,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
final Engine newEngine = engineFactory.newReadWriteEngine(config);
onNewEngine(newEngine);
Expand Down Expand Up @@ -2520,10 +2525,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRemoteStore(this, repository, listener);
storeRecovery.recoverFromRemoteStore(this, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3324,14 +3329,7 @@ public void startRecovery(
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case REMOTE_STORE:
final Repository remoteTranslogRepo;
final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
if (remoteTranslogRepoName != null) {
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
} else {
remoteTranslogRepo = null;
}
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(l));
break;
case PEER:
try {
Expand Down Expand Up @@ -4406,6 +4404,9 @@ public void close() throws IOException {
if (indexSettings.isRemoteStoreEnabled()) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
}
newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker)));
onNewEngine(newEngineReference.get());
}
Expand Down Expand Up @@ -4439,6 +4440,18 @@ public void close() throws IOException {
onSettingsChanged();
}

private void syncRemoteTranslogAndUpdateGlobalCheckpoint() throws IOException {
syncTranslogFilesFromRemoteTranslog();
loadGlobalCheckpointToReplicationTracker();
}

public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog());
}

/**
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -118,13 +114,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste
}
}

void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) {
void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType;
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from remote store ...");
recoverFromRemoteStore(indexShard, repository);
recoverFromRemoteStore(indexShard);
return true;
});
} else {
Expand Down Expand Up @@ -441,7 +437,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
});
}

private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException {
private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
throw new IndexShardRecoveryException(
Expand All @@ -462,8 +458,8 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
}
if (repository != null) {
syncTranslogFilesFromRemoteTranslog(indexShard, repository);
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) {
indexShard.syncTranslogFilesFromRemoteTranslog();
} else {
bootstrap(indexShard, store);
}
Expand All @@ -482,19 +478,6 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository
}
}

private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
indexShard.getThreadPool(),
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
}

/**
* Recovers the state of the shard from the store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ public Translog newTranslog(
primaryModeSupplier
);
}

public Repository getRepository() {
return repository;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -116,8 +117,20 @@ public RemoteFsTranslog(
}
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = buildTranslogTransferManager(
blobStoreRepository,
threadPool,
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, location);
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2873,7 +2873,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(null, future);
target.restoreFromRemoteStore(future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
Expand Down