Skip to content

Commit

Permalink
[SPARK-37369][SQL][FOLLOWUP] Override supportsRowBased in UnionExec
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
In PR #34642, we added a `supportsRowBased` in `SparkPlan` in order to avoid
redundant `ColumnarToRow` transition in `InMemoryTableScan `. But, this optimization
also applies to Union if its children both support row-based output.
So, this PR adds the `supportsRowBased` implementation for `UnionExec`.

### Why are the changes needed?
followup PR

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests passed.

Closes #35061 from linhongliu-db/SPARK-37369-followup.

Authored-by: Linhong Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
linhongliu-db authored and cloud-fan committed Dec 30, 2021
1 parent 22585b6 commit 77e8683
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,8 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {

override def supportsColumnar: Boolean = children.forall(_.supportsColumnar)

override def supportsRowBased: Boolean = children.forall(_.supportsRowBased)

protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
sparkContext.union(children.map(_.executeColumnar()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.execution.columnar

import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
import org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
Expand Down Expand Up @@ -101,7 +103,13 @@ class TestSingleIntColumnarCachedBatchSerializer extends CachedBatchSerializer {
cacheAttributes: Seq[Attribute],
selectedAttributes: Seq[Attribute],
conf: SQLConf): RDD[InternalRow] = {
throw new IllegalStateException("This does not work. This is only for testing")
convertCachedBatchToColumnarBatch(input, cacheAttributes, selectedAttributes, conf)
.mapPartitionsInternal { batches =>
val toUnsafe = UnsafeProjection.create(selectedAttributes, selectedAttributes)
batches.flatMap { batch =>
batch.rowIterator().asScala.map(toUnsafe)
}
}
}

override def buildFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,25 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui
}

test("SPARK-28537: DebugExec cannot debug columnar related queries") {
val base = spark.range(5)
base.persist()
val df = base.union(base)
withTempPath { workDir =>
val workDirPath = workDir.getAbsolutePath
val input = spark.range(5).toDF("id")
input.write.parquet(workDirPath)
val df = spark.read.parquet(workDirPath)

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
df.debug()
}

val captured = new ByteArrayOutputStream()
Console.withOut(captured) {
df.debug()
val output = captured.toString()
.replaceAll("== FileScan parquet \\[id#\\d+L] .* ==", "== FileScan parquet [id#xL] ==")
assert(output.contains(
"""== FileScan parquet [id#xL] ==
|Tuples output: 0
| id LongType: {}
|""".stripMargin))
}
df.unpersist()

val output = captured.toString().replaceAll("#\\d+", "#x")
assert(output.contains(
"""== InMemoryTableScan [id#xL] ==
|Tuples output: 0
| id LongType: {}
|""".stripMargin))
}
}

Expand Down

0 comments on commit 77e8683

Please sign in to comment.