From 53a44aa661a3f95b0045cd780363017471b196cc Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 6 Sep 2023 17:03:32 -0700 Subject: [PATCH] Add procedure declaration checker since it's fully deprecated in Scala 2.13 and will cause compilation to fail Signed-off-by: Navin Kumar --- pom.xml | 4 ++-- scalastyle-config.xml | 4 ++++ .../src/main/scala/com/nvidia/spark/rapids/Arm.scala | 11 ++++++++++- .../nvidia/spark/rapids/GpuShuffledHashJoinExec.scala | 4 ++-- .../scala/com/nvidia/spark/rapids/GpuSortExec.scala | 4 ++-- .../spark/rapids/GpuTextBasedPartitionReader.scala | 4 ++-- .../scala/com/nvidia/spark/rapids/PlanUtils.scala | 4 ++-- .../scala/com/nvidia/spark/rapids/RapidsConf.scala | 2 +- .../spark/rapids/shuffle/RapidsShuffleClient.scala | 2 +- .../spark/rapids/shuffle/RapidsShuffleServer.scala | 4 ++-- .../spark/rapids/shuffle/WindowedBlockIterator.scala | 2 +- .../spark/sql/rapids/GpuFileFormatDataWriter.scala | 5 ++--- .../nvidia/spark/rapids/shims/Spark320PlusShims.scala | 2 +- 13 files changed, 32 insertions(+), 20 deletions(-) diff --git a/pom.xml b/pom.xml index 3fb13fb26341..0f927b5581e6 100644 --- a/pom.xml +++ b/pom.xml @@ -559,7 +559,7 @@ scala-2.12 - 2.12.15 + 2.13.8 @@ -572,7 +572,7 @@ scala-2.13 2.13.8 - 2.13 + 2.12 diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 6f420c9f3c2d..2007e6760cc9 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -129,6 +129,10 @@ You can also disable only one rule, by specifying its rule id, as specified in: procedure syntax is deprecated for constructors in Scala 2.13: add `=`, as in method definition + + procedure syntax is deprecated in Scala 2.13: add return type `: Unit` and `=` + + ArrayBuilder.make\[(.+)\]\(\) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala index 6fbee2d7e92b..740b1314a63c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala @@ -15,7 +15,7 @@ */ package com.nvidia.spark.rapids -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.control.ControlThrowable import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -68,6 +68,15 @@ object Arm { } } + /** Executes the provided code block and then closes the list buffer of resources */ + def withResource[T <: AutoCloseable, V](r: ListBuffer[T])(block: ListBuffer[T] => V): V = { + try { + block(r) + } finally { + r.safeClose() + } + } + /** Executes the provided code block and then closes the value if it is AutoCloseable */ def withResourceIfAllowed[T, V](r: T)(block: T => V): V = { try { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala index 1222422393b3..7ad585590879 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala @@ -357,7 +357,7 @@ object GpuShuffledHashJoinExec extends Logging { // The size still overflows after filtering or sub-partitioning is enabled for test. logDebug("Return multiple batches as the build side data for the following " + "sub-partitioning join in null-filtering mode.") - val safeIter = GpuSubPartitionHashJoin.safeIteratorFromSeq(spillBuf).map { sp => + val safeIter = GpuSubPartitionHashJoin.safeIteratorFromSeq(spillBuf.toSeq).map { sp => withResource(sp)(_.getColumnarBatch()) } ++ filteredIter Right(new CollectTimeIterator("hash join build", safeIter, buildTime)) @@ -369,7 +369,7 @@ object GpuShuffledHashJoinExec extends Logging { spillBuf.append( SpillableColumnarBatch(filteredIter.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY)) } - val spill = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(spillBuf) + val spill = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(spillBuf.toSeq) // There is a prior empty check so this `spill` can not be a None. assert(spill.isDefined, "The build data iterator should not be empty.") withResource(spill) { _ => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala index e05c0711745e..2bd4bdb2c8b6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuSortExec.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, NvtxRange, Table} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric._ -import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq +import com.nvidia.spark.rapids.RapidsPluginImplicits.{ArrayBufferAsStack, AutoCloseableProducingSeq} import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion import com.nvidia.spark.rapids.shims.ShimUnaryExecNode @@ -377,7 +377,7 @@ case class GpuOutOfCoreSortIterator( } } } - (sortedCb, pendingObs) + (sortedCb, pendingObs.toSeq) } /** Save the splitting result returned from `splitAfterSort` into the cache */ diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala index 73fb0bd79413..09da238459d9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTextBasedPartitionReader.scala @@ -361,7 +361,7 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf } columns += castColumn } - new Table(columns: _*) + new Table(columns.toSeq: _*) } } @@ -759,7 +759,7 @@ object GpuTextBasedDateUtils { } formats += fmt } - formats + formats.toSeq } } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PlanUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PlanUtils.scala index d09e3a2bbae0..3ea1f03c6334 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PlanUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/PlanUtils.scala @@ -63,7 +63,7 @@ object PlanUtils { case other => other.children.flatMap(p => recurse(p, predicate, accum)).headOption } - accum + accum.toSeq } recurse(exp, predicate, new ListBuffer[Expression]()) } @@ -86,7 +86,7 @@ object PlanUtils { case qs: ShuffleQueryStageExec => recurse(qs.shuffle, predicate, accum) case other => other.children.flatMap(p => recurse(p, predicate, accum)).headOption } - accum + accum.toSeq } recurse(plan, predicate, new ListBuffer[SparkPlan]()) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 9fe3793cffef..db846f8ead49 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -2185,7 +2185,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging { } lazy val rapidsConfMap: util.Map[String, String] = conf.filterKeys( - _.startsWith("spark.rapids.")).asJava + _.startsWith("spark.rapids.")).toMap.asJava lazy val metricsLevel: String = get(METRICS_LEVEL) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClient.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClient.scala index 9aecedf064f6..0e6e27bef40f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClient.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleClient.scala @@ -345,7 +345,7 @@ class RapidsShuffleClient( } if (ptrs.nonEmpty) { - transport.queuePending(ptrs) + transport.queuePending(ptrs.toSeq) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServer.scala index 13629f0b81ad..e40ce552f364 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleServer.scala @@ -201,7 +201,7 @@ class RapidsShuffleServer(transport: RapidsShuffleTransport, } } if (bssToIssue.nonEmpty) { - doHandleTransferRequest(bssToIssue) + doHandleTransferRequest(bssToIssue.toSeq) } } @@ -368,7 +368,7 @@ class RapidsShuffleServer(transport: RapidsShuffleTransport, // If we are still able to handle at least one `BufferSendState`, add any // others that also failed due back to the queue. - addToContinueQueue(toTryAgain) + addToContinueQueue(toTryAgain.toSeq) } serverStream.sync() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala index fe21a2006433..2fdce9862ada 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/WindowedBlockIterator.scala @@ -154,7 +154,7 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long } val lastBlock = blockRangesInWindow.last BlocksForWindow(lastBlockIndex, - blockRangesInWindow, + blockRangesInWindow.toSeq, !continue || !lastBlock.isComplete()) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index ae0bb9a6275f..c28c6fb717a1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -562,7 +562,7 @@ class GpuDynamicPartitionDataSingleWriter( // clear the caches savedStatus.get.tableCaches.clear() - withRetryNoSplit(toConcat) { spillables: ListBuffer[SpillableColumnarBatch] => + withRetryNoSplit(toConcat) { spillables => withResource(spillables.toSeq.safeMap(_.getColumnarBatch())) { batches => withResource(batches.map(GpuColumnVector.from)) { subTables => Table.concatenate(subTables: _*) @@ -998,8 +998,7 @@ class GpuDynamicPartitionDataConcurrentWriter( // get concat table or the single table val spillableToWrite = if (status.tableCaches.length >= 2) { // concat the sub batches to write in once. - val concatted = withRetryNoSplit(status.tableCaches) { - spillableSubBatches: ListBuffer[SpillableColumnarBatch] => + val concatted = withRetryNoSplit(status.tableCaches) { spillableSubBatches => withResource(spillableSubBatches.toSeq.safeMap(_.getColumnarBatch())) { subBatches => withResource(subBatches.map(GpuColumnVector.from)) { subTables => Table.concatenate(subTables: _*) diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala index 05fc736c7334..89280b50bf42 100644 --- a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala +++ b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/Spark320PlusShims.scala @@ -316,7 +316,7 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging { case c: CommandResultExec => recurse(c.commandPhysicalPlan, predicate, accum) case other => other.children.flatMap(p => recurse(p, predicate, accum)).headOption } - accum + accum.toSeq } recurse(plan, predicate, new ListBuffer[SparkPlan]())