From e608ee7a7024126cf4869c43cc101bdf4ff1ee0a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 8 Nov 2021 07:51:15 -0700 Subject: [PATCH] Enable approx percentile tests (#3770) * Enable some approx percentile tests Signed-off-by: Andy Grove * Enable some approx percentile tests Signed-off-by: Andy Grove * enable tests and add more tests * improve null test * add tests for byte input * remove temp debug print * Remove comment Signed-off-by: Andy Grove * update documentation * run approx percentile tests with and without AQE Signed-off-by: Andy Grove * Add test for split CPU/GPU approx_percentile and implement fix * scalastyle * Revert fix for issue 3770 * address PR feedback --- .../src/main/python/hash_aggregate_test.py | 110 +++++++++++------- .../rapids/ApproximatePercentileSuite.scala | 63 ++++++---- 2 files changed, 111 insertions(+), 62 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 8ca583408b2..d7b9cfe7dc4 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1138,45 +1138,61 @@ def do_it(spark): return df.groupBy('a').agg(f.min(df.b[1]["a"])) assert_gpu_and_cpu_are_equal_collect(do_it) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") -@ignore_order(local=True) -def test_hash_groupby_approx_percentile_long_repeated_keys(): +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +def test_hash_groupby_approx_percentile_byte(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) + compare_percentile_approx( + lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), + ('v', ByteGen())], length=100), + [0.05, 0.25, 0.5, 0.75, 0.95], conf) + +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) + compare_percentile_approx( + lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), + ('v', ByteGen())], length=100), + 0.5, conf) + +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', RepeatSeqGen(LongGen(), length=20)), ('v', LongRangeGen())], length=100), - [0.05, 0.25, 0.5, 0.75, 0.95]) + [0.05, 0.25, 0.5, 0.75, 0.95], conf) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") -@ignore_order(local=True) -def test_hash_groupby_approx_percentile_long(): +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +def test_hash_groupby_approx_percentile_long(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', LongRangeGen())], length=100), - [0.05, 0.25, 0.5, 0.75, 0.95]) + [0.05, 0.25, 0.5, 0.75, 0.95], conf) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") -@ignore_order(local=True) -def test_hash_groupby_approx_percentile_long_scalar(): +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +def test_hash_groupby_approx_percentile_long_scalar(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', LongRangeGen())], length=100), - 0.5) + 0.5, conf) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") -@ignore_order(local=True) -def test_hash_groupby_approx_percentile_double(): +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +def test_hash_groupby_approx_percentile_double(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', DoubleGen())], length=100), - [0.05, 0.25, 0.5, 0.75, 0.95]) + [0.05, 0.25, 0.5, 0.75, 0.95], conf) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") -@ignore_order(local=True) -def test_hash_groupby_approx_percentile_double_scalar(): +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +def test_hash_groupby_approx_percentile_double_scalar(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', DoubleGen())], length=100), - 0.05) + 0.05, conf) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) @@ -1200,7 +1216,7 @@ def approx_percentile_query(spark): # results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then # compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage # of the CPU numbers -def compare_percentile_approx(df_fun, percentiles): +def compare_percentile_approx(df_fun, percentiles, conf): # create SQL statements for exact and approx percentiles p_exact_sql = create_percentile_sql("percentile", percentiles) @@ -1222,23 +1238,35 @@ def run_approx(spark): # run approx_percentile on CPU and GPU approx_cpu, approx_gpu = run_with_cpu_and_gpu(run_approx, 'COLLECT', _approx_percentile_conf) - for result in zip(exact, approx_cpu, approx_gpu): + assert len(exact) == len(approx_cpu) + assert len(exact) == len(approx_gpu) + + for i in range(len(exact)): + cpu_exact_result = exact[i] + cpu_approx_result = approx_cpu[i] + gpu_approx_result = approx_gpu[i] + # assert that keys match - assert result[0]['k'] == result[1]['k'] - assert result[1]['k'] == result[2]['k'] - - exact = result[0]['the_percentile'] - cpu = result[1]['the_percentile'] - gpu = result[2]['the_percentile'] - - if exact is not None: - if isinstance(exact, list): - for x in zip(exact, cpu, gpu): - exact = x[0] - cpu = x[1] - gpu = x[2] - gpu_delta = abs(float(gpu) - float(exact)) - cpu_delta = abs(float(cpu) - float(exact)) + assert cpu_exact_result['k'] == cpu_approx_result['k'] + assert cpu_exact_result['k'] == gpu_approx_result['k'] + + # extract the percentile result column + exact_percentile = cpu_exact_result['the_percentile'] + cpu_approx_percentile = cpu_approx_result['the_percentile'] + gpu_approx_percentile = gpu_approx_result['the_percentile'] + + if exact_percentile is None: + assert cpu_approx_percentile is None + assert gpu_approx_percentile is None + else: + assert cpu_approx_percentile is not None + assert gpu_approx_percentile is not None + if isinstance(exact_percentile, list): + for j in range(len(exact_percentile)): + assert cpu_approx_percentile[j] is not None + assert gpu_approx_percentile[j] is not None + gpu_delta = abs(float(gpu_approx_percentile[j]) - float(exact_percentile[j])) + cpu_delta = abs(float(cpu_approx_percentile[j]) - float(exact_percentile[j])) if gpu_delta > cpu_delta: # GPU is less accurate so make sure we are within some tolerance if gpu_delta == 0: @@ -1246,8 +1274,8 @@ def run_approx(spark): else: assert abs(cpu_delta / gpu_delta) - 1 < 0.001 else: - gpu_delta = abs(float(gpu) - float(exact)) - cpu_delta = abs(float(cpu) - float(exact)) + gpu_delta = abs(float(gpu_approx_percentile) - float(exact_percentile)) + cpu_delta = abs(float(cpu_approx_percentile) - float(exact_percentile)) if gpu_delta > cpu_delta: # GPU is less accurate so make sure we are within some tolerance if gpu_delta == 0: @@ -1257,10 +1285,10 @@ def run_approx(spark): def create_percentile_sql(func_name, percentiles): if isinstance(percentiles, list): - return """select k, {}(v, array({})) as the_percentile from t group by k""".format( + return """select k, {}(v, array({})) as the_percentile from t group by k order by k""".format( func_name, ",".join(str(i) for i in percentiles)) else: - return """select k, {}(v, {}) as the_percentile from t group by k""".format( + return """select k, {}(v, {}) as the_percentile from t group by k order by k""".format( func_name, percentiles) @ignore_order diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala index f26d85b9d13..3636126d264 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala @@ -28,46 +28,60 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { val DEFAULT_PERCENTILES = Array(0.005, 0.05, 0.25, 0.45, 0.5, 0.55, 0.75, 0.95, 0.995) + test("null handling") { + val func = spark => salariesWithNull(spark) + doTest(func, delta = Some(100)) + } + test("1 row per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 1, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1) + doTest(func, delta = Some(100)) } test("5 rows per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 5, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 5) + doTest(func, delta = Some(100)) } test("250 rows per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250) + doTest(func, delta = Some(100)) } test("2500 rows per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 2500, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 2500) + doTest(func, delta = Some(100)) } test("250 rows per group, default delta, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250) + doTest(func, delta = None) } test("25000 rows per group, default delta, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 25000, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 25000) + doTest(func, delta = None) } test("50000 rows per group, default delta, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000) + doTest(func, delta = None) } // test with a threshold just below the default level of 10000 test("50000 rows per group, delta 9999, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = Some(9999)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000) + doTest(func, delta = Some(9999)) } test("empty input set") { - doTest(DataTypes.DoubleType, rowsPerGroup = 0, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1) + doTest(func, delta = None) } test("scalar percentile") { - doTest(DataTypes.DoubleType, rowsPerGroup = 250, - percentileArg = Left(0.5), delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250) + doTest(func, percentileArg = Left(0.5), delta = Some(100)) } test("empty percentile array fall back to CPU") { @@ -105,25 +119,25 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { }, conf) } - private def doTest(dataType: DataType, - rowsPerGroup: Int, - percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES), - delta: Option[Int]) { + private def doTest( + func: SparkSession => DataFrame, + percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES), + delta: Option[Int]) { val percentiles = withCpuSparkSession { spark => - calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, + calcPercentiles(spark, func, percentileArg, delta, approx = false) } val approxPercentilesCpu = withCpuSparkSession { spark => - calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true) + calcPercentiles(spark, func, percentileArg, delta, approx = true) } val conf = new SparkConf() .set("spark.rapids.sql.expression.ApproximatePercentile", "true") val approxPercentilesGpu = withGpuSparkSession(spark => - calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true) + calcPercentiles(spark, func, percentileArg, delta, approx = true) , conf) val keys = percentiles.keySet ++ approxPercentilesCpu.keySet ++ approxPercentilesGpu.keySet @@ -165,14 +179,13 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { private def calcPercentiles( spark: SparkSession, - dataType: DataType, - rowsPerDept: Int, + dfFunc: SparkSession => DataFrame, percentilesArg: Either[Double, Array[Double]], delta: Option[Int], approx: Boolean ): Map[String, Array[Double]] = { - val df = salaries(spark, dataType, rowsPerDept) + val df = dfFunc(spark) val percentileArg = percentilesArg match { case Left(n) => s"$n" @@ -210,6 +223,14 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { }).toMap } + private def salariesWithNull(spark: SparkSession): DataFrame = { + import spark.implicits._ + Seq(("a", null), ("b", null), ("b", "123456.78")).toDF("dept", "x") + .withColumn("salary", expr("CAST(x AS double)")) + .drop("x") + .repartition(2) + } + private def salaries( spark: SparkSession, salaryDataType: DataType, rowsPerDept: Int): DataFrame = {