diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index b7b36c4885e..516cef3e7a5 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,11 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_equal +from asserts import assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect from data_gen import * from marks import * from pyspark.sql.types import * -from spark_session import with_cpu_session, with_gpu_session +from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330 def read_parquet_df(data_path): return lambda spark : spark.read.parquet(data_path) @@ -685,3 +685,46 @@ def test_parquet_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path).filter(filter_str), all_confs) + + +conf_for_parquet_aggregate_pushdown = { + "spark.sql.parquet.aggregatePushdown": "true", + "spark.sql.sources.useV1SourceList": "" +} + +@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on Parquet is a new feature of Spark 330') +def test_parquet_scan_without_aggregation_pushdown_not_fallback(spark_tmp_path): + """ + No aggregation will be pushed down in this test, so we should not fallback to CPU + """ + data_path = spark_tmp_path + "/pushdown.parquet" + + def do_parquet_scan(spark): + spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").parquet(data_path) + df = spark.read.parquet(data_path).selectExpr("Max(p)") + return df + + assert_gpu_and_cpu_are_equal_collect( + do_parquet_scan, + conf_for_parquet_aggregate_pushdown + ) + + +@pytest.mark.skipif(is_before_spark_330(), reason='Aggregate push down on Parquet is a new feature of Spark 330') +@allow_non_gpu(any = True) +def test_parquet_scan_with_aggregation_pushdown_fallback(spark_tmp_path): + """ + The aggregation will be pushed down in this test, so we should fallback to CPU + """ + data_path = spark_tmp_path + "/pushdown.parquet" + + def do_parquet_scan(spark): + spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").mode("overwrite").parquet(data_path) + df = spark.read.parquet(data_path).selectExpr("count(p)") + return df + + assert_cpu_and_gpu_are_equal_collect_with_capture( + do_parquet_scan, + exist_classes= "BatchScanExec", + non_exist_classes= "GpuBatchScanExec", + conf = conf_for_parquet_aggregate_pushdown) diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala index be5b4e86b7c..e54c891ca01 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/Spark33XShims.scala @@ -24,9 +24,13 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.json.rapids.shims.v2.Spark33XFileOptionsShims +import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.types.StructType trait Spark33XShims extends Spark33XFileOptionsShims { @@ -56,4 +60,87 @@ trait Spark33XShims extends Spark33XFileOptionsShims { new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith, pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode) } + + override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = Seq( + GpuOverrides.scan[ParquetScan]( + "Parquet parsing", + (a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = { + GpuParquetScanBase.tagSupport(this) + // we are being overly cautious and that Parquet does not support this yet + if (a.isInstanceOf[SupportsRuntimeFiltering]) { + willNotWorkOnGpu("Parquet does not support Runtime filtering (DPP)" + + " on datasource V2 yet.") + } + if (a.pushedAggregate.nonEmpty) { + willNotWorkOnGpu( + "aggregates pushed into Parquet read, which is a metadata only operation" + ) + } + } + + override def convertToGpu(): Scan = { + GpuParquetScan(a.sparkSession, + a.hadoopConf, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.pushedFilters, + a.options, + a.partitionFilters, + a.dataFilters, + conf) + } + }), + GpuOverrides.scan[OrcScan]( + "ORC parsing", + (a, conf, p, r) => new ScanMeta[OrcScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = { + GpuOrcScanBase.tagSupport(this) + // we are being overly cautious and that Orc does not support this yet + if (a.isInstanceOf[SupportsRuntimeFiltering]) { + willNotWorkOnGpu("Orc does not support Runtime filtering (DPP)" + + " on datasource V2 yet.") + } + } + + override def convertToGpu(): Scan = + GpuOrcScan(a.sparkSession, + a.hadoopConf, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.options, + a.pushedFilters, + a.partitionFilters, + a.dataFilters, + conf) + }), + GpuOverrides.scan[CSVScan]( + "CSV parsing", + (a, conf, p, r) => new ScanMeta[CSVScan](a, conf, p, r) { + override def tagSelfForGpu(): Unit = { + GpuCSVScan.tagSupport(this) + // we are being overly cautious and that Csv does not support this yet + if (a.isInstanceOf[SupportsRuntimeFiltering]) { + willNotWorkOnGpu("Csv does not support Runtime filtering (DPP)" + + " on datasource V2 yet.") + } + } + + override def convertToGpu(): Scan = + GpuCSVScan(a.sparkSession, + a.fileIndex, + a.dataSchema, + a.readDataSchema, + a.readPartitionSchema, + a.options, + a.partitionFilters, + a.dataFilters, + conf.maxReadBatchSizeRows, + conf.maxReadBatchSizeBytes) + }) + ).map(r => (r.getClassFor.asSubclass(classOf[Scan]), r)).toMap }