Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] adding support for range predicate #9445

Merged
merged 7 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.calcite.rel.rules;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.tools.RelBuilderFactory;


public class PinotFilterExpandSearchRule extends RelOptRule {
public static final PinotFilterExpandSearchRule INSTANCE =
new PinotFilterExpandSearchRule(PinotRuleUtils.PINOT_REL_FACTORY);

public PinotFilterExpandSearchRule(RelBuilderFactory factory) {
super(operand(LogicalFilter.class, any()), factory, null);
}

@Override
@SuppressWarnings("rawtypes")
public boolean matches(RelOptRuleCall call) {
if (call.rels.length < 1) {
return false;
}
if (call.rel(0) instanceof Filter) {
Filter filter = call.rel(0);
return containsRangeSearch(filter.getCondition());
}
return false;
}

@Override
public void onMatch(RelOptRuleCall call) {
Filter filter = call.rel(0);
RexNode newCondition = RexUtil.expandSearch(filter.getCluster().getRexBuilder(), null, filter.getCondition());
call.transformTo(LogicalFilter.create(filter.getInput(), newCondition));
}

private boolean containsRangeSearch(RexNode condition) {
switch (condition.getKind()) {
case AND:
case OR:
for (RexNode operand : ((RexCall) condition).getOperands()) {
if (containsRangeSearch(operand)) {
return true;
}
}
return false;
case SEARCH:
return true;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ private PinotQueryRuleSets() {
PruneEmptyRules.UNION_INSTANCE,

// Pinot specific rules
PinotFilterExpandSearchRule.INSTANCE,
PinotJoinExchangeNodeInsertRule.INSTANCE,
PinotAggregateExchangeNodeInsertRule.INSTANCE,
PinotSortExchangeNodeInsertRule.INSTANCE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
Expand All @@ -45,6 +46,7 @@
import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.ValueNode;


/**
Expand Down Expand Up @@ -77,11 +79,17 @@ public static StageNode toStageNode(RelNode node, int currentStageId) {
return convertLogicalAggregate((LogicalAggregate) node, currentStageId);
} else if (node instanceof LogicalSort) {
return convertLogicalSort((LogicalSort) node, currentStageId);
} else if (node instanceof LogicalValues) {
return convertLogicalValues((LogicalValues) node, currentStageId);
} else {
throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
throw new UnsupportedOperationException("Unsupported logical plan node: " + node);
}
}

private static StageNode convertLogicalValues(LogicalValues node, int currentStageId) {
return new ValueNode(currentStageId, toDataSchema(node.getRowType()), node.tuples);
}

private static StageNode convertLogicalSort(LogicalSort node, int currentStageId) {
int fetch = node.fetch == null ? 0 : ((RexLiteral) node.fetch).getValueAs(Integer.class);
int offset = node.offset == null ? 0 : ((RexLiteral) node.offset).getValueAs(Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ private static AbstractStageNode newNodeInstance(String nodeName, int stageId) {
return new MailboxSendNode(stageId);
case "MailboxReceiveNode":
return new MailboxReceiveNode(stageId);
case "ValueNode":
return new ValueNode(stageId);
default:
throw new IllegalArgumentException("Unknown node name: " + nodeName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.planner.stage;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.rex.RexLiteral;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.serde.ProtoProperties;


public class ValueNode extends AbstractStageNode {
@ProtoProperties
private List<List<RexExpression>> _literalRows;

public ValueNode(int stageId) {
super(stageId);
}

public ValueNode(int currentStageId, DataSchema dataSchema,
Copy link
Contributor

@siddharthteotia siddharthteotia Sep 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General suggestion -- for every new Operator / StageNode introduced, I think we should add a corresponding query compilation test to validate the plan, expressions etc. In the current engine, we have done that for quite some time in CalciteSqlCompilerTest.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one yes. ValueNode is a special node generated by calcite when the query evaluates back to a constant (regardless of underlying table)

value node replaces table scan as a root.

ImmutableList<ImmutableList<RexLiteral>> literalTuples) {
super(currentStageId, dataSchema);
_literalRows = new ArrayList<>();
for (List<RexLiteral> literalTuple : literalTuples) {
List<RexExpression> literalRow = new ArrayList<>();
for (RexLiteral literal : literalTuple) {
literalRow.add(RexExpression.toRexExpression(literal));
}
_literalRows.add(literalRow);
}
}

public List<List<RexExpression>> getLiteralRows() {
return _literalRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,6 @@ private Object[][] provideQueriesWithException() {
new Object[]{"SELECT a.col1, SUM(a.col3) FROM a", "'a.col1' is not being grouped"},
// empty IN clause fails compilation
new Object[]{"SELECT a.col1 FROM a WHERE a.col1 IN ()", "Encountered \"\" at line"},
// range filter queries are not supported right now
new Object[]{"SELECT a.col1 FROM a WHERE a.col1 > 'x' AND a.col1 < 'y'", "Range is not implemented yet"}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.ValueNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.AggregateOperator;
import org.apache.pinot.query.runtime.operator.FilterOperator;
import org.apache.pinot.query.runtime.operator.HashJoinOperator;
import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.SortOperator;
Expand Down Expand Up @@ -144,6 +146,8 @@ private BaseOperator<TransferableBlock> getOperator(long requestId, StageNode st
return new SortOperator(getOperator(requestId, sortNode.getInputs().get(0), metadataMap),
sortNode.getCollationKeys(), sortNode.getCollationDirections(), sortNode.getFetch(), sortNode.getOffset(),
sortNode.getDataSchema());
} else if (stageNode instanceof ValueNode) {
return new LiteralValueOperator(stageNode.getDataSchema(), ((ValueNode) stageNode).getLiteralRows());
} else {
throw new UnsupportedOperationException(
String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.operator;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.common.datablock.BaseDataBlock;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;


public class LiteralValueOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "LITERAL_VALUE_PROVIDER";

private final DataSchema _dataSchema;
private final TransferableBlock _rexLiteralBlock;
private boolean _isLiteralBlockReturned;

public LiteralValueOperator(DataSchema dataSchema, List<List<RexExpression>> rexLiteralRows) {
_dataSchema = dataSchema;
_rexLiteralBlock = constructBlock(rexLiteralRows);
_isLiteralBlockReturned = false;
}

@Override
public List<Operator> getChildOperators() {
// WorkerExecutor doesn't use getChildOperators, returns null here.
return null;
}

@Nullable
@Override
public String toExplainString() {
return EXPLAIN_NAME;
}

@Override
protected TransferableBlock getNextBlock() {
if (!_isLiteralBlockReturned) {
_isLiteralBlockReturned = true;
return _rexLiteralBlock;
} else {
return TransferableBlockUtils.getEndOfStreamTransferableBlock(_dataSchema);
}
}

private TransferableBlock constructBlock(List<List<RexExpression>> rexLiteralRows) {
List<Object[]> blockContent = new ArrayList<>();
for (List<RexExpression> rexLiteralRow : rexLiteralRows) {
Object[] row = new Object[_dataSchema.size()];
for (int i = 0; i < _dataSchema.size(); i++) {
row[i] = ((RexExpression.Literal) rexLiteralRow.get(i)).getValue();
}
blockContent.add(row);
}
return new TransferableBlock(blockContent, _dataSchema, BaseDataBlock.Type.ROW);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ private Object[][] provideTestSql() {
new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
+ " WHERE a.col1 IN ('foo') AND b.col2 NOT IN ('')"},

// Range conditions with continuous and non-continuous range.
new Object[]{"SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 = b.col1 "
+ " WHERE a.col3 IN (1, 2, 3) OR (a.col3 > 10 AND a.col3 < 50)"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also check the case where someone tries an impossible range condition or conditions with overlaps? I was running into an issue with those edge-cases in my PR. Examples:

(a.col3 > 10 AND a.col3 < 20) AND (a.col3 > 30 AND a.col3 < 40) # Impossible range
(a.col3 > 10 AND a.col3 < 20) AND (a.col3 > 15 AND a.col3 < 25) # Overlapping range

Copy link
Contributor Author

@walterddr walterddr Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was planning to split that into a separate PR but yeah sure let me add that to this one.

Copy link
Contributor

@siddharthteotia siddharthteotia Sep 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is col IN (<single value>) getting rewritten to col = <val> ?

FWIW, in the current engine we do this rewrite but not really worth it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single IN yes will be rewrite


new Object[]{"SELECT col1, SUM(col3) FROM a WHERE a.col3 BETWEEN 23 AND 36 "
+ " GROUP BY col1 HAVING SUM(col3) > 10.0 AND MIN(col3) <> 123 AND MAX(col3) BETWEEN 10 AND 20"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current engine non-literals (function expressions) on RHS get rewritten by moving to LHS. Do we support that here ?

WHERE a > b + 20 -> WHERE a - b > 20

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. this is not done by the range predicate but by the compilation on server.


new Object[]{"SELECT col1, SUM(col3) FROM a WHERE (col3 > 0 AND col3 < 45) AND (col3 > 15 AND col3 < 50) "
+ " GROUP BY col1 HAVING (SUM(col3) > 10 AND SUM(col3) < 20) AND (SUM(col3) > 30 AND SUM(col3) < 40)"},

// Projection pushdown
new Object[]{"SELECT a.col1, a.col3 + a.col3 FROM a WHERE a.col3 >= 0 AND a.col2 = 'alice'"},

Expand Down