From e39b33bb8a656d6a3bd929b3c0381b4843f57e41 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 8 Dec 2021 08:27:36 +0800 Subject: [PATCH 01/17] A small optimization for conditional expressions Signed-off-by: Firestarman --- .../spark/rapids/conditionalExpressions.scala | 100 +++++++++++++----- 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index d108c64bf3b..da48f86d1b3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -29,31 +29,44 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr protected def computeIfElse( batch: ColumnarBatch, - predExpr: Expression, + pred: GpuColumnVector, trueExpr: Expression, falseValue: Any): GpuColumnVector = { withResourceIfAllowed(falseValue) { falseRet => - withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { pred => - withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => - val finalRet = (trueRet, falseRet) match { - case (t: GpuColumnVector, f: GpuColumnVector) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuScalar, f: GpuColumnVector) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuColumnVector, f: GpuScalar) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t: GpuScalar, f: GpuScalar) => - pred.getBase.ifElse(t.getBase, f.getBase) - case (t, f) => - throw new IllegalStateException(s"Unexpected inputs" + - s" ($t: ${t.getClass}, $f: ${f.getClass})") - } - GpuColumnVector.from(finalRet, dataType) + withResourceIfAllowed(trueExpr.columnarEval(batch)) { trueRet => + val finalRet = (trueRet, falseRet) match { + case (t: GpuColumnVector, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuColumnVector) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuColumnVector, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t: GpuScalar, f: GpuScalar) => + pred.getBase.ifElse(t.getBase, f.getBase) + case (t, f) => + throw new IllegalStateException(s"Unexpected inputs" + + s" ($t: ${t.getClass}, $f: ${f.getClass})") } + GpuColumnVector.from(finalRet, dataType) } } } + protected def isAllTrue(col: GpuColumnVector): Boolean = { + assert(BooleanType == col.dataType()) + withResource(col.getBase.all()) { allTrue => + // null is treated as false in Spark, but skipped by 'all()' method. + allTrue.isValid && allTrue.getBoolean && !col.hasNull + } + } + + protected def isAllFalse(col: GpuColumnVector): Boolean = { + assert(BooleanType == col.dataType()) + withResource(col.getBase.any()) { anyTrue => + anyTrue.isValid && !anyTrue.getBoolean + } + } + } case class GpuIf( @@ -82,8 +95,19 @@ case class GpuIf( } } - override def columnarEval(batch: ColumnarBatch): Any = computeIfElse(batch, predicateExpr, - trueExpr, falseExpr.columnarEval(batch)) + override def columnarEval(batch: ColumnarBatch): Any = { + withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => + if (isAllTrue(pred)) { + // All are true + trueExpr.columnarEval(batch) + } else if (isAllFalse(pred)) { + // All are false + falseExpr.columnarEval(batch) + } else { + computeIfElse(batch, pred, trueExpr, falseExpr.columnarEval(batch)) + } + } + } override def toString: String = s"if ($predicateExpr) $trueExpr else $falseExpr" @@ -129,13 +153,39 @@ case class GpuCaseWhen( } } + @transient + private[this] lazy val trueExpressions = branches.map(_._2) + override def columnarEval(batch: ColumnarBatch): Any = { - // `elseRet` will be closed in `computeIfElse`. - val elseRet = elseValue - .map(_.columnarEval(batch)) - .getOrElse(GpuScalar(null, branches.last._2.dataType)) - branches.foldRight[Any](elseRet) { case ((predicateExpr, trueExpr), falseRet) => - computeIfElse(batch, predicateExpr, trueExpr, falseRet) + val size = branches.size + val predications = new Array[GpuColumnVector](size) + var isAllPredsFalse = true + var i = 0 + + withResource(predications) { preds => + while (i < size) { + // If any predication is the first all-true, then evaluate its true expression + // and return the result. + preds(i) = GpuExpressionsUtils.columnarEvalToColumn(branches(i)._1, batch) + val p = preds(i) + if (isAllPredsFalse && isAllTrue(p)) { + return trueExpressions(i).columnarEval(batch) + } + isAllPredsFalse = isAllPredsFalse && isAllFalse(p) + i += 1 + } + + val elseRet = elseValue + .map(_.columnarEval(batch)) + .getOrElse(GpuScalar(null, branches.last._2.dataType)) + if (isAllPredsFalse) { + // No predication has a true, so return the else value. + elseRet + } else { + preds.zip(trueExpressions).foldRight[Any](elseRet) { case ((p, trueExpr), falseRet) => + computeIfElse(batch, p, trueExpr, falseRet) + } + } } } From 95237f9dcd99a4fe4de8b00d383117be82494950 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 8 Dec 2021 17:27:32 +0800 Subject: [PATCH 02/17] Add a config for this opt Signed-off-by: Firestarman --- docs/configs.md | 1 + .../src/main/python/conditionals_test.py | 23 +++++++--- .../nvidia/spark/rapids/GpuOverrides.scala | 4 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 14 ++++++ .../spark/rapids/conditionalExpressions.scala | 46 ++++++++++++++++--- 5 files changed, 73 insertions(+), 15 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index f0d6db7c4e2..8bcdb1d29b5 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -110,6 +110,7 @@ Name | Description | Default Value spark.rapids.sql.join.leftSemi.enabled|When set to true left semi joins are enabled on the GPU|true spark.rapids.sql.join.rightOuter.enabled|When set to true right outer joins are enabled on the GPU|true spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE +spark.rapids.sql.opt.allTrueFalse.enabled|When set to true, checks whether the predictions on a batch are all true or false, then accordingly evaluate only the true or false expression. This is for the GPU `If` and `CaseWhen`, and can improve the performance by avoiding the evaluation of the true or false expression when it is an expensive operation. e.g. row-based UDF. It is disabled by default because it does work when the input batch is mixed with true and false predictions, and may affect the performance negatively because it involves some additional computations. This config might be removed in the future when the implementation of GPU `If` and `CaseWhen` evolves|false spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647 spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647 diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 06e43f0f54c..927dc30ee5f 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -40,11 +40,15 @@ if_nested_gens = if_array_gens_sample + if_struct_gens_sample @pytest.mark.parametrize('data_gen', all_gens + if_nested_gens + decimal_128_gens_no_neg, ids=idfn) -def test_if_else(data_gen): +@pytest.mark.parametrize('all_truefalse_opt', [True, False]) +def test_if_else(data_gen, all_truefalse_opt): (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) null_lit = get_null_lit_string(data_gen.data_type) + row_num = 2048 + if all_truefalse_opt: + row_num = 30 # Small row number can get all-true or all-false batch easily. assert_gpu_and_cpu_are_equal_collect( - lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen).selectExpr( + lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen, row_num).selectExpr( 'IF(TRUE, b, c)', 'IF(TRUE, {}, {})'.format(s1, null_lit), 'IF(FALSE, {}, {})'.format(s1, null_lit), @@ -53,7 +57,8 @@ def test_if_else(data_gen): 'IF(a, b, {})'.format(s2), 'IF(a, {}, {})'.format(s1, s2), 'IF(a, b, {})'.format(null_lit), - 'IF(a, {}, c)'.format(null_lit))) + 'IF(a, {}, c)'.format(null_lit)), + conf={'spark.rapids.sql.opt.allTrueFalse.enabled': all_truefalse_opt}) # Maps scalars are not really supported by Spark from python without jumping through a lot of hoops # so for now we are going to skip them @@ -67,7 +72,13 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) -def test_case_when(data_gen): +@pytest.mark.parametrize('all_truefalse_opt', [True, False]) +def test_case_when(data_gen, all_truefalse_opt): + row_num = 2048 + if all_truefalse_opt: + row_num = 30 # Small row number can get all-true or all-false batch easily. + opt_conf = {'spark.rapids.sql.opt.allTrueFalse.enabled': all_truefalse_opt} + opt_conf.update(allow_negative_scale_of_decimal_conf) num_cmps = 20 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of false @@ -87,7 +98,7 @@ def test_case_when(data_gen): # (scalar, column) # in sequence. assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, gen).select(command, + lambda spark : gen_df(spark, gen, row_num).select(command, f.when(f.col('_b0'), s1), f.when(f.col('_b0'), f.col('_c0')).otherwise(f.col('_c1')), f.when(f.col('_b0'), s1).otherwise(f.col('_c0')), @@ -95,7 +106,7 @@ def test_case_when(data_gen): f.when(f.col('_b0'), s1).when(f.lit(True), f.col('_c0')), f.when(f.col('_b0'), f.lit(None).cast(data_type)).otherwise(f.col('_c0')), f.when(f.lit(False), f.col('_c0'))), - conf = allow_negative_scale_of_decimal_conf) + conf = opt_conf) @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_nanvl(data_gen): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index d51fdb531db..a3c149caa68 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2045,7 +2045,7 @@ object GpuOverrides extends Logging { } else { None } - GpuCaseWhen(branches, elseValue) + GpuCaseWhen(branches, elseValue, conf.isAllTrueFalseOptEnabled) } }), expr[If]( @@ -2066,7 +2066,7 @@ object GpuOverrides extends Logging { (a, conf, p, r) => new ExprMeta[If](a, conf, p, r) { override def convertToGpu(): GpuExpression = { val Seq(boolExpr, trueExpr, falseExpr) = childExprs.map(_.convertToGpu()) - GpuIf(boolExpr, trueExpr, falseExpr) + GpuIf(boolExpr, trueExpr, falseExpr, conf.isAllTrueFalseOptEnabled) } }), expr[Pow]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 6783f9b5ddf..798926a80de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -705,6 +705,18 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val ENABLE_ALL_TRUE_FALSE_OPTIMIZATION = conf("spark.rapids.sql.opt.allTrueFalse.enabled") + .doc("When set to true, checks whether the predictions on a batch are all true or false, " + + "then accordingly evaluate only the true or false expression. This is for the GPU `If` " + + "and `CaseWhen`, and can improve the performance by avoiding the evaluation of the true " + + "or false expression when it is an expensive operation. e.g. row-based UDF. It is " + + "disabled by default because it does work when the input batch is mixed with true and " + + "false predictions, and may affect the performance negatively because it involves some " + + "additional computations. This config might be removed in the future when the " + + "implementation of GPU `If` and `CaseWhen` evolves") + .booleanConf + .createWithDefault(false) + object ParquetReaderType extends Enumeration { val AUTO, COALESCING, MULTITHREADED, PERFILE = Value } @@ -1739,6 +1751,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCpuBasedUDFEnabled: Boolean = get(ENABLE_CPU_BASED_UDF) + lazy val isAllTrueFalseOptEnabled: Boolean = get(ENABLE_ALL_TRUE_FALSE_OPTIMIZATION) + lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE) private val optimizerDefaults = Map( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index da48f86d1b3..ca02f54ed16 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -72,7 +72,8 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr case class GpuIf( predicateExpr: Expression, trueExpr: Expression, - falseExpr: Expression) extends GpuConditionalExpression { + falseExpr: Expression, + optAllTrueFalse: Boolean = false) extends GpuConditionalExpression { @transient override lazy val inputTypesForMerging: Seq[DataType] = { @@ -95,8 +96,9 @@ case class GpuIf( } } - override def columnarEval(batch: ColumnarBatch): Any = { - withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => + @transient + private[this] lazy val func = if (optAllTrueFalse) { + (pred: GpuColumnVector, batch: ColumnarBatch) => { if (isAllTrue(pred)) { // All are true trueExpr.columnarEval(batch) @@ -107,6 +109,16 @@ case class GpuIf( computeIfElse(batch, pred, trueExpr, falseExpr.columnarEval(batch)) } } + } else { + (pred: GpuColumnVector, batch: ColumnarBatch) => { + computeIfElse(batch, pred, trueExpr, falseExpr.columnarEval(batch)) + } + } + + override def columnarEval(batch: ColumnarBatch): Any = { + withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => + func(pred, batch) + } } override def toString: String = s"if ($predicateExpr) $trueExpr else $falseExpr" @@ -117,7 +129,8 @@ case class GpuIf( case class GpuCaseWhen( branches: Seq[(Expression, Expression)], - elseValue: Option[Expression] = None) extends GpuConditionalExpression with Serializable { + elseValue: Option[Expression] = None, + optAllTrueFalse: Boolean) extends GpuConditionalExpression with Serializable { override def children: Seq[Expression] = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue @@ -156,13 +169,13 @@ case class GpuCaseWhen( @transient private[this] lazy val trueExpressions = branches.map(_._2) - override def columnarEval(batch: ColumnarBatch): Any = { + private def computeWithTrueFalseOpt(batch: ColumnarBatch): Any = { val size = branches.size - val predications = new Array[GpuColumnVector](size) + val predictions = new Array[GpuColumnVector](size) var isAllPredsFalse = true var i = 0 - withResource(predications) { preds => + withResource(predictions) { preds => while (i < size) { // If any predication is the first all-true, then evaluate its true expression // and return the result. @@ -189,6 +202,25 @@ case class GpuCaseWhen( } } + @transient + private[this] lazy val func = if (optAllTrueFalse) { + (batch: ColumnarBatch) => computeWithTrueFalseOpt(batch) + } else { + (batch: ColumnarBatch) => { + // `elseRet` will be closed in `computeIfElse`. + val elseRet = elseValue + .map(_.columnarEval(batch)) + .getOrElse(GpuScalar(null, branches.last._2.dataType)) + branches.foldRight[Any](elseRet) { case ((predicateExpr, trueExpr), falseRet) => + withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => + computeIfElse(batch, pred, trueExpr, falseRet) + } + } + } + } + + override def columnarEval(batch: ColumnarBatch): Any = func(batch) + override def toString: String = { val cases = branches.map { case (c, v) => s" WHEN $c THEN $v" }.mkString val elseCase = elseValue.map(" ELSE " + _).getOrElse("") From c25986b1163eb398d16638c8123f8115c9dbcf40 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Thu, 9 Dec 2021 09:49:50 +0800 Subject: [PATCH 03/17] Address the comments Signed-off-by: Firestarman --- docs/configs.md | 1 - .../src/main/python/conditionals_test.py | 21 +++----- .../nvidia/spark/rapids/GpuOverrides.scala | 4 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 14 ----- .../spark/rapids/conditionalExpressions.scala | 52 +++++-------------- 5 files changed, 21 insertions(+), 71 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 8bcdb1d29b5..f0d6db7c4e2 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -110,7 +110,6 @@ Name | Description | Default Value spark.rapids.sql.join.leftSemi.enabled|When set to true left semi joins are enabled on the GPU|true spark.rapids.sql.join.rightOuter.enabled|When set to true right outer joins are enabled on the GPU|true spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE -spark.rapids.sql.opt.allTrueFalse.enabled|When set to true, checks whether the predictions on a batch are all true or false, then accordingly evaluate only the true or false expression. This is for the GPU `If` and `CaseWhen`, and can improve the performance by avoiding the evaluation of the true or false expression when it is an expensive operation. e.g. row-based UDF. It is disabled by default because it does work when the input batch is mixed with true and false predictions, and may affect the performance negatively because it involves some additional computations. This config might be removed in the future when the implementation of GPU `If` and `CaseWhen` evolves|false spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647 spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647 diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 927dc30ee5f..8963d5977e3 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -40,13 +40,10 @@ if_nested_gens = if_array_gens_sample + if_struct_gens_sample @pytest.mark.parametrize('data_gen', all_gens + if_nested_gens + decimal_128_gens_no_neg, ids=idfn) -@pytest.mark.parametrize('all_truefalse_opt', [True, False]) -def test_if_else(data_gen, all_truefalse_opt): +@pytest.mark.parametrize('row_num', [2048, 30]) # Small row number can get all-true or all-false batch easily. +def test_if_else(data_gen, row_num): (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) null_lit = get_null_lit_string(data_gen.data_type) - row_num = 2048 - if all_truefalse_opt: - row_num = 30 # Small row number can get all-true or all-false batch easily. assert_gpu_and_cpu_are_equal_collect( lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen, row_num).selectExpr( 'IF(TRUE, b, c)', @@ -57,8 +54,7 @@ def test_if_else(data_gen, all_truefalse_opt): 'IF(a, b, {})'.format(s2), 'IF(a, {}, {})'.format(s1, s2), 'IF(a, b, {})'.format(null_lit), - 'IF(a, {}, c)'.format(null_lit)), - conf={'spark.rapids.sql.opt.allTrueFalse.enabled': all_truefalse_opt}) + 'IF(a, {}, c)'.format(null_lit))) # Maps scalars are not really supported by Spark from python without jumping through a lot of hoops # so for now we are going to skip them @@ -72,13 +68,8 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) -@pytest.mark.parametrize('all_truefalse_opt', [True, False]) -def test_case_when(data_gen, all_truefalse_opt): - row_num = 2048 - if all_truefalse_opt: - row_num = 30 # Small row number can get all-true or all-false batch easily. - opt_conf = {'spark.rapids.sql.opt.allTrueFalse.enabled': all_truefalse_opt} - opt_conf.update(allow_negative_scale_of_decimal_conf) +@pytest.mark.parametrize('row_num', [2048, 30]) # Small row number can get all-true or all-false batch easily. +def test_case_when(data_gen, row_num): num_cmps = 20 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of false @@ -106,7 +97,7 @@ def test_case_when(data_gen, all_truefalse_opt): f.when(f.col('_b0'), s1).when(f.lit(True), f.col('_c0')), f.when(f.col('_b0'), f.lit(None).cast(data_type)).otherwise(f.col('_c0')), f.when(f.lit(False), f.col('_c0'))), - conf = opt_conf) + conf = allow_negative_scale_of_decimal_conf) @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_nanvl(data_gen): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index a3c149caa68..d51fdb531db 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2045,7 +2045,7 @@ object GpuOverrides extends Logging { } else { None } - GpuCaseWhen(branches, elseValue, conf.isAllTrueFalseOptEnabled) + GpuCaseWhen(branches, elseValue) } }), expr[If]( @@ -2066,7 +2066,7 @@ object GpuOverrides extends Logging { (a, conf, p, r) => new ExprMeta[If](a, conf, p, r) { override def convertToGpu(): GpuExpression = { val Seq(boolExpr, trueExpr, falseExpr) = childExprs.map(_.convertToGpu()) - GpuIf(boolExpr, trueExpr, falseExpr, conf.isAllTrueFalseOptEnabled) + GpuIf(boolExpr, trueExpr, falseExpr) } }), expr[Pow]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 798926a80de..6783f9b5ddf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -705,18 +705,6 @@ object RapidsConf { .booleanConf .createWithDefault(false) - val ENABLE_ALL_TRUE_FALSE_OPTIMIZATION = conf("spark.rapids.sql.opt.allTrueFalse.enabled") - .doc("When set to true, checks whether the predictions on a batch are all true or false, " + - "then accordingly evaluate only the true or false expression. This is for the GPU `If` " + - "and `CaseWhen`, and can improve the performance by avoiding the evaluation of the true " + - "or false expression when it is an expensive operation. e.g. row-based UDF. It is " + - "disabled by default because it does work when the input batch is mixed with true and " + - "false predictions, and may affect the performance negatively because it involves some " + - "additional computations. This config might be removed in the future when the " + - "implementation of GPU `If` and `CaseWhen` evolves") - .booleanConf - .createWithDefault(false) - object ParquetReaderType extends Enumeration { val AUTO, COALESCING, MULTITHREADED, PERFILE = Value } @@ -1751,8 +1739,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCpuBasedUDFEnabled: Boolean = get(ENABLE_CPU_BASED_UDF) - lazy val isAllTrueFalseOptEnabled: Boolean = get(ENABLE_ALL_TRUE_FALSE_OPTIMIZATION) - lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE) private val optimizerDefaults = Map( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index ca02f54ed16..4fbc79c946b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -54,9 +54,15 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr protected def isAllTrue(col: GpuColumnVector): Boolean = { assert(BooleanType == col.dataType()) + if (col.getRowCount == 0) { + return true + } + if (col.hasNull) { + return false + } withResource(col.getBase.all()) { allTrue => - // null is treated as false in Spark, but skipped by 'all()' method. - allTrue.isValid && allTrue.getBoolean && !col.hasNull + // Guaranteed there is at least one row and no nulls so result must be valid + allTrue.getBoolean } } @@ -72,8 +78,7 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr case class GpuIf( predicateExpr: Expression, trueExpr: Expression, - falseExpr: Expression, - optAllTrueFalse: Boolean = false) extends GpuConditionalExpression { + falseExpr: Expression) extends GpuConditionalExpression { @transient override lazy val inputTypesForMerging: Seq[DataType] = { @@ -96,9 +101,8 @@ case class GpuIf( } } - @transient - private[this] lazy val func = if (optAllTrueFalse) { - (pred: GpuColumnVector, batch: ColumnarBatch) => { + override def columnarEval(batch: ColumnarBatch): Any = { + withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => if (isAllTrue(pred)) { // All are true trueExpr.columnarEval(batch) @@ -109,16 +113,6 @@ case class GpuIf( computeIfElse(batch, pred, trueExpr, falseExpr.columnarEval(batch)) } } - } else { - (pred: GpuColumnVector, batch: ColumnarBatch) => { - computeIfElse(batch, pred, trueExpr, falseExpr.columnarEval(batch)) - } - } - - override def columnarEval(batch: ColumnarBatch): Any = { - withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => - func(pred, batch) - } } override def toString: String = s"if ($predicateExpr) $trueExpr else $falseExpr" @@ -129,8 +123,7 @@ case class GpuIf( case class GpuCaseWhen( branches: Seq[(Expression, Expression)], - elseValue: Option[Expression] = None, - optAllTrueFalse: Boolean) extends GpuConditionalExpression with Serializable { + elseValue: Option[Expression] = None) extends GpuConditionalExpression with Serializable { override def children: Seq[Expression] = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue @@ -169,7 +162,7 @@ case class GpuCaseWhen( @transient private[this] lazy val trueExpressions = branches.map(_._2) - private def computeWithTrueFalseOpt(batch: ColumnarBatch): Any = { + override def columnarEval(batch:ColumnarBatch): Any = { val size = branches.size val predictions = new Array[GpuColumnVector](size) var isAllPredsFalse = true @@ -202,25 +195,6 @@ case class GpuCaseWhen( } } - @transient - private[this] lazy val func = if (optAllTrueFalse) { - (batch: ColumnarBatch) => computeWithTrueFalseOpt(batch) - } else { - (batch: ColumnarBatch) => { - // `elseRet` will be closed in `computeIfElse`. - val elseRet = elseValue - .map(_.columnarEval(batch)) - .getOrElse(GpuScalar(null, branches.last._2.dataType)) - branches.foldRight[Any](elseRet) { case ((predicateExpr, trueExpr), falseRet) => - withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => - computeIfElse(batch, pred, trueExpr, falseRet) - } - } - } - } - - override def columnarEval(batch: ColumnarBatch): Any = func(batch) - override def toString: String = { val cases = branches.map { case (c, v) => s" WHEN $c THEN $v" }.mkString val elseCase = elseValue.map(" ELSE " + _).getOrElse("") From cb963cb7738aecc1f02d7fa8557d4e1e717f3760 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Fri, 10 Dec 2021 09:01:30 +0800 Subject: [PATCH 04/17] Address comments Signed-off-by: Firestarman --- .../src/main/python/conditionals_test.py | 20 ++++++++------ integration_tests/src/main/python/data_gen.py | 9 +++++-- .../spark/rapids/conditionalExpressions.scala | 26 +++++++++---------- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 8963d5977e3..03a8157fe89 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -40,12 +40,13 @@ if_nested_gens = if_array_gens_sample + if_struct_gens_sample @pytest.mark.parametrize('data_gen', all_gens + if_nested_gens + decimal_128_gens_no_neg, ids=idfn) -@pytest.mark.parametrize('row_num', [2048, 30]) # Small row number can get all-true or all-false batch easily. -def test_if_else(data_gen, row_num): +@pytest.mark.parametrize('const_bool', [True, False, None]) +def test_if_else(data_gen, const_bool): (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) null_lit = get_null_lit_string(data_gen.data_type) + bool_gen = BooleanGen(const_value=const_bool) assert_gpu_and_cpu_are_equal_collect( - lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen, row_num).selectExpr( + lambda spark : three_col_df(spark, bool_gen, data_gen, data_gen).selectExpr( 'IF(TRUE, b, c)', 'IF(TRUE, {}, {})'.format(s1, null_lit), 'IF(FALSE, {}, {})'.format(s1, null_lit), @@ -68,12 +69,15 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) -@pytest.mark.parametrize('row_num', [2048, 30]) # Small row number can get all-true or all-false batch easily. -def test_case_when(data_gen, row_num): +@pytest.mark.parametrize('const_bool', [True, False, None]) +def test_case_when(data_gen, const_bool): num_cmps = 20 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) - # we want lots of false - bool_gen = BooleanGen().with_special_case(False, weight=1000.0) + if const_bool: + bool_gen = BooleanGen(const_value=const_bool) + else: + # we want lots of false + bool_gen = BooleanGen().with_special_case(False, weight=1000.0) gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] gen_cols = gen_cols + [('_c' + str(x), data_gen) for x in range(0, num_cmps)] gen = StructGen(gen_cols, nullable=False) @@ -89,7 +93,7 @@ def test_case_when(data_gen, row_num): # (scalar, column) # in sequence. assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, gen, row_num).select(command, + lambda spark : gen_df(spark, gen).select(command, f.when(f.col('_b0'), s1), f.when(f.col('_b0'), f.col('_c0')).otherwise(f.col('_c1')), f.when(f.col('_b0'), s1).otherwise(f.col('_c0')), diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 2d5807efafe..ce26a1f72cd 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -416,11 +416,16 @@ def gen_part_double(): class BooleanGen(DataGen): """Generate Bools (True/False)""" - def __init__(self, nullable=True): + def __init__(self, nullable=True, const_value=None): super().__init__(BooleanType(), nullable=nullable) + self.const_value = const_value def start(self, rand): - self._start(rand, lambda : bool(rand.getrandbits(1))) + if self.const_value: + make_boolean = lambda : bool(self.const_value) + else: + make_boolean = lambda : bool(rand.getrandbits(1)) + self._start(rand, make_boolean) class StructGen(DataGen): """Generate a Struct""" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 4fbc79c946b..9ceb48f4c49 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -69,7 +69,8 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr protected def isAllFalse(col: GpuColumnVector): Boolean = { assert(BooleanType == col.dataType()) withResource(col.getBase.any()) { anyTrue => - anyTrue.isValid && !anyTrue.getBoolean + // null values are considered false values in this context + !anyTrue.isValid || !anyTrue.getBoolean } } @@ -105,10 +106,10 @@ case class GpuIf( withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => if (isAllTrue(pred)) { // All are true - trueExpr.columnarEval(batch) + GpuExpressionsUtils.columnarEvalToColumn(trueExpr, batch) } else if (isAllFalse(pred)) { // All are false - falseExpr.columnarEval(batch) + GpuExpressionsUtils.columnarEvalToColumn(falseExpr, batch) } else { computeIfElse(batch, pred, trueExpr, falseExpr.columnarEval(batch)) } @@ -163,22 +164,19 @@ case class GpuCaseWhen( private[this] lazy val trueExpressions = branches.map(_._2) override def columnarEval(batch:ColumnarBatch): Any = { - val size = branches.size - val predictions = new Array[GpuColumnVector](size) + val predictions = new Array[GpuColumnVector](branches.size) var isAllPredsFalse = true - var i = 0 withResource(predictions) { preds => - while (i < size) { - // If any predication is the first all-true, then evaluate its true expression - // and return the result. - preds(i) = GpuExpressionsUtils.columnarEvalToColumn(branches(i)._1, batch) - val p = preds(i) + branches.zipWithIndex.foreach { case ((predExpr, trueExpr), i) => + val p = GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch) + preds(i) = p if (isAllPredsFalse && isAllTrue(p)) { - return trueExpressions(i).columnarEval(batch) + // If any predication is the first all-true, then evaluate its true expression + // and return the result. + return GpuExpressionsUtils.columnarEvalToColumn(trueExpr, batch) } isAllPredsFalse = isAllPredsFalse && isAllFalse(p) - i += 1 } val elseRet = elseValue @@ -186,7 +184,7 @@ case class GpuCaseWhen( .getOrElse(GpuScalar(null, branches.last._2.dataType)) if (isAllPredsFalse) { // No predication has a true, so return the else value. - elseRet + GpuExpressionsUtils.resolveColumnVector(elseRet, batch.numRows()) } else { preds.zip(trueExpressions).foldRight[Any](elseRet) { case ((p, trueExpr), falseRet) => computeIfElse(batch, p, trueExpr, falseRet) From 10570711847a765f646b128e6218da9279b5e49b Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 10:24:37 +0800 Subject: [PATCH 05/17] Address the comments Signed-off-by: Firestarman --- .../src/main/python/conditionals_test.py | 17 +++++++++-------- integration_tests/src/main/python/data_gen.py | 9 ++------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 03a8157fe89..603c73b35e9 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -40,11 +40,11 @@ if_nested_gens = if_array_gens_sample + if_struct_gens_sample @pytest.mark.parametrize('data_gen', all_gens + if_nested_gens + decimal_128_gens_no_neg, ids=idfn) -@pytest.mark.parametrize('const_bool', [True, False, None]) -def test_if_else(data_gen, const_bool): +@pytest.mark.parametrize('pred_value', [True, False, None, "random"]) +def test_if_else(data_gen, pred_value): (s1, s2) = gen_scalars_for_sql(data_gen, 2, force_no_nulls=not isinstance(data_gen, NullGen)) null_lit = get_null_lit_string(data_gen.data_type) - bool_gen = BooleanGen(const_value=const_bool) + bool_gen = boolean_gen if pred_value == "random" else SetValuesGen(BooleanType(), [pred_value]) assert_gpu_and_cpu_are_equal_collect( lambda spark : three_col_df(spark, bool_gen, data_gen, data_gen).selectExpr( 'IF(TRUE, b, c)', @@ -69,15 +69,16 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) -@pytest.mark.parametrize('const_bool', [True, False, None]) -def test_case_when(data_gen, const_bool): +@pytest.mark.parametrize('pred_value', [True, False, None, "random"]) +def test_case_when(data_gen, pred_value): num_cmps = 20 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) - if const_bool: - bool_gen = BooleanGen(const_value=const_bool) - else: + if pred_value == "random": # we want lots of false bool_gen = BooleanGen().with_special_case(False, weight=1000.0) + else: + # All true, all false, all nulls + bool_gen = SetValuesGen(BooleanType(), [pred_value]) gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] gen_cols = gen_cols + [('_c' + str(x), data_gen) for x in range(0, num_cmps)] gen = StructGen(gen_cols, nullable=False) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index ce26a1f72cd..2d5807efafe 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -416,16 +416,11 @@ def gen_part_double(): class BooleanGen(DataGen): """Generate Bools (True/False)""" - def __init__(self, nullable=True, const_value=None): + def __init__(self, nullable=True): super().__init__(BooleanType(), nullable=nullable) - self.const_value = const_value def start(self, rand): - if self.const_value: - make_boolean = lambda : bool(self.const_value) - else: - make_boolean = lambda : bool(rand.getrandbits(1)) - self._start(rand, make_boolean) + self._start(rand, lambda : bool(rand.getrandbits(1))) class StructGen(DataGen): """Generate a Struct""" From c29b868108418a8c11b12928d017905aaf28b7b9 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 12:01:16 +0800 Subject: [PATCH 06/17] Fix an OOM issue for premerge build Signed-off-by: Firestarman --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 32df2c739fa..3912184806a 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,7 +108,7 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=3 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ ./integration_tests/run_pyspark_from_build.sh From 762446add7ffb574b6df1e1947bd9d8563e07c57 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 15:07:44 +0800 Subject: [PATCH 07/17] Premerge debug Signed-off-by: Firestarman --- jenkins/spark-premerge-build.sh | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 3912184806a..0dc643800d2 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,10 +108,11 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST_PARALLEL=3 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh - TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh - TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ - ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=5 TEST='if_else' ./integration_tests/run_pyspark_from_build.sh + # TEST_PARALLEL=5 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh + # TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh + # TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ + # ./integration_tests/run_pyspark_from_build.sh } From 26f0e961fed692cc0214d61e6244f773aeb28f2a Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 16:29:26 +0800 Subject: [PATCH 08/17] premerge debug 2 Signed-off-by: Firestarman --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 0dc643800d2..7372f777690 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,7 +108,7 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST_PARALLEL=5 TEST='if_else' ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=5 TEST='if_else or case_when' ./integration_tests/run_pyspark_from_build.sh # TEST_PARALLEL=5 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh # TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh # TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ From 134404047f23273cf6c956aaf1e925d748ca62b5 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 17:17:11 +0800 Subject: [PATCH 09/17] premerge debug 3 Signed-off-by: Firestarman --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 7372f777690..3ca24803c5d 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,7 +108,7 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST_PARALLEL=5 TEST='if_else or case_when' ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=5 TEST='case_when' ./integration_tests/run_pyspark_from_build.sh # TEST_PARALLEL=5 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh # TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh # TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ From 669cb7abe9957674ea98a5840a04ba92aa4c3813 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 18:09:24 +0800 Subject: [PATCH 10/17] permerge build debug 4 Signed-off-by: Firestarman --- .../src/main/python/conditionals_test.py | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index 603c73b35e9..ceac17c0dd6 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -69,16 +69,11 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) -@pytest.mark.parametrize('pred_value', [True, False, None, "random"]) -def test_case_when(data_gen, pred_value): +def test_case_when(data_gen): num_cmps = 20 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) - if pred_value == "random": - # we want lots of false - bool_gen = BooleanGen().with_special_case(False, weight=1000.0) - else: - # All true, all false, all nulls - bool_gen = SetValuesGen(BooleanType(), [pred_value]) + # we want lots of false + bool_gen = BooleanGen().with_special_case(False, weight=1000.0) gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] gen_cols = gen_cols + [('_c' + str(x), data_gen) for x in range(0, num_cmps)] gen = StructGen(gen_cols, nullable=False) @@ -104,6 +99,34 @@ def test_case_when(data_gen, pred_value): f.when(f.lit(False), f.col('_c0'))), conf = allow_negative_scale_of_decimal_conf) +# Seperate this test to avoid OOM in premerge build. +@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) +@pytest.mark.parametrize('pred_value', [True, False, None]) +def test_case_when_all_true_false(data_gen, pred_value): + # Smaller number to reduce the memory size for this big test. Bigger number may get + # OOM easily and be killed in premerge build. + num_cmps = 2 + s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) + bool_gen = SetValuesGen(BooleanType(), [pred_value]) + gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] + gen_cols = gen_cols + [('_c' + str(x), data_gen) for x in range(0, num_cmps)] + gen = StructGen(gen_cols, nullable=False) + command = f.when(f.col('_b0'), f.col('_c0')) + for x in range(1, num_cmps): + command = command.when(f.col('_b'+ str(x)), f.col('_c' + str(x))) + command = command.otherwise(s1) + data_type = data_gen.data_type + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, gen).select(command, + f.when(f.col('_b0'), s1), + f.when(f.col('_b0'), f.col('_c0')).otherwise(f.col('_c1')), + f.when(f.col('_b0'), s1).otherwise(f.col('_c0')), + f.when(f.col('_b0'), s1).when(f.lit(False), f.col('_c0')), + f.when(f.col('_b0'), s1).when(f.lit(True), f.col('_c0')), + f.when(f.col('_b0'), f.lit(None).cast(data_type)).otherwise(f.col('_c0')), + f.when(f.lit(False), f.col('_c0'))), + conf = allow_negative_scale_of_decimal_conf) + @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_nanvl(data_gen): s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) From e8df00bb14d5bd21ff5195a7f1c44c76f5838000 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 18:43:56 +0800 Subject: [PATCH 11/17] Try to fix the OOM issue Signed-off-by: Firestarman --- integration_tests/src/main/python/conditionals_test.py | 5 ++++- jenkins/spark-premerge-build.sh | 9 ++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index ceac17c0dd6..ac3709e1618 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -70,7 +70,10 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) def test_case_when(data_gen): - num_cmps = 20 + # Change the number from 20 to 10 to reduce the memory size used in premerge build. + # The previous value took about 44GB memory, almost reaching the upper + # limitation(50G), which will cause the killing. + num_cmps = 10 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of false bool_gen = BooleanGen().with_special_case(False, weight=1000.0) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 3ca24803c5d..32df2c739fa 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,11 +108,10 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST_PARALLEL=5 TEST='case_when' ./integration_tests/run_pyspark_from_build.sh - # TEST_PARALLEL=5 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh - # TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh - # TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ - # ./integration_tests/run_pyspark_from_build.sh + TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh + TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ + ./integration_tests/run_pyspark_from_build.sh } From 8fc650b0825051f32d3983e2a931f20402f000ed Mon Sep 17 00:00:00 2001 From: Firestarman Date: Mon, 13 Dec 2021 21:00:49 +0800 Subject: [PATCH 12/17] Restore the number in case_when Signed-off-by: Firestarman --- integration_tests/src/main/python/conditionals_test.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index ac3709e1618..ceac17c0dd6 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -70,10 +70,7 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) def test_case_when(data_gen): - # Change the number from 20 to 10 to reduce the memory size used in premerge build. - # The previous value took about 44GB memory, almost reaching the upper - # limitation(50G), which will cause the killing. - num_cmps = 10 + num_cmps = 20 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) # we want lots of false bool_gen = BooleanGen().with_special_case(False, weight=1000.0) From 4ed653fd9c8aaabab1a311adec2692ae88f3d57d Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 14 Dec 2021 09:37:31 +0800 Subject: [PATCH 13/17] Do not cache the predicates Signed-off-by: Firestarman --- .../src/main/python/conditionals_test.py | 38 ++++-------------- .../spark/rapids/conditionalExpressions.scala | 40 ++++++++++--------- 2 files changed, 28 insertions(+), 50 deletions(-) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index ceac17c0dd6..5c67ebe63a6 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -69,11 +69,15 @@ def test_if_else_map(data_gen): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) -def test_case_when(data_gen): +@pytest.mark.parametrize('pred_value', [True, False, None, "random"]) +def test_case_when(data_gen, pred_value): num_cmps = 20 s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) - # we want lots of false - bool_gen = BooleanGen().with_special_case(False, weight=1000.0) + if pred_value == "random": + # we want lots of false + bool_gen = BooleanGen().with_special_case(False, weight=1000.0) + else: + bool_gen = SetValuesGen(BooleanType(), [pred_value]) gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] gen_cols = gen_cols + [('_c' + str(x), data_gen) for x in range(0, num_cmps)] gen = StructGen(gen_cols, nullable=False) @@ -99,34 +103,6 @@ def test_case_when(data_gen): f.when(f.lit(False), f.col('_c0'))), conf = allow_negative_scale_of_decimal_conf) -# Seperate this test to avoid OOM in premerge build. -@pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + decimal_128_gens, ids=idfn) -@pytest.mark.parametrize('pred_value', [True, False, None]) -def test_case_when_all_true_false(data_gen, pred_value): - # Smaller number to reduce the memory size for this big test. Bigger number may get - # OOM easily and be killed in premerge build. - num_cmps = 2 - s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) - bool_gen = SetValuesGen(BooleanType(), [pred_value]) - gen_cols = [('_b' + str(x), bool_gen) for x in range(0, num_cmps)] - gen_cols = gen_cols + [('_c' + str(x), data_gen) for x in range(0, num_cmps)] - gen = StructGen(gen_cols, nullable=False) - command = f.when(f.col('_b0'), f.col('_c0')) - for x in range(1, num_cmps): - command = command.when(f.col('_b'+ str(x)), f.col('_c' + str(x))) - command = command.otherwise(s1) - data_type = data_gen.data_type - assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, gen).select(command, - f.when(f.col('_b0'), s1), - f.when(f.col('_b0'), f.col('_c0')).otherwise(f.col('_c1')), - f.when(f.col('_b0'), s1).otherwise(f.col('_c0')), - f.when(f.col('_b0'), s1).when(f.lit(False), f.col('_c0')), - f.when(f.col('_b0'), s1).when(f.lit(True), f.col('_c0')), - f.when(f.col('_b0'), f.lit(None).cast(data_type)).otherwise(f.col('_c0')), - f.when(f.lit(False), f.col('_c0'))), - conf = allow_negative_scale_of_decimal_conf) - @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_nanvl(data_gen): s1 = gen_scalar(data_gen, force_no_nulls=not isinstance(data_gen, NullGen)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index 9ceb48f4c49..e7415ce9ea2 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -68,6 +68,10 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr protected def isAllFalse(col: GpuColumnVector): Boolean = { assert(BooleanType == col.dataType()) + if (col.getRowCount == col.numNulls()) { + // all nulls, and null values are false values here + return true + } withResource(col.getBase.any()) { anyTrue => // null values are considered false values in this context !anyTrue.isValid || !anyTrue.getBoolean @@ -160,33 +164,31 @@ case class GpuCaseWhen( } } - @transient - private[this] lazy val trueExpressions = branches.map(_._2) - override def columnarEval(batch:ColumnarBatch): Any = { - val predictions = new Array[GpuColumnVector](branches.size) var isAllPredsFalse = true - - withResource(predictions) { preds => - branches.zipWithIndex.foreach { case ((predExpr, trueExpr), i) => - val p = GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch) - preds(i) = p + // Check if any predicate is the first all-true, then evaluate its true expression + // and return the result. + branches.foreach { case (predExpr, trueExpr) => + withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { p => if (isAllPredsFalse && isAllTrue(p)) { - // If any predication is the first all-true, then evaluate its true expression - // and return the result. return GpuExpressionsUtils.columnarEvalToColumn(trueExpr, batch) } isAllPredsFalse = isAllPredsFalse && isAllFalse(p) } + } - val elseRet = elseValue - .map(_.columnarEval(batch)) - .getOrElse(GpuScalar(null, branches.last._2.dataType)) - if (isAllPredsFalse) { - // No predication has a true, so return the else value. - GpuExpressionsUtils.resolveColumnVector(elseRet, batch.numRows()) - } else { - preds.zip(trueExpressions).foldRight[Any](elseRet) { case ((p, trueExpr), falseRet) => + val elseRet = elseValue + .map(_.columnarEval(batch)) + .getOrElse(GpuScalar(null, branches.last._2.dataType)) + if (isAllPredsFalse) { + // No predicate has a true, so return the else value directly. + GpuExpressionsUtils.resolveColumnVector(elseRet, batch.numRows()) + } else { + branches.foldRight[Any](elseRet) { case ((predExpr, trueExpr), falseRet) => + // Here evaluates the predicate expressions again, sacrificing a little of perf + // for minimizing the memory size of computations. Because caching the predicates + // may get OOM if there are too many branches. + withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { p => computeIfElse(batch, p, trueExpr, falseRet) } } From 0c23c50d2e09b4d6b0dafb725795103bcd144234 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 14 Dec 2021 16:03:58 +0800 Subject: [PATCH 14/17] Debug premerge Signed-off-by: Firestarman --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 32df2c739fa..6ca1b705c0c 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,7 +108,7 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh + TEST='case_when' ./integration_tests/run_pyspark_from_build.sh TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ ./integration_tests/run_pyspark_from_build.sh From 7eea3a4b4f3504f68e37c92d1d6117ae3487336f Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 15 Dec 2021 11:47:09 +0800 Subject: [PATCH 15/17] premerge debug 2 Signed-off-by: Firestarman --- jenkins/spark-premerge-build.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 6ca1b705c0c..72cc8197387 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,10 +108,10 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST='case_when' ./integration_tests/run_pyspark_from_build.sh - TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh - TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ - ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=3 TEST='case_when' ./integration_tests/run_pyspark_from_build.sh + # TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh + # TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ + # ./integration_tests/run_pyspark_from_build.sh } From 6dafd726159a61f8654c3c878113429431619a99 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 15 Dec 2021 12:29:26 +0800 Subject: [PATCH 16/17] debug premerge 3 Signed-off-by: Firestarman --- jenkins/spark-premerge-build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 72cc8197387..0a5cff6c744 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,7 +108,7 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST_PARALLEL=3 TEST='case_when' ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=2 TEST='case_when' ./integration_tests/run_pyspark_from_build.sh # TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh # TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ # ./integration_tests/run_pyspark_from_build.sh From 18a8a0c6997cad541f27665b07865fdb8637e200 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Wed, 15 Dec 2021 12:29:26 +0800 Subject: [PATCH 17/17] Check branch number to enable the opt conditionally Signed-off-by: Firestarman --- docs/configs.md | 1 + jenkins/spark-premerge-build.sh | 8 +-- .../nvidia/spark/rapids/GpuOverrides.scala | 2 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 9 +++ .../spark/rapids/conditionalExpressions.scala | 63 +++++++++++++------ 5 files changed, 58 insertions(+), 25 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 796d8a612c9..ae6ec6286a7 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -110,6 +110,7 @@ Name | Description | Default Value spark.rapids.sql.join.leftSemi.enabled|When set to true left semi joins are enabled on the GPU|true spark.rapids.sql.join.rightOuter.enabled|When set to true right outer joins are enabled on the GPU|true spark.rapids.sql.metrics.level|GPU plans can produce a lot more metrics than CPU plans do. In very large queries this can sometimes result in going over the max result size limit for the driver. Supported values include DEBUG which will enable all metrics supported and typically only needs to be enabled when debugging the plugin. MODERATE which should output enough metrics to understand how long each part of the query is taking and how much data is going to each part of the query. ESSENTIAL which disables most metrics except those Apache Spark CPU plans will also report or their equivalents.|MODERATE +spark.rapids.sql.opt.condition.maxBranchNumber|Maximum number of branches for GPU case-when to enable the lazy evaluation of true and else expressions if the predicates on a batch are all-true or all-false. Big number may get GPU OOM easily since the predicates are cached during the computation.|2 spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647 spark.rapids.sql.reader.batchSizeRows|Soft limit on the maximum number of rows the reader will read per batch. The orc and parquet readers will read row groups until this limit is met or exceeded. The limit is respected by the csv reader.|2147483647 diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 9a350ef9954..82367d43840 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -108,10 +108,10 @@ ci_2() { export TEST_TYPE="pre-commit" export TEST_PARALLEL=4 # separate process to avoid OOM kill - TEST_PARALLEL=2 TEST='case_when' ./integration_tests/run_pyspark_from_build.sh - # TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh - # TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ - # ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=2 TEST='conditionals_test or window_function_test' ./integration_tests/run_pyspark_from_build.sh + TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh + TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \ + ./integration_tests/run_pyspark_from_build.sh } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index e40697b6b46..bc67af685f8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2045,7 +2045,7 @@ object GpuOverrides extends Logging { } else { None } - GpuCaseWhen(branches, elseValue) + GpuCaseWhen(branches, elseValue, conf.maxConditionBranchNumber) } }), expr[If]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 6783f9b5ddf..7e55a9ce11c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1325,6 +1325,13 @@ object RapidsConf { .booleanConf .createWithDefault(value = false) + val MAX_CONDITION_BRANCH_NUMBER = conf("spark.rapids.sql.opt.condition.maxBranchNumber") + .doc("Maximum number of branches for GPU case-when to enable the lazy evaluation of true " + + "and else expressions if the predicates on a batch are all-true or all-false. Big number " + + "may get GPU OOM easily since the predicates are cached during the computation.") + .integerConf + .createWithDefault(2) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1741,6 +1748,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isFastSampleEnabled: Boolean = get(ENABLE_FAST_SAMPLE) + lazy val maxConditionBranchNumber: Int = get(MAX_CONDITION_BRANCH_NUMBER) + private val optimizerDefaults = Map( // this is not accurate because CPU projections do have a cost due to appending values // to each row that is produced, but this needs to be a really small number because diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala index e7415ce9ea2..62563cf4532 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/conditionalExpressions.scala @@ -74,7 +74,7 @@ trait GpuConditionalExpression extends ComplexTypeMergingExpression with GpuExpr } withResource(col.getBase.any()) { anyTrue => // null values are considered false values in this context - !anyTrue.isValid || !anyTrue.getBoolean + !anyTrue.getBoolean } } @@ -128,7 +128,8 @@ case class GpuIf( case class GpuCaseWhen( branches: Seq[(Expression, Expression)], - elseValue: Option[Expression] = None) extends GpuConditionalExpression with Serializable { + elseValue: Option[Expression] = None, + maxBranchNumForOpt: Int = 2) extends GpuConditionalExpression with Serializable { override def children: Seq[Expression] = branches.flatMap(b => b._1 :: b._2 :: Nil) ++ elseValue @@ -164,37 +165,59 @@ case class GpuCaseWhen( } } - override def columnarEval(batch:ColumnarBatch): Any = { + private def computeWithTrueFalseOpt(batch: ColumnarBatch, trueExprs: Seq[Expression]): Any = { + val predicates = new Array[GpuColumnVector](branches.size) var isAllPredsFalse = true - // Check if any predicate is the first all-true, then evaluate its true expression - // and return the result. - branches.foreach { case (predExpr, trueExpr) => - withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { p => + + withResource(predicates) { preds => + branches.zipWithIndex.foreach { case ((predExpr, trueExpr), i) => + val p = GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch) + preds(i) = p if (isAllPredsFalse && isAllTrue(p)) { + // If any predicate is the first all-true, then evaluate its true expression + // and return the result. return GpuExpressionsUtils.columnarEvalToColumn(trueExpr, batch) } isAllPredsFalse = isAllPredsFalse && isAllFalse(p) } - } - val elseRet = elseValue - .map(_.columnarEval(batch)) - .getOrElse(GpuScalar(null, branches.last._2.dataType)) - if (isAllPredsFalse) { - // No predicate has a true, so return the else value directly. - GpuExpressionsUtils.resolveColumnVector(elseRet, batch.numRows()) - } else { - branches.foldRight[Any](elseRet) { case ((predExpr, trueExpr), falseRet) => - // Here evaluates the predicate expressions again, sacrificing a little of perf - // for minimizing the memory size of computations. Because caching the predicates - // may get OOM if there are too many branches. - withResource(GpuExpressionsUtils.columnarEvalToColumn(predExpr, batch)) { p => + val elseRet = elseValue + .map(_.columnarEval(batch)) + .getOrElse(GpuScalar(null, branches.last._2.dataType)) + if (isAllPredsFalse) { + // No predicate has a true, so return the else value. + GpuExpressionsUtils.resolveColumnVector(elseRet, batch.numRows()) + } else { + preds.zip(trueExprs).foldRight[Any](elseRet) { case ((p, trueExpr), falseRet) => computeIfElse(batch, p, trueExpr, falseRet) } } } } + @transient + private[this] lazy val computationFunc = if (branches.length <= maxBranchNumForOpt) { + // Run into the optimization only when the branch number is not bigger than the + // limitation. Since the predicate result will be cached during the computation, + // and caching too many predicates can get GPU OOM easily. + val trueExpressions = branches.map(_._2) + (batch: ColumnarBatch) => computeWithTrueFalseOpt(batch, trueExpressions) + } else { + (batch: ColumnarBatch) => { + // `elseRet` will be closed in `computeIfElse`. + val elseRet = elseValue + .map(_.columnarEval(batch)) + .getOrElse(GpuScalar(null, branches.last._2.dataType)) + branches.foldRight[Any](elseRet) { case ((predicateExpr, trueExpr), falseRet) => + withResource(GpuExpressionsUtils.columnarEvalToColumn(predicateExpr, batch)) { pred => + computeIfElse(batch, pred, trueExpr, falseRet) + } + } + } + } + + override def columnarEval(batch:ColumnarBatch): Any = computationFunc(batch) + override def toString: String = { val cases = branches.map { case (c, v) => s" WHEN $c THEN $v" }.mkString val elseCase = elseValue.map(" ELSE " + _).getOrElse("")