Skip to content

Commit

Permalink
Add procedure declaration checker since it's fully deprecated in Scal…
Browse files Browse the repository at this point in the history
…a 2.13 and will cause compilation to fail

Signed-off-by: Navin Kumar <[email protected]>
  • Loading branch information
NVnavkumar committed Sep 7, 2023
1 parent 85c0af9 commit 53a44aa
Show file tree
Hide file tree
Showing 13 changed files with 32 additions and 20 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@
<profile>
<id>scala-2.12</id>
<properties>
<scala.version>2.12.15</scala.version>
<scala.version>2.13.8</scala.version>
</properties>
<build>
<pluginManagement>
Expand All @@ -572,7 +572,7 @@
<id>scala-2.13</id>
<properties>
<scala.version>2.13.8</scala.version>
<scala.binary.version>2.13</scala.binary.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down
4 changes: 4 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ You can also disable only one rule, by specifying its rule id, as specified in:
<customMessage>procedure syntax is deprecated for constructors in Scala 2.13: add `=`, as in method definition</customMessage>
</check>

<check level="error" class="org.scalastyle.scalariform.ProcedureDeclarationChecker" enabled="true">
<customMessage>procedure syntax is deprecated in Scala 2.13: add return type `: Unit` and `=`</customMessage>
</check>

<check level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters>
<parameter name="regex">ArrayBuilder.make\[(.+)\]\(\)</parameter>
Expand Down
11 changes: 10 additions & 1 deletion sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.nvidia.spark.rapids

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.ControlThrowable

import com.nvidia.spark.rapids.RapidsPluginImplicits._
Expand Down Expand Up @@ -68,6 +68,15 @@ object Arm {
}
}

/** Executes the provided code block and then closes the list buffer of resources */
def withResource[T <: AutoCloseable, V](r: ListBuffer[T])(block: ListBuffer[T] => V): V = {
try {
block(r)
} finally {
r.safeClose()
}
}

/** Executes the provided code block and then closes the value if it is AutoCloseable */
def withResourceIfAllowed[T, V](r: T)(block: T => V): V = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ object GpuShuffledHashJoinExec extends Logging {
// The size still overflows after filtering or sub-partitioning is enabled for test.
logDebug("Return multiple batches as the build side data for the following " +
"sub-partitioning join in null-filtering mode.")
val safeIter = GpuSubPartitionHashJoin.safeIteratorFromSeq(spillBuf).map { sp =>
val safeIter = GpuSubPartitionHashJoin.safeIteratorFromSeq(spillBuf.toSeq).map { sp =>
withResource(sp)(_.getColumnarBatch())
} ++ filteredIter
Right(new CollectTimeIterator("hash join build", safeIter, buildTime))
Expand All @@ -369,7 +369,7 @@ object GpuShuffledHashJoinExec extends Logging {
spillBuf.append(
SpillableColumnarBatch(filteredIter.next(), SpillPriorities.ACTIVE_BATCHING_PRIORITY))
}
val spill = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(spillBuf)
val spill = GpuSubPartitionHashJoin.concatSpillBatchesAndClose(spillBuf.toSeq)
// There is a prior empty check so this `spill` can not be a None.
assert(spill.isDefined, "The build data iterator should not be empty.")
withResource(spill) { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf.{ColumnVector, ContiguousTable, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.RapidsPluginImplicits.{ArrayBufferAsStack, AutoCloseableProducingSeq}
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.ScalableTaskCompletion.onTaskCompletion
import com.nvidia.spark.rapids.shims.ShimUnaryExecNode
Expand Down Expand Up @@ -377,7 +377,7 @@ case class GpuOutOfCoreSortIterator(
}
}
}
(sortedCb, pendingObs)
(sortedCb, pendingObs.toSeq)
}

/** Save the splitting result returned from `splitAfterSort` into the cache */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ abstract class GpuTextBasedPartitionReader[BUFF <: LineBufferer, FACT <: LineBuf
}
columns += castColumn
}
new Table(columns: _*)
new Table(columns.toSeq: _*)
}
}

Expand Down Expand Up @@ -759,7 +759,7 @@ object GpuTextBasedDateUtils {
}
formats += fmt
}
formats
formats.toSeq
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object PlanUtils {
case other =>
other.children.flatMap(p => recurse(p, predicate, accum)).headOption
}
accum
accum.toSeq
}
recurse(exp, predicate, new ListBuffer[Expression]())
}
Expand All @@ -86,7 +86,7 @@ object PlanUtils {
case qs: ShuffleQueryStageExec => recurse(qs.shuffle, predicate, accum)
case other => other.children.flatMap(p => recurse(p, predicate, accum)).headOption
}
accum
accum.toSeq
}
recurse(plan, predicate, new ListBuffer[SparkPlan]())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2185,7 +2185,7 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
}

lazy val rapidsConfMap: util.Map[String, String] = conf.filterKeys(
_.startsWith("spark.rapids.")).asJava
_.startsWith("spark.rapids.")).toMap.asJava

lazy val metricsLevel: String = get(METRICS_LEVEL)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class RapidsShuffleClient(
}

if (ptrs.nonEmpty) {
transport.queuePending(ptrs)
transport.queuePending(ptrs.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class RapidsShuffleServer(transport: RapidsShuffleTransport,
}
}
if (bssToIssue.nonEmpty) {
doHandleTransferRequest(bssToIssue)
doHandleTransferRequest(bssToIssue.toSeq)
}
}

Expand Down Expand Up @@ -368,7 +368,7 @@ class RapidsShuffleServer(transport: RapidsShuffleTransport,

// If we are still able to handle at least one `BufferSendState`, add any
// others that also failed due back to the queue.
addToContinueQueue(toTryAgain)
addToContinueQueue(toTryAgain.toSeq)
}

serverStream.sync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class WindowedBlockIterator[T <: BlockWithSize](blocks: Seq[T], windowSize: Long
}
val lastBlock = blockRangesInWindow.last
BlocksForWindow(lastBlockIndex,
blockRangesInWindow,
blockRangesInWindow.toSeq,
!continue || !lastBlock.isComplete())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ class GpuDynamicPartitionDataSingleWriter(
// clear the caches
savedStatus.get.tableCaches.clear()

withRetryNoSplit(toConcat) { spillables: ListBuffer[SpillableColumnarBatch] =>
withRetryNoSplit(toConcat) { spillables =>
withResource(spillables.toSeq.safeMap(_.getColumnarBatch())) { batches =>
withResource(batches.map(GpuColumnVector.from)) { subTables =>
Table.concatenate(subTables: _*)
Expand Down Expand Up @@ -998,8 +998,7 @@ class GpuDynamicPartitionDataConcurrentWriter(
// get concat table or the single table
val spillableToWrite = if (status.tableCaches.length >= 2) {
// concat the sub batches to write in once.
val concatted = withRetryNoSplit(status.tableCaches) {
spillableSubBatches: ListBuffer[SpillableColumnarBatch] =>
val concatted = withRetryNoSplit(status.tableCaches) { spillableSubBatches =>
withResource(spillableSubBatches.toSeq.safeMap(_.getColumnarBatch())) { subBatches =>
withResource(subBatches.map(GpuColumnVector.from)) { subTables =>
Table.concatenate(subTables: _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
case c: CommandResultExec => recurse(c.commandPhysicalPlan, predicate, accum)
case other => other.children.flatMap(p => recurse(p, predicate, accum)).headOption
}
accum
accum.toSeq
}

recurse(plan, predicate, new ListBuffer[SparkPlan]())
Expand Down

0 comments on commit 53a44aa

Please sign in to comment.