Skip to content

Commit

Permalink
fix: Solved threading problem by null checking removed queued inputs
Browse files Browse the repository at this point in the history
[Issue reported by Duolingo](https://2dimensions.slack.com/archives/C029X99PETE/p1731696582702019).

Looks like a threading issue around access to the concurrent queue.

The root of their trace is the artboard `advance`, called from our native worker thread. `processAllInputs` is iterating through a `ConcurrentLinkedQueue` holding the inputs. We check:
```kotlin
while (changedInputs.isNotEmpty()) {
    val input = changedInputs.remove()
```
There could be a small window where `isNotEmpty` returns true and `remove` has nothing to remove. I'm not sure exactly what would cause that - something on a different thread, likely the main thread, would have to empty the queue, maybe a call to `reset` somewhere.

The [docs for remove](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/AbstractQueue.html#remove()) show the exception theyre're getting. We assume because we just checked that it's not empty, `remove` should return an element, but it seems this assumption is not safe given the scenario above.

We can likely protect against this by using the more forgiving [poll](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Queue.html#poll()) method which returns `null` instead of throwing. Then we can null check before proceeding.

Alternatively, we can use the `ReentrantLock`. Removing from this queue in `processAllInputs` is protected while clearing from `reset` it is not. We could wrap reset in a synchronous block on the same lock. Let me know if this approach is preferred.

Diffs=
7757e65a9b fix: Solved threading problem by null checking removed queued inputs (#8594)

Co-authored-by: Erik <[email protected]>
  • Loading branch information
ErikUggeldahl and ErikUggeldahl committed Nov 28, 2024
1 parent ad8b5e4 commit 1081f88
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .rive_head
Original file line number Diff line number Diff line change
@@ -1 +1 @@
08df0095bbf9616cebaec7eb930c4b146bbcb2cf
7757e65a9bef68432a983ef74297be14800b302a
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
package app.rive.runtime.kotlin.core

import androidx.test.ext.junit.runners.AndroidJUnit4
import app.rive.runtime.kotlin.ChangedInput
import app.rive.runtime.kotlin.controllers.RiveFileController
import app.rive.runtime.kotlin.test.R
import org.junit.Assert.*
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertNotEquals
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
import org.junit.Test
import org.junit.runner.RunWith
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread


@RunWith(AndroidJUnit4::class)
Expand Down Expand Up @@ -127,4 +136,47 @@ class RiveControllerTest {
secondController.animations.first(),
)
}

// This tests a bug where between checking if the queue is empty and polling an item from the queue,
// another thread could empty the queue, causing a NoSuchElementException.
// This was resolved by using poll() instead of remove().
// This test should simply run without crashing.
@Test
fun testProcessAllInputsQueueEmptiedByAnotherThread() {
// To simulate the threading conditions, we need a mock queue that allows interrupting at the precise moment when the bug occurs.
class MockInputQueue(
private val latch: CountDownLatch,
items: Collection<ChangedInput> = emptyList()
) : ConcurrentLinkedQueue<ChangedInput>(items) {
override fun isEmpty(): Boolean {
val wasEmpty = super.isEmpty() // Cache the result before it's cleared
latch.countDown() // Signal the clearing thread to proceed
Thread.sleep(10) // Give that thread a chance to clear the queue
return wasEmpty
}
}

// We need a file and artboard so that the controller doesn't skip advancing
val file = appContext.resources
.openRawResource(R.raw.off_road_car_blog)
.use {
File(it.readBytes())
}

val latch = CountDownLatch(1) // Threading primitive to synchronize the two threads
val inputQueue = MockInputQueue(latch, listOf(ChangedInput("stateMachine", "input")))
val controller = RiveFileController(changedInputs = inputQueue)
controller.setRiveFile(file)

// Start a thread to empty the queue
thread {
latch.await() // Wait until the queue is checked
inputQueue.clear()
}

// Invoke processAllInputs by way of advance
controller.advance(0.0f)

// No assertions. The intent is to run without throwing a NoSuchElementException.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import app.rive.runtime.kotlin.core.SMIBoolean
import app.rive.runtime.kotlin.core.SMINumber
import app.rive.runtime.kotlin.core.SMITrigger
import app.rive.runtime.kotlin.core.StateMachineInstance
import app.rive.runtime.kotlin.core.errors.RiveException
import app.rive.runtime.kotlin.renderers.PointerEvents
import java.util.Collections
import java.util.concurrent.ConcurrentLinkedQueue
Expand Down Expand Up @@ -56,14 +55,26 @@ class ControllerState internal constructor(

typealias OnStartCallback = () -> Unit

class RiveFileController(
class RiveFileController internal constructor(
var loop: Loop = Loop.AUTO,
var autoplay: Boolean = true,
file: File? = null,
activeArtboard: Artboard? = null,
var onStart: OnStartCallback? = null,

@get:VisibleForTesting(otherwise = VisibleForTesting.PRIVATE)
internal val changedInputs: ConcurrentLinkedQueue<ChangedInput> = ConcurrentLinkedQueue()
) : Observable<RiveFileController.Listener>, RefCount {

// The "primary" constructor as the actual primary is internal to expose the queue for testing.
constructor(
loop: Loop = Loop.AUTO,
autoplay: Boolean = true,
file: File? = null,
activeArtboard: Artboard? = null,
onStart: OnStartCallback? = null,
) : this(loop, autoplay, file, activeArtboard, onStart, ConcurrentLinkedQueue())

companion object {
const val TAG = "RiveFileController"
}
Expand Down Expand Up @@ -220,8 +231,6 @@ class RiveFileController(
return stateMachines subtract playingStateMachines
}

private val changedInputs = ConcurrentLinkedQueue<ChangedInput>()

/** Lock to prevent race conditions on starting and stopping the rendering thread. */
internal val startStopLock = ReentrantLock()

Expand Down Expand Up @@ -599,7 +608,9 @@ class RiveFileController(
val playableSet = mutableSetOf<StateMachineInstance>()
// No need to lock this: this is being called from `advance()` which is `synchronized(file)`
while (changedInputs.isNotEmpty()) {
val input = changedInputs.remove()
// There is a small chance that the queue will be emptied by another thread before removing.
// Null checking the removed item protects against that scenario.
val input = changedInputs.poll() ?: break
if (input.nestedArtboardPath == null) {
val stateMachines = getOrCreateStateMachines(input.stateMachineName)
stateMachines.forEach { stateMachineInstance ->
Expand Down

0 comments on commit 1081f88

Please sign in to comment.