Skip to content

Commit

Permalink
Enable approx percentile tests (#3770)
Browse files Browse the repository at this point in the history
* Enable some approx percentile tests

Signed-off-by: Andy Grove <[email protected]>

* Enable some approx percentile tests

Signed-off-by: Andy Grove <[email protected]>

* enable tests and add more tests

* improve null test

* add tests for byte input

* remove temp debug print

* Remove comment

Signed-off-by: Andy Grove <[email protected]>

* update documentation

* run approx percentile tests with and without AQE

Signed-off-by: Andy Grove <[email protected]>

* Add test for split CPU/GPU approx_percentile and implement fix

* scalastyle

* Revert fix for issue 3770

* address PR feedback
  • Loading branch information
andygrove authored Nov 8, 2021
1 parent a5ce9c1 commit e608ee7
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 62 deletions.
110 changes: 69 additions & 41 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,45 +1138,61 @@ def do_it(spark):
return df.groupBy('a').agg(f.min(df.b[1]["a"]))
assert_gpu_and_cpu_are_equal_collect(do_it)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_long_repeated_keys():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_byte(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', ByteGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', ByteGen())], length=100),
0.5, conf)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', RepeatSeqGen(LongGen(), length=20)),
('v', LongRangeGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95])
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_long():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', LongRangeGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95])
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_long_scalar():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long_scalar(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', LongRangeGen())], length=100),
0.5)
0.5, conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_double():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_double(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', DoubleGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95])
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_double_scalar():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_double_scalar(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', DoubleGen())], length=100),
0.05)
0.05, conf)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
@ignore_order(local=True)
Expand All @@ -1200,7 +1216,7 @@ def approx_percentile_query(spark):
# results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then
# compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage
# of the CPU numbers
def compare_percentile_approx(df_fun, percentiles):
def compare_percentile_approx(df_fun, percentiles, conf):

# create SQL statements for exact and approx percentiles
p_exact_sql = create_percentile_sql("percentile", percentiles)
Expand All @@ -1222,32 +1238,44 @@ def run_approx(spark):
# run approx_percentile on CPU and GPU
approx_cpu, approx_gpu = run_with_cpu_and_gpu(run_approx, 'COLLECT', _approx_percentile_conf)

for result in zip(exact, approx_cpu, approx_gpu):
assert len(exact) == len(approx_cpu)
assert len(exact) == len(approx_gpu)

for i in range(len(exact)):
cpu_exact_result = exact[i]
cpu_approx_result = approx_cpu[i]
gpu_approx_result = approx_gpu[i]

# assert that keys match
assert result[0]['k'] == result[1]['k']
assert result[1]['k'] == result[2]['k']

exact = result[0]['the_percentile']
cpu = result[1]['the_percentile']
gpu = result[2]['the_percentile']

if exact is not None:
if isinstance(exact, list):
for x in zip(exact, cpu, gpu):
exact = x[0]
cpu = x[1]
gpu = x[2]
gpu_delta = abs(float(gpu) - float(exact))
cpu_delta = abs(float(cpu) - float(exact))
assert cpu_exact_result['k'] == cpu_approx_result['k']
assert cpu_exact_result['k'] == gpu_approx_result['k']

# extract the percentile result column
exact_percentile = cpu_exact_result['the_percentile']
cpu_approx_percentile = cpu_approx_result['the_percentile']
gpu_approx_percentile = gpu_approx_result['the_percentile']

if exact_percentile is None:
assert cpu_approx_percentile is None
assert gpu_approx_percentile is None
else:
assert cpu_approx_percentile is not None
assert gpu_approx_percentile is not None
if isinstance(exact_percentile, list):
for j in range(len(exact_percentile)):
assert cpu_approx_percentile[j] is not None
assert gpu_approx_percentile[j] is not None
gpu_delta = abs(float(gpu_approx_percentile[j]) - float(exact_percentile[j]))
cpu_delta = abs(float(cpu_approx_percentile[j]) - float(exact_percentile[j]))
if gpu_delta > cpu_delta:
# GPU is less accurate so make sure we are within some tolerance
if gpu_delta == 0:
assert abs(gpu_delta / cpu_delta) - 1 < 0.001
else:
assert abs(cpu_delta / gpu_delta) - 1 < 0.001
else:
gpu_delta = abs(float(gpu) - float(exact))
cpu_delta = abs(float(cpu) - float(exact))
gpu_delta = abs(float(gpu_approx_percentile) - float(exact_percentile))
cpu_delta = abs(float(cpu_approx_percentile) - float(exact_percentile))
if gpu_delta > cpu_delta:
# GPU is less accurate so make sure we are within some tolerance
if gpu_delta == 0:
Expand All @@ -1257,10 +1285,10 @@ def run_approx(spark):

def create_percentile_sql(func_name, percentiles):
if isinstance(percentiles, list):
return """select k, {}(v, array({})) as the_percentile from t group by k""".format(
return """select k, {}(v, array({})) as the_percentile from t group by k order by k""".format(
func_name, ",".join(str(i) for i in percentiles))
else:
return """select k, {}(v, {}) as the_percentile from t group by k""".format(
return """select k, {}(v, {}) as the_percentile from t group by k order by k""".format(
func_name, percentiles)

@ignore_order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,60 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {

val DEFAULT_PERCENTILES = Array(0.005, 0.05, 0.25, 0.45, 0.5, 0.55, 0.75, 0.95, 0.995)

test("null handling") {
val func = spark => salariesWithNull(spark)
doTest(func, delta = Some(100))
}

test("1 row per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 1, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1)
doTest(func, delta = Some(100))
}

test("5 rows per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 5, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 5)
doTest(func, delta = Some(100))
}

test("250 rows per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250)
doTest(func, delta = Some(100))
}

test("2500 rows per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 2500, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 2500)
doTest(func, delta = Some(100))
}

test("250 rows per group, default delta, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250)
doTest(func, delta = None)
}

test("25000 rows per group, default delta, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 25000, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 25000)
doTest(func, delta = None)
}

test("50000 rows per group, default delta, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000)
doTest(func, delta = None)
}

// test with a threshold just below the default level of 10000
test("50000 rows per group, delta 9999, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = Some(9999))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000)
doTest(func, delta = Some(9999))
}

test("empty input set") {
doTest(DataTypes.DoubleType, rowsPerGroup = 0, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1)
doTest(func, delta = None)
}

test("scalar percentile") {
doTest(DataTypes.DoubleType, rowsPerGroup = 250,
percentileArg = Left(0.5), delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250)
doTest(func, percentileArg = Left(0.5), delta = Some(100))
}

test("empty percentile array fall back to CPU") {
Expand Down Expand Up @@ -105,25 +119,25 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {
}, conf)
}

private def doTest(dataType: DataType,
rowsPerGroup: Int,
percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES),
delta: Option[Int]) {
private def doTest(
func: SparkSession => DataFrame,
percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES),
delta: Option[Int]) {

val percentiles = withCpuSparkSession { spark =>
calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta,
calcPercentiles(spark, func, percentileArg, delta,
approx = false)
}

val approxPercentilesCpu = withCpuSparkSession { spark =>
calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true)
calcPercentiles(spark, func, percentileArg, delta, approx = true)
}

val conf = new SparkConf()
.set("spark.rapids.sql.expression.ApproximatePercentile", "true")

val approxPercentilesGpu = withGpuSparkSession(spark =>
calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true)
calcPercentiles(spark, func, percentileArg, delta, approx = true)
, conf)

val keys = percentiles.keySet ++ approxPercentilesCpu.keySet ++ approxPercentilesGpu.keySet
Expand Down Expand Up @@ -165,14 +179,13 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {

private def calcPercentiles(
spark: SparkSession,
dataType: DataType,
rowsPerDept: Int,
dfFunc: SparkSession => DataFrame,
percentilesArg: Either[Double, Array[Double]],
delta: Option[Int],
approx: Boolean
): Map[String, Array[Double]] = {

val df = salaries(spark, dataType, rowsPerDept)
val df = dfFunc(spark)

val percentileArg = percentilesArg match {
case Left(n) => s"$n"
Expand Down Expand Up @@ -210,6 +223,14 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {
}).toMap
}

private def salariesWithNull(spark: SparkSession): DataFrame = {
import spark.implicits._
Seq(("a", null), ("b", null), ("b", "123456.78")).toDF("dept", "x")
.withColumn("salary", expr("CAST(x AS double)"))
.drop("x")
.repartition(2)
}

private def salaries(
spark: SparkSession,
salaryDataType: DataType, rowsPerDept: Int): DataFrame = {
Expand Down

0 comments on commit e608ee7

Please sign in to comment.