Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cumulative 6.7 backport #40190

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions docs/reference/modules/cross-cluster-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
== {ccs-cap}

The _{ccs}_ feature allows any node to act as a federated client across
multiple clusters. In contrast to the <<modules-tribe,tribe node>> feature, a {ccs} node won't
join the remote cluster, instead it connects to a remote cluster in a light fashion in order to execute
federated search requests.
multiple clusters. In contrast to the <<modules-tribe,tribe node>> feature,
a {ccs} node won't join the remote cluster, instead it connects to a remote
cluster in a light fashion in order to execute federated search requests.
For details on communication and compatibility between different clusters,
see <<modules-remote-clusters>>.

[float]
=== Using {ccs}
Expand Down Expand Up @@ -43,7 +45,8 @@ PUT _cluster/settings
// TEST[s/127.0.0.1:9300/\${transport_host}/]

To search the `twitter` index on remote cluster `cluster_one` the index name
must be prefixed with the cluster alias separated by a `:` character:
must be prefixed with the alias of the remote cluster followed by the `:`
character:

[source,js]
--------------------------------------------------
Expand Down Expand Up @@ -101,8 +104,8 @@ GET /cluster_one:twitter/_search
// TESTRESPONSE[s/"_score": 1/"_score": "$body.hits.hits.0._score"/]


In contrast to the `tribe` feature cross cluster search can also search indices with the same name on different
clusters:
In contrast to the `tribe` feature cross cluster search can also search indices
with the same name on different clusters:

[source,js]
--------------------------------------------------
Expand All @@ -118,10 +121,10 @@ GET /cluster_one:twitter,twitter/_search
// CONSOLE
// TEST[continued]

Search results are disambiguated the same way as the indices are disambiguated in the request. Even if index names are
identical these indices will be treated as different indices when results are merged. All results retrieved from a
remote index
will be prefixed with their remote cluster name:
Search results are disambiguated the same way as the indices are disambiguated in the request.
Indices with same names are treated as different indices when results are merged. All results
retrieved from an index located in a remote cluster are prefixed with their corresponding
cluster alias:

[source,js]
--------------------------------------------------
Expand Down Expand Up @@ -179,10 +182,11 @@ will be prefixed with their remote cluster name:
[float]
=== Skipping disconnected clusters

By default all remote clusters that are searched via {ccs} need to be available when
the search request is executed, otherwise the whole request fails and no search results are returned
despite some of the clusters are available. Remote clusters can be made optional through the
boolean `skip_unavailable` setting, set to `false` by default.
By default, all remote clusters that are searched via {ccs} need to be
available when the search request is executed. Otherwise, the whole request
fails; even if some of the clusters are available, no search results are
returned. You can use the boolean `skip_unavailable` setting to make remote
clusters optional. By default, it is set to `false`.

[source,js]
--------------------------------
Expand Down
54 changes: 43 additions & 11 deletions docs/reference/modules/remote-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,55 @@ connections to a remote cluster. This functionality is used in
endif::[]

Remote cluster connections work by configuring a remote cluster and connecting
only to a limited number of nodes in the remote cluster. Each remote cluster is
referenced by a name and a list of seed nodes. When a remote cluster is
registered, its cluster state is retrieved from one of the seed nodes so that by
default up to three _gateway nodes_ are selected to be connected to as part of
remote cluster requests. Remote cluster connections consist of uni-directional
connections from the coordinating node to the previously selected remote nodes
only. You can tag which nodes should be selected by using node attributes (see <<remote-cluster-settings>>).
only to a limited number of nodes in that remote cluster. Each remote cluster
is referenced by a name and a list of seed nodes. When a remote cluster is
registered, its cluster state is retrieved from one of the seed nodes and up
to three _gateway nodes_ are selected to be connected to as part of remote
cluster requests. All the communication required between different clusters
goes through the <<modules-transport,transport layer>>. Remote cluster
connections consist of uni-directional connections from the coordinating
node to the selected remote _gateway nodes_ only.

Each node in a cluster that has remote clusters configured connects to one or
more _gateway nodes_ and uses them to federate requests to the remote cluster.
[float]
[[gateway-nodes-selection]]
=== Gateway nodes selection

The _gateway nodes_ selection depends on the following criteria:

- *version*: Remote nodes must be compatible with the cluster they are
registered to. This is subject to the same rules as <<rolling-upgrades>>.
Any node can communicate with any other node on the same major version (e.g.
6.0 can talk to any 6.x node). Only nodes on the last minor version of a
certain major version can communicate with nodes on the following major
version (e.g. 6.7 can communicate with 7.0, as well as any 7.x node, while
6.6 or earlier cannot talk to any 7.x node). Note that version compatibility
is symmetric, meaning that if 6.7 can communicate with 7.0, 7.0 can also
communicate with 6.7. The matrix below summarizes compatibility as described
above.

[cols="^,^,^,^,^,^"]
|====
| Compatibility | 5.0->5.5 | 5.6 | 6.0->6.6 | 6.7 | 7.x
| 5.0->5.5 | Yes | Yes | No | No | No
| 5.6 | Yes | Yes | Yes | Yes | No
| 6.0->6.6 | No | Yes | Yes | Yes | No
| 6.7 | No | Yes | Yes | Yes | Yes
| 7.x | No | No | No | Yes | Yes
|====

- *role*: Dedicated master nodes never get selected.
- *attributes*: You can tag which nodes should be selected
(see <<remote-cluster-settings>>), though such tagged nodes still have
to satisfy the two above requirements.

[float]
[[configuring-remote-clusters]]
=== Configuring remote clusters

You can configure remote clusters globally by using
<<cluster-update-settings,cluster settings>>, which you can update dynamically.
Alternatively, you can configure them locally on individual nodes by using the `elasticsearch.yml` file.
Alternatively, you can configure them locally on individual nodes by using the
`elasticsearch.yml` file.

If you specify the settings in `elasticsearch.yml` files, only the nodes with
those settings can connect to the remote cluster. In other words, functionality
Expand Down Expand Up @@ -59,7 +90,8 @@ between local and remote indices.
For more information about the optional transport settings, see
<<modules-transport>>.

If you use <<cluster-update-settings,cluster settings>>, the remote clusters are available on every node in the cluster. For example:
If you use <<cluster-update-settings,cluster settings>>, the remote clusters
are available on every node in the cluster. For example:

[source,js]
--------------------------------
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/modules/snapshots.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ back up a cluster is by using the snapshot and restore functionality.
[float]
=== Version compatibility

IMPORTANT: Version compatibility refers to the underlying Lucene index
compatibility. Follow the <<setup-upgrade,Upgrade documentation>>
when migrating between versions.

A snapshot contains a copy of the on-disk data structures that make up an
index. This means that snapshots can only be restored to versions of
Elasticsearch that can read the indices:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
Expand All @@ -62,8 +61,6 @@
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public final class SearchPhaseController {

Expand Down Expand Up @@ -496,41 +493,15 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
}
final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
return new ReducedQueryPhase(topDocsStats.totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

/**
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
*/
private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
ReduceContext reduceContext = reduceContextFunction.apply(false);
return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
null, reduceContext);
}

private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
List<InternalAggregation> newAggs = StreamSupport.stream(aggregations.spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext);
newAggs.add(newAgg);
}
return new InternalAggregations(newAggs);
}
return aggregations;
}

public static final class ReducedQueryPhase {
// the sum of all hits across all reduces shards
final long totalHits;
Expand Down Expand Up @@ -653,7 +624,8 @@ public void consumeResult(SearchPhaseResult result) {
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (index == bufferSize) {
if (hasAggs) {
InternalAggregations reducedAggs = controller.reduceAggsIncrementally(Arrays.asList(aggsBuffer));
ReduceContext reduceContext = controller.reduceContextFunction.apply(false);
InternalAggregations reducedAggs = InternalAggregations.reduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ public AggParseContext(String name) {
}
}

public static final AggregatorFactories EMPTY = new AggregatorFactories(null, new AggregatorFactory<?>[0],
new ArrayList<PipelineAggregationBuilder>());
public static final AggregatorFactories EMPTY = new AggregatorFactories(null, new AggregatorFactory<?>[0], new ArrayList<>());

private AggregatorFactory<?> parent;
private AggregatorFactory<?>[] factories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsum
/**
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicates if operations like
* pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account.
* Operations that are potentially loosing information can only be applied during the final reduce phase.
* Operations that are potentially losing information can only be applied during the final reduce phase.
*/
public boolean isFinalReduce() {
return isFinalReduce;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static java.util.Collections.emptyMap;

Expand All @@ -48,23 +53,56 @@ public final class InternalAggregations extends Aggregations implements Streamab
}
};

private List<SiblingPipelineAggregator> topLevelPipelineAggregators = Collections.emptyList();

private InternalAggregations() {
}

/**
* Constructs a new addAggregation.
* Constructs a new aggregation.
*/
public InternalAggregations(List<InternalAggregation> aggregations) {
super(aggregations);
}

/**
* Reduces the given lists of addAggregation.
*
* @param aggregationsList A list of aggregation to reduce
* @return The reduced addAggregation
* Constructs a new aggregation providing its {@link InternalAggregation}s and {@link SiblingPipelineAggregator}s
*/
public InternalAggregations(List<InternalAggregation> aggregations, List<SiblingPipelineAggregator> topLevelPipelineAggregators) {
super(aggregations);
this.topLevelPipelineAggregators = Objects.requireNonNull(topLevelPipelineAggregators);
}

/**
* Returns the top-level pipeline aggregators.
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
* become part of the list of {@link InternalAggregation}s.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
return topLevelPipelineAggregators;
}

/**
* Reduces the given list of aggregations as well as the top-level pipeline aggregators extracted from the first
* {@link InternalAggregations} object found in the list.
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
ReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}
InternalAggregations first = aggregationsList.get(0);
return reduce(aggregationsList, first.topLevelPipelineAggregators, context);
}

/**
* Reduces the given list of aggregations as well as the provided top-level pipeline aggregators.
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> topLevelPipelineAggregators,
ReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}
Expand All @@ -89,7 +127,15 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
InternalAggregation first = aggregations.get(0); // the list can't be empty as it's created on demand
reducedAggregations.add(first.reduce(aggregations, context));
}
return new InternalAggregations(reducedAggregations);

if (context.isFinalReduce()) {
for (SiblingPipelineAggregator pipelineAggregator : topLevelPipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(reducedAggregations), context);
reducedAggregations.add(newAgg);
}
return new InternalAggregations(reducedAggregations);
}
return new InternalAggregations(reducedAggregations, topLevelPipelineAggregators);
}

public static InternalAggregations readAggregations(StreamInput in) throws IOException {
Expand All @@ -104,11 +150,20 @@ public void readFrom(StreamInput in) throws IOException {
if (aggregations.isEmpty()) {
aggregationsAsMap = emptyMap();
}
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
this.topLevelPipelineAggregators = in.readList(
stream -> (SiblingPipelineAggregator)in.readNamedWriteable(PipelineAggregator.class));
} else {
this.topLevelPipelineAggregators = Collections.emptyList();
}
}

@Override
@SuppressWarnings("unchecked")
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteableList((List<InternalAggregation>)aggregations);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeNamedWriteableList(topLevelPipelineAggregators);
}
}
}
Loading