Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-36645][SQL] Aggregate (Min/Max/Count) push down for Parquet #33639

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,14 @@ object SQLConf {
.checkValue(threshold => threshold >= 0, "The threshold must not be negative.")
.createWithDefault(10)

val PARQUET_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.aggregatePushdown")
.doc("If true, MAX/MIN/COUNT without filter and group by will be pushed" +
" down to Parquet for optimization. MAX/MIN/COUNT for complex types and timestamp" +
" can't be pushed down")
.version("3.3.0")
.booleanConf
.createWithDefault(false)

val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat")
.doc("If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal " +
"values will be written in Apache Parquet's fixed-length byte array format, which other " +
Expand Down Expand Up @@ -3660,6 +3668,8 @@ class SQLConf extends Serializable with Logging {
def parquetFilterPushDownInFilterThreshold: Int =
getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)

def parquetAggregatePushDown: Boolean = getConf(PARQUET_AGGREGATE_PUSHDOWN_ENABLED)

def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)

def isOrcSchemaMergingEnabled: Boolean = getConf(ORC_SCHEMA_MERGING_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,28 @@
*/
package org.apache.spark.sql.execution.datasources.parquet

import java.util

import scala.collection.mutable
import scala.language.existentials

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.metadata.{ColumnChunkMetaData, ParquetMetadata}
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{PrimitiveType, Types}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min}
import org.apache.spark.sql.execution.RowToColumnConverter
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf.{LegacyBehaviorPolicy, PARQUET_AGGREGATE_PUSHDOWN_ENABLED}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object ParquetUtils {
def inferSchema(
Expand Down Expand Up @@ -127,4 +144,213 @@ object ParquetUtils {
file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE ||
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}

/**
* When the partial aggregates (Max/Min/Count) are pushed down to Parquet, we don't need to
* createRowBaseReader to read data from Parquet and aggregate at Spark layer. Instead we want
* to get the partial aggregates (Max/Min/Count) result using the statistics information
* from Parquet footer file, and then construct an InternalRow from these aggregate results.
*
* @return Aggregate results in the format of InternalRow
*/
private[sql] def createAggInternalRowFromFooter(
footer: ParquetMetadata,
filePath: String,
dataSchema: StructType,
partitionSchema: StructType,
aggregation: Aggregation,
aggSchema: StructType,
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
isCaseSensitive: Boolean): InternalRow = {
val (primitiveTypes, values) = getPushedDownAggResult(
footer, filePath, dataSchema, partitionSchema, aggregation, isCaseSensitive)

val builder = Types.buildMessage
primitiveTypes.foreach(t => builder.addField(t))
val parquetSchema = builder.named("root")

val schemaConverter = new ParquetToSparkSchemaConverter
val converter = new ParquetRowConverter(schemaConverter, parquetSchema, aggSchema,
None, datetimeRebaseMode, LegacyBehaviorPolicy.CORRECTED, NoopUpdater)
val primitiveTypeNames = primitiveTypes.map(_.getPrimitiveTypeName)
primitiveTypeNames.zipWithIndex.foreach {
case (PrimitiveType.PrimitiveTypeName.BOOLEAN, i) =>
val v = values(i).asInstanceOf[Boolean]
converter.getConverter(i).asPrimitiveConverter.addBoolean(v)
case (PrimitiveType.PrimitiveTypeName.INT32, i) =>
val v = values(i).asInstanceOf[Integer]
converter.getConverter(i).asPrimitiveConverter.addInt(v)
case (PrimitiveType.PrimitiveTypeName.INT64, i) =>
val v = values(i).asInstanceOf[Long]
converter.getConverter(i).asPrimitiveConverter.addLong(v)
case (PrimitiveType.PrimitiveTypeName.FLOAT, i) =>
val v = values(i).asInstanceOf[Float]
converter.getConverter(i).asPrimitiveConverter.addFloat(v)
case (PrimitiveType.PrimitiveTypeName.DOUBLE, i) =>
val v = values(i).asInstanceOf[Double]
converter.getConverter(i).asPrimitiveConverter.addDouble(v)
case (PrimitiveType.PrimitiveTypeName.BINARY, i) =>
val v = values(i).asInstanceOf[Binary]
converter.getConverter(i).asPrimitiveConverter.addBinary(v)
case (PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, i) =>
val v = values(i).asInstanceOf[Binary]
converter.getConverter(i).asPrimitiveConverter.addBinary(v)
case (_, i) =>
throw new SparkException("Unexpected parquet type name: " + primitiveTypeNames(i))
}
converter.currentRecord
}

/**
* When the aggregates (Max/Min/Count) are pushed down to Parquet, in the case of
* PARQUET_VECTORIZED_READER_ENABLED sets to true, we don't need buildColumnarReader
* to read data from Parquet and aggregate at Spark layer. Instead we want
* to get the aggregates (Max/Min/Count) result using the statistics information
* from Parquet footer file, and then construct a ColumnarBatch from these aggregate results.
*
* @return Aggregate results in the format of ColumnarBatch
*/
private[sql] def createAggColumnarBatchFromFooter(
footer: ParquetMetadata,
filePath: String,
dataSchema: StructType,
partitionSchema: StructType,
aggregation: Aggregation,
aggSchema: StructType,
offHeap: Boolean,
datetimeRebaseMode: LegacyBehaviorPolicy.Value,
isCaseSensitive: Boolean): ColumnarBatch = {
val row = createAggInternalRowFromFooter(
footer,
filePath,
dataSchema,
partitionSchema,
aggregation,
aggSchema,
datetimeRebaseMode,
isCaseSensitive)
val converter = new RowToColumnConverter(aggSchema)
val columnVectors = if (offHeap) {
OffHeapColumnVector.allocateColumns(1, aggSchema)
} else {
OnHeapColumnVector.allocateColumns(1, aggSchema)
}
converter.convert(row, columnVectors.toArray)
new ColumnarBatch(columnVectors.asInstanceOf[Array[ColumnVector]], 1)
}

/**
* Calculate the pushed down aggregates (Max/Min/Count) result using the statistics
* information from Parquet footer file.
*
* @return A tuple of `Array[PrimitiveType]` and Array[Any].
* The first element is the Parquet PrimitiveType of the aggregate column,
* and the second element is the aggregated value.
*/
private[sql] def getPushedDownAggResult(
footer: ParquetMetadata,
filePath: String,
dataSchema: StructType,
partitionSchema: StructType,
aggregation: Aggregation,
isCaseSensitive: Boolean)
: (Array[PrimitiveType], Array[Any]) = {
val footerFileMetaData = footer.getFileMetaData
val fields = footerFileMetaData.getSchema.getFields
val blocks = footer.getBlocks
val primitiveTypeBuilder = mutable.ArrayBuilder.make[PrimitiveType]
val valuesBuilder = mutable.ArrayBuilder.make[Any]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should assert groupingExpressions is empty here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks

aggregation.aggregateExpressions.foreach { agg =>
var value: Any = None
var rowCount = 0L
var isCount = false
var index = 0
var schemaName = ""
blocks.forEach { block =>
val blockMetaData = block.getColumns
agg match {
case max: Max =>
val colName = max.column.fieldNames.head
index = dataSchema.fieldNames.toList.indexOf(colName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should consider case sensitivity in these cases too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are ok. I have normalized aggregate column names.

schemaName = "max(" + colName + ")"
val currentMax = getCurrentBlockMaxOrMin(filePath, blockMetaData, index, true)
if (value == None || currentMax.asInstanceOf[Comparable[Any]].compareTo(value) > 0) {
value = currentMax
}
case min: Min =>
val colName = min.column.fieldNames.head
index = dataSchema.fieldNames.toList.indexOf(colName)
schemaName = "min(" + colName + ")"
val currentMin = getCurrentBlockMaxOrMin(filePath, blockMetaData, index, false)
if (value == None || currentMin.asInstanceOf[Comparable[Any]].compareTo(value) < 0) {
value = currentMin
}
case count: Count =>
schemaName = "count(" + count.column.fieldNames.head + ")"
rowCount += block.getRowCount
var isPartitionCol = false
if (partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive))
Comment on lines +290 to +294
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the partition column also stored in Parquet file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for count(column), I actually get the getRowCount from BlockMetaData, and then getNumNulls from footer, and subtract the num of Null from the row count. If the column is partition column, I don't think it can be null, so I simply get getRowCount.

.toSet.contains(count.column.fieldNames.head)) {
isPartitionCol = true
}
isCount = true
if (!isPartitionCol) {
index = dataSchema.fieldNames.toList.indexOf(count.column.fieldNames.head)
// Count(*) includes the null values, but Count(colName) doesn't.
rowCount -= getNumNulls(filePath, blockMetaData, index)
}
case _: CountStar =>
schemaName = "count(*)"
rowCount += block.getRowCount
isCount = true
case _ =>
}
}
if (isCount) {
valuesBuilder += rowCount
primitiveTypeBuilder += Types.required(PrimitiveTypeName.INT64).named(schemaName);
} else {
valuesBuilder += value
val field = fields.get(index)
primitiveTypeBuilder += Types.required(field.asPrimitiveType.getPrimitiveTypeName)
.as(field.getLogicalTypeAnnotation)
.length(field.asPrimitiveType.getTypeLength)
.named(schemaName)
}
}
(primitiveTypeBuilder.result, valuesBuilder.result)
}

/**
* Get the Max or Min value for ith column in the current block
*
* @return the Max or Min value
*/
private def getCurrentBlockMaxOrMin(
filePath: String,
columnChunkMetaData: util.List[ColumnChunkMetaData],
i: Int,
isMax: Boolean): Any = {
val statistics = columnChunkMetaData.get(i).getStatistics
if (!statistics.hasNonNullValue) {
throw new UnsupportedOperationException(s"No min/max found for Parquet file $filePath. " +
s"Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute again")
} else {
if (isMax) statistics.genericGetMax else statistics.genericGetMin
}
}

private def getNumNulls(
filePath: String,
columnChunkMetaData: util.List[ColumnChunkMetaData],
i: Int): Long = {
val statistics = columnChunkMetaData.get(i).getStatistics
if (!statistics.isNumNullsSet) {
throw new UnsupportedOperationException(s"Number of nulls not set for Parquet file" +
s" $filePath. Set SQLConf ${PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key} to false and execute" +
s" again")
}
statistics.getNumNulls;
}
}
Loading