diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysFalse.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysFalse.java new file mode 100644 index 0000000000000..72ed83f86df6d --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysFalse.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that always evaluates to {@code false}. + * + * @since 3.3.0 + */ +@Evolving +public final class AlwaysFalse extends Filter { + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return true; + } + + @Override + public int hashCode() { + return Objects.hash(); + } + + @Override + public String toString() { return "FALSE"; } + + @Override + public NamedReference[] references() { return EMPTY_REFERENCE; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysTrue.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysTrue.java new file mode 100644 index 0000000000000..b6d39c3f64a77 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/AlwaysTrue.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that always evaluates to {@code true}. + * + * @since 3.3.0 + */ +@Evolving +public final class AlwaysTrue extends Filter { + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return true; + } + + @Override + public int hashCode() { + return Objects.hash(); + } + + @Override + public String toString() { return "TRUE"; } + + @Override + public NamedReference[] references() { return EMPTY_REFERENCE; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/And.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/And.java new file mode 100644 index 0000000000000..e0b8b13acb158 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/And.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; + +/** + * A filter that evaluates to {@code true} iff both {@code left} and {@code right} evaluate to + * {@code true}. + * + * @since 3.3.0 + */ +@Evolving +public final class And extends BinaryFilter { + + public And(Filter left, Filter right) { + super(left, right); + } + + @Override + public String toString() { + return String.format("(%s) AND (%s)", left.describe(), right.describe()); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/BinaryComparison.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/BinaryComparison.java new file mode 100644 index 0000000000000..0ae6e5af3ca1a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/BinaryComparison.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * Base class for {@link EqualNullSafe}, {@link EqualTo}, {@link GreaterThan}, + * {@link GreaterThanOrEqual}, {@link LessThan}, {@link LessThanOrEqual} + * + * @since 3.3.0 + */ +@Evolving +abstract class BinaryComparison extends Filter { + protected final NamedReference column; + protected final Literal value; + + protected BinaryComparison(NamedReference column, Literal value) { + this.column = column; + this.value = value; + } + + public NamedReference column() { return column; } + public Literal value() { return value; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BinaryComparison that = (BinaryComparison) o; + return Objects.equals(column, that.column) && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(column, value); + } + + @Override + public NamedReference[] references() { return new NamedReference[] { column }; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/BinaryFilter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/BinaryFilter.java new file mode 100644 index 0000000000000..ac4b9f281e9ca --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/BinaryFilter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * Base class for {@link And}, {@link Or} + * + * @since 3.3.0 + */ +@Evolving +abstract class BinaryFilter extends Filter { + protected final Filter left; + protected final Filter right; + + protected BinaryFilter(Filter left, Filter right) { + this.left = left; + this.right = right; + } + + public Filter left() { return left; } + public Filter right() { return right; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BinaryFilter and = (BinaryFilter) o; + return Objects.equals(left, and.left) && Objects.equals(right, and.right); + } + + @Override + public int hashCode() { + return Objects.hash(left, right); + } + + @Override + public NamedReference[] references() { + NamedReference[] refLeft = left.references(); + NamedReference[] refRight = right.references(); + NamedReference[] arr = new NamedReference[refLeft.length + refRight.length]; + System.arraycopy(refLeft, 0, arr, 0, refLeft.length); + System.arraycopy(refRight, 0, arr, refLeft.length, refRight.length); + return arr; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/EqualNullSafe.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/EqualNullSafe.java new file mode 100644 index 0000000000000..34b529194e075 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/EqualNullSafe.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * Performs equality comparison, similar to {@link EqualTo}. However, this differs from + * {@link EqualTo} in that it returns {@code true} (rather than NULL) if both inputs are NULL, + * and {@code false} (rather than NULL) if one of the input is NULL and the other is not NULL. + * + * @since 3.3.0 + */ +@Evolving +public final class EqualNullSafe extends BinaryComparison { + + public EqualNullSafe(NamedReference column, Literal value) { + super(column, value); + } + + @Override + public String toString() { return this.column.describe() + " <=> " + value.describe(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/EqualTo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/EqualTo.java new file mode 100644 index 0000000000000..b9c4fe053b83c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/EqualTo.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to a value + * equal to {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class EqualTo extends BinaryComparison { + + public EqualTo(NamedReference column, Literal value) { + super(column, value); + } + + @Override + public String toString() { return column.describe() + " = " + value.describe(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Filter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Filter.java new file mode 100644 index 0000000000000..852837496a103 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Filter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * Filter base class + * + * @since 3.3.0 + */ +@Evolving +public abstract class Filter implements Expression { + + protected static final NamedReference[] EMPTY_REFERENCE = new NamedReference[0]; + + /** + * Returns list of columns that are referenced by this filter. + */ + public abstract NamedReference[] references(); + + @Override + public String describe() { return this.toString(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/GreaterThan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/GreaterThan.java new file mode 100644 index 0000000000000..a3374f359ea29 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/GreaterThan.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to a value + * greater than {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class GreaterThan extends BinaryComparison { + + public GreaterThan(NamedReference column, Literal value) { + super(column, value); + } + + @Override + public String toString() { return column.describe() + " > " + value.describe(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/GreaterThanOrEqual.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/GreaterThanOrEqual.java new file mode 100644 index 0000000000000..4ee921014da41 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/GreaterThanOrEqual.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to a value + * greater than or equal to {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class GreaterThanOrEqual extends BinaryComparison { + + public GreaterThanOrEqual(NamedReference column, Literal value) { + super(column, value); + } + + @Override + public String toString() { return column.describe() + " >= " + value.describe(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/In.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/In.java new file mode 100644 index 0000000000000..8d6490b8984fd --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/In.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to one of the + * {@code values} in the array. + * + * @since 3.3.0 + */ +@Evolving +public final class In extends Filter { + static final int MAX_LEN_TO_PRINT = 50; + private final NamedReference column; + private final Literal[] values; + + public In(NamedReference column, Literal[] values) { + this.column = column; + this.values = values; + } + + public NamedReference column() { return column; } + public Literal[] values() { return values; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + In in = (In) o; + return Objects.equals(column, in.column) && values.length == in.values.length + && Arrays.asList(values).containsAll(Arrays.asList(in.values)); + } + + @Override + public int hashCode() { + int result = Objects.hash(column); + result = 31 * result + Arrays.hashCode(values); + return result; + } + + @Override + public String toString() { + String res = Arrays.stream(values).limit((MAX_LEN_TO_PRINT)).map(Literal::describe) + .collect(Collectors.joining(", ")); + if (values.length > MAX_LEN_TO_PRINT) { + res += "..."; + } + return column.describe() + " IN (" + res + ")"; + } + + @Override + public NamedReference[] references() { return new NamedReference[] { column }; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/IsNotNull.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/IsNotNull.java new file mode 100644 index 0000000000000..2cf000e99878e --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/IsNotNull.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to a non-null value. + * + * @since 3.3.0 + */ +@Evolving +public final class IsNotNull extends Filter { + private final NamedReference column; + + public IsNotNull(NamedReference column) { + this.column = column; + } + + public NamedReference column() { return column; } + + @Override + public String toString() { return column.describe() + " IS NOT NULL"; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IsNotNull isNotNull = (IsNotNull) o; + return Objects.equals(column, isNotNull.column); + } + + @Override + public int hashCode() { + return Objects.hash(column); + } + + @Override + public NamedReference[] references() { return new NamedReference[] { column }; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/IsNull.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/IsNull.java new file mode 100644 index 0000000000000..1cd497c02242e --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/IsNull.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to null. + * + * @since 3.3.0 + */ +@Evolving +public final class IsNull extends Filter { + private final NamedReference column; + + public IsNull(NamedReference column) { + this.column = column; + } + + public NamedReference column() { return column; } + + @Override + public String toString() { return column.describe() + " IS NULL"; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IsNull isNull = (IsNull) o; + return Objects.equals(column, isNull.column); + } + + @Override + public int hashCode() { + return Objects.hash(column); + } + + @Override + public NamedReference[] references() { return new NamedReference[] { column }; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/LessThan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/LessThan.java new file mode 100644 index 0000000000000..9fa5cfb87f527 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/LessThan.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to a value + * less than {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class LessThan extends BinaryComparison { + + public LessThan(NamedReference column, Literal value) { + super(column, value); + } + + @Override + public String toString() { return column.describe() + " < " + value.describe(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/LessThanOrEqual.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/LessThanOrEqual.java new file mode 100644 index 0000000000000..a41b3c8045d5a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/LessThanOrEqual.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to a value + * less than or equal to {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class LessThanOrEqual extends BinaryComparison { + + public LessThanOrEqual(NamedReference column, Literal value) { + super(column, value); + } + + @Override + public String toString() { return column.describe() + " <= " + value.describe(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Not.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Not.java new file mode 100644 index 0000000000000..69746f59ee933 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Not.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; + +/** + * A filter that evaluates to {@code true} iff {@code child} is evaluated to {@code false}. + * + * @since 3.3.0 + */ +@Evolving +public final class Not extends Filter { + private final Filter child; + + public Not(Filter child) { this.child = child; } + + public Filter child() { return child; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Not not = (Not) o; + return Objects.equals(child, not.child); + } + + @Override + public int hashCode() { + return Objects.hash(child); + } + + @Override + public String toString() { return "NOT (" + child.describe() + ")"; } + + @Override + public NamedReference[] references() { return child.references(); } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Or.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Or.java new file mode 100644 index 0000000000000..baa33d849feef --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/Or.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; + +/** + * A filter that evaluates to {@code true} iff at least one of {@code left} or {@code right} + * evaluates to {@code true}. + * + * @since 3.3.0 + */ +@Evolving +public final class Or extends BinaryFilter { + + public Or(Filter left, Filter right) { + super(left, right); + } + + @Override + public String toString() { + return String.format("(%s) OR (%s)", left.describe(), right.describe()); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringContains.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringContains.java new file mode 100644 index 0000000000000..9a01e4d574888 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringContains.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to + * a string that contains {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class StringContains extends StringPredicate { + + public StringContains(NamedReference column, UTF8String value) { + super(column, value); + } + + @Override + public String toString() { return "STRING_CONTAINS(" + column.describe() + ", " + value + ")"; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringEndsWith.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringEndsWith.java new file mode 100644 index 0000000000000..11b8317ba4895 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringEndsWith.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to + * a string that ends with {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class StringEndsWith extends StringPredicate { + + public StringEndsWith(NamedReference column, UTF8String value) { + super(column, value); + } + + @Override + public String toString() { return "STRING_ENDS_WITH(" + column.describe() + ", " + value + ")"; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringPredicate.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringPredicate.java new file mode 100644 index 0000000000000..ffe5d5dba45b3 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringPredicate.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import java.util.Objects; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Base class for {@link StringContains}, {@link StringStartsWith}, + * {@link StringEndsWith} + * + * @since 3.3.0 + */ +@Evolving +abstract class StringPredicate extends Filter { + protected final NamedReference column; + protected final UTF8String value; + + protected StringPredicate(NamedReference column, UTF8String value) { + this.column = column; + this.value = value; + } + + public NamedReference column() { return column; } + public UTF8String value() { return value; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StringPredicate that = (StringPredicate) o; + return Objects.equals(column, that.column) && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(column, value); + } + + @Override + public NamedReference[] references() { return new NamedReference[] { column }; } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringStartsWith.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringStartsWith.java new file mode 100644 index 0000000000000..38a5de1921cdc --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/filter/StringStartsWith.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.expressions.filter; + +import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A filter that evaluates to {@code true} iff the {@code column} evaluates to + * a string that starts with {@code value}. + * + * @since 3.3.0 + */ +@Evolving +public final class StringStartsWith extends StringPredicate { + + public StringStartsWith(NamedReference column, UTF8String value) { + super(column, value); + } + + @Override + public String toString() { + return "STRING_STARTS_WITH(" + column.describe() + ", " + value + ")"; + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2FiltersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2FiltersSuite.scala new file mode 100644 index 0000000000000..b457211b7f89f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2FiltersSuite.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.connector.expressions.{FieldReference, Literal, LiteralValue} +import org.apache.spark.sql.connector.expressions.filter._ +import org.apache.spark.sql.execution.datasources.v2.FiltersV2Suite.ref +import org.apache.spark.sql.types.IntegerType +import org.apache.spark.unsafe.types.UTF8String + +class FiltersV2Suite extends SparkFunSuite { + + test("nested columns") { + val filter1 = new EqualTo(ref("a", "B"), LiteralValue(1, IntegerType)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a.B")) + assert(filter1.describe.equals("a.B = 1")) + + val filter2 = new EqualTo(ref("a", "b.c"), LiteralValue(1, IntegerType)) + assert(filter2.references.map(_.describe()).toSeq == Seq("a.`b.c`")) + assert(filter2.describe.equals("a.`b.c` = 1")) + + val filter3 = new EqualTo(ref("`a`.b", "c"), LiteralValue(1, IntegerType)) + assert(filter3.references.map(_.describe()).toSeq == Seq("```a``.b`.c")) + assert(filter3.describe.equals("```a``.b`.c = 1")) + } + + test("AlwaysTrue") { + val filter1 = new AlwaysTrue + val filter2 = new AlwaysTrue + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).length == 0) + assert(filter1.describe.equals("TRUE")) + } + + test("AlwaysFalse") { + val filter1 = new AlwaysFalse + val filter2 = new AlwaysFalse + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).length == 0) + assert(filter1.describe.equals("FALSE")) + } + + test("EqualTo") { + val filter1 = new EqualTo(ref("a"), LiteralValue(1, IntegerType)) + val filter2 = new EqualTo(ref("a"), LiteralValue(1, IntegerType)) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a = 1")) + } + + test("EqualNullSafe") { + val filter1 = new EqualNullSafe(ref("a"), LiteralValue(1, IntegerType)) + val filter2 = new EqualNullSafe(ref("a"), LiteralValue(1, IntegerType)) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a <=> 1")) + } + + test("GreaterThan") { + val filter1 = new GreaterThan(ref("a"), LiteralValue(1, IntegerType)) + val filter2 = new GreaterThan(ref("a"), LiteralValue(1, IntegerType)) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a > 1")) + } + + test("GreaterThanOrEqual") { + val filter1 = new GreaterThanOrEqual(ref("a"), LiteralValue(1, IntegerType)) + val filter2 = new GreaterThanOrEqual(ref("a"), LiteralValue(1, IntegerType)) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a >= 1")) + } + + test("LessThan") { + val filter1 = new LessThan(ref("a"), LiteralValue(1, IntegerType)) + val filter2 = new LessThan(ref("a"), LiteralValue(1, IntegerType)) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a < 1")) + } + + test("LessThanOrEqual") { + val filter1 = new LessThanOrEqual(ref("a"), LiteralValue(1, IntegerType)) + val filter2 = new LessThanOrEqual(ref("a"), LiteralValue(1, IntegerType)) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a <= 1")) + } + + test("In") { + val filter1 = new In(ref("a"), + Array(LiteralValue(1, IntegerType), LiteralValue(2, IntegerType), + LiteralValue(3, IntegerType), LiteralValue(4, IntegerType))) + val filter2 = new In(ref("a"), + Array(LiteralValue(4, IntegerType), LiteralValue(2, IntegerType), + LiteralValue(3, IntegerType), LiteralValue(1, IntegerType))) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a IN (1, 2, 3, 4)")) + val values: Array[Literal[_]] = new Array[Literal[_]](1000) + for (i <- 0 until 1000) { + values(i) = LiteralValue(i, IntegerType) + } + val filter3 = new In(ref("a"), values) + var expected = "a IN (" + for (i <- 0 until 50) { + expected += i + ", " + } + expected = expected.dropRight(2) // remove the last ", " + expected += "...)" + assert(filter3.describe.equals(expected)) + } + + test("IsNull") { + val filter1 = new IsNull(ref("a")) + val filter2 = new IsNull(ref("a")) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a IS NULL")) + } + + test("IsNotNull") { + val filter1 = new IsNotNull(ref("a")) + val filter2 = new IsNotNull(ref("a")) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("a IS NOT NULL")) + } + + test("Not") { + val filter1 = new Not(new LessThan(ref("a"), LiteralValue(1, IntegerType))) + val filter2 = new Not(new LessThan(ref("a"), LiteralValue(1, IntegerType))) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("NOT (a < 1)")) + } + + test("And") { + val filter1 = new And(new EqualTo(ref("a"), LiteralValue(1, IntegerType)), + new EqualTo(ref("b"), LiteralValue(1, IntegerType))) + val filter2 = new And(new EqualTo(ref("a"), LiteralValue(1, IntegerType)), + new EqualTo(ref("b"), LiteralValue(1, IntegerType))) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a", "b")) + assert(filter1.describe.equals("(a = 1) AND (b = 1)")) + } + + test("Or") { + val filter1 = new Or(new EqualTo(ref("a"), LiteralValue(1, IntegerType)), + new EqualTo(ref("b"), LiteralValue(1, IntegerType))) + val filter2 = new Or(new EqualTo(ref("a"), LiteralValue(1, IntegerType)), + new EqualTo(ref("b"), LiteralValue(1, IntegerType))) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a", "b")) + assert(filter1.describe.equals("(a = 1) OR (b = 1)")) + } + + test("StringStartsWith") { + val filter1 = new StringStartsWith(ref("a"), UTF8String.fromString("str")) + val filter2 = new StringStartsWith(ref("a"), UTF8String.fromString("str")) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("STRING_STARTS_WITH(a, str)")) + } + + test("StringEndsWith") { + val filter1 = new StringEndsWith(ref("a"), UTF8String.fromString("str")) + val filter2 = new StringEndsWith(ref("a"), UTF8String.fromString("str")) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("STRING_ENDS_WITH(a, str)")) + } + + test("StringContains") { + val filter1 = new StringContains(ref("a"), UTF8String.fromString("str")) + val filter2 = new StringContains(ref("a"), UTF8String.fromString("str")) + assert(filter1.equals(filter2)) + assert(filter1.references.map(_.describe()).toSeq == Seq("a")) + assert(filter1.describe.equals("STRING_CONTAINS(a, str)")) + } +} + +object FiltersV2Suite { + private[sql] def ref(parts: String*): FieldReference = { + new FieldReference(parts) + } +}