Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for local cluster alias to SearchRequest #36997

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,16 @@ public final void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
String clusterAlias = shardIt.getClusterAlias();
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
String indexName = shardIt.shardId().getIndex().getName();
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings);
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(),
shardIt.getOriginalIndices());
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
onShardFailure(shardIndex, shardTarget, e);

if (totalOps.incrementAndGet() == expectedTotalOps) {
Expand Down Expand Up @@ -257,8 +256,8 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
Runnable r = () -> {
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(
shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {
@Override
public void innerOnResponse(FirstResult result) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public String getNode() {
return node;
}

@Nullable
public String getClusterAlias() {
return clusterAlias;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;

private final String clusterAlias;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to localClusterAlias ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


private SearchType searchType = SearchType.DEFAULT;

private String[] indices = Strings.EMPTY_ARRAY;
Expand Down Expand Up @@ -92,6 +94,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;

public SearchRequest() {
this.clusterAlias = null;
}

/**
Expand All @@ -111,6 +114,7 @@ public SearchRequest(SearchRequest searchRequest) {
this.searchType = searchRequest.searchType;
this.source = searchRequest.source;
this.types = searchRequest.types;
this.clusterAlias = searchRequest.clusterAlias;
}

/**
Expand All @@ -125,13 +129,23 @@ public SearchRequest(String... indices) {
* Constructs a new search request against the provided indices with the given search source.
*/
public SearchRequest(String[] indices, SearchSourceBuilder source) {
this();
if (source == null) {
throw new IllegalArgumentException("source must not be null");
}
indices(indices);
this.source = source;
}

/**
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
*/
SearchRequest(String clusterAlias) {
this.clusterAlias = Objects.requireNonNull(clusterAlias, "cluster alias must not be null");
}

/**
* Constructs a new search request from reading the specified stream.
*
Expand All @@ -158,6 +172,12 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
allowPartialSearchResults = in.readOptionalBoolean();
}
//TODO update version after backport
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
clusterAlias = in.readOptionalString();
} else {
clusterAlias = null;
}
}

@Override
Expand All @@ -181,6 +201,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
out.writeOptionalBoolean(allowPartialSearchResults);
}
//TODO update version after backport
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(clusterAlias);
}
}

@Override
Expand Down Expand Up @@ -209,6 +233,16 @@ public ActionRequestValidationException validate() {
return validationException;
}

/**
* Returns the alias of the cluster that this search request is being executed on. A non-null value indicates that this search request
* is being executed as part of a locally reduced cross-cluster search request. The cluster alias is used to provide the cluster alias
* to prefix returned index names with.
*/
@Nullable
String getClusterAlias() {
return clusterAlias;
}

/**
* Sets the indices the search will be executed on.
*/
Expand Down Expand Up @@ -529,14 +563,15 @@ public boolean equals(Object o) {
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
Objects.equals(clusterAlias, that.clusterAlias);
}

@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults);
allowPartialSearchResults, clusterAlias);
}

@Override
Expand All @@ -554,6 +589,7 @@ public String toString() {
", batchedReduceSize=" + batchedReduceSize +
", preFilterShardSize=" + preFilterShardSize +
", allowPartialSearchResults=" + allowPartialSearchResults +
", clusterAlias=" + clusterAlias +
", source=" + source + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,34 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchShardTarget;

import java.util.List;

/**
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
* of the search request. Useful especially with cross cluster search, as each cluster has its own set of original indices.
* of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as
* the cluster alias.
* @see OriginalIndices
*/
public final class SearchShardIterator extends PlainShardIterator {

private final OriginalIndices originalIndices;
private String clusterAlias;
private final String clusterAlias;
private boolean skip = false;

/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* this the a given <code>shardId</code>.
*
* @param clusterAlias the alias of the cluster where the shard is located
* @param shardId shard id of the group
* @param shards shards to iterate
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
*/
public SearchShardIterator(String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
super(shardId, shards);
this.originalIndices = originalIndices;
this.clusterAlias = clusterAlias;
Expand All @@ -56,10 +62,22 @@ public OriginalIndices getOriginalIndices() {
return originalIndices;
}

/**
* Returns the alias of the cluster where the shard is located.
*/
@Nullable
public String getClusterAlias() {
return clusterAlias;
}

/**
* Creates a new shard target from this iterator, pointing at the node identified by the provided identifier.
* @see SearchShardTarget
*/
SearchShardTarget newSearchShardTarget(String nodeId) {
return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices);
}

/**
* Reset the iterator and mark it as skippable
* @see #skip()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ public static ShardSearchFailure readShardSearchFailure(StreamInput in) throws I

@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
shardTarget = new SearchShardTarget(in);
shardTarget = in.readOptionalWriteable(SearchShardTarget::new);
if (shardTarget != null) {
index = shardTarget.getFullyQualifiedIndexName();
shardId = shardTarget.getShardId().getId();
}
Expand All @@ -110,12 +110,7 @@ public void readFrom(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (shardTarget == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
shardTarget.writeTo(out);
}
out.writeOptionalWriteable(shardTarget);
out.writeString(reason);
RestStatus.writeTo(out, status);
out.writeException(cause);
Expand Down Expand Up @@ -175,7 +170,7 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
SearchShardTarget searchShardTarget = null;
if (nodeId != null) {
searchShardTarget = new SearchShardTarget(nodeId,
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
}
return new ShardSearchFailure(exception, searchShardTarget);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -60,6 +61,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
Expand Down Expand Up @@ -311,7 +313,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
remoteShardIterators);
searchRequest.getClusterAlias(), remoteShardIterators);

failIfOverShardCountLimit(clusterService, shardIterators.size());

Expand All @@ -338,19 +340,34 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
}

final DiscoveryNodes nodes = clusterState.nodes();
BiFunction<String, String, Transport.Connection> connectionLookup = (clusterName, nodeId) -> {
final DiscoveryNode discoveryNode = clusterName == null ? nodes.get(nodeId) : remoteConnections.apply(clusterName, nodeId);
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getClusterAlias(),
nodes::get, remoteConnections, searchTransportService::getConnection);
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}

static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
Function<String, DiscoveryNode> localNodes,
BiFunction<String, String, DiscoveryNode> remoteNodes,
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
return (clusterAlias, nodeId) -> {
final DiscoveryNode discoveryNode;
if (clusterAlias == null || requestClusterAlias != null) {
assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
discoveryNode = localNodes.apply(nodeId);
} else {
discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
}
if (discoveryNode == null) {
throw new IllegalStateException("no node found for id: " + nodeId);
}
return searchTransportService.getConnection(clusterName, discoveryNode);
return nodeToConnection.apply(clusterAlias, discoveryNode);
};
boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
}

private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators) {
SearchSourceBuilder source = searchRequest.source();
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
SearchService.canRewriteToMatchNone(source) &&
Expand All @@ -359,10 +376,11 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh

static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
OriginalIndices localIndices,
@Nullable String localClusterAlias,
List<SearchShardIterator> remoteShardIterators) {
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
return new GroupShardsIterator<>(shards);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhase
out.writeLong(searchPhaseResult.getRequestId());
SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();
if (searchShardTarget.getClusterAlias() != null) {
out.writeString(RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(),
searchShardTarget.getNodeId()));
out.writeString(
RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()));
} else {
out.writeString(searchShardTarget.getNodeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ final class DefaultSearchContext extends SearchContext {
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout,
FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
FetchPhase fetchPhase, Version minNodeVersion) {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand All @@ -179,7 +179,7 @@ final class DefaultSearchContext extends SearchContext {
this.timeout = timeout;
this.minNodeVersion = minNodeVersion;
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
clusterAlias);
shardTarget.getClusterAlias());
queryShardContext.setTypes(request.types());
queryBoost = request.indexBoost();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,7 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException
return context;
}

public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout)
throws IOException {
public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
return createSearchContext(request, timeout, true, "search");
}

Expand All @@ -684,7 +683,7 @@ private DefaultSearchContext createSearchContext(ShardSearchRequest request, Tim

final DefaultSearchContext searchContext = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget,
engineSearcher, clusterService, indexService, indexShard, bigArrays, threadPool.estimatedTimeInMillisCounter(), timeout,
fetchPhase, request.getClusterAlias(), clusterService.state().nodes().getMinNodeVersion());
fetchPhase, clusterService.state().nodes().getMinNodeVersion());
boolean success = false;
try {
// we clone the query shard context here just for rewriting otherwise we
Expand Down
Loading