From f74df92dcf3073d2eff1fe1500d018f5892850ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 20 Jan 2025 18:51:12 +0100 Subject: [PATCH 1/4] Skip multivalues on JOIN operator --- .../operator/lookup/EnrichQuerySourceOperator.java | 11 ++++++++--- .../lookup/EnrichQuerySourceOperatorTests.java | 11 +++++++++-- .../xpack/esql/enrich/AbstractLookupService.java | 5 +++++ .../xpack/esql/enrich/EnrichLookupService.java | 1 + .../xpack/esql/enrich/LookupFromIndexService.java | 1 + 5 files changed, 24 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index 0cd34d2ad4066..bf6786931a6a1 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -41,6 +41,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator { private final IndexSearcher searcher; private final Warnings warnings; private final int maxPageSize; + private final boolean skipMultiValuesMatching; // using smaller pages enables quick cancellation and reduces sorting costs public static final int DEFAULT_MAX_PAGE_SIZE = 256; @@ -48,12 +49,14 @@ public final class EnrichQuerySourceOperator extends SourceOperator { public EnrichQuerySourceOperator( BlockFactory blockFactory, int maxPageSize, + boolean skipMultiValuesMatching, QueryList queryList, IndexReader indexReader, Warnings warnings ) { this.blockFactory = blockFactory; this.maxPageSize = maxPageSize; + this.skipMultiValuesMatching = skipMultiValuesMatching; this.queryList = queryList; this.indexReader = indexReader; this.searcher = new IndexSearcher(indexReader); @@ -154,9 +157,11 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil private Query nextQuery() { ++queryPosition; while (isFinished() == false) { - Query query = queryList.getQuery(queryPosition); - if (query != null) { - return query; + if (skipMultiValuesMatching == false || queryList.block.getValueCount(queryPosition) == 1) { + Query query = queryList.getQuery(queryPosition); + if (query != null) { + return query; + } } ++queryPosition; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java index 2af52b6bab5a8..0816ef7a3b277 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java @@ -123,7 +123,7 @@ public void testQueries() throws Exception { // 4 -> [a1] -> [3] // 5 -> [] -> [] var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich"); - EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader, warnings); + EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, false, queryList, reader, warnings); Page p0 = queryOperator.getOutput(); assertNotNull(p0); assertThat(p0.getPositionCount(), equalTo(6)); @@ -191,7 +191,14 @@ public void testRandomMatchQueries() throws Exception { var queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms); int maxPageSize = between(1, 256); var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich"); - EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader, warnings); + EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( + blockFactory, + maxPageSize, + false, + queryList, + reader, + warnings + ); Map> actualPositions = new HashMap<>(); while (queryOperator.isFinished() == false) { Page page = queryOperator.getOutput(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index a486d574ddd84..c1841264a1df7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -148,6 +148,8 @@ abstract class AbstractLookupService readRequest ) { this.actionName = actionName; @@ -167,6 +170,7 @@ abstract class AbstractLookupService var queryOperator = new EnrichQuerySourceOperator( driverContext.blockFactory(), EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE, + skipMultiValues, queryList, searchExecutionContext.getIndexReader(), warnings diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index e3d962fa9231b..cf56d8185066e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -61,6 +61,7 @@ public EnrichLookupService( bigArrays, blockFactory, true, + false, TransportRequest::readFrom ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index ad65394fdfbde..f5a6770937ab3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -60,6 +60,7 @@ public LookupFromIndexService( bigArrays, blockFactory, false, + true, TransportRequest::readFrom ); } From a4955982a9c59c1cde3aa6de88d75d058dda3600 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 21 Jan 2025 12:04:17 +0100 Subject: [PATCH 2/4] Added extra test on skip multi-values --- .../EnrichQuerySourceOperatorTests.java | 331 +++++++++++------- .../xpack/esql/planner/PlannerUtils.java | 2 +- 2 files changed, 200 insertions(+), 133 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java index 0816ef7a3b277..eebc7f5118dfd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java @@ -42,11 +42,14 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -70,156 +73,220 @@ public void allBreakersEmpty() throws Exception { } public void testQueries() throws Exception { - MockDirectoryWrapper dir = newMockDirectory(); - IndexWriterConfig iwc = new IndexWriterConfig(); - iwc.setMergePolicy(NoMergePolicy.INSTANCE); - IndexWriter writer = new IndexWriter(dir, iwc); - List> terms = List.of( - List.of("a2"), - List.of("a1", "c1", "b2"), - List.of("a2"), - List.of("a3"), - List.of("b2", "b1", "a1") - ); - for (List ts : terms) { - Document doc = new Document(); - for (String t : ts) { - doc.add(new StringField("uid", t, Field.Store.NO)); + try ( + var directoryData = makeDirectoryWith( + List.of(List.of("a2"), List.of("a1", "c1", "b2"), List.of("a2"), List.of("a3"), List.of("b2", "b1", "a1")) + ) + ) { + final BytesRefBlock inputTerms; + try (BytesRefBlock.Builder termBuilder = blockFactory.newBytesRefBlockBuilder(6)) { + termBuilder.appendBytesRef(new BytesRef("b2")) + .beginPositionEntry() + .appendBytesRef(new BytesRef("c1")) + .appendBytesRef(new BytesRef("a2")) + .endPositionEntry() + .appendBytesRef(new BytesRef("z2")) + .appendNull() + .appendBytesRef(new BytesRef("a3")) + .appendNull(); + inputTerms = termBuilder.build(); } - writer.addDocument(doc); - } - writer.commit(); - DirectoryReader reader = DirectoryReader.open(writer); - writer.close(); - - final BytesRefBlock inputTerms; - try (BytesRefBlock.Builder termBuilder = blockFactory.newBytesRefBlockBuilder(6)) { - termBuilder.appendBytesRef(new BytesRef("b2")) - .beginPositionEntry() - .appendBytesRef(new BytesRef("c1")) - .appendBytesRef(new BytesRef("a2")) - .endPositionEntry() - .appendBytesRef(new BytesRef("z2")) - .appendNull() - .appendBytesRef(new BytesRef("a3")) - .appendNull(); - inputTerms = termBuilder.build(); + MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid"); + QueryList queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms); + assertThat(queryList.getPositionCount(), equalTo(6)); + assertThat(queryList.getQuery(0), equalTo(new TermQuery(new Term("uid", new BytesRef("b2"))))); + assertThat(queryList.getQuery(1), equalTo(new TermInSetQuery("uid", List.of(new BytesRef("c1"), new BytesRef("a2"))))); + assertThat(queryList.getQuery(2), equalTo(new TermQuery(new Term("uid", new BytesRef("z2"))))); + assertNull(queryList.getQuery(3)); + assertThat(queryList.getQuery(4), equalTo(new TermQuery(new Term("uid", new BytesRef("a3"))))); + assertNull(queryList.getQuery(5)); + // pos -> terms -> docs + // ----------------------------- + // 0 -> [b2] -> [1, 4] + // 1 -> [c1, a2] -> [1, 0, 2] + // 2 -> [z2] -> [] + // 3 -> [] -> [] + // 4 -> [a1] -> [3] + // 5 -> [] -> [] + var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich"); + EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( + blockFactory, + 128, + false, + queryList, + directoryData.reader, + warnings + ); + Page page = queryOperator.getOutput(); + assertNotNull(page); + assertThat(page.getPositionCount(), equalTo(6)); + IntVector docs = getDocVector(page, 0); + assertThat(docs.getInt(0), equalTo(1)); + assertThat(docs.getInt(1), equalTo(4)); + assertThat(docs.getInt(2), equalTo(0)); + assertThat(docs.getInt(3), equalTo(1)); + assertThat(docs.getInt(4), equalTo(2)); + assertThat(docs.getInt(5), equalTo(3)); + + Block positions = page.getBlock(1); + assertThat(BlockUtils.toJavaObject(positions, 0), equalTo(0)); + assertThat(BlockUtils.toJavaObject(positions, 1), equalTo(0)); + assertThat(BlockUtils.toJavaObject(positions, 2), equalTo(1)); + assertThat(BlockUtils.toJavaObject(positions, 3), equalTo(1)); + assertThat(BlockUtils.toJavaObject(positions, 4), equalTo(1)); + assertThat(BlockUtils.toJavaObject(positions, 5), equalTo(4)); + page.releaseBlocks(); + assertTrue(queryOperator.isFinished()); + IOUtils.close(inputTerms); } - MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid"); - QueryList queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms); - assertThat(queryList.getPositionCount(), equalTo(6)); - assertThat(queryList.getQuery(0), equalTo(new TermQuery(new Term("uid", new BytesRef("b2"))))); - assertThat(queryList.getQuery(1), equalTo(new TermInSetQuery("uid", List.of(new BytesRef("c1"), new BytesRef("a2"))))); - assertThat(queryList.getQuery(2), equalTo(new TermQuery(new Term("uid", new BytesRef("z2"))))); - assertNull(queryList.getQuery(3)); - assertThat(queryList.getQuery(4), equalTo(new TermQuery(new Term("uid", new BytesRef("a3"))))); - assertNull(queryList.getQuery(5)); - // pos -> terms -> docs - // ----------------------------- - // 0 -> [b2] -> [1, 4] - // 1 -> [c1, a2] -> [1, 0, 2] - // 2 -> [z2] -> [] - // 3 -> [] -> [] - // 4 -> [a1] -> [3] - // 5 -> [] -> [] - var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich"); - EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, false, queryList, reader, warnings); - Page p0 = queryOperator.getOutput(); - assertNotNull(p0); - assertThat(p0.getPositionCount(), equalTo(6)); - IntVector docs = getDocVector(p0, 0); - assertThat(docs.getInt(0), equalTo(1)); - assertThat(docs.getInt(1), equalTo(4)); - assertThat(docs.getInt(2), equalTo(0)); - assertThat(docs.getInt(3), equalTo(1)); - assertThat(docs.getInt(4), equalTo(2)); - assertThat(docs.getInt(5), equalTo(3)); - - Block positions = p0.getBlock(1); - assertThat(BlockUtils.toJavaObject(positions, 0), equalTo(0)); - assertThat(BlockUtils.toJavaObject(positions, 1), equalTo(0)); - assertThat(BlockUtils.toJavaObject(positions, 2), equalTo(1)); - assertThat(BlockUtils.toJavaObject(positions, 3), equalTo(1)); - assertThat(BlockUtils.toJavaObject(positions, 4), equalTo(1)); - assertThat(BlockUtils.toJavaObject(positions, 5), equalTo(4)); - p0.releaseBlocks(); - assertTrue(queryOperator.isFinished()); - IOUtils.close(reader, dir, inputTerms); } public void testRandomMatchQueries() throws Exception { - MockDirectoryWrapper dir = newMockDirectory(); - IndexWriterConfig iwc = new IndexWriterConfig(); - iwc.setMergePolicy(NoMergePolicy.INSTANCE); - IndexWriter writer = new IndexWriter(dir, iwc); int numTerms = randomIntBetween(10, 1000); - Map terms = new HashMap<>(); - for (int i = 0; i < numTerms; i++) { - Document doc = new Document(); - String term = "term-" + i; - terms.put(term, i); - doc.add(new StringField("uid", term, Field.Store.NO)); - writer.addDocument(doc); - } - writer.forceMerge(1); - writer.commit(); - DirectoryReader reader = DirectoryReader.open(writer); - writer.close(); - - Map> expectedPositions = new HashMap<>(); - int numPositions = randomIntBetween(1, 1000); - final BytesRefBlock inputTerms; - try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(numPositions)) { - for (int i = 0; i < numPositions; i++) { - if (randomBoolean()) { - String term = randomFrom(terms.keySet()); - builder.appendBytesRef(new BytesRef(term)); - Integer position = terms.get(term); - expectedPositions.put(i, Set.of(position)); - } else { + List> termsList = IntStream.range(0, numTerms).mapToObj(i -> List.of("term-" + i)).toList(); + Map terms = IntStream.range(0, numTerms).boxed().collect(Collectors.toMap(i -> "term-" + i, i -> i)); + + try (var directoryData = makeDirectoryWith(termsList)) { + Map> expectedPositions = new HashMap<>(); + int numPositions = randomIntBetween(1, 1000); + final BytesRefBlock inputTerms; + try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(numPositions)) { + for (int i = 0; i < numPositions; i++) { if (randomBoolean()) { - builder.appendNull(); - } else { - String term = "other-" + randomIntBetween(1, 100); + String term = randomFrom(terms.keySet()); builder.appendBytesRef(new BytesRef(term)); + Integer position = terms.get(term); + expectedPositions.put(i, Set.of(position)); + } else { + if (randomBoolean()) { + builder.appendNull(); + } else { + String term = "other-" + randomIntBetween(1, 100); + builder.appendBytesRef(new BytesRef(term)); + } } } + inputTerms = builder.build(); } - inputTerms = builder.build(); - } - MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid"); - var queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms); - int maxPageSize = between(1, 256); - var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich"); - EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( - blockFactory, - maxPageSize, - false, - queryList, - reader, - warnings - ); - Map> actualPositions = new HashMap<>(); - while (queryOperator.isFinished() == false) { - Page page = queryOperator.getOutput(); - if (page != null) { - IntVector docs = getDocVector(page, 0); - IntBlock positions = page.getBlock(1); - assertThat(positions.getPositionCount(), lessThanOrEqualTo(maxPageSize)); - for (int i = 0; i < page.getPositionCount(); i++) { - int doc = docs.getInt(i); - int position = positions.getInt(i); - actualPositions.computeIfAbsent(position, k -> new HashSet<>()).add(doc); + var queryList = QueryList.rawTermQueryList(directoryData.field, mock(SearchExecutionContext.class), inputTerms); + int maxPageSize = between(1, 256); + var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich"); + EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( + blockFactory, + maxPageSize, + false, + queryList, + directoryData.reader, + warnings + ); + Map> actualPositions = new HashMap<>(); + while (queryOperator.isFinished() == false) { + Page page = queryOperator.getOutput(); + if (page != null) { + IntVector docs = getDocVector(page, 0); + IntBlock positions = page.getBlock(1); + assertThat(positions.getPositionCount(), lessThanOrEqualTo(maxPageSize)); + for (int i = 0; i < page.getPositionCount(); i++) { + int doc = docs.getInt(i); + int position = positions.getInt(i); + actualPositions.computeIfAbsent(position, k -> new HashSet<>()).add(doc); + } + page.releaseBlocks(); } - page.releaseBlocks(); } + assertThat(actualPositions, equalTo(expectedPositions)); + IOUtils.close(inputTerms); + } + } + + public void testQueries_SkipMultiValue() throws Exception { + try ( + var directoryData = makeDirectoryWith( + List.of(List.of("a2"), List.of("a1", "c1", "b2"), List.of("a2"), List.of("a3"), List.of("b2", "b1", "a1")) + ) + ) { + final BytesRefBlock inputTerms; + try (BytesRefBlock.Builder termBuilder = blockFactory.newBytesRefBlockBuilder(6)) { + termBuilder.appendBytesRef(new BytesRef("b2")) + .beginPositionEntry() + .appendBytesRef(new BytesRef("c1")) + .appendBytesRef(new BytesRef("a2")) + .endPositionEntry() + .appendBytesRef(new BytesRef("z2")) + .appendNull() + .appendBytesRef(new BytesRef("a3")) + .beginPositionEntry() + .appendBytesRef(new BytesRef("a3")) + .appendBytesRef(new BytesRef("a2")) + .appendBytesRef(new BytesRef("z2")) + .appendBytesRef(new BytesRef("xx")) + .endPositionEntry(); + inputTerms = termBuilder.build(); + } + QueryList queryList = QueryList.rawTermQueryList(directoryData.field, mock(SearchExecutionContext.class), inputTerms); + // pos -> terms -> docs + // ----------------------------- + // 0 -> [b2] -> [1, 4] + // 1 -> [c1, a2] -> [] + // 2 -> [z2] -> [] + // 3 -> [] -> [] + // 4 -> [a1] -> [3] + // 5 -> [a3, a2, z2, xx] -> [] + var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test lookup"); + EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( + blockFactory, + 128, + true, + queryList, + directoryData.reader, + warnings + ); + Page page = queryOperator.getOutput(); + assertNotNull(page); + assertThat(page.getPositionCount(), equalTo(3)); + IntVector docs = getDocVector(page, 0); + assertThat(docs.getInt(0), equalTo(1)); + assertThat(docs.getInt(1), equalTo(4)); + assertThat(docs.getInt(2), equalTo(3)); + + Block positions = page.getBlock(1); + assertThat(BlockUtils.toJavaObject(positions, 0), equalTo(0)); + assertThat(BlockUtils.toJavaObject(positions, 1), equalTo(0)); + assertThat(BlockUtils.toJavaObject(positions, 2), equalTo(4)); + page.releaseBlocks(); + assertTrue(queryOperator.isFinished()); + IOUtils.close(inputTerms); } - assertThat(actualPositions, equalTo(expectedPositions)); - IOUtils.close(reader, dir, inputTerms); } private static IntVector getDocVector(Page page, int blockIndex) { DocBlock doc = page.getBlock(blockIndex); return doc.asVector().docs(); } + + private record DirectoryData(DirectoryReader reader, MockDirectoryWrapper dir, MappedFieldType field) implements AutoCloseable { + @Override + public void close() throws IOException { + IOUtils.close(reader, dir); + } + } + + private static DirectoryData makeDirectoryWith(List> terms) throws IOException { + MockDirectoryWrapper dir = newMockDirectory(); + IndexWriterConfig iwc = new IndexWriterConfig(); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + for (var termList : terms) { + Document doc = new Document(); + for (String term : termList) { + doc.add(new StringField("uid", term, Field.Store.NO)); + } + writer.addDocument(doc); + } + writer.forceMerge(1); + writer.commit(); + + return new DirectoryData(DirectoryReader.open(writer), dir, new KeywordFieldMapper.KeywordFieldType("uid")); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index af5146f7b6926..e8347f201d9a1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -146,7 +146,7 @@ private static void forEachFromRelation(PhysicalPlan plan, Consumer } /** - * Similar to {@link Node#forEachUp(Consumer)}, but with a custom callback to get the node children. + * Similar to {@link Node#forEachUp(Class, Consumer)}, but with a custom callback to get the node children. */ private static , E extends T> void forEachUpWithChildren( T node, From 9b84c75c19063abfe851d4e7d335072fab96cd66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 21 Jan 2025 13:34:59 +0100 Subject: [PATCH 3/4] Updated MV tests and added capability --- .../src/main/resources/lookup-join.csv-spec | 27 +++++++++++++++---- .../xpack/esql/action/EsqlCapabilities.java | 5 ++++ 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index c8203042a23de..1e2c4d91759e6 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -416,7 +416,7 @@ emp_no:integer | language_code:integer | language_name:keyword 10003 | null | null ; -mvJoinKeyOnTheDataNode +mvJoinKeyOnTheLookupIndex required_capability: join_lookup_v11 FROM employees @@ -435,8 +435,28 @@ emp_no:integer | language_code:integer | language_name:keyword 10007 | 7 | Mv-Lang2 ; +mvJoinKeyOnFrom +required_capability: join_lookup_v11 + +FROM employees +| WHERE emp_no < 10006 +| EVAL language_code = salary_change.int +| LOOKUP JOIN languages_lookup ON language_code +| SORT emp_no +| KEEP emp_no, language_code, language_name +; + +emp_no:integer | language_code:integer | language_name:keyword +10001 | 1 | English +10002 | [-7, 11] | null +10003 | [12, 14] | null +10004 | [0, 1, 3, 13] | null +10005 | [-2, 13] | null +; + mvJoinKeyFromRow required_capability: join_lookup_v11 +required_capability: join_lookup_skip_mv ROW language_code = [4, 5, 6, 7] | LOOKUP JOIN languages_lookup_non_unique_key ON language_code @@ -445,10 +465,7 @@ ROW language_code = [4, 5, 6, 7] ; language_code:integer | language_name:keyword | country:text -[4, 5, 6, 7] | Mv-Lang | Mv-Land -[4, 5, 6, 7] | Mv-Lang2 | Mv-Land2 -[4, 5, 6, 7] | Quenya | null -[4, 5, 6, 7] | null | Atlantis +[4, 5, 6, 7] | null | null ; mvJoinKeyFromRowExpanded diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index fb5433d7662af..b275155d175c2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -690,6 +690,11 @@ public enum Cap { */ JOIN_LOOKUP_V11(Build.current().isSnapshot()), + /** + * LOOKUP JOIN without MV matching (https://github.com/elastic/elasticsearch/issues/118780) + */ + JOIN_LOOKUP_SKIP_MV(JOIN_LOOKUP_V11.isEnabled()), + /** * Fix for https://github.com/elastic/elasticsearch/issues/117054 */ From 22b2cb919487b50c0e703c1817d47dd280063f1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Wed, 22 Jan 2025 13:10:34 +0100 Subject: [PATCH 4/4] Simplify approach, moving logic to QueryList --- .../lookup/EnrichQuerySourceOperator.java | 11 ++--- .../compute/operator/lookup/QueryList.java | 49 +++++++++++++++---- .../EnrichQuerySourceOperatorTests.java | 8 ++- .../esql/enrich/AbstractLookupService.java | 5 -- .../esql/enrich/EnrichLookupService.java | 1 - .../esql/enrich/LookupFromIndexService.java | 3 +- 6 files changed, 47 insertions(+), 30 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java index bf6786931a6a1..0cd34d2ad4066 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java @@ -41,7 +41,6 @@ public final class EnrichQuerySourceOperator extends SourceOperator { private final IndexSearcher searcher; private final Warnings warnings; private final int maxPageSize; - private final boolean skipMultiValuesMatching; // using smaller pages enables quick cancellation and reduces sorting costs public static final int DEFAULT_MAX_PAGE_SIZE = 256; @@ -49,14 +48,12 @@ public final class EnrichQuerySourceOperator extends SourceOperator { public EnrichQuerySourceOperator( BlockFactory blockFactory, int maxPageSize, - boolean skipMultiValuesMatching, QueryList queryList, IndexReader indexReader, Warnings warnings ) { this.blockFactory = blockFactory; this.maxPageSize = maxPageSize; - this.skipMultiValuesMatching = skipMultiValuesMatching; this.queryList = queryList; this.indexReader = indexReader; this.searcher = new IndexSearcher(indexReader); @@ -157,11 +154,9 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil private Query nextQuery() { ++queryPosition; while (isFinished() == false) { - if (skipMultiValuesMatching == false || queryList.block.getValueCount(queryPosition) == 1) { - Query query = queryList.getQuery(queryPosition); - if (query != null) { - return query; - } + Query query = queryList.getQuery(queryPosition); + if (query != null) { + return query; } ++queryPosition; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java index 5428863436535..1e0d19fac5b51 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/QueryList.java @@ -39,9 +39,11 @@ */ public abstract class QueryList { protected final Block block; + protected final boolean onlySingleValues; - protected QueryList(Block block) { + protected QueryList(Block block, boolean onlySingleValues) { this.block = block; + this.onlySingleValues = onlySingleValues; } /** @@ -51,6 +53,12 @@ int getPositionCount() { return block.getPositionCount(); } + /** + * Returns a copy of this query list that only returns queries for single-valued positions. + * That is, it returns `null` queries for either multivalued or null positions. + */ + public abstract QueryList onlySingleValues(); + /** * Returns the query at the given position. */ @@ -93,7 +101,7 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC case COMPOSITE -> throw new IllegalArgumentException("can't read values from [composite] block"); case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]"); }; - return new TermQueryList(field, searchExecutionContext, block, blockToJavaObject); + return new TermQueryList(field, searchExecutionContext, block, false, blockToJavaObject); } /** @@ -103,7 +111,7 @@ public static QueryList rawTermQueryList(MappedFieldType field, SearchExecutionC public static QueryList ipTermQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, BytesRefBlock block) { BytesRef scratch = new BytesRef(); byte[] ipBytes = new byte[InetAddressPoint.BYTES]; - return new TermQueryList(field, searchExecutionContext, block, offset -> { + return new TermQueryList(field, searchExecutionContext, block, false, offset -> { final var bytes = block.getBytesRef(offset, scratch); if (ipBytes.length != bytes.length) { // Lucene only support 16-byte IP addresses, even IPv4 is encoded in 16 bytes @@ -123,6 +131,7 @@ public static QueryList dateTermQueryList(MappedFieldType field, SearchExecution field, searchExecutionContext, block, + false, field instanceof RangeFieldMapper.RangeFieldType rangeFieldType ? offset -> rangeFieldType.dateTimeFormatter().formatMillis(block.getLong(offset)) : block::getLong @@ -133,7 +142,7 @@ public static QueryList dateTermQueryList(MappedFieldType field, SearchExecution * Returns a list of geo_shape queries for the given field and the input block. */ public static QueryList geoShapeQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block) { - return new GeoShapeQueryList(field, searchExecutionContext, block); + return new GeoShapeQueryList(field, searchExecutionContext, block, false); } private static class TermQueryList extends QueryList { @@ -145,18 +154,27 @@ private TermQueryList( MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block, + boolean onlySingleValues, IntFunction blockValueReader ) { - super(block); + super(block, onlySingleValues); this.field = field; this.searchExecutionContext = searchExecutionContext; this.blockValueReader = blockValueReader; } + @Override + public TermQueryList onlySingleValues() { + return new TermQueryList(field, searchExecutionContext, block, true, blockValueReader); + } + @Override Query getQuery(int position) { - final int first = block.getFirstValueIndex(position); final int count = block.getValueCount(position); + if (onlySingleValues && count != 1) { + return null; + } + final int first = block.getFirstValueIndex(position); return switch (count) { case 0 -> null; case 1 -> field.termQuery(blockValueReader.apply(first), searchExecutionContext); @@ -179,8 +197,13 @@ private static class GeoShapeQueryList extends QueryList { private final IntFunction blockValueReader; private final IntFunction shapeQuery; - private GeoShapeQueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block) { - super(block); + private GeoShapeQueryList( + MappedFieldType field, + SearchExecutionContext searchExecutionContext, + Block block, + boolean onlySingleValues + ) { + super(block, onlySingleValues); this.field = field; this.searchExecutionContext = searchExecutionContext; @@ -188,10 +211,18 @@ private GeoShapeQueryList(MappedFieldType field, SearchExecutionContext searchEx this.shapeQuery = shapeQuery(); } + @Override + public GeoShapeQueryList onlySingleValues() { + return new GeoShapeQueryList(field, searchExecutionContext, block, true); + } + @Override Query getQuery(int position) { - final int first = block.getFirstValueIndex(position); final int count = block.getValueCount(position); + if (onlySingleValues && count != 1) { + return null; + } + final int first = block.getFirstValueIndex(position); return switch (count) { case 0 -> null; case 1 -> shapeQuery.apply(first); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java index eebc7f5118dfd..894843e7e4ec7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperatorTests.java @@ -112,7 +112,6 @@ public void testQueries() throws Exception { EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( blockFactory, 128, - false, queryList, directoryData.reader, warnings @@ -174,7 +173,6 @@ public void testRandomMatchQueries() throws Exception { EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( blockFactory, maxPageSize, - false, queryList, directoryData.reader, warnings @@ -199,7 +197,7 @@ public void testRandomMatchQueries() throws Exception { } } - public void testQueries_SkipMultiValue() throws Exception { + public void testQueries_OnlySingleValues() throws Exception { try ( var directoryData = makeDirectoryWith( List.of(List.of("a2"), List.of("a1", "c1", "b2"), List.of("a2"), List.of("a3"), List.of("b2", "b1", "a1")) @@ -223,7 +221,8 @@ public void testQueries_SkipMultiValue() throws Exception { .endPositionEntry(); inputTerms = termBuilder.build(); } - QueryList queryList = QueryList.rawTermQueryList(directoryData.field, mock(SearchExecutionContext.class), inputTerms); + QueryList queryList = QueryList.rawTermQueryList(directoryData.field, mock(SearchExecutionContext.class), inputTerms) + .onlySingleValues(); // pos -> terms -> docs // ----------------------------- // 0 -> [b2] -> [1, 4] @@ -236,7 +235,6 @@ public void testQueries_SkipMultiValue() throws Exception { EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator( blockFactory, 128, - true, queryList, directoryData.reader, warnings diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index c1841264a1df7..a486d574ddd84 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -148,8 +148,6 @@ abstract class AbstractLookupService readRequest ) { this.actionName = actionName; @@ -170,7 +167,6 @@ abstract class AbstractLookupService var queryOperator = new EnrichQuerySourceOperator( driverContext.blockFactory(), EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE, - skipMultiValues, queryList, searchExecutionContext.getIndexReader(), warnings diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index cf56d8185066e..e3d962fa9231b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -61,7 +61,6 @@ public EnrichLookupService( bigArrays, blockFactory, true, - false, TransportRequest::readFrom ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index f5a6770937ab3..5d3853fac20d2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -60,7 +60,6 @@ public LookupFromIndexService( bigArrays, blockFactory, false, - true, TransportRequest::readFrom ); } @@ -83,7 +82,7 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque protected QueryList queryList(TransportRequest request, SearchExecutionContext context, Block inputBlock, DataType inputDataType) { MappedFieldType fieldType = context.getFieldType(request.matchField); validateTypes(request.inputDataType, fieldType); - return termQueryList(fieldType, context, inputBlock, inputDataType); + return termQueryList(fieldType, context, inputBlock, inputDataType).onlySingleValues(); } @Override