Skip to content

Commit

Permalink
Merge remote-tracking branch 'elastic/main' into resolve-cluster/clus…
Browse files Browse the repository at this point in the history
…ter-info-only
  • Loading branch information
quux00 committed Jan 21, 2025
2 parents d0f39ed + 6db7984 commit 17251d2
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@ Learn how to combine different retrievers in these hands-on examples.
==== Add example data

To begin with, lets create the `retrievers_example` index, and add some documents to it.
We will set `number_of_shards=1` for our examples to ensure consistent and reproducible ordering.

[source,console]
----
PUT retrievers_example
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"vector": {
"type": "dense_vector",
"dims": 3,
"similarity": "l2_norm",
"index": true
"index": true,
"index_options": {
"type": "flat"
}
},
"text": {
"type": "text"
Expand Down Expand Up @@ -458,6 +465,9 @@ and index a couple of documents.
----
PUT retrievers_example_nested
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"nested_field": {
Expand All @@ -470,7 +480,10 @@ PUT retrievers_example_nested
"type": "dense_vector",
"dims": 3,
"similarity": "l2_norm",
"index": true
"index": true,
"index_options": {
"type": "flat"
}
}
}
},
Expand Down Expand Up @@ -639,7 +652,7 @@ This would propagate the `inner_hits` defined for the `knn` query to the `rrf` r
"value": 3,
"relation": "eq"
},
"max_score": 0.44353113,
"max_score": 0.44444445,
"hits": [
{
"_index": "retrievers_example_nested",
Expand All @@ -648,7 +661,7 @@ This would propagate the `inner_hits` defined for the `knn` query to the `rrf` r
"field": "nested_field",
"offset": 2
},
"_score": 0.44353113,
"_score": 0.44444445,
"fields": {
"nested_field": [
{
Expand All @@ -666,7 +679,7 @@ This would propagate the `inner_hits` defined for the `knn` query to the `rrf` r
"field": "nested_field",
"offset": 1
},
"_score": 0.26567122,
"_score": 0.21301977,
"fields": {
"nested_field": [
{
Expand All @@ -684,7 +697,7 @@ This would propagate the `inner_hits` defined for the `knn` query to the `rrf` r
"field": "nested_field",
"offset": 0
},
"_score": 0.18478848,
"_score": 0.16889325,
"fields": {
"nested_field": [
{
Expand Down Expand Up @@ -716,7 +729,7 @@ This would propagate the `inner_hits` defined for the `knn` query to the `rrf` r
"value": 1,
"relation": "eq"
},
"max_score": 0.32002488,
"max_score": 0.31715825,
"hits": [
{
"_index": "retrievers_example_nested",
Expand All @@ -725,7 +738,7 @@ This would propagate the `inner_hits` defined for the `knn` query to the `rrf` r
"field": "nested_field",
"offset": 0
},
"_score": 0.32002488,
"_score": 0.31715825,
"fields": {
"nested_field": [
{
Expand Down
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ tests:
issue: https://github.com/elastic/elasticsearch/issues/118914
- class: org.elasticsearch.xpack.security.authc.ldap.ActiveDirectoryRunAsIT
issue: https://github.com/elastic/elasticsearch/issues/115727
- class: org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT
method: test {yaml=reference/search/search-your-data/retrievers-examples/line_98}
issue: https://github.com/elastic/elasticsearch/issues/119155
- class: org.elasticsearch.xpack.esql.action.EsqlNodeFailureIT
method: testFailureLoadingFields
issue: https://github.com/elastic/elasticsearch/issues/118000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
package org.elasticsearch.index.mapper;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.termvectors.TermVectorsService;
import org.elasticsearch.test.index.IndexVersionUtils;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;

Expand Down Expand Up @@ -78,4 +80,39 @@ public void testUsingEnabledSettingThrows() {
ex.getMessage()
);
}

/**
* disabling the _field_names should still work for indices before 8.0
*/
public void testUsingEnabledBefore8() throws Exception {
DocumentMapper docMapper = createDocumentMapper(
IndexVersionUtils.randomPreviousCompatibleVersion(random(), IndexVersions.V_8_0_0),
topMapping(b -> b.startObject("_field_names").field("enabled", false).endObject())
);

assertWarnings(FieldNamesFieldMapper.ENABLED_DEPRECATION_MESSAGE);
FieldNamesFieldMapper fieldNamesMapper = docMapper.metadataMapper(FieldNamesFieldMapper.class);
assertFalse(fieldNamesMapper.fieldType().isEnabled());

ParsedDocument doc = docMapper.parse(source(b -> b.field("field", "value")));
assertNull(doc.rootDoc().get("_field_names"));
}

/**
* Merging the "_field_names" enabled setting is forbidden in 8.0, but we still want to tests the behavior on pre-8 indices
*/
public void testMergingMappingsBefore8() throws Exception {
MapperService mapperService = createMapperService(
IndexVersionUtils.randomPreviousCompatibleVersion(random(), IndexVersions.V_8_0_0),
mapping(b -> {})
);

merge(mapperService, topMapping(b -> b.startObject("_field_names").field("enabled", false).endObject()));
assertFalse(mapperService.documentMapper().metadataMapper(FieldNamesFieldMapper.class).fieldType().isEnabled());
assertWarnings(FieldNamesFieldMapper.ENABLED_DEPRECATION_MESSAGE);

merge(mapperService, topMapping(b -> b.startObject("_field_names").field("enabled", true).endObject()));
assertTrue(mapperService.documentMapper().metadataMapper(FieldNamesFieldMapper.class).fieldType().isEnabled());
assertWarnings(FieldNamesFieldMapper.ENABLED_DEPRECATION_MESSAGE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@
* the same number of rows that it was sent no matter how many documents match.
* </p>
*/
abstract class AbstractLookupService<R extends AbstractLookupService.Request, T extends AbstractLookupService.TransportRequest> {
public abstract class AbstractLookupService<R extends AbstractLookupService.Request, T extends AbstractLookupService.TransportRequest> {
private final String actionName;
private final ClusterService clusterService;
private final SearchService searchService;
private final CreateShardContext createShardContext;
private final TransportService transportService;
private final Executor executor;
private final BigArrays bigArrays;
Expand All @@ -152,7 +152,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
AbstractLookupService(
String actionName,
ClusterService clusterService,
SearchService searchService,
CreateShardContext createShardContext,
TransportService transportService,
BigArrays bigArrays,
BlockFactory blockFactory,
Expand All @@ -161,7 +161,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
) {
this.actionName = actionName;
this.clusterService = clusterService;
this.searchService = searchService;
this.createShardContext = createShardContext;
this.transportService = transportService;
this.executor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH);
this.bigArrays = bigArrays;
Expand Down Expand Up @@ -324,9 +324,8 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
final List<Releasable> releasables = new ArrayList<>(6);
boolean started = false;
try {
final ShardSearchRequest shardSearchRequest = new ShardSearchRequest(request.shardId, 0, AliasFilter.EMPTY);
final SearchContext searchContext = searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT);
releasables.add(searchContext);
LookupShardContext shardContext = createShardContext.create(request.shardId);
releasables.add(shardContext.release);
final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(
blockFactory.breaker(),
localBreakerSettings.overReservedBytes(),
Expand Down Expand Up @@ -364,8 +363,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
}
}
releasables.add(finishPages);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
QueryList queryList = queryList(request, shardContext.executionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
DriverContext.WarningsMode.COLLECT,
request.source.source().getLineNumber(),
Expand All @@ -376,11 +374,11 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
searchExecutionContext.getIndexReader(),
shardContext.context.searcher().getIndexReader(),
warnings
);
releasables.add(queryOperator);
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
releasables.add(extractFieldsOperator);

/*
Expand All @@ -403,7 +401,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
List.of(extractFieldsOperator, finishPages),
outputOperator,
Driver.DEFAULT_STATUS_INTERVAL,
Releasables.wrap(searchContext, localBreaker)
Releasables.wrap(shardContext.release, localBreaker)
);
task.addListener(() -> {
String reason = Objects.requireNonNullElse(task.getReasonCancelled(), "task was cancelled");
Expand Down Expand Up @@ -440,15 +438,10 @@ public void onFailure(Exception e) {
}

private static Operator extractFieldsOperator(
SearchContext searchContext,
EsPhysicalOperationProviders.ShardContext shardContext,
DriverContext driverContext,
List<NamedExpression> extractFields
) {
EsPhysicalOperationProviders.ShardContext shardContext = new EsPhysicalOperationProviders.DefaultShardContext(
0,
searchContext.getSearchExecutionContext(),
searchContext.request().getAliasFilter()
);
List<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<>(extractFields.size());
for (NamedExpression extractField : extractFields) {
BlockLoader loader = shardContext.blockLoader(
Expand All @@ -472,7 +465,7 @@ private static Operator extractFieldsOperator(
return new ValuesSourceReaderOperator(
driverContext.blockFactory(),
fields,
List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.searcher().getIndexReader(), searchContext::newSourceLoader)),
List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)),
0
);
}
Expand Down Expand Up @@ -680,4 +673,42 @@ public boolean hasReferences() {
return refs.hasReferences();
}
}

/**
* Create a {@link LookupShardContext} for a locally allocated {@link ShardId}.
*/
public interface CreateShardContext {
LookupShardContext create(ShardId shardId) throws IOException;

static CreateShardContext fromSearchService(SearchService searchService) {
return shardId -> {
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY);
return LookupShardContext.fromSearchContext(
searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT)
);
};
}
}

/**
* {@link AbstractLookupService} uses this to power the queries and field loading that
* it needs to perform to actually do the lookup.
*/
public record LookupShardContext(
EsPhysicalOperationProviders.ShardContext context,
SearchExecutionContext executionContext,
Releasable release
) {
public static LookupShardContext fromSearchContext(SearchContext context) {
return new LookupShardContext(
new EsPhysicalOperationProviders.DefaultShardContext(
0,
context.getSearchExecutionContext(),
context.request().getAliasFilter()
),
context.getSearchExecutionContext(),
context
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.index.mapper.RangeType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
Expand All @@ -48,15 +47,15 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi

public EnrichLookupService(
ClusterService clusterService,
SearchService searchService,
CreateShardContext createShardContext,
TransportService transportService,
BigArrays bigArrays,
BlockFactory blockFactory
) {
super(
LOOKUP_ACTION_NAME,
clusterService,
searchService,
createShardContext,
transportService,
bigArrays,
blockFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
Expand All @@ -47,15 +46,15 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde

public LookupFromIndexService(
ClusterService clusterService,
SearchService searchService,
CreateShardContext createShardContext,
TransportService transportService,
BigArrays bigArrays,
BlockFactory blockFactory
) {
super(
LOOKUP_ACTION_NAME,
clusterService,
searchService,
createShardContext,
transportService,
bigArrays,
blockFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.enrich.AbstractLookupService;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
Expand Down Expand Up @@ -107,8 +108,23 @@ public TransportEsqlQueryAction(
exchangeService.registerTransportHandler(transportService);
this.exchangeService = exchangeService;
this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService, bigArrays, blockFactory);
this.lookupFromIndexService = new LookupFromIndexService(clusterService, searchService, transportService, bigArrays, blockFactory);
AbstractLookupService.CreateShardContext lookupCreateShardContext = AbstractLookupService.CreateShardContext.fromSearchService(
searchService
);
this.enrichLookupService = new EnrichLookupService(
clusterService,
lookupCreateShardContext,
transportService,
bigArrays,
blockFactory
);
this.lookupFromIndexService = new LookupFromIndexService(
clusterService,
lookupCreateShardContext,
transportService,
bigArrays,
blockFactory
);
this.computeService = new ComputeService(
searchService,
transportService,
Expand Down

0 comments on commit 17251d2

Please sign in to comment.