Skip to content

Commit

Permalink
Experimental changes no review required
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed May 3, 2024
1 parent ed33488 commit 5d19b85
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.index.remote.RemoteStorePathStrategyResolver.PATH_TYPE_NODE_ATTR_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -215,8 +216,9 @@ public void testRestoreOperationsShallowCopyEnabled() throws Exception {
* on snapshot restore.
*/
public void testRemoteStoreCustomDataOnIndexCreationAndRestore() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
Settings pathTypeSetting = Settings.builder().put(PATH_TYPE_NODE_ATTR_KEY, true).build();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(pathTypeSetting);
internalCluster().startDataOnlyNode(pathTypeSetting);
String indexName1 = "testindex1";
String indexName2 = "testindex2";
String snapshotRepoName = "test-restore-snapshot-repo";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutionException;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.index.remote.RemoteStorePathStrategyResolver.PATH_TYPE_NODE_ATTR_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand All @@ -33,7 +34,11 @@ public class RemoteStoreUploadIndexPathIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(PATH_TYPE_NODE_ATTR_KEY, true)
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public MetadataCreateIndexService(
createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true);
Supplier<Version> minNodeVersionSupplier = () -> clusterService.state().nodes().getMinNodeVersion();
remoteStorePathStrategyResolver = isRemoteDataAttributePresent(settings)
? new RemoteStorePathStrategyResolver(remoteStoreSettings, minNodeVersionSupplier)
? new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService)
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@

package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;

import java.util.function.Supplier;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
* Determines the {@link RemoteStorePathStrategy} at the time of index metadata creation.
Expand All @@ -24,21 +28,42 @@
@ExperimentalApi
public class RemoteStorePathStrategyResolver {

public final static String PATH_TYPE_ATTRIBUTE_KEY = "optimised_remote_store_index_path_enabled";

public final static String PATH_TYPE_NODE_ATTR_KEY = Node.NODE_ATTRIBUTES.getKey() + PATH_TYPE_ATTRIBUTE_KEY;

private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<Version> minNodeVersionSupplier;
private final ClusterService clusterService;

public RemoteStorePathStrategyResolver(RemoteStoreSettings remoteStoreSettings, Supplier<Version> minNodeVersionSupplier) {
this.remoteStoreSettings = remoteStoreSettings;
this.minNodeVersionSupplier = minNodeVersionSupplier;
public RemoteStorePathStrategyResolver(RemoteStoreSettings remoteStoreSettings, ClusterService clusterService) {
this.remoteStoreSettings = Objects.requireNonNull(remoteStoreSettings);
this.clusterService = Objects.requireNonNull(clusterService);
}

public RemoteStorePathStrategy get() {
PathType pathType;
PathHashAlgorithm pathHashAlgorithm;
// Min node version check ensures that we are enabling the new prefix type only when all the nodes understand it.
pathType = Version.V_2_14_0.compareTo(minNodeVersionSupplier.get()) <= 0 ? remoteStoreSettings.getPathType() : PathType.FIXED;
pathType = isPathTypeEnabled() ? remoteStoreSettings.getPathType() : PathType.FIXED;
// If the path type is fixed, hash algorithm is not applicable.
pathHashAlgorithm = pathType == PathType.FIXED ? null : remoteStoreSettings.getPathHashAlgorithm();
return new RemoteStorePathStrategy(pathType, pathHashAlgorithm);
}

private boolean isPathTypeEnabled() {
Map<String, DiscoveryNode> nodesMap = Collections.unmodifiableMap(clusterService.state().nodes().getNodes());

if (nodesMap.isEmpty()) {
return false;
}

for (String node : nodesMap.keySet()) {
DiscoveryNode nodeDiscovery = nodesMap.get(node);
Map<String, String> nodeAttributes = nodeDiscovery.getAttributes();
if (!nodeAttributes.containsKey(PATH_TYPE_ATTRIBUTE_KEY)) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategyResolver;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.DefaultRemoteStoreSettings;
import org.opensearch.indices.IndexCreationException;
Expand Down Expand Up @@ -1736,8 +1737,13 @@ private IndexMetadata testRemoteCustomData(boolean remoteStoreEnabled, PathType
Metadata metadata = Metadata.builder()
.transientSettings(Settings.builder().put(Metadata.DEFAULT_REPLICA_COUNT_SETTING.getKey(), 1).build())
.build();
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
DiscoveryNode discoveryNode = mock(DiscoveryNode.class);
when(discoveryNode.getAttributes()).thenReturn(Map.of(RemoteStorePathStrategyResolver.PATH_TYPE_ATTRIBUTE_KEY, "true"));
when(discoveryNodes.getNodes()).thenReturn(Map.of("node-1", discoveryNode));
ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.nodes(discoveryNodes)
.build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getSettings()).thenReturn(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,52 @@

package org.opensearch.index.remote;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.index.remote.RemoteStorePathStrategyResolver.PATH_TYPE_ATTRIBUTE_KEY;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RemoteStorePathStrategyResolverTests extends OpenSearchTestCase {

private ClusterService clusterService;
private final Map<String, DiscoveryNode> nodesMap = new HashMap<>();

private final AtomicLong nodeCounter = new AtomicLong();

@Before
public void setup() {
clusterService = mock(ClusterService.class);
ClusterState clusterState = mock(ClusterState.class);
when(clusterService.state()).thenReturn(clusterState);
DiscoveryNodes discoveryNodes = mock(DiscoveryNodes.class);
when(clusterState.nodes()).thenReturn(discoveryNodes);
when(discoveryNodes.getNodes()).thenReturn(nodesMap);
}

public void testGetMinVersionOlder() {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values())).build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_13_0);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
addNode(false);
assertEquals(PathType.FIXED, resolver.get().getType());
assertNull(resolver.get().getHashAlgorithm());
}
Expand All @@ -35,7 +63,8 @@ public void testGetMinVersionNewer() {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), pathType).build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
addNode(true);
assertEquals(pathType, resolver.get().getType());
if (pathType.requiresHashAlgorithm()) {
assertNotNull(resolver.get().getHashAlgorithm());
Expand All @@ -49,7 +78,8 @@ public void testGetStrategy() {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.FIXED).build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
addNode(true);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.FIXED, resolver.get().getType());

// FIXED type with hash algorithm
Expand All @@ -59,22 +89,22 @@ public void testGetStrategy() {
.build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.FIXED, resolver.get().getType());

// HASHED_PREFIX type with FNV_1A_COMPOSITE
settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.HASHED_PREFIX, resolver.get().getType());
assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm());

// HASHED_PREFIX type with FNV_1A_COMPOSITE
settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), PathType.HASHED_PREFIX).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.HASHED_PREFIX, resolver.get().getType());
assertEquals(PathHashAlgorithm.FNV_1A_COMPOSITE_1, resolver.get().getHashAlgorithm());

Expand All @@ -85,7 +115,7 @@ public void testGetStrategy() {
.build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.HASHED_PREFIX, resolver.get().getType());
assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm());

Expand All @@ -96,7 +126,7 @@ public void testGetStrategy() {
.build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.HASHED_PREFIX, resolver.get().getType());
assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm());
}
Expand All @@ -107,7 +137,8 @@ public void testGetStrategyWithDynamicUpdate() {
Settings settings = Settings.builder().build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, () -> Version.V_2_14_0);
addNode(true);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.FIXED, resolver.get().getType());
assertNull(resolver.get().getHashAlgorithm());

Expand Down Expand Up @@ -145,4 +176,25 @@ public void testGetStrategyWithDynamicUpdate() {
assertEquals(PathType.HASHED_INFIX, resolver.get().getType());
assertEquals(PathHashAlgorithm.FNV_1A_BASE64, resolver.get().getHashAlgorithm());
}

public void testGetMinVersionOlderWithoutNodes() {
Settings settings = Settings.builder().put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(PathType.values())).build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
RemoteStoreSettings remoteStoreSettings = new RemoteStoreSettings(settings, clusterSettings);
RemoteStorePathStrategyResolver resolver = new RemoteStorePathStrategyResolver(remoteStoreSettings, clusterService);
assertEquals(PathType.FIXED, resolver.get().getType());
assertNull(resolver.get().getHashAlgorithm());
}

private void addNode(boolean hashPathTypeAttr) {
DiscoveryNode discoveryNode = mock(DiscoveryNode.class);
Map<String, String> nodeAttrs;
if (hashPathTypeAttr) {
nodeAttrs = Map.of(PATH_TYPE_ATTRIBUTE_KEY, "true");
} else {
nodeAttrs = Collections.emptyMap();
}
when(discoveryNode.getAttributes()).thenReturn(nodeAttrs);
nodesMap.put("node-" + nodeCounter.getAndIncrement(), discoveryNode);
}
}

0 comments on commit 5d19b85

Please sign in to comment.