-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39696][CORE] Fix data race in access to TaskMetrics.externalAccums #40663
Conversation
Ping @LuciferYang @JoshRosen that previously discussed this issue in #37206 let me know if would be a better approach to reopen the old PR rather then creating a new one. And/or if my description of the data race does not sounds accurate to you. |
It's ok to me to further discussion in this one |
I found that before Scala 2.13.6(include) seems no this issue and the new test will failed after 2.13.7. @eejbyfeldt I am not sure if this is caused by change of scala/scala#9258, as it has been added to Scala 2.13.4. also cc @srowen @mridulm and @xinrong-meng |
Thanks for looking in to that closer. Should probably formulated myself more clearly that was only a guess on my part and not something I had verified. But now that you narrowed it down to 2.13.7 another possible candidate change could be scala/scala#9786 that changed how that mutations are tracked in ArrayBuffer. EDIT: Even if scala/scala#9258 was tagged milestone 2.13.4 at some point it looks to me like it actually landed in 2.13.7 with this commit scala/scala@5f25002 |
This makes sense. So this is essentially a bug waiting to happen for two reasons:
Note, the task itself is visible only after the deserialization is complete, since it is volatile - but the registration is not covered by it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of minor nits, nice job chasing this down @eejbyfeldt !
core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Outdated
Show resolved
Hide resolved
And remove unneeded config in spec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+CC @JoshRosen, @LuciferYang
@eejbyfeldt
|
core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Outdated
Show resolved
Hide resolved
also cc @dongjoon-hyun due to Spark 3.2.x also use Scala 2.13.8 and maybe Spark 3.2.4 should include this one |
core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM , pending ci
Merged to master and branch-3.4. |
…cums ### What changes were proposed in this pull request? This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats. This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422 ### Why are the changes needed? The current code has a data race. ### Does this PR introduce _any_ user-facing change? It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13. ### How was this patch tested? This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13. Closes #40663 from eejbyfeldt/SPARK-39696. Lead-authored-by: Emil Ejbyfeldt <[email protected]> Co-authored-by: yangjie01 <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 6ce0822) Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon branch-3.3 and branch-3.2 may also require this one, they also use Scala 2.13.8, need @eejbyfeldt to submit an independent PRs? |
…cums ### What changes were proposed in this pull request? This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats. This fix of using of using `CopyOnWriteArrayList` is cherry picked from #37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422 ### Why are the changes needed? The current code has a data race. ### Does this PR introduce _any_ user-facing change? It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13. ### How was this patch tested? This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13. Closes #40663 from eejbyfeldt/SPARK-39696. Lead-authored-by: Emil Ejbyfeldt <[email protected]> Co-authored-by: yangjie01 <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 6ce0822) Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit b2ff4c4) Signed-off-by: Dongjoon Hyun <[email protected]>
Thank you, @eejbyfeldt , @mridulm , @HyukjinKwon , and @LuciferYang . However, we need a new PR for branch-3.2 due to the compilation error, @LuciferYang and @eejbyfeldt .
|
To @LuciferYang and all. After double-checking, I found that Apache Spark 3.2.x is not affected because it uses Scala 2.13.5. Line 3389 in 7773740
SPARK-35496 (Scala 2.13.7) landed at Apache Spark 3.3.0+. We don't need to backport this to branch-3.2. |
Please let me know if this is still valid in
|
@dongjoon-hyun Scala 2.13.5 does not require this fix. I apologize for providing incorrect information earlier |
No problem at all. Thank you always, @LuciferYang ! |
Thanks for checking @dongjoon-hyun and @LuciferYang ! |
…cums ### What changes were proposed in this pull request? This PR fixes a data race around concurrent access to `TaskMetrics.externalAccums`. The race occurs between the `executor-heartbeater` thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13 ~due this change scala/scala#9258 (LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in the `executor-heartbeater` thread, which means that the executor will eventually be terminated due to missing hearbeats. This fix of using of using `CopyOnWriteArrayList` is cherry picked from apache#37206 where is was suggested as a fix by LuciferYang since `TaskMetrics.externalAccums` is also accessed from outside the class `TaskMetrics`. The old PR was closed because at that point there was no clear understanding of the race condition. JoshRosen commented here apache#37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here: https://github.com/apache/spark/blob/0cc96f76d8a4858aee09e1fa32658da3ae76d384/core/src/main/scala/org/apache/spark/executor/Executor.scala#L507-L508 and therefore no writes should occur while the hearbeat thread will read the accumulators. But my understanding is that is incorrect as accumulators will also be deserialized as part of the taskBinary here: https://github.com/apache/spark/blob/169f828b1efe10d7f21e4b71a77f68cdd1d706d6/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala#L87-L88 which will happen while the heartbeater thread is potentially reading the accumulators. This can both due to user code using accumulators (see the new test case) but also when using the Dataframe/Dataset API as sql metrics will also be `externalAccums`. One way metrics will be sent as part of the taskBinary is when the dep is a `ShuffleDependency`: https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/core/src/main/scala/org/apache/spark/Dependency.scala#L85 with a ShuffleWriteProcessor that comes from https://github.com/apache/spark/blob/fbbcf9434ac070dd4ced4fb9efe32899c6db12a9/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L411-L422 ### Why are the changes needed? The current code has a data race. ### Does this PR introduce _any_ user-facing change? It will fix an uncaught exception in the `executor-hearbeater` thread when using scala 2.13. ### How was this patch tested? This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13. Closes apache#40663 from eejbyfeldt/SPARK-39696. Lead-authored-by: Emil Ejbyfeldt <[email protected]> Co-authored-by: yangjie01 <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 6ce0822) Signed-off-by: Hyukjin Kwon <[email protected]>
…version on TaskMetrics#externalAccums ### What changes were proposed in this pull request? This is a followup fix for #47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (#40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47578 from Ngone51/SPARK-48791-followup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
…version on TaskMetrics#externalAccums This is a followup fix for apache#47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (apache#40663). Fix perf regression. No. Covered by existing tests. No. Closes apache#47578 from Ngone51/SPARK-48791-followup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
…version on TaskMetrics#externalAccums This is a followup fix for apache#47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (apache#40663). Fix perf regression. No. Covered by existing tests. No. Closes apache#47578 from Ngone51/SPARK-48791-followup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
…e conversion on TaskMetrics#externalAccums This PR backports #47578 to branch-3.5. ### What changes were proposed in this pull request? This is a followup fix for #47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (#40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47612 from Ngone51/SPARK-48791-followup-3.5. Authored-by: Yi Wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…e conversion on TaskMetrics#externalAccums This PR backports #47578 to branch-3.4. ### What changes were proposed in this pull request? This is a followup fix for #47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (#40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47613 from Ngone51/SPARK-48791-followup-3.4. Authored-by: Yi Wu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…version on TaskMetrics#externalAccums ### What changes were proposed in this pull request? This is a followup fix for apache#47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (apache#40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47578 from Ngone51/SPARK-48791-followup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
…version on TaskMetrics#externalAccums ### What changes were proposed in this pull request? This is a followup fix for apache#47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (apache#40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47578 from Ngone51/SPARK-48791-followup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
…version on TaskMetrics#externalAccums ### What changes were proposed in this pull request? This is a followup fix for apache#47197. We found that the perf regression still exists after that fix and located the culprit is the immutable conversion on `TaskMetrics#externalAccums`. This PR fixes it by avoiding the immutable conversion, and then enforce the read lock protection during the accessing on `TaskMetrics#externalAccums` to avoid the race issue (apache#40663). ### Why are the changes needed? Fix perf regression. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Covered by existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#47578 from Ngone51/SPARK-48791-followup. Lead-authored-by: Yi Wu <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: Kent Yao <[email protected]>
What changes were proposed in this pull request?
This PR fixes a data race around concurrent access to
TaskMetrics.externalAccums
. The race occurs between theexecutor-heartbeater
thread and the thread executing the task. This data race is not known to cause issues on 2.12 but in 2.13due this change scala/scala#9258(@LuciferYang bisected this to first cause failures in scala 2.13.7 one possible reason could be scala/scala#9786) leads to an uncaught exception in theexecutor-heartbeater
thread, which means that the executor will eventually be terminated due to missing hearbeats.This fix of using of using
CopyOnWriteArrayList
is cherry picked from #37206 where is was suggested as a fix by @LuciferYang sinceTaskMetrics.externalAccums
is also accessed from outside the classTaskMetrics
. The old PR was closed because at that point there was no clear understanding of the race condition. @JoshRosen commented here #37206 (comment) saying that there should be no such race based on because all accumulators should be deserialized as part of task deserialization here:spark/core/src/main/scala/org/apache/spark/executor/Executor.scala
Lines 507 to 508 in 0cc96f7
spark/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
Lines 87 to 88 in 169f828
externalAccums
. One way metrics will be sent as part of the taskBinary is when the dep is aShuffleDependency
:spark/core/src/main/scala/org/apache/spark/Dependency.scala
Line 85 in fbbcf94
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Lines 411 to 422 in fbbcf94
Why are the changes needed?
The current code has a data race.
Does this PR introduce any user-facing change?
It will fix an uncaught exception in the
executor-hearbeater
thread when using scala 2.13.How was this patch tested?
This patch adds a new test case, that before the fix was applied consistently produces the uncaught exception in the heartbeater thread when using scala 2.13.