Skip to content

Commit

Permalink
fix: Disable Comet shuffle with AQE coalesce partitions enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 4, 2024
1 parent 26af020 commit 130f8b7
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,9 @@ object CometSparkSessionExtensions extends Logging {
private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) &&
(conf.contains("spark.shuffle.manager") && conf.getConfString("spark.shuffle.manager") ==
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") &&
// TODO: AQE coalesce partitions feature causes Comet columnar shuffle memory leak
!conf.coalesceShufflePartitionsEnabled

private[comet] def isCometScanEnabled(conf: SQLConf): Boolean = {
COMET_SCAN_ENABLED.get(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar

import testImplicits._

setupTestData()

test("Disable Comet shuffle with AQE coalesce partitions enabled") {
withSQLConf(
CometConf.COMET_EXEC_ENABLED.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
"SELECT * FROM (SELECT * FROM testData WHERE key = 0) t1 FULL JOIN " +
"testData2 t2 ON t1.key = t2.a")
checkShuffleAnswer(df, 0)
}
}

test("columnar shuffle on nested struct including nulls") {
Seq(10, 201).foreach { numPartitions =>
Seq("1.0", "10.0").foreach { ratio =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ abstract class CometTestBase
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false")
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
Expand Down

0 comments on commit 130f8b7

Please sign in to comment.