Skip to content

Commit

Permalink
[SPARK-20977][CORE] NPE in CollectionAccumulator
Browse files Browse the repository at this point in the history
This PR is a fix for the JLS 17.5.3 violation identified in
Shixiong Zhu's 19/Feb/19 11:47 comment on the JIRA.
- avoid using a final field to hold the state of the collection
accumulator
  • Loading branch information
gerashegalov committed Feb 10, 2021
1 parent 777d51e commit c9918ab
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.util

import java.{lang => jl}
import java.io.ObjectInputStream
import java.util.{ArrayList, Collections}
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

Expand Down Expand Up @@ -449,39 +449,46 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
* @since 2.0.0
*/
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
private val _list: java.util.List[T] = Collections.synchronizedList(new ArrayList[T]())
private var _list: java.util.List[T] = _

private def getOrCreate = {
_list = Option(_list).getOrElse(new java.util.ArrayList[T]())
_list
}

/**
* Returns false if this accumulator instance has any values in it.
*/
override def isZero: Boolean = _list.isEmpty
override def isZero: Boolean = this.synchronized(getOrCreate.isEmpty)

override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator

override def copy(): CollectionAccumulator[T] = {
val newAcc = new CollectionAccumulator[T]
_list.synchronized {
newAcc._list.addAll(_list)
this.synchronized {
newAcc.getOrCreate.addAll(getOrCreate)
}
newAcc
}

override def reset(): Unit = _list.clear()
override def reset(): Unit = this.synchronized {
_list = null
}

override def add(v: T): Unit = _list.add(v)
override def add(v: T): Unit = this.synchronized(getOrCreate.add(v))

override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
case o: CollectionAccumulator[T] => _list.addAll(o.value)
case o: CollectionAccumulator[T] => this.synchronized(getOrCreate.addAll(o.value))
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: java.util.List[T] = _list.synchronized {
java.util.Collections.unmodifiableList(new ArrayList[T](_list))
override def value: java.util.List[T] = this.synchronized {
java.util.Collections.unmodifiableList(new ArrayList[T](getOrCreate))
}

private[spark] def setValue(newValue: java.util.List[T]): Unit = {
_list.clear()
_list.addAll(newValue)
private[spark] def setValue(newValue: java.util.List[T]): Unit = this.synchronized {
_list = null
getOrCreate.addAll(newValue)
}
}

0 comments on commit c9918ab

Please sign in to comment.