From 2baf9eb7c1d65b7c490de8dadaae34077100d0a3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 14 Dec 2021 11:36:17 -0700 Subject: [PATCH 01/14] Update GpuIf to support expressions with side effects Signed-off-by: Andy Grove --- .../src/main/python/conditionals_test.py | 17 ++ .../com/nvidia/spark/rapids/GpuCast.scala | 4 + .../nvidia/spark/rapids/GpuExpressions.scala | 7 + .../spark/rapids/GpuUserDefinedFunction.scala | 2 + .../spark/rapids/conditionalExpressions.scala | 204 +++++++++++++++--- .../apache/spark/sql/rapids/arithmetic.scala | 2 + .../spark/sql/rapids/mathExpressions.scala | 4 + .../spark/rapids/ConditionalsSuite.scala | 15 ++ 8 files changed, 223 insertions(+), 32 deletions(-) create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 06e43f0f54c..c637a1510fa 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -182,3 +182,20 @@ def test_ifnull(data_gen): 'ifnull({}, b)'.format(s1), 'ifnull({}, b)'.format(null_lit), 'ifnull(a, {})'.format(null_lit))) + +@pytest.mark.parametrize('data_gen', int_n_long_gens, ids=idfn) +def test_conditional_with_side_effects_col_col(data_gen): + gen = IntegerGen().with_special_case(2147483647) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : two_col_df(spark, data_gen, gen).selectExpr( + 'IF(b < 2147483647, b + 1, b)'), + conf = {'spark.sql.ansi.enabled':True}) + +@pytest.mark.parametrize('data_gen', int_n_long_gens, ids=idfn) +def test_conditional_with_side_effects_col_scalar(data_gen): + gen = IntegerGen().with_special_case(2147483647) + assert_gpu_and_cpu_are_equal_collect( + lambda spark : two_col_df(spark, data_gen, gen).selectExpr( + 'IF(b < 2147483647, b + 1, 2147483647)', + 'IF(b >= 2147483646, 2147483647, b + 1)'), + conf = {'spark.sql.ansi.enabled':True}) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 87289552899..7de127803d3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1451,6 +1451,10 @@ case class GpuCast( import GpuCast._ + // when ansi mode is enabled, some cast expressions can throw exceptions on invalid inputs + //TODO this might not be true for all possible casts? + override def hasSideEffects: Boolean = ansiMode + override def toString: String = if (ansiMode) { s"ansi_cast($child as ${dataType.simpleString})" } else { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala index 7732f76bbc4..afc120242f8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuExpressions.scala @@ -160,6 +160,13 @@ trait GpuExpression extends Expression with Arm { */ def convertToAst(numFirstTableColumns: Int): ast.AstExpression = throw new IllegalStateException(s"Cannot convert ${this.getClass.getSimpleName} to AST") + + /** Could evaluating this expression cause side-effects, such as throwing an exception? */ + def hasSideEffects: Boolean = + children.exists { + case c: GpuExpression => c.hasSideEffects + case _ => false // This path should never really happen + } } abstract class GpuLeafExpression extends GpuExpression with ShimExpression { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala index f76af64a27e..1678df89cca 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala @@ -41,6 +41,8 @@ trait GpuUserDefinedFunction extends GpuExpression /** True if the UDF is deterministic */ val udfDeterministic: Boolean + override def hasSideEffects: Boolean = true + override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) private[this] val nvtxRangeName = s"UDF: $name" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index d108c64bf3b..69ce9162967 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -16,43 +16,19 @@ package com.nvidia.spark.rapids +import ai.rapids.cudf.{ColumnVector, NullPolicy, ScanAggregation, ScanType, Table, UnaryOp} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimExpression import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} import org.apache.spark.sql.catalyst.expressions.{ComplexTypeMergingExpression, Expression} -import org.apache.spark.sql.types.{BooleanType, DataType} +import org.apache.spark.sql.types.{BooleanType, DataType, DataTypes} import org.apache.spark.sql.vectorized.ColumnarBatch trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpression - with ShimExpression { + with ShimExpression { - protected def computeIfElse( - batch: ColumnarBatch, - predExpr: Expression, - trueExpr: Expression, - falseValue: Any): GpuColumnVector = { - withResourceIfAllowed(falseValue) { falseRet => - withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { pred => - withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => - val finalRet = (trueRet, falseRet) match { - case (t: GpuColumnVector, f: GpuColumnVector) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuScalar, f: GpuColumnVector) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuColumnVector, f: GpuScalar) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuScalar, f: GpuScalar) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t, f) => - throw new IllegalStateException(s"Unexpected inputs" + - s" ($t: ${t.getClass}, $f: ${f.getClass})") - } - GpuColumnVector.from(finalRet, dataType) - } - } - } - } + //TODO move common code back here once CASE WHEN is implemented } @@ -67,6 +43,7 @@ case class GpuIf( } override def children: Seq[Expression] = predicateExpr :: trueExpr :: falseExpr :: Nil + override def nullable: Boolean = trueExpr.nullable || falseExpr.nullable override def checkInputDataTypes(): TypeCheckResult = { @@ -82,8 +59,143 @@ case class GpuIf( } } - override def columnarEval(batch: ColumnarBatch): Any = computeIfElse(batch, predicateExpr, - trueExpr, falseExpr.columnarEval(batch)) + override def columnarEval(batch: ColumnarBatch): Any = { + + val gpuTrueExpr = trueExpr.asInstanceOf[GpuExpression] + val gpuFalseExpr = falseExpr.asInstanceOf[GpuExpression] + val colTypes = GpuColumnVector.extractTypes(batch) + + withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => + //TODO are these checks for valid good enough? check against + // previous work in cast expressions + // also need to compare to work in https://github.com/NVIDIA/spark-rapids/pull/4329 + if (!pred.hasNull && pred.getBase.all().getBoolean) { + trueExpr.columnarEval(batch) + } else if (!pred.hasNull && !pred.getBase.any().getBoolean) { + falseExpr.columnarEval(batch) + } else if (gpuTrueExpr.hasSideEffects || gpuFalseExpr.hasSideEffects) { + withResource(GpuColumnVector.from(batch)) { tbl => + withResource(pred.getBase.unaryOp(UnaryOp.NOT)) { inverted => + val trueBatch = filterBatch(tbl, pred.getBase, colTypes) + val falseBatch = filterBatch(tbl, inverted, colTypes) + withResourceIfAllowed(gpuTrueExpr.columnarEval(trueBatch)) { tt => + withResourceIfAllowed(gpuFalseExpr.columnarEval(falseBatch)) { ff => + val finalRet = (tt, ff) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => + withResource(gather(inverted, falseTable)) { falseValues => + pred.getBase.ifElse( + trueValues.getColumn(0), + falseValues.getColumn(0)) + } + } + } + } + case (t: GpuScalar, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => + withResource(gather(inverted, falseTable)) { falseValues => + pred.getBase.ifElse( + t.getBase, + falseValues.getColumn(0)) + } + } + case (t: GpuColumnVector, f: GpuScalar) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => + pred.getBase.ifElse( + trueValues.getColumn(0), + f.getBase) + } + } + case (_: GpuScalar, _: GpuScalar) => + throw new IllegalStateException( + "scalar expressions can never have side effects") + } + GpuColumnVector.from(finalRet, dataType) + } + } + } + } + } else { + // simple approach (original GpuIf code) + withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => + withResourceIfAllowed(falseExpr.columnarEval(batch)) { falseRet => + val finalRet = (trueRet, falseRet) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuColumnVector, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t, f) => + throw new IllegalStateException(s"Unexpected inputs" + + s" ($t: ${t.getClass}, $f: ${f.getClass})") + } + GpuColumnVector.from(finalRet, dataType) + } + } + } + } + } + + private def filterBatch( + tbl: Table, + pred: ColumnVector, + colTypes: Array[DataType]): ColumnarBatch = { + + withResource(tbl.filter(pred)) { filteredData => + GpuColumnVector.from(filteredData, colTypes) + } + } + + private def boolToInt(cv: ColumnVector): ColumnVector = { + withResource(GpuScalar.from(1, DataTypes.IntegerType)) { one => + withResource(GpuScalar.from(0, DataTypes.IntegerType)) { zero => + cv.ifElse(one, zero) + } + } + } + + private def gather(predicate: ColumnVector, batch: ColumnarBatch): Table = { + withResource(boolToInt(predicate)) { boolsAsInts => + + // use prefixSum (EXCLUSIVE!) to create gather map + // + //TODO explain this well since it is not obvious + // + // example: [0, 0, 1, 0, 1] => [0, 0, 0, 1, 1] + + // + + withResource(boolsAsInts.scan( + ScanAggregation.sum(), + ScanType.EXCLUSIVE, + NullPolicy.INCLUDE)) { gatherMap => + + // set unreferenced values to null (do as part two) + // -MAX_INT = out of bounds replace with null + // + // example: [0, 0, 1, 1, 2] => [0, 0, 1, 0, 2] + + val gatherMap2 = withResource(GpuScalar.from(Int.MinValue, + DataTypes.IntegerType)) { outOfBoundsFlag => + predicate.ifElse(gatherMap, outOfBoundsFlag) + } + + withResource(GpuColumnVector.from(batch)) { table => + table.gather(gatherMap2) + } + } + } + } override def toString: String = s"if ($predicateExpr) $trueExpr else $falseExpr" @@ -92,8 +204,8 @@ case class GpuIf( case class GpuCaseWhen( - branches: Seq[(Expression, Expression)], - elseValue: Option[Expression] = None) extends GpuConditionalExpression with Serializable { + branches: Seq[(Expression, Expression)], + elseValue: Option[Expression] = None) extends GpuConditionalExpression with Serializable { override def children: Seq[Expression] = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue @@ -139,6 +251,34 @@ case class GpuCaseWhen( } } + // original code from GpuIf + def computeIfElse( + batch: ColumnarBatch, + predExpr: Expression, + trueExpr: Expression, + falseValue: Any): GpuColumnVector = { + withResourceIfAllowed(falseValue) { falseRet => + withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { pred => + withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => + val finalRet = (trueRet, falseRet) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuColumnVector, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t, f) => + throw new IllegalStateException(s"Unexpected inputs" + + s" ($t: ${t.getClass}, $f: ${f.getClass})") + } + GpuColumnVector.from(finalRet, dataType) + } + } + } + } + override def toString: String = { val cases = branches.map { case (c, v) => s" WHEN $c THEN $v" }.mkString val elseCase = elseValue.map(" ELSE " + _).getOrElse("") diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index 86de80c5703..d9555bae37b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -142,6 +142,8 @@ case class GpuAbs(child: Expression, failOnError: Boolean) extends CudfUnaryExpr abstract class CudfBinaryArithmetic extends CudfBinaryOperator with NullIntolerant { override def dataType: DataType = left.dataType + // arithmetic operations can overflow and throw exceptions in ANSI mode + override def hasSideEffects: Boolean = SQLConf.get.ansiEnabled } object GpuAdd extends Arm { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala index 0e617da6d7b..fe32d415981 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/mathExpressions.scala @@ -171,6 +171,8 @@ case class GpuCeil(child: Expression) extends CudfUnaryMathExpression("CEIL") { case _ => LongType } + override def hasSideEffects: Boolean = true + override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(DoubleType, DecimalType, LongType)) @@ -245,6 +247,8 @@ case class GpuFloor(child: Expression) extends CudfUnaryMathExpression("FLOOR") case _ => LongType } + override def hasSideEffects: Boolean = true + override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(DoubleType, DecimalType, LongType)) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala new file mode 100644 index 00000000000..0c0e84bbea2 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala @@ -0,0 +1,15 @@ +package com.nvidia.spark.rapids + +class ConditionalsSuite extends SparkQueryCompareTestSuite { + + test("foo") { + + + + + + + + } + +} From b236f39bda75ea37d6e7d00a1af7951fe087e91f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 14 Dec 2021 13:03:32 -0700 Subject: [PATCH 02/14] prep for review Signed-off-by: Andy Grove --- .../spark/rapids/conditionalExpressions.scala | 181 +++++++++--------- .../spark/rapids/ConditionalsSuite.scala | 15 -- 2 files changed, 89 insertions(+), 107 deletions(-) delete mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 69ce9162967..dfe7f5e7759 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -28,7 +28,32 @@ import org.apache.spark.sql.vectorized.ColumnarBatch trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpression with ShimExpression { - //TODO move common code back here once CASE WHEN is implemented + def computeIfElse( + batch: ColumnarBatch, + predExpr: Expression, + trueExpr: Expression, + falseValue: Any): GpuColumnVector = { + withResourceIfAllowed(falseValue) { falseRet => + withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { pred => + withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => + val finalRet = (trueRet, falseRet) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuColumnVector, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t, f) => + throw new IllegalStateException(s"Unexpected inputs" + + s" ($t: ${t.getClass}, $f: ${f.getClass})") + } + GpuColumnVector.from(finalRet, dataType) + } + } + } + } } @@ -66,58 +91,58 @@ case class GpuIf( val colTypes = GpuColumnVector.extractTypes(batch) withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => - //TODO are these checks for valid good enough? check against - // previous work in cast expressions - // also need to compare to work in https://github.com/NVIDIA/spark-rapids/pull/4329 - if (!pred.hasNull && pred.getBase.all().getBoolean) { + val hasNull = pred.hasNull + if (!hasNull && pred.getBase.all().getBoolean) { trueExpr.columnarEval(batch) - } else if (!pred.hasNull && !pred.getBase.any().getBoolean) { + } else if (!hasNull && !pred.getBase.any().getBoolean) { falseExpr.columnarEval(batch) } else if (gpuTrueExpr.hasSideEffects || gpuFalseExpr.hasSideEffects) { withResource(GpuColumnVector.from(batch)) { tbl => withResource(pred.getBase.unaryOp(UnaryOp.NOT)) { inverted => - val trueBatch = filterBatch(tbl, pred.getBase, colTypes) - val falseBatch = filterBatch(tbl, inverted, colTypes) - withResourceIfAllowed(gpuTrueExpr.columnarEval(trueBatch)) { tt => - withResourceIfAllowed(gpuFalseExpr.columnarEval(falseBatch)) { ff => - val finalRet = (tt, ff) match { - case (t: GpuColumnVector, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => + withResource(filterBatch(tbl, pred.getBase, colTypes)) { trueBatch => + withResource(filterBatch(tbl, inverted, colTypes)) { falseBatch => + withResourceIfAllowed(gpuTrueExpr.columnarEval(trueBatch)) { tt => + withResourceIfAllowed(gpuFalseExpr.columnarEval(falseBatch)) { ff => + val finalRet = (tt, ff) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => + withResource(gather(inverted, falseTable)) { falseValues => + pred.getBase.ifElse( + trueValues.getColumn(0), + falseValues.getColumn(0)) + } + } + } + } + case (t: GpuScalar, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => withResource(gather(inverted, falseTable)) { falseValues => pred.getBase.ifElse( - trueValues.getColumn(0), + t.getBase, falseValues.getColumn(0)) } } - } - } - case (t: GpuScalar, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => - withResource(gather(inverted, falseTable)) { falseValues => - pred.getBase.ifElse( - t.getBase, - falseValues.getColumn(0)) - } - } - case (t: GpuColumnVector, f: GpuScalar) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => - pred.getBase.ifElse( - trueValues.getColumn(0), - f.getBase) - } + case (t: GpuColumnVector, f: GpuScalar) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => + pred.getBase.ifElse( + trueValues.getColumn(0), + f.getBase) + } + } + case (_: GpuScalar, _: GpuScalar) => + throw new IllegalStateException( + "scalar expressions can never have side effects") } - case (_: GpuScalar, _: GpuScalar) => - throw new IllegalStateException( - "scalar expressions can never have side effects") + GpuColumnVector.from(finalRet, dataType) + } } - GpuColumnVector.from(finalRet, dataType) } } } @@ -150,7 +175,6 @@ case class GpuIf( tbl: Table, pred: ColumnVector, colTypes: Array[DataType]): ColumnarBatch = { - withResource(tbl.filter(pred)) { filteredData => GpuColumnVector.from(filteredData, colTypes) } @@ -165,33 +189,34 @@ case class GpuIf( } private def gather(predicate: ColumnVector, batch: ColumnarBatch): Table = { - withResource(boolToInt(predicate)) { boolsAsInts => - - // use prefixSum (EXCLUSIVE!) to create gather map - // - //TODO explain this well since it is not obvious - // - // example: [0, 0, 1, 0, 1] => [0, 0, 0, 1, 1] - - // + // convert the predicate boolean column to numeric where 1 = true + // amd 0 = false and then use `scan` with `sum` to convert to + // indices. + // + // For example, if the predicate evaluates to [F, F, T, F, T] then this + // gets translated first to [0, 0, 1, 0, 1] and then the scan operation + // will perform an exclusive sum on these values and + // produce [0, 0, 0, 1, 1]. Combining this with the original + // predicate boolean array results in the two T values mapping to + // indices 0 and 1, respectively. + withResource(boolToInt(predicate)) { boolsAsInts => withResource(boolsAsInts.scan( ScanAggregation.sum(), ScanType.EXCLUSIVE, - NullPolicy.INCLUDE)) { gatherMap => - - // set unreferenced values to null (do as part two) - // -MAX_INT = out of bounds replace with null - // - // example: [0, 0, 1, 1, 2] => [0, 0, 1, 0, 2] + NullPolicy.INCLUDE)) { prefixSumExclusive => - val gatherMap2 = withResource(GpuScalar.from(Int.MinValue, - DataTypes.IntegerType)) { outOfBoundsFlag => - predicate.ifElse(gatherMap, outOfBoundsFlag) + // for the entries in the gather map that do not represent valid + // values to be gathered, we change the value to -MAX_INT which + // will be treated as null values in the gather algorithm + val gatherMap = withResource(GpuScalar.from(Int.MinValue, DataTypes.IntegerType)) { + outOfBoundsFlag => predicate.ifElse(prefixSumExclusive, outOfBoundsFlag) } - withResource(GpuColumnVector.from(batch)) { table => - table.gather(gatherMap2) + withResource(gatherMap) { _ => + withResource(GpuColumnVector.from(batch)) { table => + table.gather(gatherMap) + } } } } @@ -204,8 +229,8 @@ case class GpuIf( case class GpuCaseWhen( - branches: Seq[(Expression, Expression)], - elseValue: Option[Expression] = None) extends GpuConditionalExpression with Serializable { + branches: Seq[(Expression, Expression)], + elseValue: Option[Expression] = None) extends GpuConditionalExpression with Serializable { override def children: Seq[Expression] = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue @@ -251,34 +276,6 @@ case class GpuCaseWhen( } } - // original code from GpuIf - def computeIfElse( - batch: ColumnarBatch, - predExpr: Expression, - trueExpr: Expression, - falseValue: Any): GpuColumnVector = { - withResourceIfAllowed(falseValue) { falseRet => - withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { pred => - withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => - val finalRet = (trueRet, falseRet) match { - case (t: GpuColumnVector, f: GpuColumnVector) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuScalar, f: GpuColumnVector) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuColumnVector, f: GpuScalar) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuScalar, f: GpuScalar) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t, f) => - throw new IllegalStateException(s"Unexpected inputs" + - s" ($t: ${t.getClass}, $f: ${f.getClass})") - } - GpuColumnVector.from(finalRet, dataType) - } - } - } - } - override def toString: String = { val cases = branches.map { case (c, v) => s" WHEN $c THEN $v" }.mkString val elseCase = elseValue.map(" ELSE " + _).getOrElse("") diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala deleted file mode 100644 index 0c0e84bbea2..00000000000 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ConditionalsSuite.scala +++ /dev/null @@ -1,15 +0,0 @@ -package com.nvidia.spark.rapids - -class ConditionalsSuite extends SparkQueryCompareTestSuite { - - test("foo") { - - - - - - - - } - -} From c1943f0e53686f8422e9b0e674145dd1ea722ff6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 14 Dec 2021 13:46:22 -0700 Subject: [PATCH 03/14] improve checks for all true/false Signed-off-by: Andy Grove --- .../com/nvidia/spark/rapids/GpuCast.scala | 2 +- .../spark/rapids/conditionalExpressions.scala | 30 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 7de127803d3..11575789153 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1452,7 +1452,7 @@ case class GpuCast( import GpuCast._ // when ansi mode is enabled, some cast expressions can throw exceptions on invalid inputs - //TODO this might not be true for all possible casts? + //TODO not all casts can throw exceptions so this needs to be more specific override def hasSideEffects: Boolean = ansiMode override def toString: String = if (ansiMode) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index dfe7f5e7759..550da115c2a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -55,6 +55,31 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr } } + protected def isAllTrue(col: GpuColumnVector): Boolean = { + assert(BooleanType == col.dataType()) + if (col.getRowCount == 0) { + return true + } + if (col.hasNull) { + return false + } + withResource(col.getBase.all()) { allTrue => + // Guaranteed there is at least one row and no nulls so result must be valid + allTrue.getBoolean + } + } + + protected def isAllFalse(col: GpuColumnVector): Boolean = { + assert(BooleanType == col.dataType()) + if (col.getRowCount == col.numNulls()) { + // all nulls, and null values are false values here + return true + } + withResource(col.getBase.any()) { anyTrue => + // null values are considered false values in this context + !anyTrue.isValid || !anyTrue.getBoolean + } + } } case class GpuIf( @@ -91,10 +116,9 @@ case class GpuIf( val colTypes = GpuColumnVector.extractTypes(batch) withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => - val hasNull = pred.hasNull - if (!hasNull && pred.getBase.all().getBoolean) { + if (isAllTrue(pred)) { trueExpr.columnarEval(batch) - } else if (!hasNull && !pred.getBase.any().getBoolean) { + } else if (isAllFalse(pred)) { falseExpr.columnarEval(batch) } else if (gpuTrueExpr.hasSideEffects || gpuFalseExpr.hasSideEffects) { withResource(GpuColumnVector.from(batch)) { tbl => From 5620ca53b64c7124f3a5aeda97d973c256c0ba7e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 14 Dec 2021 14:12:32 -0700 Subject: [PATCH 04/14] refactor for readability and add documentation --- .../spark/rapids/conditionalExpressions.scala | 119 ++++++++++-------- 1 file changed, 68 insertions(+), 51 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 550da115c2a..c961cda188f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -121,58 +121,8 @@ case class GpuIf( } else if (isAllFalse(pred)) { falseExpr.columnarEval(batch) } else if (gpuTrueExpr.hasSideEffects || gpuFalseExpr.hasSideEffects) { - withResource(GpuColumnVector.from(batch)) { tbl => - withResource(pred.getBase.unaryOp(UnaryOp.NOT)) { inverted => - withResource(filterBatch(tbl, pred.getBase, colTypes)) { trueBatch => - withResource(filterBatch(tbl, inverted, colTypes)) { falseBatch => - withResourceIfAllowed(gpuTrueExpr.columnarEval(trueBatch)) { tt => - withResourceIfAllowed(gpuFalseExpr.columnarEval(falseBatch)) { ff => - val finalRet = (tt, ff) match { - case (t: GpuColumnVector, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => - withResource(gather(inverted, falseTable)) { falseValues => - pred.getBase.ifElse( - trueValues.getColumn(0), - falseValues.getColumn(0)) - } - } - } - } - case (t: GpuScalar, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => - withResource(gather(inverted, falseTable)) { falseValues => - pred.getBase.ifElse( - t.getBase, - falseValues.getColumn(0)) - } - } - case (t: GpuColumnVector, f: GpuScalar) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => - pred.getBase.ifElse( - trueValues.getColumn(0), - f.getBase) - } - } - case (_: GpuScalar, _: GpuScalar) => - throw new IllegalStateException( - "scalar expressions can never have side effects") - } - GpuColumnVector.from(finalRet, dataType) - } - } - } - } - } - } + conditionalWithSideEffects(batch, pred, gpuTrueExpr, gpuFalseExpr, colTypes) } else { - // simple approach (original GpuIf code) withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => withResourceIfAllowed(falseExpr.columnarEval(batch)) { falseRet => val finalRet = (trueRet, falseRet) match { @@ -195,6 +145,73 @@ case class GpuIf( } } + /** + * When computing conditional expressions on the CPU, the true and false + * expressions are evaluated lazily, meaning that the true expression is + * only evaluated for rows where the predicate is true, and the false + * expression is only evaluated for rows where the predicate is false. + * This is important in the case where the expressions can have + * side-effects, such as throwing exceptions for invalid inputs. + * + * This method performs lazy evaluation on the GPU by first filtering the + * input batch into two batches - one for rows where the predicate is true + * and one for rows where the predicate is false. The expressions are + * evaluated against these batches and then the results are combined + * back into a single batch using the gather algorithm. + */ + private def conditionalWithSideEffects( + batch: ColumnarBatch, + pred: GpuColumnVector, + gpuTrueExpr: GpuExpression, + gpuFalseExpr: GpuExpression, + colTypes: Array[DataType]): GpuColumnVector = { + + withResource(GpuColumnVector.from(batch)) { tbl => + withResource(pred.getBase.unaryOp(UnaryOp.NOT)) { inverted => + withResource(filterBatch(tbl, pred.getBase, colTypes)) { trueBatch => + withResource(filterBatch(tbl, inverted, colTypes)) { falseBatch => + withResourceIfAllowed(gpuTrueExpr.columnarEval(trueBatch)) { tt => + withResourceIfAllowed(gpuFalseExpr.columnarEval(falseBatch)) { ff => + val finalRet = (tt, ff) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => + withResource(gather(inverted, falseTable)) { falseValues => + pred.getBase.ifElse(trueValues.getColumn(0), falseValues.getColumn(0)) + } + } + } + } + case (t: GpuScalar, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => + withResource(gather(inverted, falseTable)) { falseValues => + pred.getBase.ifElse(t.getBase, falseValues.getColumn(0)) + } + } + case (t: GpuColumnVector, f: GpuScalar) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => + pred.getBase.ifElse(trueValues.getColumn(0), f.getBase) + } + } + case (_: GpuScalar, _: GpuScalar) => + throw new IllegalStateException( + "scalar expressions can never have side effects") + } + GpuColumnVector.from(finalRet, dataType) + } + } + } + } + } + } + } + private def filterBatch( tbl: Table, pred: ColumnVector, From f5f6f12ae82d2b72d6df5e94a1330d23ead78300 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 14 Dec 2021 15:22:51 -0700 Subject: [PATCH 05/14] improve GpuCast side-effect check Signed-off-by: Andy Grove --- .../scala/com/nvidia/spark/rapids/GpuCast.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 11575789153..4935fe809b6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -1452,8 +1452,21 @@ case class GpuCast( import GpuCast._ // when ansi mode is enabled, some cast expressions can throw exceptions on invalid inputs - //TODO not all casts can throw exceptions so this needs to be more specific - override def hasSideEffects: Boolean = ansiMode + override def hasSideEffects: Boolean = { + (child.dataType, dataType) match { + case (StringType, _) if ansiMode => true + case (TimestampType, ByteType | ShortType | IntegerType) if ansiMode => true + case (_: DecimalType, LongType) if ansiMode => true + case (LongType | _: DecimalType, IntegerType) if ansiMode => true + case (LongType | IntegerType | _: DecimalType, ShortType) if ansiMode => true + case (LongType | IntegerType | ShortType | _: DecimalType, ByteType) if ansiMode => true + case (FloatType | DoubleType, ByteType) if ansiMode => true + case (FloatType | DoubleType, ShortType) if ansiMode => true + case (FloatType | DoubleType, IntegerType) if ansiMode => true + case (FloatType | DoubleType, LongType) if ansiMode => true + case _ => false + } + } override def toString: String = if (ansiMode) { s"ansi_cast($child as ${dataType.simpleString})" From 7d097ed547b482b477ca3afd1ba8937940aa99c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 14 Dec 2021 15:40:32 -0700 Subject: [PATCH 06/14] release resources earlier --- .../spark/rapids/conditionalExpressions.scala | 70 ++++++++++--------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index c961cda188f..7b76b3e4292 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -168,44 +168,48 @@ case class GpuIf( withResource(GpuColumnVector.from(batch)) { tbl => withResource(pred.getBase.unaryOp(UnaryOp.NOT)) { inverted => - withResource(filterBatch(tbl, pred.getBase, colTypes)) { trueBatch => - withResource(filterBatch(tbl, inverted, colTypes)) { falseBatch => - withResourceIfAllowed(gpuTrueExpr.columnarEval(trueBatch)) { tt => - withResourceIfAllowed(gpuFalseExpr.columnarEval(falseBatch)) { ff => - val finalRet = (tt, ff) match { - case (t: GpuColumnVector, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => - withResource(gather(inverted, falseTable)) { falseValues => - pred.getBase.ifElse(trueValues.getColumn(0), falseValues.getColumn(0)) - } - } - } - } - case (t: GpuScalar, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => + // evaluate true expression against true batch + val tt = withResource(filterBatch(tbl, pred.getBase, colTypes)) { trueBatch => + gpuTrueExpr.columnarEval(trueBatch) + } + withResourceIfAllowed(tt) { _ => + // evaluate false expression against false batch + val ff = withResource(filterBatch(tbl, inverted, colTypes)) { falseBatch => + gpuFalseExpr.columnarEval(falseBatch) + } + withResourceIfAllowed(ff) { _ => + val finalRet = (tt, ff) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => withResource(gather(inverted, falseTable)) { falseValues => - pred.getBase.ifElse(t.getBase, falseValues.getColumn(0)) - } - } - case (t: GpuColumnVector, f: GpuScalar) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => - pred.getBase.ifElse(trueValues.getColumn(0), f.getBase) + pred.getBase.ifElse(trueValues.getColumn(0), falseValues.getColumn(0)) } } - case (_: GpuScalar, _: GpuScalar) => - throw new IllegalStateException( - "scalar expressions can never have side effects") + } + } + case (t: GpuScalar, f: GpuColumnVector) => + withResource(GpuColumnVector.from(new Table(f.getBase), + Array(falseExpr.dataType))) { falseTable => + withResource(gather(inverted, falseTable)) { falseValues => + pred.getBase.ifElse(t.getBase, falseValues.getColumn(0)) + } } - GpuColumnVector.from(finalRet, dataType) - } + case (t: GpuColumnVector, f: GpuScalar) => + withResource(GpuColumnVector.from(new Table(t.getBase), + Array(trueExpr.dataType))) { trueTable => + withResource(gather(pred.getBase, trueTable)) { trueValues => + pred.getBase.ifElse(trueValues.getColumn(0), f.getBase) + } + } + case (_: GpuScalar, _: GpuScalar) => + throw new IllegalStateException( + "scalar expressions can never have side effects") } + GpuColumnVector.from(finalRet, dataType) } } } From b6b865f6be5e14ff8832c1567905680461b8a4d0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 14 Dec 2021 15:42:07 -0700 Subject: [PATCH 07/14] remove isValid check --- .../scala/com/nvidia/spark/rapids/conditionalExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 7b76b3e4292..395ebc4941e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -77,7 +77,7 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr } withResource(col.getBase.any()) { anyTrue => // null values are considered false values in this context - !anyTrue.isValid || !anyTrue.getBoolean + !anyTrue.getBoolean } } } From 96567bbb76300b47fb8684e737cfcd53928578fa Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Dec 2021 08:23:12 -0700 Subject: [PATCH 08/14] Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala Co-authored-by: Liangcai Li --- .../scala/com/nvidia/spark/rapids/conditionalExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 395ebc4941e..cc18da513f0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -117,7 +117,7 @@ case class GpuIf( withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => if (isAllTrue(pred)) { - trueExpr.columnarEval(batch) + GpuExpressionsUtils.columnarEvalToColumn(trueExpr, batch) } else if (isAllFalse(pred)) { falseExpr.columnarEval(batch) } else if (gpuTrueExpr.hasSideEffects || gpuFalseExpr.hasSideEffects) { From 0c706f4760b70c8c1d0ce0a52ed866f25a4cb18c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Dec 2021 08:23:17 -0700 Subject: [PATCH 09/14] Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala Co-authored-by: Liangcai Li --- .../scala/com/nvidia/spark/rapids/conditionalExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index cc18da513f0..ccd67e7f31f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -119,7 +119,7 @@ case class GpuIf( if (isAllTrue(pred)) { GpuExpressionsUtils.columnarEvalToColumn(trueExpr, batch) } else if (isAllFalse(pred)) { - falseExpr.columnarEval(batch) + GpuExpressionsUtils.columnarEvalToColumn(falseExpr, batch) } else if (gpuTrueExpr.hasSideEffects || gpuFalseExpr.hasSideEffects) { conditionalWithSideEffects(batch, pred, gpuTrueExpr, gpuFalseExpr, colTypes) } else { From ee833391e9afad914c1fbcd3b5c2dac7f8b4ec09 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Dec 2021 11:02:29 -0700 Subject: [PATCH 10/14] fix resource leak and add test for CAST Signed-off-by: Andy Grove --- .../src/main/python/conditionals_test.py | 14 +++++++++++++- .../spark/rapids/conditionalExpressions.scala | 18 ++++++++++-------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index c637a1510fa..02d580e3fb5 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -20,6 +20,9 @@ from pyspark.sql.types import * import pyspark.sql.functions as f +def mk_str_gen(pattern): + return StringGen(pattern).with_special_case('').with_special_pattern('.{0,10}') + all_gens = all_gen + [NullGen()] all_nested_gens = array_gens_sample + struct_gens_sample + map_gens_sample all_nested_gens_nonempty_struct = array_gens_sample + nonempty_struct_gens_sample @@ -198,4 +201,13 @@ def test_conditional_with_side_effects_col_scalar(data_gen): lambda spark : two_col_df(spark, data_gen, gen).selectExpr( 'IF(b < 2147483647, b + 1, 2147483647)', 'IF(b >= 2147483646, 2147483647, b + 1)'), - conf = {'spark.sql.ansi.enabled':True}) \ No newline at end of file + conf = {'spark.sql.ansi.enabled':True}) + +@pytest.mark.parametrize('data_gen', int_n_long_gens, ids=idfn) +def test_conditional_with_side_effects_cast(data_gen): + gen = mk_str_gen('[0-9]{1,20}') + assert_gpu_and_cpu_are_equal_collect( + lambda spark : two_col_df(spark, data_gen, gen).selectExpr( + 'IF(a RLIKE "^[0-9]{1,5}$", CAST(a AS INT), 0)'), + conf = {'spark.sql.ansi.enabled':True, + 'spark.rapids.sql.expression.RLike': True}) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index ccd67e7f31f..46718d6c646 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -166,6 +166,12 @@ case class GpuIf( gpuFalseExpr: GpuExpression, colTypes: Array[DataType]): GpuColumnVector = { + def batchFromCv(t: GpuColumnVector, dt: DataType): ColumnarBatch = { + withResource(new Table(t.getBase)) { tbl => + GpuColumnVector.from(tbl, Array(dt)) + } + } + withResource(GpuColumnVector.from(batch)) { tbl => withResource(pred.getBase.unaryOp(UnaryOp.NOT)) { inverted => // evaluate true expression against true batch @@ -180,10 +186,8 @@ case class GpuIf( withResourceIfAllowed(ff) { _ => val finalRet = (tt, ff) match { case (t: GpuColumnVector, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => + withResource(batchFromCv(t, trueExpr.dataType)) { trueTable => + withResource(batchFromCv(f, falseExpr.dataType)) { falseTable => withResource(gather(pred.getBase, trueTable)) { trueValues => withResource(gather(inverted, falseTable)) { falseValues => pred.getBase.ifElse(trueValues.getColumn(0), falseValues.getColumn(0)) @@ -192,15 +196,13 @@ case class GpuIf( } } case (t: GpuScalar, f: GpuColumnVector) => - withResource(GpuColumnVector.from(new Table(f.getBase), - Array(falseExpr.dataType))) { falseTable => + withResource(batchFromCv(f, falseExpr.dataType)) { falseTable => withResource(gather(inverted, falseTable)) { falseValues => pred.getBase.ifElse(t.getBase, falseValues.getColumn(0)) } } case (t: GpuColumnVector, f: GpuScalar) => - withResource(GpuColumnVector.from(new Table(t.getBase), - Array(trueExpr.dataType))) { trueTable => + withResource(batchFromCv(t, trueExpr.dataType)) { trueTable => withResource(gather(pred.getBase, trueTable)) { trueValues => pred.getBase.ifElse(trueValues.getColumn(0), f.getBase) } From fcdc5416fc465e894dba3cd44385a66378800153 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Dec 2021 11:05:37 -0700 Subject: [PATCH 11/14] revert add blank line --- .../scala/com/nvidia/spark/rapids/conditionalExpressions.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 46718d6c646..94047129b01 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -93,7 +93,6 @@ case class GpuIf( } override def children: Seq[Expression] = predicateExpr :: trueExpr :: falseExpr :: Nil - override def nullable: Boolean = trueExpr.nullable || falseExpr.nullable override def checkInputDataTypes(): TypeCheckResult = { From 54488eb5923aa00d98b9cc003e71bb1fc39a460d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Dec 2021 17:15:20 -0700 Subject: [PATCH 12/14] Update sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala Co-authored-by: Jason Lowe --- .../scala/com/nvidia/spark/rapids/conditionalExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 94047129b01..7315af6f304 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -255,7 +255,7 @@ case class GpuIf( // for the entries in the gather map that do not represent valid // values to be gathered, we change the value to -MAX_INT which // will be treated as null values in the gather algorithm - val gatherMap = withResource(GpuScalar.from(Int.MinValue, DataTypes.IntegerType)) { + val gatherMap = withResource(Scalar.fromInt(Int.MinValue)) { outOfBoundsFlag => predicate.ifElse(prefixSumExclusive, outOfBoundsFlag) } From 7b87af6d998fcb03ab3a5bf57e9028c310ed9746 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 15 Dec 2021 17:25:08 -0700 Subject: [PATCH 13/14] partially address PR review feedback Signed-off-by: Andy Grove --- .../spark/rapids/GpuUserDefinedFunction.scala | 1 + .../spark/rapids/conditionalExpressions.scala | 44 +++++++------------ 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala index 1678df89cca..0dc5c75f743 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala @@ -105,6 +105,7 @@ trait GpuRowBasedUserDefinedFunction extends GpuExpression private[this] lazy val outputType = dataType.catalogString override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) + override def hasSideEffects: Boolean = true override def columnarEval(batch: ColumnarBatch): Any = { val cpuUDFStart = System.nanoTime diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 94047129b01..254735dcab0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpression with ShimExpression { - def computeIfElse( + protected def computeIfElse( batch: ColumnarBatch, predExpr: Expression, trueExpr: Expression, @@ -112,7 +112,6 @@ case class GpuIf( val gpuTrueExpr = trueExpr.asInstanceOf[GpuExpression] val gpuFalseExpr = falseExpr.asInstanceOf[GpuExpression] - val colTypes = GpuColumnVector.extractTypes(batch) withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => if (isAllTrue(pred)) { @@ -120,7 +119,7 @@ case class GpuIf( } else if (isAllFalse(pred)) { GpuExpressionsUtils.columnarEvalToColumn(falseExpr, batch) } else if (gpuTrueExpr.hasSideEffects || gpuFalseExpr.hasSideEffects) { - conditionalWithSideEffects(batch, pred, gpuTrueExpr, gpuFalseExpr, colTypes) + conditionalWithSideEffects(batch, pred, gpuTrueExpr, gpuFalseExpr) } else { withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => withResourceIfAllowed(falseExpr.columnarEval(batch)) { falseRet => @@ -162,14 +161,9 @@ case class GpuIf( batch: ColumnarBatch, pred: GpuColumnVector, gpuTrueExpr: GpuExpression, - gpuFalseExpr: GpuExpression, - colTypes: Array[DataType]): GpuColumnVector = { + gpuFalseExpr: GpuExpression): GpuColumnVector = { - def batchFromCv(t: GpuColumnVector, dt: DataType): ColumnarBatch = { - withResource(new Table(t.getBase)) { tbl => - GpuColumnVector.from(tbl, Array(dt)) - } - } + val colTypes = GpuColumnVector.extractTypes(batch) withResource(GpuColumnVector.from(batch)) { tbl => withResource(pred.getBase.unaryOp(UnaryOp.NOT)) { inverted => @@ -185,26 +179,18 @@ case class GpuIf( withResourceIfAllowed(ff) { _ => val finalRet = (tt, ff) match { case (t: GpuColumnVector, f: GpuColumnVector) => - withResource(batchFromCv(t, trueExpr.dataType)) { trueTable => - withResource(batchFromCv(f, falseExpr.dataType)) { falseTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => - withResource(gather(inverted, falseTable)) { falseValues => - pred.getBase.ifElse(trueValues.getColumn(0), falseValues.getColumn(0)) - } - } + withResource(gather(pred.getBase, t)) { trueValues => + withResource(gather(inverted, f)) { falseValues => + pred.getBase.ifElse(trueValues.getColumn(0), falseValues.getColumn(0)) } } case (t: GpuScalar, f: GpuColumnVector) => - withResource(batchFromCv(f, falseExpr.dataType)) { falseTable => - withResource(gather(inverted, falseTable)) { falseValues => - pred.getBase.ifElse(t.getBase, falseValues.getColumn(0)) - } + withResource(gather(inverted, f)) { falseValues => + pred.getBase.ifElse(t.getBase, falseValues.getColumn(0)) } case (t: GpuColumnVector, f: GpuScalar) => - withResource(batchFromCv(t, trueExpr.dataType)) { trueTable => - withResource(gather(pred.getBase, trueTable)) { trueValues => - pred.getBase.ifElse(trueValues.getColumn(0), f.getBase) - } + withResource(gather(pred.getBase, t)) { trueValues => + pred.getBase.ifElse(trueValues.getColumn(0), f.getBase) } case (_: GpuScalar, _: GpuScalar) => throw new IllegalStateException( @@ -234,7 +220,7 @@ case class GpuIf( } } - private def gather(predicate: ColumnVector, batch: ColumnarBatch): Table = { + private def gather(predicate: ColumnVector, t: GpuColumnVector): Table = { // convert the predicate boolean column to numeric where 1 = true // amd 0 = false and then use `scan` with `sum` to convert to // indices. @@ -259,9 +245,9 @@ case class GpuIf( outOfBoundsFlag => predicate.ifElse(prefixSumExclusive, outOfBoundsFlag) } - withResource(gatherMap) { _ => - withResource(GpuColumnVector.from(batch)) { table => - table.gather(gatherMap) + withResource(new Table(t.getBase)) { tbl => + withResource(gatherMap) { _ => + tbl.gather(gatherMap) } } } From bee0643d4606ca011388432788bd1ec1596fc41b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 16 Dec 2021 08:48:02 -0700 Subject: [PATCH 14/14] Change gather signature to return ColumnVector. Also add missing import. Signed-off-by: Andy Grove --- .../spark/rapids/conditionalExpressions.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 357eb245912..5b1c1172d5a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.{ColumnVector, NullPolicy, ScanAggregation, ScanType, Table, UnaryOp} +import ai.rapids.cudf.{ColumnVector, NullPolicy, Scalar, ScanAggregation, ScanType, Table, UnaryOp} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimExpression @@ -181,16 +181,16 @@ case class GpuIf( case (t: GpuColumnVector, f: GpuColumnVector) => withResource(gather(pred.getBase, t)) { trueValues => withResource(gather(inverted, f)) { falseValues => - pred.getBase.ifElse(trueValues.getColumn(0), falseValues.getColumn(0)) + pred.getBase.ifElse(trueValues, falseValues) } } case (t: GpuScalar, f: GpuColumnVector) => withResource(gather(inverted, f)) { falseValues => - pred.getBase.ifElse(t.getBase, falseValues.getColumn(0)) + pred.getBase.ifElse(t.getBase, falseValues) } case (t: GpuColumnVector, f: GpuScalar) => withResource(gather(pred.getBase, t)) { trueValues => - pred.getBase.ifElse(trueValues.getColumn(0), f.getBase) + pred.getBase.ifElse(trueValues, f.getBase) } case (_: GpuScalar, _: GpuScalar) => throw new IllegalStateException( @@ -220,7 +220,7 @@ case class GpuIf( } } - private def gather(predicate: ColumnVector, t: GpuColumnVector): Table = { + private def gather(predicate: ColumnVector, t: GpuColumnVector): ColumnVector = { // convert the predicate boolean column to numeric where 1 = true // amd 0 = false and then use `scan` with `sum` to convert to // indices. @@ -247,7 +247,9 @@ case class GpuIf( withResource(new Table(t.getBase)) { tbl => withResource(gatherMap) { _ => - tbl.gather(gatherMap) + withResource(tbl.gather(gatherMap)) { gatherTbl => + gatherTbl.getColumn(0).incRefCount() + } } } }