Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IGNORE NULLS option to FIRST_VALUE and LAST_VALUE window functions #14264

Merged
merged 6 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.collections;

import java.util.AbstractList;


/**
* An immutable list like the one returned by {@link java.util.Collections#nCopies(int, Object)}, but with two values
* (that are not interleaved) instead of a single one. Useful for avoiding unnecessary allocations.
*/
public class DualValueList<E> extends AbstractList<E> {

private final E _firstValue;
private final E _secondValue;
private final int _firstCount;
private final int _totalSize;

public DualValueList(E firstValue, int firstCount, E secondValue, int secondCount) {
_firstValue = firstValue;
_firstCount = firstCount;
_secondValue = secondValue;
_totalSize = firstCount + secondCount;
}

@Override
public E get(int index) {
if (index < 0 || index >= _totalSize) {
throw new IndexOutOfBoundsException(index);
}
return index < _firstCount ? _firstValue : _secondValue;
}

@Override
public int size() {
return _totalSize;
}
}
1 change: 1 addition & 0 deletions pinot-common/src/main/proto/expressions.proto
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ message FunctionCall {
string functionName = 2;
repeated Expression functionOperands = 3;
bool isDistinct = 4;
bool ignoreNulls = 5;
}

message Expression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,31 @@ public void testAggregateServerReturnFinalResult(boolean useMultiStageQueryEngin
assertTrue(response.get("resultTable").get("rows").get(0).get(0).isNull());
}

@Test
public void testWindowFunctionIgnoreNulls()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we divide this test method into two different ones?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow your suggestion - there's a single query here.

throws Exception {
// Window functions are only supported in the multi-stage query engine
setUseMultiStageQueryEngine(true);
String sqlQuery =
"SELECT salary, LAST_VALUE(salary) IGNORE NULLS OVER (ORDER BY DaysSinceEpoch) AS gapfilledSalary from "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work with bounded preceding/following? I remember running into some exception when trying it out

Copy link
Collaborator Author

@yashmayya yashmayya Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works with bounded preceding / following as well.

I remember running into some exception when trying it out

Was it something like this error during query planning:

Caused by: java.lang.RuntimeException: Failed to convert query to relational expression:
...
Caused by: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:

Interestingly, it looks like Calcite throws this error if the window function's input column is not nullable in the table schema (i.e., if enableColumnBasedNullHandling is false or the column has "notNull": true) and IGNORE NULLS or RESPECT NULLS option is used. While the error message is not the most clear, I don't think this is an actual bug a major issue because it doesn't really make sense to use those null handling related options if the window function's input column is not nullable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we want to document it. Users might enable table level nullability (v1 engine nullability) and expect v2 engine to pick it up

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point, I'll make sure to add a note about this to the documentation. I plan to raise one consolidated documentation PR with changes from #14273 and here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should delegate this into something we document. We should also report the issue in Calcite Jira/email list and/or create a PR to fix it ourselfs. Same happened with the reserved keyword PR.

Copy link
Collaborator Author

@yashmayya yashmayya Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, actually this does look like a legitimate bug in Calcite on taking a second look. The issue is that the inferred return type for the window function in the parsed SqlNode is INTEGER NOT NULL when IGNORE NULLS option is used (assuming the column input to FIRST_VALUE / LAST_VALUE is INTEGER) and the converted return type is INTEGER (nullable) because offset based window frame bounds means that the result can be null when the window frame is out of bounds - which can't be the case when using RANGE window frames or ROWS window frames with UNBOUNDED PRECEDING / UNBOUNDED FOLLOWING / CURRENT ROW. When IGNORE NULLS option is not provided, the inferred return type for the window function in the parsed SqlNode is also INTEGER (nullable) which is why the issue doesn't occur there. Same when IGNORE NULLS option is used but input column is nullable.

Interestingly, the same error and issue also occurs when the RESPECT NULLS option is explicitly provided.

I'll create a bug tracking Jira in the Calcite project and link it here.

Edit: https://issues.apache.org/jira/browse/CALCITE-6648

+ "mytable";
JsonNode response = postQuery(sqlQuery);
assertNoError(response);

// Check if the LAST_VALUE window function with IGNORE NULLS has effectively gap-filled the salary values
Integer lastSalary = null;
JsonNode rows = response.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
JsonNode row = rows.get(i);
if (!row.get(0).isNull()) {
assertEquals(row.get(0).asInt(), row.get(1).asInt());
lastSalary = row.get(0).asInt();
} else {
assertEquals(lastSalary, row.get(1).numberValue());
}
}
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlLeadLagAggFunction;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
Expand Down Expand Up @@ -134,10 +135,6 @@ public static PinotOperatorTable instance() {
SqlStdOperatorTable.MODE,
SqlStdOperatorTable.MIN,
SqlStdOperatorTable.MAX,
SqlStdOperatorTable.LAST_VALUE,
SqlStdOperatorTable.FIRST_VALUE,
SqlStdOperatorTable.LEAD,
SqlStdOperatorTable.LAG,
SqlStdOperatorTable.AVG,
SqlStdOperatorTable.STDDEV_POP,
SqlStdOperatorTable.COVAR_POP,
Expand All @@ -152,7 +149,17 @@ public static PinotOperatorTable instance() {
SqlStdOperatorTable.RANK,
SqlStdOperatorTable.ROW_NUMBER,

// WINDOW Functions (non-aggregate)
SqlStdOperatorTable.LAST_VALUE,
SqlStdOperatorTable.FIRST_VALUE,
// TODO: Replace these with SqlStdOperatorTable.LEAD and SqlStdOperatorTable.LAG when the function implementations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean IGNORE NULLS are simply ignored?
I'd suggest using the standard operator, and throw exception when IGNORE NULLS is specified but cannot be supported to make the behavior more explicit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean IGNORE NULLS are simply ignored?

Nope, using IGNORE NULLS with LAG / LEAD will lead to a clear error like this during query planning - From line 1, column 43 to line 1, column 60: Cannot specify IGNORE NULLS or RESPECT NULLS following 'LAG'. This is because the custom operators we defined return false for allowsNullTreatment which means this Calcite validation will fail - https://github.com/apache/calcite/blob/ef1a83f659e8771c65c2541b92d2ef9cc2a05bea/core/src/main/java/org/apache/calcite/sql/SqlNullTreatmentOperator.java#L69-L74.

I'd initially gone with simply throwing a runtime exception in Pinot's LagValueWindowFunction / LeadValueWindowFunction runtime operators, but the alternative chosen here suggested by @gortiz (defining our own custom SqlAggFunctions) is much better because we fail fast during query planning instead of query execution and the error is also clear.

// are updated to support the IGNORE NULLS option.
PinotLeadWindowFunction.INSTANCE,
PinotLagWindowFunction.INSTANCE,

// SPECIAL OPERATORS
SqlStdOperatorTable.IGNORE_NULLS,
SqlStdOperatorTable.RESPECT_NULLS,
SqlStdOperatorTable.BETWEEN,
SqlStdOperatorTable.SYMMETRIC_BETWEEN,
SqlStdOperatorTable.NOT_BETWEEN,
Expand Down Expand Up @@ -372,4 +379,30 @@ public void lookupOperatorOverloads(SqlIdentifier opName, @Nullable SqlFunctionC
public List<SqlOperator> getOperatorList() {
return _operatorList;
}

private static class PinotLeadWindowFunction extends SqlLeadLagAggFunction {
static final SqlOperator INSTANCE = new PinotLeadWindowFunction();

public PinotLeadWindowFunction() {
super(SqlKind.LEAD);
}

@Override
public boolean allowsNullTreatment() {
return false;
}
}

private static class PinotLagWindowFunction extends SqlLeadLagAggFunction {
static final SqlOperator INSTANCE = new PinotLagWindowFunction();

public PinotLagWindowFunction() {
super(SqlKind.LAG);
}

@Override
public boolean allowsNullTreatment() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public Void visitWindow(WindowNode node, Void context) {
RelDataType relDataType = funCall.getDataType().toType(_builder.getTypeFactory());
Window.RexWinAggCall winCall = new Window.RexWinAggCall(aggFunction, relDataType, operands, aggCalls.size(),
// same as the one used in LogicalWindow.create
funCall.isDistinct(), false);
funCall.isDistinct(), funCall.isIgnoreNulls());
aggCalls.add(winCall);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,20 @@ class FunctionCall implements RexExpression {
private final List<RexExpression> _functionOperands;
// whether the function is a distinct function.
private final boolean _isDistinct;
// whether the function should ignore nulls (relevant to certain window functions like LAST_VALUE).
private final boolean _ignoreNulls;

public FunctionCall(ColumnDataType dataType, String functionName, List<RexExpression> functionOperands) {
this(dataType, functionName, functionOperands, false);
this(dataType, functionName, functionOperands, false, false);
}

public FunctionCall(ColumnDataType dataType, String functionName, List<RexExpression> functionOperands,
boolean isDistinct) {
boolean isDistinct, boolean ignoreNulls) {
_dataType = dataType;
_functionName = functionName;
_functionOperands = functionOperands;
_isDistinct = isDistinct;
_ignoreNulls = ignoreNulls;
}

public ColumnDataType getDataType() {
Expand All @@ -140,6 +143,10 @@ public boolean isDistinct() {
return _isDistinct;
}

public boolean isIgnoreNulls() {
return _ignoreNulls;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,12 @@ private static List<RexExpression> toFunctionOperands(RexInputRef rexInputRef, S
public static RexExpression.FunctionCall fromAggregateCall(AggregateCall aggregateCall) {
return new RexExpression.FunctionCall(RelToPlanNodeConverter.convertToColumnDataType(aggregateCall.type),
getFunctionName(aggregateCall.getAggregation()), fromRexNodes(aggregateCall.rexList),
aggregateCall.isDistinct());
aggregateCall.isDistinct(), false);
}

public static RexExpression.FunctionCall fromWindowAggregateCall(Window.RexWinAggCall winAggCall) {
return new RexExpression.FunctionCall(RelToPlanNodeConverter.convertToColumnDataType(winAggCall.type),
getFunctionName(winAggCall.op), fromRexNodes(winAggCall.operands), winAggCall.distinct);
getFunctionName(winAggCall.op), fromRexNodes(winAggCall.operands), winAggCall.distinct, winAggCall.ignoreNulls);
}

public static Integer getValueAsInt(@Nullable RexNode in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static RexExpression.FunctionCall convertFunctionCall(Expressions.Functio
operands.add(convertExpression(protoOperand));
}
return new RexExpression.FunctionCall(convertColumnDataType(functionCall.getDataType()),
functionCall.getFunctionName(), operands, functionCall.getIsDistinct());
functionCall.getFunctionName(), operands, functionCall.getIsDistinct(), functionCall.getIgnoreNulls());
}

public static RexExpression.Literal convertLiteral(Expressions.Literal literal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,13 @@ public static Expressions.FunctionCall convertFunctionCall(RexExpression.Functio
for (RexExpression operand : operands) {
protoOperands.add(convertExpression(operand));
}
return Expressions.FunctionCall.newBuilder().setDataType(convertColumnDataType(functionCall.getDataType()))
.setFunctionName(functionCall.getFunctionName()).addAllFunctionOperands(protoOperands)
.setIsDistinct(functionCall.isDistinct()).build();
return Expressions.FunctionCall.newBuilder()
.setDataType(convertColumnDataType(functionCall.getDataType()))
.setFunctionName(functionCall.getFunctionName())
.addAllFunctionOperands(protoOperands)
.setIsDistinct(functionCall.isDistinct())
.setIgnoreNulls(functionCall.isIgnoreNulls())
.build();
}

public static Expressions.Literal convertLiteral(RexExpression.Literal literal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private List<Object> processRowsWindow(List<Object[]> rows) {
List<Object> result = new ArrayList<>(numRows);
for (int i = 0; i < numRows; i++) {
if (lowerBound >= numRows) {
// Fill the remaining rows with null
// Fill the remaining rows with null since all subsequent windows will be out of bounds
for (int j = i; j < numRows; j++) {
result.add(null);
}
Expand Down
Loading
Loading