Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4 + Arrow Datafusion Shuffle Manager Fails due to class loader isolation #221

Closed
holdenk opened this issue Mar 20, 2024 · 6 comments · Fixed by #256
Closed

Spark 3.4 + Arrow Datafusion Shuffle Manager Fails due to class loader isolation #221

holdenk opened this issue Mar 20, 2024 · 6 comments · Fixed by #256
Labels
bug Something isn't working

Comments

@holdenk
Copy link
Contributor

holdenk commented Mar 20, 2024

Describe the bug

When trying to run using org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager it fails due to class loader isolation.

Steps to reproduce

/home/holden/repos/high-performance-spark-examples/spark-3.4.2-bin-hadoop3/bin/spark-sql --master 'local[5]' --conf spark.eventLog.enabled=true --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.apache.comet.CometSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=/home/holden/repos/high-performance-spark-examples/warehouse --jars /home/holden/repos/high-performance-spark-examples/accelerators/arrow-datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true --conf spark.comet.exec.all.enabled=true --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager --conf spark.comet.exec.shuffle.enabled=true --conf spark.comet.columnar.shuffle.enabled=true --conf spark.driver.userClassPathFirst=true --name sql/wap.sql -f sql/wap.sql

I think anything triggering a sort would suffice for repro but just in case my wap.sql here is:

DROP TABLE IF EXISTS local.wap_projects;
CREATE TABLE local.wap_projects (
       creator string,
       projectname string)
USING iceberg
PARTITIONED BY (creator);
ALTER TABLE local.projects SET TBLPROPERTIES (
    'write.wap.enabled''true'
);
-- We need a first commit, see https://github.com/apache/iceberg/issues/8849
INSERT INTO local.wap_projects VALUES("holdenk", "spark");
ALTER TABLE local.wap_projects DROP BRANCH IF EXISTS `audit-branch`;
ALTER TABLE local.wap_projects CREATE BRANCH `audit-branch`;
SET spark.wap.branch = 'audit-branch';
INSERT INTO local.projects VALUES("krisnova", "aurae");
SELECT count(*) FROM local.wap_projects VERSION AS OF 'audit-branch' WHERE creator is NULL;
SELECT count(*) FROM local.wap_projects VERSION AS OF 'audit-branch' WHERE creator == "krisnova";
CALL local.system.remove_orphan_files(table => 'local.wap_projects');
CALL local.system.fast_forward("local.wap_projects", "main", "audit-branch");

This results in:

24/03/20 14:26:53 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalAccessError: failed to access class org.apache.spark.shuffle.sort.ShuffleInMemorySorter from class org.apache.spark.shuffle.sort.CometShuffleExternalSorter$SpillSorter (org.apache.spark.shuffle.sort.ShuffleInMemorySorter is in unnamed module of loader 'app'; org.apache.spark.shuffle.sort.CometShuffleExternalSorter$SpillSorter is in unnamed module of loader org.apache.spark.util.ChildFirstURLClassLoader @14dc3f89)
	at org.apache.spark.shuffle.sort.CometShuffleExternalSorter$SpillSorter.<init>(CometShuffleExternalSorter.java:434)
	at org.apache.spark.shuffle.sort.CometShuffleExternalSorter.<init>(CometShuffleExternalSorter.java:169)
	at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.open(CometUnsafeShuffleWriter.java:236)
	at org.apache.spark.sql.comet.execution.shuffle.CometUnsafeShuffleWriter.<init>(CometUnsafeShuffleWriter.java:165)
	at org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager.getWriter(CometShuffleManager.scala:189)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Expected behavior

I expect the query to run.

The expected output is:

Time taken: 0.038 seconds
spark.wap.branch	'audit-branch'
Time taken: 0.041 seconds, Fetched 1 row(s)
Time taken: 0.232 seconds
0
Time taken: 0.605 seconds, Fetched 1 row(s)
0
Time taken: 0.183 seconds, Fetched 1 row(s)
Time taken: 3.352 seconds
main	4878286225198802743	4878286225198802743
Time taken: 0.035 seconds, Fetched 1 row(s)

Additional context

You can work around this error by instead of using --jars to add the arrow datafusion comet jar to the classpath instead copying it into Spark's jar directory so it will be loaded with the same classloader.

@holdenk holdenk added the bug Something isn't working label Mar 20, 2024
@holdenk
Copy link
Contributor Author

holdenk commented Mar 20, 2024

I suspect that the correct fix is a documentation note in the README (maybe + a try/catch in the code to print out a reference to the README) since changing the Spark class loader is not easy (I also tried with user classpath first class loader). If folks agree happy to make a PR.

We could also (maybe?) get at Spark's internal class loader and explicitly use it but that also seems very hack-ey

@advancedxy
Copy link
Contributor

You can work around this error by instead of using --jars to add the arrow datafusion comet jar to the classpath instead copying it into Spark's jar directory so it will be loaded with the same classloader.

Emmm, it could be a potential solution. But it seems a bit of inconvenient. Per my understanding, it usually requires extra effort to change Spark's jar directory/archive in the production environment.

since changing the Spark class loader is not easy (I also tried with user classpath first class loader)

So this issue occurred regardless the spark.driver.userClassPathFirst setting being true or false?

@holdenk
Copy link
Contributor Author

holdenk commented Mar 22, 2024

You can work around this error by instead of using --jars to add the arrow datafusion comet jar to the classpath instead copying it into Spark's jar directory so it will be loaded with the same classloader.

Emmm, it could be a potential solution. But it seems a bit of inconvenient. Per my understanding, it usually requires extra effort to change Spark's jar directory/archive in the production environment.

True, especially for users of a vendor solution, although for my deployments this isn't a big deal (we package our own Spark version anyways).

Let me take another look next week and see if there is a way to get loaded with Sparks default class loader.

since changing the Spark class loader is not easy (I also tried with user classpath first class loader)

So this issue occurred regardless the spark.driver.userClassPathFirst setting being true or false?
Yup :(

Now I only tried in vanilla 3.4.

@advancedxy
Copy link
Contributor

Let me take another look next week and see if there is a way to get loaded with Sparks default class loader.

Thanks for working on this.
Another option came out of my mind would be shading and renaming the package scoped, shuffle related classes, such as org.apache.spark.shuffle.sort.ShuffleInMemorySorter -> org.apache.comet.shaded.ShuffleInMemorySorter into Comet's jar. It should be doable and seems very hack-ey too.

@holdenk
Copy link
Contributor Author

holdenk commented Mar 25, 2024

Following on I tried adding --driver-class-path as well and it did the trick. So I think what I would purpose is updating the docs to include --driver-class-path and maybe adding a try/catch around the part where it has the error and logging a message to indicate the fix. WDYT @advancedxy ?

@advancedxy
Copy link
Contributor

Ah, I remembered this option. I think it would be great to update the doc to include this option.

One thing more, I think you also need to mention spark.executor.extraClassPath for Spark on Yarn/K8S deployment's executors?
Also cc @sunchao

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants