Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20977][CORE] Use a non-final field for the state of CollectionAccumulator #31540

Closed
wants to merge 1 commit into from

Conversation

gerashegalov
Copy link
Contributor

@gerashegalov gerashegalov commented Feb 10, 2021

This PR is a fix for the JLS 17.5.3 violation identified in
@zsxwing's 19/Feb/19 11:47 comment on the JIRA.

What changes were proposed in this pull request?

  • Use a var field to hold the state of the collection accumulator

Why are the changes needed?

AccumulatorV2 auto-registration of accumulator during readObject doesn't work with final fields that are post-processed outside readObject. As it stands incompletely initialized objects are published to heartbeat thread. This leads to sporadic exceptions knocking out executors which increases the cost of the jobs. We observe such failures on a regular basis NVIDIA/spark-rapids#1522.

Does this PR introduce any user-facing change?

None

How was this patch tested?

  • this is a concurrency bug that is almost impossible to reproduce as a quick unit test.
  • By trial and error I crafted a command [WIP] Allow repeated iterations of integration tests in the same Spark app NVIDIA/spark-rapids#1688 that reproduces the issue on my dev box several times per hour, with the first occurrence often within a few minutes. After the patch, these Exceptions have not shown up after running overnight for 10+ hours
  • existing unit tests in *AccumulatorV2Suite and *LiveEntitySuite

This PR is a fix for the JLS 17.5.3 violation identified in
Shixiong Zhu's 19/Feb/19 11:47 comment on the JIRA.
- avoid using a final field to hold the state of the collection
accumulator
@github-actions github-actions bot added the CORE label Feb 10, 2021
@mridulm
Copy link
Contributor

mridulm commented Feb 10, 2021

This does not necessarily solve the issue that @zsxwing detailed - the issue here is registerAccumulator should not be called in readObject before subclasses have completed readObject.

One possible solution would be to introduce two methods.

a) A protected method doHandleDriverSideAccumulator() in AccumulatorV2 - which has all the code after defaultReadObject in readObject.
b) Call handleDriverSideAccumulator after defaultReadObject in AccumulatorV2. In AccumulatorV2, this protected method will simply delegate to doHandleDriverSideAccumulator.
c) In subclasses with local state, override doHandleDriverSideAccumulator to make it do nothing - and after readObject in subclass is done, invoke doHandleDriverSideAccumulator

This will ensure AccumulatorV2 and subclasses will register only after state has been initialized.
(Rough sketch, please change logic/names/etc as relevant).

Note, there are other accumulators with local state; we should do this for all.
Thoughts ?

@gerashegalov
Copy link
Contributor Author

Thanks for taking a look @mridulm

correct, this patch does not address the problem in general, it just mitigates it for CollectionAccumulator and its subclass PythonAccumulatorV2. Any solution relying on readObject to publish the object including the one you propose will publish the object too early.

A generally correct solution that will work for user-defined Accumulators as well must not rely on readObject imo, and it is more involved.


override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator

override def copy(): CollectionAccumulator[T] = {
val newAcc = new CollectionAccumulator[T]
_list.synchronized {
newAcc._list.addAll(_list)
this.synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously we can't sync on _list anymore. This changes to sync on the object itself, which should be fine AFAIK - I don't think anything else depends on the lock of the accumulator itself and it's only manipulating its own state while holding the lock.

@srowen
Copy link
Member

srowen commented Feb 20, 2021

I think this is OK as a narrow fix for this particular case. Any objection? if tests pass

@srowen
Copy link
Member

srowen commented Feb 20, 2021

Jenkins test this please

@SparkQA
Copy link

SparkQA commented Feb 20, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39894/

@SparkQA
Copy link

SparkQA commented Feb 20, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39894/

@SparkQA
Copy link

SparkQA commented Feb 20, 2021

Test build #135314 has finished for PR 31540 at commit c9918ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen srowen closed this in fadd0f5 Feb 21, 2021
srowen pushed a commit that referenced this pull request Feb 21, 2021
…Accumulator

This PR is a fix for the JLS 17.5.3 violation identified in
zsxwing's [19/Feb/19 11:47 comment](https://issues.apache.org/jira/browse/SPARK-20977?focusedCommentId=16772277&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16772277) on the JIRA.

### What changes were proposed in this pull request?
- Use a var field to hold the state of the collection accumulator

### Why are the changes needed?
AccumulatorV2 auto-registration of accumulator during readObject doesn't work with final fields that are post-processed outside readObject. As it stands incompletely initialized objects are published to heartbeat thread. This leads to sporadic exceptions knocking out executors which increases the cost of the jobs. We observe such failures on a regular basis NVIDIA/spark-rapids#1522.

### Does this PR introduce _any_ user-facing change?
None

### How was this patch tested?
- this is a concurrency bug that is almost impossible to reproduce as a quick unit test.
- By trial and error I crafted a command NVIDIA/spark-rapids#1688 that reproduces the issue on my dev box several times per hour, with the first occurrence often within a few minutes. After the patch, these Exceptions have not shown up after running overnight for 10+ hours
- existing unit tests in *`AccumulatorV2Suite` and *`LiveEntitySuite`

Closes #31540 from gerashegalov/SPARK-20977.

Authored-by: Gera Shegalov <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit fadd0f5)
Signed-off-by: Sean Owen <[email protected]>
@srowen
Copy link
Member

srowen commented Feb 21, 2021

Merged to master/3.1

@zhengruifeng
Copy link
Contributor

This does not necessarily solve the issue that @zsxwing detailed - the issue here is registerAccumulator should not be called in readObject before subclasses have completed readObject.

One possible solution would be to introduce two methods.

a) A protected method doHandleDriverSideAccumulator() in AccumulatorV2 - which has all the code after defaultReadObject in readObject.
b) Call handleDriverSideAccumulator after defaultReadObject in AccumulatorV2. In AccumulatorV2, this protected method will simply delegate to doHandleDriverSideAccumulator.
c) In subclasses with local state, override doHandleDriverSideAccumulator to make it do nothing - and after readObject in subclass is done, invoke doHandleDriverSideAccumulator

This will ensure AccumulatorV2 and subclasses will register only after state has been initialized.
(Rough sketch, please change logic/names/etc as relevant).

Note, there are other accumulators with local state; we should do this for all.
Thoughts ?

+1

I recently impl some accv2 (some complex statistics containing transient lazy vars and using collections like openhashmap/array/etc) in my work, there are lots of NPE which make task probablly fail. I has tried the method like this PR, but it do not help evidently.

@gerashegalov
Copy link
Contributor Author

@zhengruifeng can you provide a minimum code reproducing for NPEs you are observing?

istreeter added a commit to snowplow/snowplow-rdb-loader that referenced this pull request Oct 4, 2024
We've seen exceptions in spark executors like:

```
java.lang.NullPointerException: Cannot invoke "scala.collection.mutable.Set.isEmpty()" because the return value of "com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark.TypesAccumulator.accum()" is null
```

The error is coming from our Spark Accumulator for accumulating Iglu
types. This is similar to [an issue previously seen][1] in Spark's own
`CollectionAccumulator`. That issue [was fixed in Spark][2] by making
the accumulator's internal state non-final, and synchronizing access to
the internal state. So here we make the exact same change to our own
Accumulator.

It is a rare race condition which is hard to reproduce.

[1]: https://issues.apache.org/jira/browse/SPARK-20977
[2]: apache/spark#31540
spenes pushed a commit to snowplow/snowplow-rdb-loader that referenced this pull request Oct 4, 2024
We've seen exceptions in spark executors like:

```
java.lang.NullPointerException: Cannot invoke "scala.collection.mutable.Set.isEmpty()" because the return value of "com.snowplowanalytics.snowplow.rdbloader.transformer.batch.spark.TypesAccumulator.accum()" is null
```

The error is coming from our Spark Accumulator for accumulating Iglu
types. This is similar to [an issue previously seen][1] in Spark's own
`CollectionAccumulator`. That issue [was fixed in Spark][2] by making
the accumulator's internal state non-final, and synchronizing access to
the internal state. So here we make the exact same change to our own
Accumulator.

It is a rare race condition which is hard to reproduce.

[1]: https://issues.apache.org/jira/browse/SPARK-20977
[2]: apache/spark#31540
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants