From b8a400c9c2a4c11c152e2472dcb07b7f62bde691 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Tue, 4 Apr 2023 15:06:35 +0200 Subject: [PATCH 1/5] SPARK-39696 Add spec that shows that race exists for any code using accumulators --- .../apache/spark/executor/ExecutorSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index bef36d08e8aee..2655a2dd9dc1f 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -30,6 +30,7 @@ import scala.collection.mutable.{ArrayBuffer, Map} import scala.concurrent.duration._ import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.logging.log4j._ import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.{any, eq => meq} import org.mockito.Mockito.{inOrder, verify, when} @@ -44,6 +45,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Network import org.apache.spark.internal.config.UI._ import org.apache.spark.memory.{SparkOutOfMemoryError, TestMemoryManager} import org.apache.spark.metrics.MetricsSystem @@ -270,6 +272,30 @@ class ExecutorSuite extends SparkFunSuite heartbeatZeroAccumulatorUpdateTest(false) } + test("SPARK-39696: Using accumulators should not cause heartbeat to fail") { + val conf = new SparkConf().setMaster("local").setAppName("executor suite test") + conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms") + conf.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "500ms") + conf.set(Network.NETWORK_TIMEOUT_INTERVAL.key, "400ms") + sc = new SparkContext(conf) + + val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator${i}")) + val input = sc.parallelize(1 to 10, 10) + val baseRdd = input.map(i => (i, i)) + var testRdd = baseRdd + (0 to 10).foreach( i => + testRdd = testRdd.map(x => {accums.map(_.add(1)); (x._1 * i, x._2)}).reduceByKey(_ + _) + ) + + val logAppender = new LogAppender(s"heartbeat thread shuold not die") + withLogAppender(logAppender, level = Some(Level.ERROR)) { + val _ = testRdd.count() + } + val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + .filter(_.contains("Uncaught exception in thread executor-heartbeater")) + assert(logs.isEmpty) + } + private def withMockHeartbeatReceiverRef(executor: Executor) (func: RpcEndpointRef => Unit): Unit = { val executorClass = classOf[Executor] From d5b662096f25ce15fb11b6efda2e8b863333d5ae Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 18 Jul 2022 13:01:54 +0800 Subject: [PATCH 2/5] try change to use CopyOnWriteArrayList --- .../org/apache/spark/executor/TaskMetrics.scala | 14 +++++++++----- .../scala/org/apache/spark/scheduler/Task.scala | 2 +- .../sql/execution/ui/SQLAppStatusListener.scala | 2 +- .../sql/execution/metric/SQLMetricsTestUtils.scala | 2 +- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1ca8590b1c90c..3d63f5e898d65 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import java.util.concurrent.CopyOnWriteArrayList + import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} @@ -206,7 +208,6 @@ class TaskMetrics private[spark] () extends Serializable { // Only used for test private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator) - import InternalAccumulator._ @transient private[spark] lazy val nameToAccums = LinkedHashMap( EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime, @@ -262,13 +263,16 @@ class TaskMetrics private[spark] () extends Serializable { /** * External accumulators registered with this task. */ - @transient private[spark] lazy val externalAccums = new ArrayBuffer[AccumulatorV2[_, _]] + @transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]] + + private[spark] def externalAccums() = _externalAccums.asScala private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { - externalAccums += a + _externalAccums.add(a) } - private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums + private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = + internalAccums ++ _externalAccums.asScala private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = { // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its @@ -331,7 +335,7 @@ private[spark] object TaskMetrics extends Logging { tmAcc.metadata = acc.metadata tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]]) } else { - tm.externalAccums += acc + tm._externalAccums.add(acc) } } tm diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 001e3220e73b2..dbacdc6d72f0b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -208,7 +208,7 @@ private[spark] abstract class Task[T]( context.taskMetrics.nonZeroInternalAccums() ++ // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not // filter them out. - context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues) + context.taskMetrics.externalAccums().filter(a => !taskFailed || a.countFailedValues) } else { Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 7b9f877bdef5a..02e1c9cd00aac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -178,7 +178,7 @@ class SQLAppStatusListener( // work around a race in the DAGScheduler. The metrics info does not contain accumulator info // when reading event logs in the SHS, so we have to rely on the accumulator in that case. val accums = if (live && event.taskMetrics != null) { - event.taskMetrics.externalAccums.flatMap { a => + event.taskMetrics.externalAccums().flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { Some(a.toInfo(Some(a.value), None)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 81667d52e16ae..81be8d97572b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -311,7 +311,7 @@ object InputOutputMetricsHelper { res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead var maxOutputRows = 0L - for (accum <- taskEnd.taskMetrics.externalAccums) { + for (accum <- taskEnd.taskMetrics.externalAccums()) { val info = accum.toInfo(Some(accum.value), None) if (info.name.toString.contains("number of output rows")) { info.update match { From a0c7646c57663c92f11fb4d023745525f7712be8 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 6 Apr 2023 08:03:16 +0200 Subject: [PATCH 3/5] PR comments And remove unneeded config in spec --- .../main/scala/org/apache/spark/executor/TaskMetrics.scala | 5 ++--- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 2 +- .../test/scala/org/apache/spark/executor/ExecutorSuite.scala | 2 -- .../apache/spark/sql/execution/ui/SQLAppStatusListener.scala | 2 +- .../spark/sql/execution/metric/SQLMetricsTestUtils.scala | 2 +- 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 3d63f5e898d65..1f9cb0f755a58 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -265,14 +265,13 @@ class TaskMetrics private[spark] () extends Serializable { */ @transient private[spark] lazy val _externalAccums = new CopyOnWriteArrayList[AccumulatorV2[_, _]] - private[spark] def externalAccums() = _externalAccums.asScala + private[spark] def externalAccums = _externalAccums.asScala private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = { _externalAccums.add(a) } - private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = - internalAccums ++ _externalAccums.asScala + private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = { // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index dbacdc6d72f0b..001e3220e73b2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -208,7 +208,7 @@ private[spark] abstract class Task[T]( context.taskMetrics.nonZeroInternalAccums() ++ // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not // filter them out. - context.taskMetrics.externalAccums().filter(a => !taskFailed || a.countFailedValues) + context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues) } else { Seq.empty } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 2655a2dd9dc1f..745720a54af36 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -275,8 +275,6 @@ class ExecutorSuite extends SparkFunSuite test("SPARK-39696: Using accumulators should not cause heartbeat to fail") { val conf = new SparkConf().setMaster("local").setAppName("executor suite test") conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms") - conf.set(STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT.key, "500ms") - conf.set(Network.NETWORK_TIMEOUT_INTERVAL.key, "400ms") sc = new SparkContext(conf) val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator${i}")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 02e1c9cd00aac..7b9f877bdef5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -178,7 +178,7 @@ class SQLAppStatusListener( // work around a race in the DAGScheduler. The metrics info does not contain accumulator info // when reading event logs in the SHS, so we have to rely on the accumulator in that case. val accums = if (live && event.taskMetrics != null) { - event.taskMetrics.externalAccums().flatMap { a => + event.taskMetrics.externalAccums.flatMap { a => // This call may fail if the accumulator is gc'ed, so account for that. try { Some(a.toInfo(Some(a.value), None)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 81be8d97572b1..81667d52e16ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -311,7 +311,7 @@ object InputOutputMetricsHelper { res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead var maxOutputRows = 0L - for (accum <- taskEnd.taskMetrics.externalAccums()) { + for (accum <- taskEnd.taskMetrics.externalAccums) { val info = accum.toInfo(Some(accum.value), None) if (info.name.toString.contains("number of output rows")) { info.update match { From 73de1fb70f3fe7d1a0df159fc9d557e267af4ab0 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 6 Apr 2023 11:02:41 +0200 Subject: [PATCH 4/5] More cleanup --- .../org/apache/spark/executor/TaskMetrics.scala | 1 + .../org/apache/spark/executor/ExecutorSuite.scala | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 1f9cb0f755a58..78b39b0cbda68 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -208,6 +208,7 @@ class TaskMetrics private[spark] () extends Serializable { // Only used for test private[spark] val testAccum = sys.props.get(IS_TESTING.key).map(_ => new LongAccumulator) + import InternalAccumulator._ @transient private[spark] lazy val nameToAccums = LinkedHashMap( EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime, diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 745720a54af36..834d43aa611d7 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -45,7 +45,6 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.config._ -import org.apache.spark.internal.config.Network import org.apache.spark.internal.config.UI._ import org.apache.spark.memory.{SparkOutOfMemoryError, TestMemoryManager} import org.apache.spark.metrics.MetricsSystem @@ -277,15 +276,16 @@ class ExecutorSuite extends SparkFunSuite conf.set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1ms") sc = new SparkContext(conf) - val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator${i}")) + val accums = (1 to 10).map(i => sc.longAccumulator(s"mapperRunAccumulator$i")) val input = sc.parallelize(1 to 10, 10) - val baseRdd = input.map(i => (i, i)) - var testRdd = baseRdd + var testRdd = input.map(i => (i, i)) (0 to 10).foreach( i => - testRdd = testRdd.map(x => {accums.map(_.add(1)); (x._1 * i, x._2)}).reduceByKey(_ + _) + testRdd = testRdd + .map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) }) + .reduceByKey(_ + _) ) - val logAppender = new LogAppender(s"heartbeat thread shuold not die") + val logAppender = new LogAppender(s"heartbeat thread should not die") withLogAppender(logAppender, level = Some(Level.ERROR)) { val _ = testRdd.count() } From 6ba6589f08a24c66af491575b44ac6c88fdc21a8 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Thu, 6 Apr 2023 13:26:52 +0200 Subject: [PATCH 5/5] Fix indentation --- .../scala/org/apache/spark/executor/ExecutorSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 834d43aa611d7..46f41195ebd87 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -280,17 +280,15 @@ class ExecutorSuite extends SparkFunSuite val input = sc.parallelize(1 to 10, 10) var testRdd = input.map(i => (i, i)) (0 to 10).foreach( i => - testRdd = testRdd - .map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) }) - .reduceByKey(_ + _) + testRdd = testRdd.map(x => { accums.foreach(_.add(1)); (x._1 * i, x._2) }).reduceByKey(_ + _) ) - val logAppender = new LogAppender(s"heartbeat thread should not die") + val logAppender = new LogAppender("heartbeat thread should not die") withLogAppender(logAppender, level = Some(Level.ERROR)) { val _ = testRdd.count() } val logs = logAppender.loggingEvents.map(_.getMessage.getFormattedMessage) - .filter(_.contains("Uncaught exception in thread executor-heartbeater")) + .filter(_.contains("Uncaught exception in thread executor-heartbeater")) assert(logs.isEmpty) }