Skip to content

Commit

Permalink
Core changes ported over from the earlier PR branch. No major changes…
Browse files Browse the repository at this point in the history
… to TransportResolveClusterAction
  • Loading branch information
quux00 committed Jan 9, 2025
1 parent 230acb8 commit b988813
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.admin.indices.resolve.ResolveClusterActionResponse;
import org.elasticsearch.action.admin.indices.resolve.ResolveClusterInfo;
import org.elasticsearch.action.admin.indices.resolve.TransportResolveClusterAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -405,6 +406,52 @@ public void testClusterResolveWithIndices() throws IOException {
}
}

// corresponds to the GET _resolve/cluster endpoint with no index expression specified
public void testClusterResolveWithNoIndexExpression() throws IOException {
Map<String, Object> testClusterInfo = setupThreeClusters(false);
boolean skipUnavailable1 = (Boolean) testClusterInfo.get("remote1.skip_unavailable");
boolean skipUnavailable2 = true;

{
String[] noIndexSpecified = new String[0];
boolean clusterInfoOnly = true;
boolean runningOnQueryingCluster = true;
ResolveClusterActionRequest request = new ResolveClusterActionRequest(
noIndexSpecified,
IndicesOptions.DEFAULT,
clusterInfoOnly,
runningOnQueryingCluster
);

ActionFuture<ResolveClusterActionResponse> future = client(LOCAL_CLUSTER).admin()
.indices()
.execute(TransportResolveClusterAction.TYPE, request);
ResolveClusterActionResponse response = future.actionGet(30, TimeUnit.SECONDS);
assertNotNull(response);

Map<String, ResolveClusterInfo> clusterInfo = response.getResolveClusterInfo();
assertEquals(2, clusterInfo.size());

// only remote clusters should be present (not local)
Set<String> expectedClusterNames = Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
assertThat(clusterInfo.keySet(), equalTo(expectedClusterNames));

ResolveClusterInfo remote1 = clusterInfo.get(REMOTE_CLUSTER_1);
assertThat(remote1.isConnected(), equalTo(true));
assertThat(remote1.getSkipUnavailable(), equalTo(skipUnavailable1));
assertThat(remote1.getMatchingIndices(), equalTo(null)); // should not be set
assertNotNull(remote1.getBuild().version());
assertNull(remote1.getError());

ResolveClusterInfo remote2 = clusterInfo.get(REMOTE_CLUSTER_2);
assertThat(remote2.isConnected(), equalTo(true));
assertThat(remote2.getSkipUnavailable(), equalTo(skipUnavailable2));
assertThat(remote2.getMatchingIndices(), equalTo(null)); // should not be set
assertNotNull(remote2.getBuild().version());
assertNull(remote2.getError());
}
}

public void testClusterResolveWithMatchingAliases() throws IOException {
Map<String, Object> testClusterInfo = setupThreeClusters(true);
String localAlias = (String) testClusterInfo.get("local.alias");
Expand Down Expand Up @@ -615,9 +662,49 @@ public void testClusterResolveDisconnectedAndErrorScenarios() throws Exception {
assertNotNull(local.getBuild().version());
assertNull(local.getError());
}

// cluster1 was stopped/disconnected, so it should return a connected:false response when querying with no index expression,
// corresponding to GET _resolve/cluster endpoint
{
String[] noIndexSpecified = new String[0];
boolean clusterInfoOnly = true;
boolean runningOnQueryingCluster = true;
ResolveClusterActionRequest request = new ResolveClusterActionRequest(
noIndexSpecified,
IndicesOptions.DEFAULT,
clusterInfoOnly,
runningOnQueryingCluster
);

ActionFuture<ResolveClusterActionResponse> future = client(LOCAL_CLUSTER).admin()
.indices()
.execute(TransportResolveClusterAction.TYPE, request);
ResolveClusterActionResponse response = future.actionGet(30, TimeUnit.SECONDS);
assertNotNull(response);

Map<String, ResolveClusterInfo> clusterInfo = response.getResolveClusterInfo();
assertEquals(2, clusterInfo.size());
// local cluster is not present when querying without an index expression
Set<String> expectedClusterNames = Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
assertThat(clusterInfo.keySet(), equalTo(expectedClusterNames));

ResolveClusterInfo remote1 = clusterInfo.get(REMOTE_CLUSTER_1);
assertThat(remote1.isConnected(), equalTo(false));
assertThat(remote1.getSkipUnavailable(), equalTo(skipUnavailable1));
assertNull(remote1.getMatchingIndices());
assertNull(remote1.getBuild());
assertNull(remote1.getError());

ResolveClusterInfo remote2 = clusterInfo.get(REMOTE_CLUSTER_2);
assertThat(remote2.isConnected(), equalTo(true));
assertThat(remote2.getSkipUnavailable(), equalTo(skipUnavailable2));
assertNull(remote2.getMatchingIndices()); // not present when no index expression specified
assertNotNull(remote2.getBuild().version());
assertNull(remote2.getError());
}
}

private Map<String, Object> setupThreeClusters(boolean useAlias) throws IOException {
private Map<String, Object> setupThreeClusters(boolean useAlias) {
String localAlias = randomAlphaOfLengthBetween(5, 25);
String remoteAlias1 = randomAlphaOfLengthBetween(5, 25);
String remoteAlias2 = randomAlphaOfLengthBetween(5, 25);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ static TransportVersion def(int id) {
public static final TransportVersion TRACK_INDEX_FAILED_DUE_TO_VERSION_CONFLICT_METRIC = def(8_820_00_0);
public static final TransportVersion REPLACE_FAILURE_STORE_OPTIONS_WITH_SELECTOR_SYNTAX = def(8_821_00_0);
public static final TransportVersion ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION = def(8_822_00_0);
public static final TransportVersion RESOLVE_CLUSTER_NO_INDEX_EXPRESSION = def(8_823_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -53,15 +52,25 @@ public class ResolveClusterActionRequest extends ActionRequest implements Indice
private boolean localIndicesRequested = false;
private IndicesOptions indicesOptions;

// true if the user did not provide any index expression - they only want cluster level info, not index matching
private boolean clusterInfoOnly;
// Whether this request is being processed on the primary ("local") cluster being queried or on a remote.
// This is needed when clusterInfoOnly=true since we need to know whether to list out all possible remotes
// on a node. (We don't want cross-cluster chaining on remotes that might be configured with their own remotes.)
private boolean isQueryingCluster;

public ResolveClusterActionRequest(String[] names) {
this(names, DEFAULT_INDICES_OPTIONS);
this(names, DEFAULT_INDICES_OPTIONS, false, true);
assert names != null && names.length > 0 : "One or more index expressions must be included with this constructor";
}

@SuppressWarnings("this-escape")
public ResolveClusterActionRequest(String[] names, IndicesOptions indicesOptions) {
public ResolveClusterActionRequest(String[] names, IndicesOptions indicesOptions, boolean clusterInfoOnly, boolean queryingCluster) {
this.names = names;
this.localIndicesRequested = localIndicesPresent(names);
this.indicesOptions = indicesOptions;
this.clusterInfoOnly = clusterInfoOnly;
this.isQueryingCluster = queryingCluster;
}

@SuppressWarnings("this-escape")
Expand All @@ -73,6 +82,13 @@ public ResolveClusterActionRequest(StreamInput in) throws IOException {
this.names = in.readStringArray();
this.indicesOptions = IndicesOptions.readIndicesOptions(in);
this.localIndicesRequested = localIndicesPresent(names);
if (in.getTransportVersion().onOrAfter(TransportVersions.RESOLVE_CLUSTER_NO_INDEX_EXPRESSION)) {
this.clusterInfoOnly = in.readBoolean();
this.isQueryingCluster = in.readBoolean();
} else {
this.clusterInfoOnly = false;
this.isQueryingCluster = false;
}
}

@Override
Expand All @@ -83,9 +99,13 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeStringArray(names);
indicesOptions.writeIndicesOptions(out);
if (out.getTransportVersion().onOrAfter(TransportVersions.RESOLVE_CLUSTER_NO_INDEX_EXPRESSION)) {
out.writeBoolean(clusterInfoOnly);
out.writeBoolean(isQueryingCluster);
}
}

private String createVersionErrorMessage(TransportVersion versionFound) {
static String createVersionErrorMessage(TransportVersion versionFound) {
return Strings.format(
"%s %s but was %s",
TRANSPORT_VERSION_ERROR_MESSAGE_PREFIX,
Expand All @@ -96,11 +116,7 @@ private String createVersionErrorMessage(TransportVersion versionFound) {

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (names == null || names.length == 0) {
validationException = ValidateActions.addValidationError("no index expressions specified", validationException);
}
return validationException;
return null;
}

@Override
Expand All @@ -123,6 +139,14 @@ public String[] indices() {
return names;
}

public boolean clusterInfoOnly() {
return clusterInfoOnly;
}

public boolean queryingCluster() {
return isQueryingCluster;
}

public boolean isLocalIndicesRequested() {
return localIndicesRequested;
}
Expand Down Expand Up @@ -160,7 +184,11 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
return new CancellableTask(id, type, action, "", parentTaskId, headers) {
@Override
public String getDescription() {
return "resolve/cluster for " + Arrays.toString(indices());
if (indices().length == 0) {
return "resolve/cluster";
} else {
return "resolve/cluster for " + Arrays.toString(indices());
}
}
};
}
Expand All @@ -173,4 +201,18 @@ boolean localIndicesPresent(String[] indices) {
}
return false;
}

@Override
public String toString() {
return "ResolveClusterActionRequest{"
+ "indices="
+ Arrays.toString(names)
+ ", localIndicesRequested="
+ localIndicesRequested
+ ", clusterInfoOnly="
+ clusterInfoOnly
+ ", queryingCluster="
+ isQueryingCluster
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class ResolveClusterInfo implements Writeable {

private final boolean connected;
private final Boolean skipUnavailable; // remote clusters don't know their setting, so they put null and querying cluster fills in
private final Boolean matchingIndices; // null means 'unknown' when not connected
private final Boolean matchingIndices; // null means no index expression requested by user or remote cluster was not connected
private final Build build;
private final String error;

Expand All @@ -38,8 +38,14 @@ public ResolveClusterInfo(boolean connected, Boolean skipUnavailable, Boolean ma
this(connected, skipUnavailable, matchingIndices, build, null);
}

public ResolveClusterInfo(ResolveClusterInfo copyFrom, boolean skipUnavailable) {
this(copyFrom.isConnected(), skipUnavailable, copyFrom.getMatchingIndices(), copyFrom.getBuild(), copyFrom.getError());
public ResolveClusterInfo(ResolveClusterInfo copyFrom, boolean skipUnavailable, boolean clusterInfoOnly) {
this(
copyFrom.isConnected(),
skipUnavailable,
clusterInfoOnly ? null : copyFrom.getMatchingIndices(),
copyFrom.getBuild(),
clusterInfoOnly ? null : copyFrom.getError()
);
}

private ResolveClusterInfo(boolean connected, Boolean skipUnavailable, Boolean matchingIndices, Build build, String error) {
Expand All @@ -48,7 +54,6 @@ private ResolveClusterInfo(boolean connected, Boolean skipUnavailable, Boolean m
this.matchingIndices = matchingIndices;
this.build = build;
this.error = error;
assert error != null || matchingIndices != null || connected == false : "If matchingIndices is null, connected must be false";
}

public ResolveClusterInfo(StreamInput in) throws IOException {
Expand All @@ -67,12 +72,7 @@ public ResolveClusterInfo(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().before(TransportVersions.V_8_13_0)) {
throw new UnsupportedOperationException(
"ResolveClusterAction requires at least version "
+ TransportVersions.V_8_13_0.toReleaseVersion()
+ " but was "
+ out.getTransportVersion().toReleaseVersion()
);
throw new UnsupportedOperationException(ResolveClusterActionRequest.createVersionErrorMessage(out.getTransportVersion()));
}
out.writeBoolean(connected);
out.writeOptionalBoolean(skipUnavailable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A
searchCoordinationExecutor,
RemoteClusterService.DisconnectedStrategy.FAIL_IF_DISCONNECTED
);
var remoteRequest = new ResolveClusterActionRequest(originalIndices.indices(), request.indicesOptions());
var remoteRequest = new ResolveClusterActionRequest(
originalIndices.indices(),
request.indicesOptions(),
request.clusterInfoOnly(),
false
);
// allow cancellation requests to propagate to remote clusters
remoteRequest.setParentTask(clusterService.localNode().getId(), task.getId());

Expand All @@ -155,7 +160,7 @@ public void onResponse(ResolveClusterActionResponse response) {
}
ResolveClusterInfo info = response.getResolveClusterInfo().get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (info != null) {
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(info, skipUnavailable));
clusterInfoMap.put(clusterAlias, new ResolveClusterInfo(info, skipUnavailable, request.clusterInfoOnly()));
}
if (resolveClusterTask.isCancelled()) {
releaseResourcesOnCancel(clusterInfoMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,20 @@ public List<Route> routes() {

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String[] indexExpressions = Strings.splitStringByCommaToArray(request.param("name"));
String[] indexExpressions;
boolean clusterInfoOnly;
if (request.hasParam("name")) {
indexExpressions = Strings.splitStringByCommaToArray(request.param("name"));
clusterInfoOnly = false;
} else {
indexExpressions = new String[0];
clusterInfoOnly = true;
}
ResolveClusterActionRequest resolveRequest = new ResolveClusterActionRequest(
indexExpressions,
IndicesOptions.fromRequest(request, ResolveIndexAction.Request.DEFAULT_INDICES_OPTIONS)
IndicesOptions.fromRequest(request, ResolveIndexAction.Request.DEFAULT_INDICES_OPTIONS),
clusterInfoOnly,
true
);
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,24 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
return remoteClusters.get(remoteCluster).isNodeConnected(node);
}

public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices) {
/**
* Group indices by cluster alias mapped to OriginalIndices for that cluster.
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
* @param indices Multiple index expressions as string[].
* @param returnLocalAll whether to support the _all functionality needed by _search
* (See https://github.com/elastic/elasticsearch/pull/33899). If true, and no indices are specified,
* then a Map with one entry for the local cluster with an empty index array is returned.
* If false, an empty map is returned when when no indices are specified.
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
*/
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll) {
final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
final Map<String, List<String>> groupedIndices = groupClusterIndices(getRemoteClusterNames(), indices);
if (groupedIndices.isEmpty()) {
// search on _all in the local cluster if neither local indices nor remote indices were specified
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
if (returnLocalAll) {
// search on _all in the local cluster if neither local indices nor remote indices were specified
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
}
} else {
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
Expand All @@ -199,6 +211,17 @@ public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions,
return originalIndicesMap;
}

/**
* If no indices are specified, then a Map with one entry for the local cluster with an empty index array is returned.
* For details see {@code groupIndices(IndicesOptions indicesOptions, String[] indices, boolean returnLocalAll)}
* @param indicesOptions IndicesOptions to clarify how the index expressions should be parsed/applied
* @param indices Multiple index expressions as string[].
* @return Map keyed by cluster alias having OriginalIndices as the map value parsed from the String[] indices argument
*/
public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices) {
return groupIndices(indicesOptions, indices, true);
}

/**
* Returns <code>true</code> iff the given cluster is configured as a remote cluster. Otherwise <code>false</code>
*/
Expand Down
Loading

0 comments on commit b988813

Please sign in to comment.