Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to CPU when Spark pushes down Aggregates (Min/Max/Count) for ORC #4859

Merged
merged 4 commits into from
Mar 7, 2022

Conversation

amahussein
Copy link
Collaborator

Fixes #4638

Signed-off-by: Ahmed Hussein (amahussein) [email protected]

SPARK-34960 allows push down certain aggregations into ORC. ORC exposes column statistics in interface org.apache.orc.Reader.
This PR makes the following changes:

  • fall back to the CPU when aggregates are pushed down.
  • created new ScanMeta classes for each file format and placed them in the shims. This way, we won't have to copy paste the entire getScans implementation just to support a new feature.

@amahussein
Copy link
Collaborator Author

build

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
@jlowe jlowe added this to the Feb 14 - Feb 25 milestone Feb 24, 2022
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, just have a question about the tests and would like to understand it a bit better. Also do we know if we are writing the correct metadata out on the GPU side to make this feature work if the GPU wrote the files instead of the CPU?


def do_orc_scan(spark):
df = spark.read.orc(data_path).selectExpr("Max(p)")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is max not pushed down but count is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking to Spark-PR-34298.
AFAIU, Min/Max don't push down partition column. Only Count does.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine, then add a comment about it and update the name of this issues so only Count is included. Spark messed up by keeping the name of the issue and PR the same.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the test to cover all the aggregates (min/max/count) Vs. (None/- partition columns).

@amahussein
Copy link
Collaborator Author

Looks good, just have a question about the tests and would like to understand it a bit better. Also do we know if we are writing the correct metadata out on the GPU side to make this feature work if the GPU wrote the files instead of the CPU?

Thanks @revans2 ! that's a actually a good question and I wanted to reach out to get more background about support for writing ORC.

Initially, the file was written and read inside do_orc_scan(spark). While this works for parquet, it fails for ORC with exceptions.
The GPU does not write the statistics column correctly

E                   py4j.protocol.Py4JJavaError: An error occurred while calling o356.collectToPython.
E                   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 5.0 failed 1 times, most recent failure: Lost task 7.0 in stage 5.0 (TID 163) (10.136.8.146 executor driver): org.apache.spark.SparkException: Cannot read columns statistics in file: file:/tmp/ahussein/pyspark_tests/c240m5-01-gw1-72665-1063323372/pushdown.orc/p=2/part-00064-47d45f8a-d4ec-42ad-8bfc-0176ade0caba.c000.snappy.orc. Please consider disabling ORC aggregate push down by setting 'spark.sql.orc.aggregatePushdown' to false.
E                       at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.createAggInternalRowFromFooter(OrcUtils.scala:432)
E                       at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$$anon$3.$anonfun$batch$2(OrcPartitionReaderFactory.scala:221)
E                       at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2735)
E                       at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$$anon$3.batch$lzycompute(OrcPartitionReaderFactory.scala:218)
E                       at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$$anon$3.batch(OrcPartitionReaderFactory.scala:217)
E                       at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$$anon$3.get(OrcPartitionReaderFactory.scala:230)
E                       at org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory$$anon$3.get(OrcPartitionReaderFactory.scala:215)
E                       at org.apache.spark.sql.execution.datasources.v2.PartitionedFileReader.get(FilePartitionReaderFactory.scala:57)
E                       at org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.get(FilePartitionReader.scala:89)
E                       at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:108)
E                       at org.apache.spark.sql.execution.datasources.v2.MetricsBatchIterator.next(DataSourceRDD.scala:154)
E                       at org.apache.spark.sql.execution.datasources.v2.MetricsBatchIterator.next(DataSourceRDD.scala:151)
E                       at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
E                       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
E                       at com.nvidia.spark.rapids.CollectTimeIterator.$anonfun$next$1(GpuExec.scala:198)
E                       at com.nvidia.spark.rapids.Arm.withResource(Arm.scala:28)
E                       at com.nvidia.spark.rapids.Arm.withResource$(Arm.scala:26)
E                       at com.nvidia.spark.RebaseHelper$.withResource(RebaseHelper.scala:25)
E                       at com.nvidia.spark.rapids.CollectTimeIterator.next(GpuExec.scala:197)
E                       at com.nvidia.spark.rapids.AbstractGpuCoalesceIterator.hasNext(GpuCoalesceBatches.scala:261)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.$anonfun$next$2(aggregate.scala:237)
E                       at scala.Option.getOrElse(Option.scala:189)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:235)
E                       at com.nvidia.spark.rapids.GpuHashAggregateIterator.next(aggregate.scala:181)
E                       at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.partNextBatch(GpuShuffleExchangeExecBase.scala:288)
E                       at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase$$anon$1.hasNext(GpuShuffleExchangeExecBase.scala:304)
E                       at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
E                       at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
E                       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
E                       at org.apache.spark.scheduler.Task.run(Task.scala:136)
E                       at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
E                       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1475)
E                       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
E                       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
E                       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
E                       at java.lang.Thread.run(Thread.java:748)
E                   Caused by: java.util.NoSuchElementException
E                       at java.util.LinkedList.removeFirst(LinkedList.java:270)
E                       at java.util.LinkedList.remove(LinkedList.java:685)
E                       at org.apache.spark.sql.execution.datasources.orc.OrcFooterReader.convertStatistics(OrcFooterReader.java:54)
E                       at org.apache.spark.sql.execution.datasources.orc.OrcFooterReader.readStatistics(OrcFooterReader.java:45)
E                       at org.apache.spark.sql.execution.datasources.orc.OrcUtils$.createAggInternalRowFromFooter(OrcUtils.scala:428)
E                       ... 36 more

Then I thought to try writing the file inside with_cpu_session which resulted in eliminating the above exception.
Since I follow Test-Driven approach, I know for sure that test_orc_scan_with_aggregation_pushdown_fallback fails without my code changes.

@jlowe
Copy link
Contributor

jlowe commented Feb 24, 2022

The GPU does not write the statistics column correctly

That's concerning. We need to do some investigations to see if this is a Spark bug or a bug in the way the statistics are generated in cudf. I suggest going with the CPU session for the write for now, and file a followup issue in this repo to track down the root cause of the problem when the GPU writes the ORC file.

@amahussein
Copy link
Collaborator Author

build

Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
@amahussein amahussein self-assigned this Feb 25, 2022
@sameerz sameerz added the audit_3.3.0 Audit related tasks for 3.3.0 label Feb 28, 2022
@sameerz
Copy link
Collaborator

sameerz commented Mar 4, 2022

build

@amahussein amahussein merged commit 4fd75c3 into NVIDIA:branch-22.04 Mar 7, 2022
@tgravescs tgravescs changed the title Aggregate (Min/Max/Count) push down for ORC Fallback to CPU when Spark pushes down Aggregates (Min/Max/Count) for ORC Mar 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
audit_3.3.0 Audit related tasks for 3.3.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Aggregate (Min/Max/Count) push down for ORC
4 participants