From 983353a1466f29ef0b1102a31dda153f26013d82 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 4 Mar 2025 15:37:59 +0800 Subject: [PATCH 1/5] [BugFix] Fix text field push down Signed-off-by: Heng Qian --- .../calcite/remote/CalciteSortCommandIT.java | 12 +-- .../calcite/standalone/CalcitePPLBasicIT.java | 21 +++++ .../occupation_index_mapping.json | 2 +- .../state_country_index_mapping.json | 2 +- .../opensearch/request/PredicateAnalyzer.java | 76 ++++++++++++++----- .../scan/CalciteOpenSearchIndexScan.java | 6 +- 6 files changed, 90 insertions(+), 29 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java index 78ed38073a..1867f7be6c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java @@ -9,12 +9,6 @@ import org.junit.Ignore; import org.opensearch.sql.ppl.SortCommandIT; -/** - * TODO there seems a bug in Calcite planner with sort. Fix {@link - * org.opensearch.sql.calcite.standalone.CalcitePPLSortIT} first. then enable this IT and remove - * this java doc. - */ -@Ignore public class CalciteSortCommandIT extends SortCommandIT { @Override public void init() throws IOException { @@ -22,4 +16,10 @@ public void init() throws IOException { disallowCalciteFallback(); super.init(); } + + // TODO: Unsupported conversion for OpenSearch Data type: IP, addressed by issue: + // https://github.com/opensearch-project/sql/issues/3322 + @Ignore + @Override + public void testSortIpField() throws IOException {} } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBasicIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBasicIT.java index 5f892073a5..c5e37f5e49 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBasicIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLBasicIT.java @@ -83,6 +83,27 @@ public void testFilterQuery3() { verifyDataRows(actual, rows("hello", 20), rows("world", 30)); } + @Test + public void testFilterOnTextField() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | where gender = 'F' | fields firstname, lastname", TEST_INDEX_BANK)); + verifySchema(actual, schema("firstname", "string"), schema("lastname", "string")); + verifyDataRows( + actual, rows("Nanette", "Bates"), rows("Virginia", "Ayala"), rows("Dillard", "Mcpherson")); + } + + @Test + public void testFilterOnTextFieldWithKeywordSubField() { + JSONObject actual = + executeQuery( + String.format( + "source=%s | where state = 'VA' | fields firstname, lastname", TEST_INDEX_BANK)); + verifySchema(actual, schema("firstname", "string"), schema("lastname", "string")); + verifyDataRows(actual, rows("Nanette", "Bates")); + } + @Test public void testFilterQueryWithOr() { JSONObject actual = diff --git a/integ-test/src/test/resources/indexDefinitions/occupation_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/occupation_index_mapping.json index 52470c0f55..cfbd6fc777 100644 --- a/integ-test/src/test/resources/indexDefinitions/occupation_index_mapping.json +++ b/integ-test/src/test/resources/indexDefinitions/occupation_index_mapping.json @@ -5,7 +5,7 @@ "type": "keyword" }, "occupation": { - "type": "keyword" + "type": "text" }, "country": { "type": "text" diff --git a/integ-test/src/test/resources/indexDefinitions/state_country_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/state_country_index_mapping.json index 03840f3677..da614cf525 100644 --- a/integ-test/src/test/resources/indexDefinitions/state_country_index_mapping.json +++ b/integ-test/src/test/resources/indexDefinitions/state_country_index_mapping.json @@ -17,7 +17,7 @@ } }, "country": { - "type": "keyword" + "type": "text" }, "year": { "type": "integer" diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 907476676a..49cb39a101 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -48,6 +48,7 @@ import java.util.Locale; import java.util.Map; import java.util.Set; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; @@ -64,6 +65,9 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.RangeQueryBuilder; import org.opensearch.sql.calcite.plan.OpenSearchConstants; +import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; +import org.opensearch.sql.opensearch.data.type.OpenSearchDataType.MappingType; +import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; /** * Query predicate analyzer. Uses visitor pattern to traverse existing expression and convert it to @@ -92,8 +96,8 @@ public static final class PredicateAnalyzerException extends RuntimeException { } /** - * Exception that is thrown when a {@link org.apache.calcite.rel.RelNode} expression cannot be - * processed (or converted into an OpenSearch query). + * Exception that is thrown when a {@link RelNode} expression cannot be processed (or converted + * into an OpenSearch query). */ public static class ExpressionNotAnalyzableException extends Exception { ExpressionNotAnalyzableException(String message, Throwable cause) { @@ -112,15 +116,19 @@ private PredicateAnalyzer() {} * filters. * * @param expression expression to analyze + * @param schema current schema of scan operator + * @param typeMapping mapping of OpenSearch field name to OpenSearchDataType * @return search query which can be used to query OS cluster * @throws ExpressionNotAnalyzableException when expression can't processed by this analyzer */ - public static QueryBuilder analyze(RexNode expression, List schema) + public static QueryBuilder analyze( + RexNode expression, List schema, Map typeMapping) throws ExpressionNotAnalyzableException { requireNonNull(expression, "expression"); try { // visits expression tree - QueryExpression queryExpression = (QueryExpression) expression.accept(new Visitor(schema)); + QueryExpression queryExpression = + (QueryExpression) expression.accept(new Visitor(schema, typeMapping)); if (queryExpression != null && queryExpression.isPartial()) { throw new UnsupportedOperationException( @@ -137,15 +145,17 @@ public static QueryBuilder analyze(RexNode expression, List schema) private static class Visitor extends RexVisitorImpl { List schema; + Map typeMapping; - private Visitor(List schema) { + private Visitor(List schema, Map typeMapping) { super(true); this.schema = schema; + this.typeMapping = typeMapping; } @Override public Expression visitInputRef(RexInputRef inputRef) { - return new NamedFieldExpression(inputRef, schema); + return new NamedFieldExpression(inputRef, schema, typeMapping); } @Override @@ -246,7 +256,7 @@ public Expression visitCall(RexCall call) { SqlSyntax syntax = call.getOperator().getSyntax(); if (!supportedRexCall(call)) { - String message = String.format(Locale.ROOT, "Unsupported call: [%s]", call); + String message = format(Locale.ROOT, "Unsupported call: [%s]", call); throw new PredicateAnalyzerException(message); } @@ -262,7 +272,7 @@ public Expression visitCall(RexCall call) { case CAST -> toCastExpression(call); case LIKE, CONTAINS -> binary(call); default -> { - String message = String.format(Locale.ROOT, "Unsupported call: [%s]", call); + String message = format(Locale.ROOT, "Unsupported call: [%s]", call); throw new PredicateAnalyzerException(message); } }; @@ -291,7 +301,7 @@ private static String convertQueryString(List fields, Expression que for (Expression expr : fields) { if (expr instanceof NamedFieldExpression) { NamedFieldExpression field = (NamedFieldExpression) expr; - String fieldIndexString = String.format(Locale.ROOT, "$%d", index++); + String fieldIndexString = format(Locale.ROOT, "$%d", index++); fieldMap.put(fieldIndexString, field.getReference()); } } @@ -307,7 +317,7 @@ private QueryExpression prefix(RexCall call) { call.getKind() == SqlKind.NOT, "Expected %s got %s", SqlKind.NOT, call.getKind()); if (call.getOperands().size() != 1) { - String message = String.format(Locale.ROOT, "Unsupported NOT operator: [%s]", call); + String message = format(Locale.ROOT, "Unsupported NOT operator: [%s]", call); throw new PredicateAnalyzerException(message); } @@ -318,7 +328,7 @@ private QueryExpression prefix(RexCall call) { private QueryExpression postfix(RexCall call) { checkArgument(call.getKind() == SqlKind.IS_NULL || call.getKind() == SqlKind.IS_NOT_NULL); if (call.getOperands().size() != 1) { - String message = String.format(Locale.ROOT, "Unsupported operator: [%s]", call); + String message = format(Locale.ROOT, "Unsupported operator: [%s]", call); throw new PredicateAnalyzerException(message); } Expression a = call.getOperands().get(0).accept(this); @@ -407,7 +417,7 @@ private QueryExpression binary(RexCall call) { default: break; } - String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + String message = format(Locale.ROOT, "Unable to handle call: [%s]", call); throw new PredicateAnalyzerException(message); } @@ -438,8 +448,7 @@ private QueryExpression andOr(RexCall call) { if (firstError != null) { throw firstError; } else { - final String message = - String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + final String message = format(Locale.ROOT, "Unable to handle call: [%s]", call); throw new PredicateAnalyzerException(message); } } @@ -447,7 +456,7 @@ private QueryExpression andOr(RexCall call) { case AND: return CompoundQueryExpression.and(partial, expressions); default: - String message = String.format(Locale.ROOT, "Unable to handle call: [%s]", call); + String message = format(Locale.ROOT, "Unable to handle call: [%s]", call); throw new PredicateAnalyzerException(message); } } @@ -506,7 +515,7 @@ private static SwapResult swap(Expression left, Expression right) { if (literal == null || terminal == null) { String message = - String.format( + format( Locale.ROOT, "Unexpected combination of expressions [left: %s] [right: %s]", left, @@ -610,7 +619,7 @@ public static QueryExpression create(TerminalExpression expression) { if (expression instanceof NamedFieldExpression) { return new SimpleQueryExpression((NamedFieldExpression) expression); } else { - String message = String.format(Locale.ROOT, "Unsupported expression: [%s]", expression); + String message = format(Locale.ROOT, "Unsupported expression: [%s]", expression); throw new PredicateAnalyzerException(message); } } @@ -832,8 +841,6 @@ public QueryExpression equals(LiteralExpression literal) { .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gte(value))) .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lte(value))); } else { - // TODO: equal(textFieldType, "value") should not rewrite as termQuery, - // it should be addressed by issue: https://github.com/opensearch-project/sql/issues/3334 builder = termQuery(getFieldReference(), value); } return this; @@ -962,29 +969,58 @@ static boolean isCastExpression(Expression exp) { static final class NamedFieldExpression implements TerminalExpression { private final String name; + private final OpenSearchDataType type; private NamedFieldExpression() { this.name = null; + this.type = null; } - private NamedFieldExpression(RexInputRef ref, List schema) { + private NamedFieldExpression( + RexInputRef ref, List schema, Map typeMapping) { this.name = (ref == null || ref.getIndex() >= schema.size()) ? null : schema.get(ref.getIndex()); + this.type = typeMapping.get(name); } private NamedFieldExpression(RexLiteral literal) { this.name = literal == null ? null : RexLiteral.stringValue(literal); + this.type = null; } String getRootName() { return name; } + OpenSearchDataType getOpenSearchDataType() { + return type; + } + + boolean isTextType() { + return type != null && type.getMappingType() == OpenSearchDataType.MappingType.Text; + } + + String toKeywordSubField() { + if (isTextType()) { + OpenSearchTextType textType = (OpenSearchTextType) type; + // Find the first subfield with type keyword, return null if non-exist. + return textType.getFields().entrySet().stream() + .filter(e -> e.getValue().getMappingType() == MappingType.Keyword) + .findFirst() + .map(e -> name + "." + e.getKey()) + .orElse(null); + } + return null; + } + boolean isMetaField() { return OpenSearchConstants.METADATAFIELD_TYPE_MAP.containsKey(getRootName()); } String getReference() { + if (isTextType()) { + return toKeywordSubField(); + } return getRootName(); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java index cda92c55f2..9a0f1c3a86 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteOpenSearchIndexScan.java @@ -9,6 +9,7 @@ import java.util.ArrayDeque; import java.util.List; +import java.util.Map; import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; import org.apache.calcite.adapter.enumerable.PhysType; import org.apache.calcite.adapter.enumerable.PhysTypeImpl; @@ -35,6 +36,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.sql.calcite.plan.OpenSearchTableScan; import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.opensearch.data.type.OpenSearchDataType; import org.opensearch.sql.opensearch.planner.physical.OpenSearchIndexRules; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.request.PredicateAnalyzer; @@ -153,7 +155,9 @@ public CalciteOpenSearchIndexScan pushDownFilter(Filter filter) { try { CalciteOpenSearchIndexScan newScan = this.copyWithNewSchema(filter.getRowType()); List schema = this.getRowType().getFieldNames(); - QueryBuilder filterBuilder = PredicateAnalyzer.analyze(filter.getCondition(), schema); + Map typeMapping = this.osIndex.getFieldOpenSearchTypes(); + QueryBuilder filterBuilder = + PredicateAnalyzer.analyze(filter.getCondition(), schema, typeMapping); newScan.pushDownContext.add( PushDownAction.of( PushDownType.FILTER, From 178ee437e551e744cd39a48205168d90729fe7fb Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 4 Mar 2025 15:50:47 +0800 Subject: [PATCH 2/5] Ignore CalciteSortCommandIT.testSortWithNullValue Signed-off-by: Heng Qian --- .../opensearch/sql/calcite/remote/CalciteSortCommandIT.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java index 1867f7be6c..f5c51ed04e 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java @@ -22,4 +22,10 @@ public void init() throws IOException { @Ignore @Override public void testSortIpField() throws IOException {} + + // TODO: Fix incorrect results for NULL values, addressed by issue: + // https://github.com/opensearch-project/sql/issues/3375 + @Ignore + @Override + public void testSortWithNullValue() throws IOException {} } From 03a24aa97bb3addac0e64a2998bedcdf280a8601 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 4 Mar 2025 16:15:22 +0800 Subject: [PATCH 3/5] Refine code: only get keyword subfield for termQuery builder Signed-off-by: Heng Qian --- .../opensearch/request/PredicateAnalyzer.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 49cb39a101..f5ce603b5f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -778,6 +778,10 @@ private String getFieldReference() { return rel.getReference(); } + private String getFieldReferenceForTermQuery() { + return rel.getReferenceForTermQuery(); + } + private SimpleQueryExpression(NamedFieldExpression rel) { this.rel = rel; } @@ -841,7 +845,7 @@ public QueryExpression equals(LiteralExpression literal) { .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).gte(value))) .must(addFormatIfNecessary(literal, rangeQuery(getFieldReference()).lte(value))); } else { - builder = termQuery(getFieldReference(), value); + builder = termQuery(getFieldReferenceForTermQuery(), value); } return this; } @@ -859,7 +863,7 @@ public QueryExpression notEquals(LiteralExpression literal) { boolQuery() // NOT LIKE should return false when field is NULL .must(existsQuery(getFieldReference())) - .mustNot(termQuery(getFieldReference(), value)); + .mustNot(termQuery(getFieldReferenceForTermQuery(), value)); } return this; } @@ -899,21 +903,21 @@ public QueryExpression queryString(String query) { @Override public QueryExpression isTrue() { - builder = termQuery(getFieldReference(), true); + builder = termQuery(getFieldReferenceForTermQuery(), true); return this; } @Override public QueryExpression in(LiteralExpression literal) { Collection collection = (Collection) literal.value(); - builder = termsQuery(getFieldReference(), collection); + builder = termsQuery(getFieldReferenceForTermQuery(), collection); return this; } @Override public QueryExpression notIn(LiteralExpression literal) { Collection collection = (Collection) literal.value(); - builder = boolQuery().mustNot(termsQuery(getFieldReference(), collection)); + builder = boolQuery().mustNot(termsQuery(getFieldReferenceForTermQuery(), collection)); return this; } } @@ -1018,6 +1022,10 @@ boolean isMetaField() { } String getReference() { + return getRootName(); + } + + String getReferenceForTermQuery() { if (isTextType()) { return toKeywordSubField(); } From 87b85964513f02042a4cb8aa4029396fae61e212 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 4 Mar 2025 16:22:39 +0800 Subject: [PATCH 4/5] Refine code Signed-off-by: Heng Qian --- .../opensearch/sql/opensearch/request/PredicateAnalyzer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index f5ce603b5f..73c1ffe241 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -1005,7 +1005,7 @@ boolean isTextType() { } String toKeywordSubField() { - if (isTextType()) { + if (type instanceof OpenSearchTextType) { OpenSearchTextType textType = (OpenSearchTextType) type; // Find the first subfield with type keyword, return null if non-exist. return textType.getFields().entrySet().stream() From a54d20c558f4a59b9688d53e16b06c8f89b72f93 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 4 Mar 2025 16:41:18 +0800 Subject: [PATCH 5/5] remove ignore tests in CalcitePPLInSubqueryIT Signed-off-by: Heng Qian --- .../sql/calcite/standalone/CalcitePPLInSubqueryIT.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLInSubqueryIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLInSubqueryIT.java index 4acac3f3ed..13a332ebd2 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLInSubqueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/standalone/CalcitePPLInSubqueryIT.java @@ -30,8 +30,7 @@ public void init() throws IOException { loadIndex(Index.OCCUPATION); } - // TODO https://github.com/opensearch-project/sql/issues/3373 - @Ignore + @Test public void testSelfInSubquery() { JSONObject result = executeQuery( @@ -349,8 +348,7 @@ public void failWhenNumOfColumnsNotMatchOutputOfSubquery() { TEST_INDEX_WORKER, TEST_INDEX_WORK_INFORMATION))); } - // TODO https://github.com/opensearch-project/sql/issues/3373 - @Ignore + @Test public void testInSubqueryWithTableAlias() { JSONObject result = executeQuery(