diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index d5b3ce36e742a..1453840b834f2 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import java.{lang => jl} import java.io.ObjectInputStream -import java.util.{ArrayList, Collections} +import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong @@ -449,39 +449,46 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { * @since 2.0.0 */ class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { - private val _list: java.util.List[T] = Collections.synchronizedList(new ArrayList[T]()) + private var _list: java.util.List[T] = _ + + private def getOrCreate = { + _list = Option(_list).getOrElse(new java.util.ArrayList[T]()) + _list + } /** * Returns false if this accumulator instance has any values in it. */ - override def isZero: Boolean = _list.isEmpty + override def isZero: Boolean = this.synchronized(getOrCreate.isEmpty) override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator override def copy(): CollectionAccumulator[T] = { val newAcc = new CollectionAccumulator[T] - _list.synchronized { - newAcc._list.addAll(_list) + this.synchronized { + newAcc.getOrCreate.addAll(getOrCreate) } newAcc } - override def reset(): Unit = _list.clear() + override def reset(): Unit = this.synchronized { + _list = null + } - override def add(v: T): Unit = _list.add(v) + override def add(v: T): Unit = this.synchronized(getOrCreate.add(v)) override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { - case o: CollectionAccumulator[T] => _list.addAll(o.value) + case o: CollectionAccumulator[T] => this.synchronized(getOrCreate.addAll(o.value)) case _ => throw new UnsupportedOperationException( s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def value: java.util.List[T] = _list.synchronized { - java.util.Collections.unmodifiableList(new ArrayList[T](_list)) + override def value: java.util.List[T] = this.synchronized { + java.util.Collections.unmodifiableList(new ArrayList[T](getOrCreate)) } - private[spark] def setValue(newValue: java.util.List[T]): Unit = { - _list.clear() - _list.addAll(newValue) + private[spark] def setValue(newValue: java.util.List[T]): Unit = this.synchronized { + _list = null + getOrCreate.addAll(newValue) } }