Skip to content

Commit

Permalink
[Spark] Honor codegen configs in DataSkippingStatsTracker (#4120)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR uses MutableProjection.create instead of directly generating the
MutableProjection via GeneratMutableProjection.generate. As the latter
forces codegen, it can fail if the generated code is too large, which,
e.g., can happen for very wide schemas. The new code uses the correct
Spark API to automatically fall back to an InterpretedMutableProjection
if codegen fails for any reason.

## How was this patch tested?

This PR just uses the correct, higher-level API call to create a
MutableProjection so existing tests are sufficient.

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
LukasRupprecht authored Feb 24, 2025
1 parent 7e85686 commit 1e3076a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,16 @@ trait DeltaSQLConfBase {
.checkValue(v => v >= 1, "Must be at least 1.")
.createWithDefault(100)

val DELTA_STATS_COLLECTION_FALLBACK_TO_INTERPRETED_PROJECTION =
buildConf("collectStats.fallbackToInterpretedProjection")
.internal()
.doc("When enabled, the updateStats expression will use the standard code path" +
" that falls back to an interpreted expression if codegen fails. This should" +
" always be true. The config only exists to force the old behavior, which was" +
" to always use codegen.")
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_ICEBERG_STATS = buildConf("collectStats.convertIceberg")
.internal()
.doc("When enabled, attempts to convert Iceberg stats to Delta stats when cloning from " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.stats
import scala.collection.mutable

import org.apache.spark.sql.delta.expressions.JoinedProjection
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

Expand All @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -87,14 +89,23 @@ class DeltaTaskStatisticsTracker(

// This projection combines the intermediate results stored by aggBuffer with the values of the
// currently processed row and updates aggBuffer in place.
private val updateStats: MutableProjection = GenerateMutableProjection.generate(
expressions = JoinedProjection.bind(
aggBufferAttrs,
dataCols,
aggregates.flatMap(_.updateExpressions)),
inputSchema = Nil,
useSubexprElimination = true
)
private val updateStats: MutableProjection = {
val aggs = aggregates.flatMap(_.updateExpressions)
val expressions = JoinedProjection.bind(aggBufferAttrs, dataCols, aggs)
if (SQLConf.get.getConf(
DeltaSQLConf.DELTA_STATS_COLLECTION_FALLBACK_TO_INTERPRETED_PROJECTION)) {
MutableProjection.create(
exprs = expressions,
inputSchema = Nil
)
} else {
GenerateMutableProjection.generate(
expressions = expressions,
inputSchema = Nil,
useSubexprElimination = true
)
}
}

// This executes the whole statsColExpr in order to compute the final stats value for the file.
// In order to evaluate it, we have to replace its aggregate functions with the corresponding
Expand Down

0 comments on commit 1e3076a

Please sign in to comment.