Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Databricks 13.3 shim boilerplate code and refactor Databricks 12.2 shim [databricks] #9510

Merged
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
38ac8ed
Shim boiler plate
razajafri Oct 12, 2023
2e41266
DB 13.3 changes
razajafri Oct 12, 2023
a4aac4a
Fixed Offset And Limit
razajafri Oct 18, 2023
19f18e6
reformat
razajafri Oct 21, 2023
c11f2a7
refactored Shim expression traits in 332db
razajafri Oct 22, 2023
79b8bd0
Signing off
razajafri Oct 23, 2023
9663ab8
fixed line length
razajafri Oct 23, 2023
64e32ce
removed duplicated code
razajafri Oct 24, 2023
b6a3478
refactored GpuBatchScanExec
razajafri Oct 25, 2023
51b29c8
Merge remote-tracking branch 'origin/branch-23.12' into fix-compilation
razajafri Oct 25, 2023
c9de866
upmerged
razajafri Oct 25, 2023
3299b08
fixed imports
razajafri Oct 25, 2023
d6db907
refactored HiveProviderCmdShim
razajafri Oct 25, 2023
d8be460
added the missing override
razajafri Oct 25, 2023
7cbad8a
addressed review comments
razajafri Oct 25, 2023
a97accb
removed ununsed import
razajafri Oct 25, 2023
9c7a4c1
addressed review comments
razajafri Oct 26, 2023
ed23a30
fixed build failures
razajafri Oct 26, 2023
efabf47
Update sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shi…
razajafri Oct 26, 2023
0c3efa0
Update sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shi…
razajafri Oct 26, 2023
4c26672
refactored more common code
razajafri Oct 27, 2023
5059231
resolved conflicts
razajafri Oct 27, 2023
aca9bae
fixed import order
razajafri Oct 27, 2023
1b57595
revert bad commit
razajafri Oct 27, 2023
76985bb
Moved shimExpressions to spark330db
razajafri Oct 27, 2023
6325aa0
removed left over imports from the bad commit
razajafri Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "341db"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.connector.read._

trait FilteredPartitions {
@transient protected val filteredPartitions: Seq[InputPartition]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,20 @@ spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.google.common.base.Objects
import com.nvidia.spark.rapids.{GpuBatchScanExecMetrics, GpuScan}
import com.nvidia.spark.rapids.GpuScan

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.execution.datasources.rapids.DataSourceStrategyUtils
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuBatchScanExec(
output: Seq[AttributeReference],
@transient scan: GpuScan,
runtimeFilters: Seq[Expression] = Seq.empty)
extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
@transient lazy val batch: Batch = scan.toBatch

// All expressions are filter expressions used on the CPU.
override def gpuExpressions: Seq[Expression] = Nil
extends GpuBatchScanExecBase(scan, runtimeFilters) {

// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
Expand All @@ -62,7 +53,7 @@ case class GpuBatchScanExec(

@transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions()

@transient private lazy val filteredPartitions: Seq[InputPartition] = {
@transient override protected lazy val filteredPartitions: Seq[InputPartition] = {
val dataSourceFilters = runtimeFilters.flatMap {
case DynamicPruningExpression(e) => DataSourceStrategyUtils.translateRuntimeFilter(e)
case _ => None
Expand All @@ -77,7 +68,6 @@ case class GpuBatchScanExec(

// call toBatch again to get filtered partitions
val newPartitions = scan.toBatch.planInputPartitions()

originalPartitioning match {
case p: DataSourcePartitioning if p.numPartitions != newPartitions.size =>
throw new SparkException(
Expand All @@ -94,38 +84,11 @@ case class GpuBatchScanExec(
}
}

override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()

override lazy val inputRDD: RDD[InternalRow] = {
scan.metrics = allMetrics
if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
// return an empty RDD with 1 partition if dynamic filtering removed the only split
sparkContext.parallelize(Array.empty[InternalRow], 1)
} else {
new GpuDataSourceRDD(sparkContext, filteredPartitions, readerFactory)
}
}

override def doCanonicalize(): GpuBatchScanExec = {
this.copy(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output))
}

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}"
val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString"
redact(result)
}

override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b =>
numOutputRows += b.numRows()
b
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "320"}
{"spark": "321"}
{"spark": "321cdh"}
{"spark": "321db"}
{"spark": "322"}
{"spark": "323"}
{"spark": "324"}
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "340"}
{"spark": "341"}
{"spark": "341db"}
{"spark": "350"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids.{GpuBatchScanExecMetrics, GpuScan}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.vectorized.ColumnarBatch

abstract class GpuBatchScanExecBase(
@transient scan: GpuScan,
runtimeFilters: Seq[Expression] = Seq.empty)
extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics with FilteredPartitions {

// All expressions are filter expressions used on the CPU.
override def gpuExpressions: Seq[Expression] = Nil

override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()

@transient lazy val batch: Batch = scan.toBatch

override lazy val inputRDD: RDD[InternalRow] = {
scan.metrics = allMetrics
if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
// return an empty RDD with 1 partition if dynamic filtering removed the only split
sparkContext.parallelize(Array.empty[InternalRow], 1)
} else {
new GpuDataSourceRDD(sparkContext, filteredPartitions, readerFactory)
}
}

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}"
val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString"
redact(result)
}

override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b =>
numOutputRows += b.numRows()
b
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "330"}
{"spark": "330cdh"}
{"spark": "330db"}
{"spark": "331"}
{"spark": "332"}
{"spark": "332cdh"}
{"spark": "332db"}
{"spark": "333"}
{"spark": "340"}
{"spark": "341"}
{"spark": "341db"}
{"spark": "350"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.connector.read._

trait FilteredPartitions {
@transient protected val filteredPartitions: Seq[Seq[InputPartition]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,23 @@ spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.google.common.base.Objects
import com.nvidia.spark.rapids.{GpuBatchScanExecMetrics, GpuScan}
import com.nvidia.spark.rapids.GpuScan

import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, DynamicPruningExpression, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowSet}
import org.apache.spark.sql.catalyst.plans.physical.KeyGroupedPartitioning
import org.apache.spark.sql.catalyst.util.InternalRowSet
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.execution.datasources.rapids.DataSourceStrategyUtils
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuBatchScanExec(
output: Seq[AttributeReference],
@transient scan: GpuScan,
runtimeFilters: Seq[Expression] = Seq.empty,
keyGroupedPartitioning: Option[Seq[Expression]] = None)
extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
@transient lazy val batch: Batch = scan.toBatch

// All expressions are filter expressions used on the CPU.
override def gpuExpressions: Seq[Expression] = Nil
extends GpuBatchScanExecBase(scan, runtimeFilters) {

// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
Expand All @@ -62,7 +55,7 @@ case class GpuBatchScanExec(

@transient override lazy val inputPartitions: Seq[InputPartition] = batch.planInputPartitions()

@transient private lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
@transient override protected lazy val filteredPartitions: Seq[Seq[InputPartition]] = {
val dataSourceFilters = runtimeFilters.flatMap {
case DynamicPruningExpression(e) => DataSourceStrategyUtils.translateRuntimeFilter(e)
case _ => None
Expand Down Expand Up @@ -114,38 +107,11 @@ case class GpuBatchScanExec(
}
}

override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()

override lazy val inputRDD: RDD[InternalRow] = {
scan.metrics = allMetrics
if (filteredPartitions.isEmpty && outputPartitioning == SinglePartition) {
// return an empty RDD with 1 partition if dynamic filtering removed the only split
sparkContext.parallelize(Array.empty[InternalRow], 1)
} else {
new GpuDataSourceRDD(sparkContext, filteredPartitions, readerFactory)
}
}

override def doCanonicalize(): GpuBatchScanExec = {
this.copy(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
output))
}

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}"
val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString"
redact(result)
}

override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { b =>
numOutputRows += b.numRows()
b
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._
import org.apache.parquet.schema.MessageType

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.rapids.shims.{GpuDivideYMInterval, GpuMultiplyYMInterval}
import org.apache.spark.sql.types.StructType

Expand All @@ -53,22 +51,6 @@ trait Spark330PlusShims extends Spark321PlusShims with Spark320PlusNonDBShims {
new FileScanRDD(sparkSession, readFunction, filePartitions, readDataSchema, metadataColumns)
}

override def getParquetFilters(
schema: MessageType,
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int,
caseSensitive: Boolean,
lookupFileMeta: String => String,
dateTimeRebaseModeFromConf: String): ParquetFilters = {
val datetimeRebaseMode = DataSourceUtils
.datetimeRebaseSpec(lookupFileMeta, dateTimeRebaseModeFromConf)
new ParquetFilters(schema, pushDownDate, pushDownTimestamp, pushDownDecimal, pushDownStartWith,
pushDownInFilterThreshold, caseSensitive, datetimeRebaseMode)
}

// GPU support ANSI interval types from 330
override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = {
val map: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
/*** spark-rapids-shim-json-lines
{"spark": "330db"}
{"spark": "332db"}
{"spark": "341db"}
spark-rapids-shim-json-lines ***/
package com.nvidia.spark.rapids.shims

Expand Down
Loading