Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in checks for Parquet LEGACY date/time rebase #435

Merged
merged 3 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def read_parquet_sql(data_path):

parquet_gens_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen,
TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))],
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))],
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/132'))]

@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_compress_read_round_trip(spark_tmp_path, compress):
string_gen, date_gen,
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))]
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]

@pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
Expand All @@ -102,7 +102,7 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func):
def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase):
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
gen = TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
Expand All @@ -113,7 +113,7 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase):

parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))],
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))],
pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133')),
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))]

Expand All @@ -132,7 +132,7 @@ def test_simple_partitioned_read(spark_tmp_path):
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))]
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
Expand All @@ -153,7 +153,7 @@ def test_read_merge_schema(spark_tmp_path):
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))]
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]
first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import scala.collection.mutable

import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, TableWriter}
import ai.rapids.cudf.{HostBufferConsumer, HostMemoryBuffer, NvtxColor, NvtxRange, Table, TableWriter}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext
Expand Down Expand Up @@ -60,7 +60,7 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
* `org.apache.spark.sql.execution.datasources.OutputWriter`.
*/
abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
dataSchema: StructType, rangeName: String) extends HostBufferConsumer {
dataSchema: StructType, rangeName: String) extends HostBufferConsumer with Arm {

val tableWriter: TableWriter
val conf = context.getConfiguration
Expand Down Expand Up @@ -130,6 +130,10 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
}
}

protected def scanTableBeforeWrite(table: Table): Unit = {
// NOOP for now, but allows a child to override this
}

/**
* Writes the columnar batch and returns the time in ns taken to write
*
Expand All @@ -140,17 +144,12 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
var needToCloseBatch = true
try {
val startTimestamp = System.nanoTime
val nvtxRange = new NvtxRange(s"GPU $rangeName write", NvtxColor.BLUE)
try {
val table = GpuColumnVector.from(batch)
try {
withResource(new NvtxRange(s"GPU $rangeName write", NvtxColor.BLUE)) { _ =>
withResource(GpuColumnVector.from(batch)) { table =>
scanTableBeforeWrite(table)
anythingWritten = true
tableWriter.write(table)
} finally {
table.close()
}
} finally {
nvtxRange.close()
}

// Batch is no longer needed, write process from here does not use GPU.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetWriteSupport}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.rapids.RebaseHelper
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.types.{DateType, StructType, TimestampType}

object GpuParquetFileFormat {
def tagGpuSupport(
Expand Down Expand Up @@ -69,6 +71,21 @@ object GpuParquetFileFormat {
}
}

val schemaHasDates = schema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[DateType])
}

sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE) match {
case "EXCEPTION" => //Good
case "CORRECTED" => //Good
case "LEGACY" =>
if (schemaHasDates || schemaHasTimestamps) {
meta.willNotWorkOnGpu("LEGACY rebase mode for dates and timestamps is not supported")
}
case other =>
meta.willNotWorkOnGpu(s"$other is not a supported rebase mode")
}

if (meta.canThisBeReplaced) {
Some(new GpuParquetFileFormat)
} else {
Expand Down Expand Up @@ -101,6 +118,9 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {

val conf = ContextUtil.getConfiguration(job)

val dateTimeRebaseException =
"EXCEPTION".equals(conf.get(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key))

val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
Expand Down Expand Up @@ -179,7 +199,7 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuParquetWriter(path, dataSchema, compressionType, context)
new GpuParquetWriter(path, dataSchema, compressionType, dateTimeRebaseException, context)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -193,9 +213,20 @@ class GpuParquetWriter(
path: String,
dataSchema: StructType,
compressionType: CompressionType,
dateTimeRebaseException: Boolean,
context: TaskAttemptContext)
extends ColumnarOutputWriter(path, context, dataSchema, "Parquet") {

override def scanTableBeforeWrite(table: Table): Unit = {
if (dateTimeRebaseException) {
(0 until table.getNumberOfColumns).foreach { i =>
if (RebaseHelper.isDateTimeRebaseNeededWrite(table.getColumn(i))) {
throw DataSourceUtils.newRebaseExceptionInWrite("Parquet")
}
}
}
}

override val tableWriter: TableWriter = {
val writeContext = new ParquetWriteSupport().init(conf)
val builder = ParquetWriterOptions.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.RebaseHelper
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -114,6 +116,8 @@ object GpuParquetScan {
sparkSession: SparkSession,
readSchema: StructType,
meta: RapidsMeta[_, _, _]): Unit = {
val sqlConf = sparkSession.conf

if (!meta.conf.isParquetEnabled) {
meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET} to true")
Expand All @@ -130,6 +134,10 @@ object GpuParquetScan {
}
}

val schemaHasTimestamps = readSchema.exists { field =>
TrampolineUtil.dataTypeExistsRecursively(field.dataType, _.isInstanceOf[TimestampType])
}

// Currently timestamp conversion is not supported.
// If support needs to be added then we need to follow the logic in Spark's
// ParquetPartitionReaderFactory and VectorizedColumnReader which essentially
Expand All @@ -139,9 +147,17 @@ object GpuParquetScan {
// were written in that timezone and convert them to UTC timestamps.
// Essentially this should boil down to a vector subtract of the scalar delta
// between the configured timezone's delta from UTC on the timestamp data.
if (sparkSession.sessionState.conf.isParquetINT96TimestampConversion) {
if (schemaHasTimestamps && sparkSession.sessionState.conf.isParquetINT96TimestampConversion) {
meta.willNotWorkOnGpu("GpuParquetScan does not support int96 timestamp conversion")
}

sqlConf.get(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key) match {
case "EXCEPTION" => // Good
case "CORRECTED" => // Good
case "LEGACY" => // Good, but it really is EXCEPTION for us...
case other =>
meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode")
}
}
}

Expand All @@ -164,6 +180,8 @@ case class GpuParquetPartitionReaderFactory(
private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix
private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows
private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes
private val isCorrectedRebase =
"CORRECTED" == sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ)

override def supportColumnarReads(partition: InputPartition): Boolean = true

Expand All @@ -177,8 +195,10 @@ case class GpuParquetPartitionReaderFactory(
ColumnarPartitionReaderWithPartitionValues.newReader(partitionedFile, reader, partitionSchema)
}

private def filterClippedSchema(clippedSchema: MessageType,
fileSchema: MessageType, isCaseSensitive: Boolean): MessageType = {
private def filterClippedSchema(
clippedSchema: MessageType,
fileSchema: MessageType,
isCaseSensitive: Boolean): MessageType = {
val fs = fileSchema.asGroupType()
val types = if (isCaseSensitive) {
val inFile = fs.getFields.asScala.map(_.getName).toSet
Expand All @@ -201,6 +221,24 @@ case class GpuParquetPartitionReaderFactory(
}
}

// Copied from Spark
private val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
// Copied from Spark
private val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime"

def isCorrectedRebaseMode(
lookupFileMeta: String => String,
isCorrectedModeConfig: Boolean): Boolean = {
// If there is no version, we return the mode specified by the config.
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
// Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
// rebase the datetime values.
// Files written by Spark 3.0 and latter may also need the rebase if they were written with
// the "LEGACY" rebase mode.
version >= "3.0.0" && lookupFileMeta(SPARK_LEGACY_DATETIME) == null
}.getOrElse(isCorrectedModeConfig)
}

private def buildBaseColumnarParquetReader(
file: PartitionedFile): PartitionReader[ColumnarBatch] = {
val conf = broadcastedConf.value.value
Expand All @@ -217,6 +255,9 @@ case class GpuParquetPartitionReaderFactory(
None
}

val isCorrectedRebaseForThis =
isCorrectedRebaseMode(footer.getFileMetaData.getKeyValueMetaData.get, isCorrectedRebase)

val blocks = if (pushedFilters.isDefined) {
// Use the ParquetFileReader to perform dictionary-level filtering
ParquetInputFormat.setFilterPredicate(conf, pushedFilters.get)
Expand All @@ -242,7 +283,7 @@ case class GpuParquetPartitionReaderFactory(
val clippedBlocks = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala)
new ParquetPartitionReader(conf, file, filePath, clippedBlocks, clippedSchema,
isCaseSensitive, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows,
maxReadBatchSizeBytes, metrics)
maxReadBatchSizeBytes, metrics, isCorrectedRebaseForThis)
}
}

Expand Down Expand Up @@ -274,7 +315,8 @@ class ParquetPartitionReader(
debugDumpPrefix: String,
maxReadBatchSizeRows: Integer,
maxReadBatchSizeBytes: Long,
execMetrics: Map[String, SQLMetric]) extends PartitionReader[ColumnarBatch] with Logging
execMetrics: Map[String, SQLMetric],
isCorrectedRebaseMode: Boolean) extends PartitionReader[ColumnarBatch] with Logging
with ScanWithMetrics with Arm {
private var isExhausted: Boolean = false
private var maxDeviceMemory: Long = 0
Expand Down Expand Up @@ -554,6 +596,13 @@ class ParquetPartitionReader(
GpuSemaphore.acquireIfNecessary(TaskContext.get())

val table = Table.readParquet(parseOpts, dataBuffer, 0, dataSize)
if (!isCorrectedRebaseMode) {
(0 until table.getNumberOfColumns).foreach { i =>
if (RebaseHelper.isDateTimeRebaseNeededRead(table.getColumn(i))) {
throw RebaseHelper.newRebaseExceptionInRead("Parquet")
}
}
}
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)
if (readDataSchema.length < table.getNumberOfColumns) {
table.close()
Expand Down
Loading