Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39696][CORE] Ensure Concurrent r/w TaskMetrics not throw Exception #37206

Closed
wants to merge 6 commits into from

Conversation

LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Jul 16, 2022

What changes were proposed in this pull request?

This PR changes the declaration type of TaskMetrics#externalAccums from s.c.mutable.ArrayBuffer to j.u.concurrent.CopyOnWriteArrayList to ensure that errors described in SPARK-39696(java.util.ConcurrentModificationException: mutation occurred during iteration) will not occur when TaskMetrics#externalAccums is read and written concurrently

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Pass GitHub Actions and add a new test case.

The new case will fail regardless of the Scala version without changes of TaskMetrics as follows:

Scala 2.12:

mvn clean test -pl core -am -Dtest=none -DwildcardSuites=org.apache.spark.executor.TaskMetricsSuite 

Before

TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception *** FAILED ***
  2 did not equal 0 (TaskMetricsSuite.scala:274)
Run completed in 6 seconds, 980 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 7, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

After

TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception
Run completed in 7 seconds, 516 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

Scala 2.13

mvn clean test -pl core -am -Dtest=none -DwildcardSuites=org.apache.spark.executor.TaskMetricsSuite  -Pscala-2.13

Before

TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception *** FAILED ***
  1339 did not equal 0 (TaskMetricsSuite.scala:275)
Run completed in 6 seconds, 714 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 7, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

After

Discovery starting.
Discovery completed in 6 seconds, 369 milliseconds.
Run starting. Expected test count is: 8
TaskMetricsSuite:
- mutating values
- mutating shuffle read metrics values
- mutating shuffle write metrics values
- mutating input metrics values
- mutating output metrics values
- merging multiple shuffle read metrics
- additional accumulables
- SPARK-39696: Concurrent r/w of TaskMetrics should not throw Exception
Run completed in 8 seconds, 434 milliseconds.
Total number of tests run: 8
Suites: completed 2, aborted 0
Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

@github-actions github-actions bot added the CORE label Jul 16, 2022
@LuciferYang
Copy link
Contributor Author

The exceptions reported in SPARK-39696 are as follows:

2022-06-21 18:17:49.289Z ERROR [executor-heartbeater] org.apache.spark.util.Utils - Uncaught exception in thread executor-heartbeater
java.util.ConcurrentModificationException: mutation occurred during iteration
        at scala.collection.mutable.MutationTracker$.checkMutations(MutationTracker.scala:43) ~[scala-library-2.13.8.jar:?]
        at scala.collection.mutable.CheckedIndexedSeqView$CheckedIterator.hasNext(CheckedIndexedSeqView.scala:47) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:873) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:869) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.VectorStatics$.append1IfSpace(Vector.scala:1959) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector1.appendedAll0(Vector.scala:425) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:203) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:113) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat$(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractSeq.concat(Seq.scala:1161) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus$(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.$plus$plus(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.TaskMetrics.accumulators(TaskMetrics.scala:261) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?] 

It seems to be a small probability event

}
}

val writeThread1 = new Thread() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two write threads are used to easily reproduce the issue

@mridulm
Copy link
Contributor

mridulm commented Jul 18, 2022

Have not looked in detail, but there are a bunch of other places where externalAccums is directly used from - are they also susceptible to these issues ? If yes, we should look at handling them as well ?

@LuciferYang
Copy link
Contributor Author

Have not looked in detail, but there are a bunch of other places where externalAccums is directly used from - are they also susceptible to these issues ? If yes, we should look at handling them as well ?

You're right. I need to rethink this problem, let me set this PR to draft first, thanks

@LuciferYang LuciferYang marked this pull request as draft July 18, 2022 04:00
@stephenmcmullan
Copy link

stephenmcmullan commented Jul 18, 2022

It seems to be a small probability event

I'm finding the issue to be quite easy to reproduce. I have a 2 node Spark cluster with 3 executors per node (5 CPU/threads per executor). Since Friday I have ~150 DEAD executor instances due to this issue. Thanks for taking on the ticket.

@LuciferYang
Copy link
Contributor Author

It seems to be a small probability event

@smcmullan-ncirl I wonder if there will be such a high frequency of failures when using Scala 2.12?

@stephenmcmullan
Copy link

stephenmcmullan commented Jul 18, 2022

It seems to be a small probability event

@smcmullan-ncirl I wonder if there will be such a high frequency of failures when using Scala 2.12?

I'm a novice with the Spark source code base and the Scala source code base but it looks to me that the scala.collection.mutable.MutationTracker class and scala.collection.mutable.CheckedIndexedSeqView class only exists in Scala 2.13 so I'm guessing its a case that with Java 17 JVM and Scala 2.13 that I'm running on a most restrictive runtime platform compared to the majority of Spark users

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Jul 18, 2022

Yes. When running new test suite, ConcurrentModificationException only occurs when using Scala 2.13. IndexOutOfBoundsException or NPE may occur when using Scala 2.12, but I did not encounter it in the production environment when using Scala 2.12

@LuciferYang LuciferYang changed the title [SPARK-39696][CORE] Ensure Concurrent r/w TaskMetrics not throw Exception [WIP][SPARK-39696][CORE] Ensure Concurrent r/w TaskMetrics not throw Exception Jul 19, 2022
@LuciferYang
Copy link
Contributor Author

LuciferYang commented Jul 19, 2022

@mridulm Compared with analyzing each scenario and using read-write locks, I think it may be simpler to change externalAccums to use a thread-safe data structure, for example CopyOnWriteArrayList. Do you have any better suggestions?

@mridulm
Copy link
Contributor

mridulm commented Jul 20, 2022

I agree, given the usecase, CopyOnWriteArrayList might be a good option LuciferYang.
+CC @JoshRosen - in case you have thoughts on this.

@LuciferYang LuciferYang marked this pull request as ready for review July 20, 2022 05:25
@LuciferYang LuciferYang changed the title [WIP][SPARK-39696][CORE] Ensure Concurrent r/w TaskMetrics not throw Exception [SPARK-39696][CORE] Ensure Concurrent r/w TaskMetrics not throw Exception Jul 20, 2022
@JoshRosen
Copy link
Contributor

@mridulm, thanks for the ping.

CopyOnWriteArrayList supports single-threaded writes and concurrent reads.

It looks like externalAccums is only written from registerAccumulator, which is only called from TaskContext.registerAccumulator, which is only called during AccumulatorV2 deserialization, which should only occur when deserializing the task binary at the beginning of Task execution.

In the SPARK-39696 JIRA, it looks like the read is occurring in the executor's reportHeartBeat function at

taskRunner.task.metrics.accumulators().filterNot(_.isZero)

As far as I know, taskRunner.task will be null initially and will be populated once the task is deserialized at

task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)

Assuming my above description is correct, I don't understand how concurrent reads and writes can occur: in normal operation I would expect that writes only occur during task binary deserialization and that reads from the heartbeat thread can only occur after deserialization has completed.

Perhaps accumulators are being deserialized and registered while the task is running (not just during task setup)? For example, maybe a case class is defined on the driver and its closure accidentally closes over something that contains an AccumulatorV2, causing us to deserialize AccumulatorV2 instances when deserializing data values. In the simple case, we'd have one writer thread (the task's main runner thread) racing with the heartbeat thread. In more complex cases, I can imagine scenarios where a single task could consist of multiple threads (e.g. for PySpark) and deserialization might happen in multiple of them in case of spill (e.g. during RDD aggregation). If this is the case then it might explain the race condition. If that's true, though, then I'm wondering whether we're registering different instances of the same accumulator multiple times in the same task (which seems like it would be a performance and correctness bug).

@LuciferYang
Copy link
Contributor Author

@JoshRosen

Yes, your analysis is very accurate. From the current stack, I can only infer that the following two methods may have racing (but I haven't found any conclusive evidence), so I added read-write locks to these two methods in the initial pr.

private[spark] def registerAccumulator(a: AccumulatorV2[_, _]): Unit = {
externalAccums += a
}
private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums

I didn't expect there would be correctness bug here, if this is really possible, I think the current clues are not enough to troubleshoot the problem.

@smcmullan-ncirl Cloud you provide more details to further investigate the issue? I want to know what the writing thread is?It seems that only you can stably reproduce this problem now :)

@mridulm
Copy link
Contributor

mridulm commented Jul 26, 2022

Thanks for the detailed analysis @JoshRosen - I agree with your analysis.
I saw two cases where this could be happening -

  • test code or user code calling spark-internal api,
  • Some nontrivial code flow, where either deserialization is happening lazily (and I would love to see a reproducible testcase for this), or some other nontrivial interaction - like the ones you detailed.

Agree, we should get more details on this before changing the type/other fixes.

@LuciferYang
Copy link
Contributor Author

Thanks for the detailed analysis @JoshRosen - I agree with your analysis. I saw two cases where this could be happening -

  • test code or user code calling spark-internal api,
  • Some nontrivial code flow, where either deserialization is happening lazily (and I would love to see a reproducible testcase for this), or some other nontrivial interaction - like the ones you detailed.

Agree, we should get more details on this before changing the type/other fixes.

Agree +1, we need more details

@stephenmcmullan
Copy link

Thanks for all the analysis and effort. I think what @JoshRosen is describing in the last paragraph of his analysis is exactly what my application is doing.

I have a custom ForeachWriter implementation class acting as a data sink on the write stream and I'm passing an instance of a statistics gathering class as an argument to the ForeachWriterImpl constructor. This statistics class has several Maps which have keys as statistic name and the value is a Spark accumulator.

I think I have some defects in my application where the de-serialization of this statistics class instance on the executor is re-registering the accumulator in the Spark context as described in the analysis above.

I will try to reorganize my code and see if I can stop the issue happening. If you need to see more details I can probably code up a sample application to show the construction of my production application

@LuciferYang
Copy link
Contributor Author

@smcmullan-ncirl Any new progress?

@stephenmcmullan
Copy link

Hi, yes and no. I produced this toy example of what I thought my production code is doing: https://github.com/smcmullan-ncirl/RateSourceSparkStreamTest

However it seems to work fine and has not reproduced the error or caused any executors to crash. Maybe the code will give some clues as to whether I'm using some bad practice which exposes the issue in the production code.

I'm still trying to analyze the production code to see where there may be differences with the toy application. I'll report back soon if I find any or if I'm able to extend the toy application to reproduce the error

@stephenmcmullan
Copy link

stephenmcmullan commented Aug 8, 2022

Hi, so I was able to reproduce the issue with the example application below.

It shows an custom ForEach sink which updates a set of Accumulators. The Map containing the accumulators is passed in the call to the Foreach sink and thus serialiazed/deserialized

I've integrated Codahale/DropWizard metrics for reporting metrics based on the accumulators and enabled the metrics sink like this:

driver.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
*.sink.console.period=5
*.sink.console.unit=seconds

You may notice in the example application code below that that overridden method for getMetrics() resets the accumulator by calling its reset() method.

I wonder whether this causes the situation that @JoshRosen wrote about above?

import com.codahale.metrics.{Gauge, Metric, MetricRegistry, MetricSet}
import org.apache.spark.metrics.source.StatsSource
import org.apache.spark.sql.{ForeachWriter, SparkSession}
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.slf4j.LoggerFactory

import java.util

object RateSourceSparkStreamTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("RateSourceStreamTest")

    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()

    val stats = new PartitionStats(spark.sparkContext)
    val statsMap = stats.getStatsMap

    val metricRegistry = new MetricRegistry
    metricRegistry.registerAll(stats)

    SparkEnv.get.metricsSystem.registerSource(
      StatsSource("RateSourceSparkStreamTest", metricRegistry)
    )

    import spark.implicits._

    spark
      .readStream
      .format("rate")
      .option("numPartitions", 120)
      .option("rowsPerSecond", 12000)
      .load()
      .map(row => row.mkString("##"))
      .writeStream
      .foreach(new ForeachWriterImpl(statsMap))
      .start()

    spark.streams.awaitAnyTermination()

    spark.close
  }
}

class ForeachWriterImpl(statsMap: Map[PartitionStats.Value, LongAccumulator]) extends ForeachWriter[String] {
  private final val LOGGER = LoggerFactory.getLogger(this.getClass)

  override def open(partitionId: Long, epochId: Long): Boolean = {
    LOGGER.info(s"Open partition $partitionId, epoch $epochId")
    PartitionStats.incMetric(statsMap, PartitionStats.partitionsOpened, 1)
    true
  }

  override def process(value: String): Unit = {
    LOGGER.info(s"Process value: $value")
    PartitionStats.incMetric(statsMap, PartitionStats.partitionsProcessed, 1)
  }

  override def close(errorOrNull: Throwable): Unit = {
    LOGGER.info(s"Close partition: $errorOrNull")
    PartitionStats.incMetric(statsMap, PartitionStats.partitionsClosed, 1)
  }
}

object PartitionStats extends Enumeration {
  private final val LOGGER = LoggerFactory.getLogger(this.getClass)

  final val partitionsOpened = Value("partitionsOpened")
  final val partitionsProcessed = Value("partitionsProcessed")
  final val partitionsClosed = Value("partitionsClosed")

  def incMetric(statsMap: Map[PartitionStats.Value, LongAccumulator], stat: PartitionStats.Value, count: Long): Unit = {
    statsMap.get(stat) match {
      case Some(acc) => acc.add(count)
      case _ => LOGGER.error(s"Cannot increment accumulator for $stat")
    }
  }
}

class PartitionStats(sparkContext: SparkContext) extends MetricSet {
  private final val statsMap: Map[PartitionStats.Value, LongAccumulator] =
    PartitionStats.values.unsorted.map(elem => elem -> sparkContext.longAccumulator(elem.toString)).toMap

  def getStatsMap: Map[PartitionStats.Value, LongAccumulator] = statsMap

  override def getMetrics: util.Map[String, Metric] = {
    val metricsMap: Map[String, Metric] = statsMap.map(
      e =>
        (
          e._1.toString,
          new Gauge[Long]() {
            override def getValue: Long = {
              val metricValue = e._2.value

              e._2.reset() // this is possibly the problem!!!!

              metricValue
            }
          }
        )
    )

    import scala.jdk.CollectionConverters._
    metricsMap.asJava
  }
}

package org.apache.spark.metrics.source {
  case class StatsSource(srcName: String, registry: MetricRegistry) extends Source {
    override def sourceName: String = srcName

    override def metricRegistry: MetricRegistry = registry
  }
}

@stephenmcmullan
Copy link

@LuciferYang - does the above make any sense? Do you need any more information from me?

@MaksGS09
Copy link

MaksGS09 commented Nov 2, 2022

Hi!
Any updates on this?

@LuciferYang
Copy link
Contributor Author

I don't have any new ideas at present

@LuciferYang
Copy link
Contributor Author

No better idea, close it first

@tamama
Copy link

tamama commented Mar 28, 2023

The exceptions reported in SPARK-39696 are as follows:

2022-06-21 18:17:49.289Z ERROR [executor-heartbeater] org.apache.spark.util.Utils - Uncaught exception in thread executor-heartbeater
java.util.ConcurrentModificationException: mutation occurred during iteration
        at scala.collection.mutable.MutationTracker$.checkMutations(MutationTracker.scala:43) ~[scala-library-2.13.8.jar:?]
        at scala.collection.mutable.CheckedIndexedSeqView$CheckedIterator.hasNext(CheckedIndexedSeqView.scala:47) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:873) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:869) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.copyToArray$(IterableOnce.scala:852) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterator.copyToArray(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.VectorStatics$.append1IfSpace(Vector.scala:1959) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector1.appendedAll0(Vector.scala:425) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:203) ~[scala-library-2.13.8.jar:?]
        at scala.collection.immutable.Vector.appendedAll(Vector.scala:113) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.SeqOps.concat$(Seq.scala:187) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractSeq.concat(Seq.scala:1161) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOps.$plus$plus$(Iterable.scala:726) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.$plus$plus(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.TaskMetrics.accumulators(TaskMetrics.scala:261) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) ~[scala-library-2.13.8.jar:?]
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) ~[scala-library-2.13.8.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:926) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) ~[scala-library-2.13.8.jar:?]
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.13-3.3.0.jar:3.3.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
        at java.lang.Thread.run(Thread.java:833) ~[?:?] 

It seems to be a small probability event

Hi - our executors crashed almost every 30 min while using HDFS as checkpoint (Kafka source). FYI - using SSHFS as checkpoint would crash the executors randomly say once/twice per day...

From our opinion, this is too dangerous in production, as this would cause streaming jitters - thus causing unncessary backpressure on our data pipeline. ...

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

Is there a Spark LTS release that is stable for production use?

@LuciferYang
Copy link
Contributor Author

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

@tamama
Copy link

tamama commented Mar 28, 2023

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing.

Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)

@tamama
Copy link

tamama commented Mar 28, 2023

@LuciferYang FYI ^

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Mar 29, 2023

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing.

Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)

Although I'm not quite sure, this sounds more like a issue of Scala 2.13.8 itself

@tamama
Copy link

tamama commented Mar 29, 2023

We intend to fallback to Spark-3.3.1 Scala-2.12 (instead of Scala 2.13)

@tamama Using Scala 2.12 can avoid this issue?

Yes - Fallback to Scala 2.12 avoids this issue. For 2 hours under heavy stress-load, there is no issue (as of now yet...) with HDFS checkpointing.
Our UAT setup is RHEL/8.6, 3-nodes, Spark 3.3.1 (hadoop3 + scala-2.12)

Although I'm not quite sure, this sounds more like a issue of Scala 2.13.8 itself

  • We confirm that we have not detected any data-race exception for 18+ hrs.
  • Will let you know if this can last for this entire trading week ...
  • If yes, I would mark this as fixed on our side - with Scala-2.12 fallback as resolution.

@tamama
Copy link

tamama commented Apr 1, 2023

There is no issue with Scala 2.12 at all for rhe entire week.

@tamama
Copy link

tamama commented Apr 3, 2023

@LuciferYang FYI

  • We have done a stress-load test from our side using Spark 3.3.1 (Scala 2.12)
  • We have not detected any data-race condition for 140 hours
  • Fallback from Scala 2.13 to Scala 2.12 is a fix for us at the moment, before our Spark community has a fix for this :)

@eejbyfeldt
Copy link

eejbyfeldt commented Apr 3, 2023

We are also seeing this failure on Spark 3.3.1 with Scala 2.13 on Ubuntu 22.04.

I used one of the spark applications seeing this issue to further debug when we are deserializing AccumulatorV2 which would making us suspicable to the race. And I found an example that does not seem to involve any client code. Here is my understanding:

If the Task is a ShuffleMapTask it will deserialize a the rddAndDep as part of the runTask:

https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L85

which is called after the Task deserilization happens. This combined with a ShuffleDependency

https://github.com/apache/spark/blob/v3.3.1/core/src/main/scala/org/apache/spark/Dependency.scala#L85

with a ShuffleWriteProcessor that comes from

https://github.com/apache/spark/blob/v3.3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

the our ShuffleDependency will contain a reference to the map metrics: Map[String, SQLMetric] which will cause the SQLMetric to be registered while the task is already running and we have our race condition.

I would be willing to work no a fix for this, but would probably need some guidance/discussion what the correct fix actually is.

@tamama
Copy link

tamama commented Apr 3, 2023

I would kindly ask for this feature to be disabled by default - before proven stable for Scala 2.13.

Otherwise, this would hurt its reputation

@mridulm
Copy link
Contributor

mridulm commented Apr 3, 2023

@tamama Please update the jira with details that can help debug the issue - specifically, the test snippet you used to surface issue in 2.13 and which does not occur in 2.12

@mridulm
Copy link
Contributor

mridulm commented Apr 3, 2023

@eejbyfeldt shuffle write processor is an instance variable, and so is a part of the dependency created at driver

@eejbyfeldt
Copy link

eejbyfeldt commented Apr 4, 2023

@eejbyfeldt shuffle write processor is an instance variable, and so is a part of the dependency created at driver

While that statement is true. That is not relevant for the point I was trying to make. @JoshRosen commented in #37206 (comment) claiming that we should not perform concurrent access due to the fact that accumulators should be deserialized and therefore registered during the Task deserilization and there should be no race with the hearbeat thread. But my claim was that is not true as accumulators will be deserialized in the taskBinary that is serialized later during the runTask. This can happen because of client code using accumulator (created spec here #40663 also linked in Jira) or due to spark serializing them as part of the ShuffleDependency.

HyukjinKwon pushed a commit that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Apr 7, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes #40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit b2ff4c4)
Signed-off-by: Dongjoon Hyun <[email protected]>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
…cums

### What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats.

This fix of using of using `CopyOnWriteArrayList` is cherry picked from apache#37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here apache#37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as  sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422

### Why are the changes needed?
The current code has a data race.

### Does this PR introduce _any_ user-facing change?
It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13.

### How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.

Closes apache#40663 from eejbyfeldt/SPARK-39696.

Lead-authored-by: Emil Ejbyfeldt <[email protected]>
Co-authored-by: yangjie01 <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 6ce0822)
Signed-off-by: Hyukjin Kwon <[email protected]>
@gowa
Copy link

gowa commented Aug 23, 2023

@JoshRosen, @eejbyfeldt , I hope it is fine that I am posting here after the ticket has been closed. We caught a problem which, I believe, has the same 'true' root cause as in this ticket, and I also think that the 'true' root cause was not eliminated by the fix. I think that 'ConcurrentModificationException' is only one of the possible outcomes in the various race conditions scenarios (concurrent reads/writes). Particularly, I managed to catch NullPointerException (instead of ConcurrentModificationException) in my custom accumulator where I would never expect it to be caught. Below is a simple reproducer for demonstrating the bigger problem:

import java.{lang => jl}
import org.apache.spark.util.AccumulatorV2

//just a simple custom accumulator to demonstrate the problem: initially 'false'. can be changed to 'true'.
//see my comments for '_set' and 'isZero'

class BooleanAccumulator extends AccumulatorV2[jl.Boolean, jl.Boolean] {
  private var _set = jl.Boolean.FALSE // supposed to be never null. In my real-world issue there is 'private final Map' in Java class which is supposed to be not null ever

  override def isZero: Boolean = ! _set.booleanValue()  // however, NullPointerException will be thrown here from executor-heartbeater

  override def copy(): BooleanAccumulator = {
    val newAcc = new BooleanAccumulator
    newAcc._set = this._set
    newAcc
  }

  override def reset(): Unit = {
	_set = jl.Boolean.FALSE
  }

  override def add(v: jl.Boolean): Unit = {
    if (v.booleanValue()) {
	  _set = jl.Boolean.TRUE
	}
  }

  override def merge(other: AccumulatorV2[jl.Boolean, jl.Boolean]): Unit = other match {
    case o: BooleanAccumulator =>
	  if (!_set.booleanValue())
		_set = o._set
    case _ =>
      throw new UnsupportedOperationException(
        s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: jl.Boolean = _set
}

//some dummy logic using that accumulator
val rdd = sc.parallelize(Seq(1), 1);

def tryReproduce(){
  var acc: BooleanAccumulator = new BooleanAccumulator();
  sc.register(acc,"acc")
  rdd.foreachPartition { it =>
    val cnt = it.count(_ => true)
    acc.add(true);
  }
}

tryReproduce()

The code listed above should be executed from the spark-shell, started with spark.executor.heartbeatInterval=1 (ms) in order to increase the chances of the appearance of the issue (on my laptop it is enough to call tryReproduce() just once):

spark-3.3.3-bin-hadoop3/bin/spark-shell --master spark://127.0.1.1:7077 -c spark.executor.heartbeatInterval=1 -c spark.executor.heartbeat.maxFailures=60000

The NullPointerException from the 'executor-heartbeater' thread looks like this:

23/08/23 18:24:42 ERROR Utils: Uncaught exception in thread executor-heartbeater
java.lang.NullPointerException
        at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$BooleanAccumulator.isZero(<console>:27)
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$2(Executor.scala:1042)
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$2$adapted(Executor.scala:1042)
        at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
        at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
        at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
        at scala.collection.TraversableLike.filterNot(TraversableLike.scala:403)
        at scala.collection.TraversableLike.filterNot$(TraversableLike.scala:403)
        at scala.collection.AbstractTraversable.filterNot(Traversable.scala:108)
        at org.apache.spark.executor.Executor.$anonfun$reportHeartBeat$1(Executor.scala:1042)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1036)
        at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
        at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

In my real-world case, my accumulator has a final Java field which can never be null... Therefore, I suspect there is some lack of synchronization between the deserialization code and accessing the 'isZero' method of a just deserialized accumulator instance. And, therefore, I think replacing ArrayBuffer by CopyOnWriteArrayList does not completely fix the bigger issue.

Also, in my real-world case we have default 10s for spark.executor.heartbeatInterval. Still, somehow we managed to catch many NullPointerExceptions from the 'executor-heartbeater' thread over 1 day, resulting in executors re-started (which led to unbalanced load distribution in a Spark Standalone cluster).

@eejbyfeldt
Copy link

Hi @gowa, I don't think you issue you describe has the same root cause and I think it more related to how serialization work. Consider the following code:

$ cat custom_serialization.scala 
import java.io._

abstract class Acc extends Serializable {
  private def readObject(in: ObjectInputStream): Unit = {
    in.defaultReadObject()
    println(s"readObject Acc ${toString()}")
  }
}

class MyAcc extends Acc {
  final private val myVar = Map()
  override def toString: String = {
    s"Class MyAcc(myVar=${myVar})"
  }
}

object Test {
  def main(args: Array[String]): Unit = {
      val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream(1024 * 1024)
      val objectStream: ObjectOutputStream = new ObjectOutputStream(outputStream)
      objectStream.writeObject(new MyAcc)
      objectStream.close()
      val input = new ObjectInputStream(new ByteArrayInputStream(outputStream.toByteArray())) 
      val result = input.readObject()
      println(result)
  }
}

when executed it will print

$ scala custom_serialization.scala 
readObject Acc Class MyAcc(myVar=null)
Class MyAcc(myVar=Map())

This code is similar to what we have in AccumulatorV2 which also has a custom readObject method https://github.com/apache/spark/blob/v3.4.1/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L191 it from inside the method that the accumulator is registered and therefore becomes accessible from the hearbeat thread. So there is a possibility that the heartbeat thread might call isZero after AccummulatorV2.readObject called but before the default deserialization of your custom class has not completed. This hole setup of giving up references to objects that are not fully deserialized is really nasty.

Here is a PR addressing/discussing the same bug from an accumulator in spark: #31540

@gowa
Copy link

gowa commented Aug 24, 2023

Hi @eejbyfeldt ! Oh, thank you! it is clear to me now. I am sorry for disturbing.
I have seen the changes of #31540 in CollectionAccumulator, but haven't traced why they had been done. Now I understand.

Thank you very much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants