Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to CPU when aggregate push down used for parquet #4623

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,11 +14,11 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_equal
from asserts import assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from data_gen import *
from marks import *
from pyspark.sql.types import *
from spark_session import with_cpu_session, with_gpu_session
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330

def read_parquet_df(data_path):
return lambda spark : spark.read.parquet(data_path)
Expand Down Expand Up @@ -685,3 +685,48 @@ def test_parquet_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path).filter(filter_str),
all_confs)


conf_for_parquet_aggregate_pushdown = {
"spark.sql.parquet.aggregatePushdown": "true",
"spark.sql.sources.useV1SourceList": ""
}

@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on Parquet is a new feature of Spark 330')
@allow_non_gpu(any = True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this test is not supposed to fallback, why are we allowing non-GPU nodes in the plan?

def test_parquet_scan_without_aggregation_pushdown_not_fallback(spark_tmp_path):
"""
No aggregation will be pushed down in this test, so we should not fallback to CPU
"""
data_path = spark_tmp_path + "/pushdown.parquet"

def do_parquet_scan(spark):
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").parquet(data_path)
df = spark.read.parquet(data_path).selectExpr("Max(p)")
return df

assert_cpu_and_gpu_are_equal_collect_with_capture(
do_parquet_scan,
exist_classes= "GpuBatchScanExec",
non_exist_classes= "BatchScanExec",
conf= conf_for_parquet_aggregate_pushdown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be assert_cpu_and_gpu_are_equal_collect. We're honestly not that interested in exactly which GPU nodes are involved here, rather that the plan is all on the GPU. That's what assert_cpu_and_gpu_are_equal_collect already checks, once we remove the @allow_non_gpu decorator.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I have updated



@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on Parquet is a new feature of Spark 330')
@allow_non_gpu(any = True)
def test_parquet_scan_with_aggregation_pushdown_fallback(spark_tmp_path):
"""
The aggregation will be pushed down in this test, so we should fallback to CPU
"""
data_path = spark_tmp_path + "/pushdown.parquet"

def do_parquet_scan(spark):
spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").parquet(data_path)
df = spark.read.parquet(data_path).selectExpr("count(p)")
return df

assert_cpu_and_gpu_are_equal_collect_with_capture(
do_parquet_scan,
exist_classes= "BatchScanExec",
non_exist_classes= "GpuBatchScanExec",
conf = conf_for_parquet_aggregate_pushdown)
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.json.rapids.shims.v2.Spark33XFileOptionsShims
import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.types.StructType

trait Spark33XShims extends Spark33XFileOptionsShims {
Expand Down Expand Up @@ -56,4 +60,86 @@ trait Spark33XShims extends Spark33XFileOptionsShims {
new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith,
pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode)
}

override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq(
GpuOverrides.scan[ParquetScan](
"Parquet parsing",
(a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = {
GpuParquetScanBase.tagSupport(this)
// we are being overly cautious and that Parquet does not support this yet
if (a.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
} else if (a.pushedAggregate.nonEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The basic principle of tagging is to record more information about not running on GPU. So here, let's move the pushedAggregate checking out of the else if

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I will change it.

willNotWorkOnGpu(
"Implementing aggregate push down on GPU gains little performance improvement"
)
}
}

override def convertToGpu(): Scan = {
GpuParquetScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.pushedFilters,
a.options,
a.partitionFilters,
a.dataFilters,
conf)
}
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
(a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = {
GpuOrcScanBase.tagSupport(this)
// we are being overly cautious and that Orc does not support this yet
if (a.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Orc does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}
}

override def convertToGpu(): Scan =
GpuOrcScan(a.sparkSession,
a.hadoopConf,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.pushedFilters,
a.partitionFilters,
a.dataFilters,
conf)
}),
GpuOverrides.scan[CSVScan](
"CSV parsing",
(a, conf, p, r) => new ScanMeta[CSVScan](a, conf, p, r) {
override def tagSelfForGpu(): Unit = {
GpuCSVScan.tagSupport(this)
// we are being overly cautious and that Csv does not support this yet
if (a.isInstanceOf[SupportsRuntimeFiltering]) {
willNotWorkOnGpu("Csv does not support Runtime filtering (DPP)" +
" on datasource V2 yet.")
}
}

override def convertToGpu(): Scan =
GpuCSVScan(a.sparkSession,
a.fileIndex,
a.dataSchema,
a.readDataSchema,
a.readPartitionSchema,
a.options,
a.partitionFilters,
a.dataFilters,
conf.maxReadBatchSizeRows,
conf.maxReadBatchSizeBytes)
})
).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap
}