Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Ignore multivalued key columns in lookup index on JOIN #120726

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/plugin/esql/compute/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@
exports org.elasticsearch.compute.operator.mvdedupe;
exports org.elasticsearch.compute.aggregation.table;
exports org.elasticsearch.compute.data.sort;
exports org.elasticsearch.compute.querydsl.query;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.geo.GeoEncodingUtils;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.geo.ShapeRelation;
Expand All @@ -20,6 +23,8 @@
import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.querydsl.query.SingleValueMatchQuery;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.Point;
Expand All @@ -30,6 +35,7 @@
import org.elasticsearch.index.mapper.RangeFieldMapper;
import org.elasticsearch.index.query.SearchExecutionContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.IntFunction;
Expand All @@ -38,10 +44,14 @@
* Generates a list of Lucene queries based on the input block.
*/
public abstract class QueryList {
protected final SearchExecutionContext searchExecutionContext;
protected final MappedFieldType field;
protected final Block block;
protected final boolean onlySingleValues;

protected QueryList(Block block, boolean onlySingleValues) {
protected QueryList(MappedFieldType field, SearchExecutionContext searchExecutionContext, Block block, boolean onlySingleValues) {
this.searchExecutionContext = searchExecutionContext;
this.field = field;
this.block = block;
this.onlySingleValues = onlySingleValues;
}
Expand All @@ -59,11 +69,52 @@ int getPositionCount() {
*/
public abstract QueryList onlySingleValues();

final Query getQuery(int position) {
final int valueCount = block.getValueCount(position);
if (onlySingleValues && valueCount != 1) {
return null;
}
final int firstValueIndex = block.getFirstValueIndex(position);

Query query = doGetQuery(position, firstValueIndex, valueCount);

if (onlySingleValues) {
query = wrapSingleValueQuery(query);
}

return query;
}

/**
* Returns the query at the given position.
*/
@Nullable
abstract Query getQuery(int position);
abstract Query doGetQuery(int position, int firstValueIndex, int valueCount);

private Query wrapSingleValueQuery(Query query) {
SingleValueMatchQuery singleValueQuery = new SingleValueMatchQuery(
searchExecutionContext.getForField(field, MappedFieldType.FielddataOperation.SEARCH),
// Not emitting warnings for multivalued fields not matching
Warnings.NOOP_WARNINGS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should just ignore this behavior. Today we do emit a warning if there's an equality operation involving a multi-value field, ie row x=[1,2,3],y=3 | where x == y. The joining key matching is basically doing the same thing, no?

Copy link
Contributor Author

@ivancea ivancea Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is the same, indeed.
I have doubts over the warnings here, as I thought that it would be "clear enough" in JOIN.
That said, yeah, we could use that argument for everything else, so I'll better add warnings. I'll customize them, as SingleValueMatchQuery sends a generic single-value function encountered multi-value. I'll add something more specific for JOIN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some investigation and things, I have some local changes with this:

  1. Warnings are being correctly generated, nice!
  2. They are sent to coordinator correctly
  3. BUT the AsyncOperator (parent of the Lookup operator) doesn't merge them with the main ThreadLocal

It's not trivial to solve (Or at least, is a quite problematic topic). After commenting it a bit with Nik, we'll better leave the Warnings part for another PR.
If there's time to make it for the FF, that's fine. Otherwise, should be fine too (?). I'll keep working on it anyway.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a comment around warnings that should be addressed as a follow-up: we should limit the amount of warnings an operator throws for cases to avoid flooding the system with objects that end up being dropped due to the limit being reached - either by a single operator, or by multiple operators, either before or during the warnings being emitted.
This is similar to the logger usage where we're invoking methods without doing any args transformation in order to perform the check the logging statement first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do limit these warnings in other operators and think we mostly just stick to prior art here. I think it's fine to juggle them in a follow up. Honestly, I think we should rip out the thread local handling of warnings from ESQL..... It's too sneaky. But I don't know if we'll have time for that for a while.

);

Query rewrite = singleValueQuery;
try {
rewrite = singleValueQuery.rewrite(searchExecutionContext.searcher());
if (rewrite instanceof MatchAllDocsQuery) {
// nothing to filter
return query;
}
} catch (IOException e) {
// ignore
// TODO: Should we do something with the exception?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe wrap it into a runtime exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rethrowing it as unchecked

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap it, yeah. ESQL has kind of a rocky relationship with IOException. Pretty much everything should be allowed to throw it but we started wrapping it long ago and never stopped.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this can throw if there's an IO error blowing up on the disk with lucene. It's not likely and probably terminal for the query. But we should bubble it up.

}

BooleanQuery.Builder builder = new BooleanQuery.Builder();
builder.add(query, BooleanClause.Occur.FILTER);
builder.add(rewrite, BooleanClause.Occur.FILTER);
return builder.build();
}

/**
* Returns a list of term queries for the given field and the input block
Expand Down Expand Up @@ -146,8 +197,6 @@ public static QueryList geoShapeQueryList(MappedFieldType field, SearchExecution
}

private static class TermQueryList extends QueryList {
private final MappedFieldType field;
private final SearchExecutionContext searchExecutionContext;
private final IntFunction<Object> blockValueReader;

private TermQueryList(
Expand All @@ -157,9 +206,7 @@ private TermQueryList(
boolean onlySingleValues,
IntFunction<Object> blockValueReader
) {
super(block, onlySingleValues);
this.field = field;
this.searchExecutionContext = searchExecutionContext;
super(field, searchExecutionContext, block, onlySingleValues);
this.blockValueReader = blockValueReader;
}

Expand All @@ -169,19 +216,14 @@ public TermQueryList onlySingleValues() {
}

@Override
Query getQuery(int position) {
final int count = block.getValueCount(position);
if (onlySingleValues && count != 1) {
return null;
}
final int first = block.getFirstValueIndex(position);
return switch (count) {
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those params are oddly specific, but at least we won't have to get them twice...

return switch (valueCount) {
case 0 -> null;
case 1 -> field.termQuery(blockValueReader.apply(first), searchExecutionContext);
case 1 -> field.termQuery(blockValueReader.apply(firstValueIndex), searchExecutionContext);
default -> {
final List<Object> terms = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
final Object value = blockValueReader.apply(first + i);
final List<Object> terms = new ArrayList<>(valueCount);
for (int i = 0; i < valueCount; i++) {
final Object value = blockValueReader.apply(firstValueIndex + i);
terms.add(value);
}
yield field.termsQuery(terms, searchExecutionContext);
Expand All @@ -192,8 +234,6 @@ Query getQuery(int position) {

private static class GeoShapeQueryList extends QueryList {
private final BytesRef scratch = new BytesRef();
private final MappedFieldType field;
private final SearchExecutionContext searchExecutionContext;
private final IntFunction<Geometry> blockValueReader;
private final IntFunction<Query> shapeQuery;

Expand All @@ -203,10 +243,8 @@ private GeoShapeQueryList(
Block block,
boolean onlySingleValues
) {
super(block, onlySingleValues);
super(field, searchExecutionContext, block, onlySingleValues);

this.field = field;
this.searchExecutionContext = searchExecutionContext;
this.blockValueReader = blockToGeometry(block);
this.shapeQuery = shapeQuery();
}
Expand All @@ -217,15 +255,10 @@ public GeoShapeQueryList onlySingleValues() {
}

@Override
Query getQuery(int position) {
final int count = block.getValueCount(position);
if (onlySingleValues && count != 1) {
return null;
}
final int first = block.getFirstValueIndex(position);
return switch (count) {
Query doGetQuery(int position, int firstValueIndex, int valueCount) {
return switch (valueCount) {
case 0 -> null;
case 1 -> shapeQuery.apply(first);
case 1 -> shapeQuery.apply(firstValueIndex);
// TODO: support multiple values
default -> throw new IllegalArgumentException("can't read multiple Geometry values from a single position");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.esql.querydsl.query;
package org.elasticsearch.compute.querydsl.query;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReaderContext;
Expand Down Expand Up @@ -39,7 +39,7 @@
/**
* Finds all fields with a single-value. If a field has a multi-value, it emits a {@link Warnings}.
*/
final class SingleValueMatchQuery extends Query {
public final class SingleValueMatchQuery extends Query {

/**
* Choose a big enough value so this approximation never drives the iteration.
Expand All @@ -52,7 +52,7 @@ final class SingleValueMatchQuery extends Query {
private final IndexFieldData<?> fieldData;
private final Warnings warnings;

SingleValueMatchQuery(IndexFieldData<?> fieldData, Warnings warnings) {
public SingleValueMatchQuery(IndexFieldData<?> fieldData, Warnings warnings) {
this.fieldData = fieldData;
this.warnings = warnings;
}
Expand Down
Loading