Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums #40663

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.executor

import java.util.concurrent.CopyOnWriteArrayList

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}

Expand Down Expand Up @@ -262,10 +264,12 @@ class TaskMetrics private[spark] () extends Serializable {
/**
* External accumulators registered with this task.
*/
@transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]]
@transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]]

private[spark] def externalAccums = _externalAccums.asScala

private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
externalAccums += a
_externalAccums.add(a)
}

private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums
Expand Down Expand Up @@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging {
tmAcc.metadata = acc.metadata
tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
} else {
tm.externalAccums += acc
tm._externalAccums.add(acc)
}
}
tm
Expand Down
22 changes: 22 additions & 0 deletions core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Map}
import scala.concurrent.duration._

import com.google.common.cache.{CacheBuilder, CacheLoader}
import org.apache.logging.log4j._
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{inOrder, verify, when}
Expand Down Expand Up @@ -270,6 +271,27 @@ class ExecutorSuite extends SparkFunSuite
heartbeatZeroAccumulatorUpdateTest(false)
}

test("SPARK-39696: Using accumulators should not cause heartbeat to fail") {
val conf = new SparkConf().setMaster("local").setAppName("executor suite test")
conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms")
sc = new SparkContext(conf)

val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator$i"))
val input = sc.parallelize(1 to 10, 10)
var testRdd = input.map(i => (i, i))
(0 to 10).foreach( i =>
testRdd = testRdd.map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) }).reduceByKey(_ + _)
)

val logAppender = new LogAppender("heartbeat thread should not die")
withLogAppender(logAppender, level = Some(Level.ERROR)) {
val _ = testRdd.count()
}
val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("Uncaught exception in thread executor-heartbeater"))
assert(logs.isEmpty)
}

private def withMockHeartbeatReceiverRef(executor: Executor)
(func: RpcEndpointRef => Unit): Unit = {
val executorClass = classOf[Executor]
Expand Down