Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal trackers APIs #369

Merged
merged 10 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ abstract class ManagedStrategy(
// Detector of loops or hangs (i.e. active locks).
internal val loopDetector: LoopDetector = LoopDetector(testCfg.hangingDetectionThreshold)

// Tracker of acquisitions and releases of monitors.
private lateinit var monitorTracker: MonitorTracker
// Tracker of objects' allocations and object graph topology.
protected abstract val objectTracker: ObjectTracker
// Tracker of the monitors' operations.
protected abstract val monitorTracker: MonitorTracker
// Tracker of the thread parking.
protected abstract val parkingTracker: ParkingTracker

// InvocationResult that was observed by the strategy during the execution (e.g., a deadlock).
@Volatile
Expand Down Expand Up @@ -105,10 +109,6 @@ abstract class ManagedStrategy(
// i.e., the first element is the top stack trace element.
private val suspendedFunctionsStack = Array(nThreads) { mutableListOf<CallStackTraceElement>() }

// Helps to ignore potential switch point in local objects (see LocalObjectManager) to avoid
// useless interleavings analysis.
private var localObjectManager = LocalObjectManager()

// Last read trace point, occurred in the current thread.
// We store it as we initialize read value after the point is created so we have to store
// the trace point somewhere to obtain it later.
Expand Down Expand Up @@ -207,13 +207,14 @@ abstract class ManagedStrategy(
finished.fill(false)
isSuspended.fill(false)
currentActorId.fill(-1)
monitorTracker = MonitorTracker(nThreads)
traceCollector = if (collectTrace) TraceCollector() else null
suddenInvocationResult = null
callStackTrace.forEach { it.clear() }
suspendedFunctionsStack.forEach { it.clear() }
randoms.forEachIndexed { i, r -> r.setSeed(i + 239L) }
localObjectManager = LocalObjectManager()
objectTracker.reset()
monitorTracker.reset()
parkingTracker.reset()
}

/**
Expand Down Expand Up @@ -503,8 +504,9 @@ abstract class ManagedStrategy(
*/
private fun isActive(iThread: Int): Boolean =
!finished[iThread] &&
!(isSuspended[iThread] && !runner.isCoroutineResumed(iThread, currentActorId[iThread])) &&
!monitorTracker.isWaiting(iThread) &&
!(isSuspended[iThread] && !runner.isCoroutineResumed(iThread, currentActorId[iThread]))
!parkingTracker.isParked(iThread)

/**
* Waits until the specified thread can continue
Expand Down Expand Up @@ -635,9 +637,9 @@ abstract class ManagedStrategy(
// Therefore, we always release the lock in this case,
// without tracking the event.
if (suddenInvocationResult != null) return
monitorTracker.releaseMonitor(monitor)
val iThread = currentThread
monitorTracker.releaseMonitor(iThread, monitor)
if (collectTrace) {
val iThread = currentThread
val tracePoint = MonitorExitTracePoint(
iThread = iThread,
actorId = currentActorId[iThread],
Expand All @@ -664,12 +666,16 @@ abstract class ManagedStrategy(
// we simply add a new switch point here, thus, also
// emulating spurious wake-ups.
newSwitchPoint(iThread, codeLocation, tracePoint)
parkingTracker.park(iThread)
while (parkingTracker.waitUnpark(iThread)) {
// switch to another thread and wait till an unpark event happens
switchCurrentThread(iThread, SwitchReason.PARK_WAIT, true)
}
}

override fun unpark(thread: Thread, codeLocation: Int): Unit = runInIgnoredSection {
val iThread = currentThread
// We don't suspend on `park()` calls to emulate spurious wake-ups,
// therefore, no actions are needed.
parkingTracker.unpark(iThread, (thread as TestThread).threadId)
if (collectTrace) {
val tracePoint = UnparkTracePoint(
iThread = iThread,
Expand Down Expand Up @@ -719,13 +725,9 @@ abstract class ManagedStrategy(
}

override fun notify(monitor: Any, codeLocation: Int, notifyAll: Boolean): Unit = runInIgnoredSection {
if (notifyAll) {
monitorTracker.notifyAll(monitor)
} else {
monitorTracker.notify(monitor)
}
val iThread = currentThread
monitorTracker.notify(iThread, monitor, notifyAll = notifyAll)
if (collectTrace) {
val iThread = currentThread
val tracePoint = NotifyTracePoint(
iThread = iThread,
actorId = currentActorId[iThread],
Expand All @@ -750,8 +752,8 @@ abstract class ManagedStrategy(
if (isFinal) {
return@runInIgnoredSection false
}
// Optimization: do not track accesses to thread-local objects
if (!isStatic && localObjectManager.isLocalObject(obj)) {
// Do not track accesses to untracked objects
if (!objectTracker.shouldTrackObjectAccess(obj ?: StaticObject)) {
return@runInIgnoredSection false
}
val iThread = currentThread
Expand All @@ -777,7 +779,9 @@ abstract class ManagedStrategy(

/** Returns <code>true</code> if a switch point is created. */
override fun beforeReadArrayElement(array: Any, index: Int, codeLocation: Int): Boolean = runInIgnoredSection {
if (localObjectManager.isLocalObject(array)) return@runInIgnoredSection false
if (!objectTracker.shouldTrackObjectAccess(array)) {
return@runInIgnoredSection false
}
val iThread = currentThread
val tracePoint = if (collectTrace) {
ReadTracePoint(
Expand Down Expand Up @@ -810,13 +814,9 @@ abstract class ManagedStrategy(

override fun beforeWriteField(obj: Any?, className: String, fieldName: String, value: Any?, codeLocation: Int,
isStatic: Boolean, isFinal: Boolean): Boolean = runInIgnoredSection {
if (isStatic) {
localObjectManager.markObjectNonLocal(value)
} else if (obj != null) {
localObjectManager.onWriteToObjectFieldOrArrayCell(obj, value)
if (localObjectManager.isLocalObject(obj)) {
return@runInIgnoredSection false
}
objectTracker.registerObjectLink(fromObject = obj ?: StaticObject, toObject = value)
if (!objectTracker.shouldTrackObjectAccess(obj ?: StaticObject)) {
return@runInIgnoredSection false
}
// Optimization: do not track final field writes
if (isFinal) {
Expand All @@ -843,8 +843,8 @@ abstract class ManagedStrategy(
}

override fun beforeWriteArrayElement(array: Any, index: Int, value: Any?, codeLocation: Int): Boolean = runInIgnoredSection {
localObjectManager.onWriteToObjectFieldOrArrayCell(array, value)
if (localObjectManager.isLocalObject(array)) {
objectTracker.registerObjectLink(fromObject = array, toObject = value)
if (!objectTracker.shouldTrackObjectAccess(array)) {
return@runInIgnoredSection false
}
val iThread = currentThread
Expand Down Expand Up @@ -876,11 +876,7 @@ abstract class ManagedStrategy(
}

override fun afterReflectiveSetter(receiver: Any?, value: Any?) = runInIgnoredSection {
if (receiver == null) {
localObjectManager.markObjectNonLocal(value)
} else {
localObjectManager.onWriteToObjectFieldOrArrayCell(receiver, value)
}
objectTracker.registerObjectLink(fromObject = receiver ?: StaticObject, toObject = value)
}

override fun getThreadLocalRandom(): Random = runInIgnoredSection {
Expand Down Expand Up @@ -910,7 +906,7 @@ abstract class ManagedStrategy(
override fun afterNewObjectCreation(obj: Any) {
if (obj is String || obj is Int || obj is Long || obj is Byte || obj is Char || obj is Float || obj is Double) return
runInIgnoredSection {
localObjectManager.registerNewObject(obj)
objectTracker.registerNewObject(obj)
}
}

Expand Down Expand Up @@ -1691,121 +1687,6 @@ internal class ManagedStrategyRunner(
}


/**
* Tracks synchronization operations with monitors (acquire/release, wait/notify) to maintain a set of active threads.
*/
private class MonitorTracker(nThreads: Int) {
// Maintains a set of acquired monitors with an information on which thread
// performed the acquisition and the reentrancy depth.
private val acquiredMonitors = IdentityHashMap<Any, MonitorAcquiringInfo>()

// Maintains a set of monitors on which each thread is waiting.
// Note, that a thread can wait on a free monitor if it is waiting for a `notify` call.
// Stores `null` if thread is not waiting on any monitor.
private val waitingMonitor = Array<MonitorAcquiringInfo?>(nThreads) { null }

// Stores `true` for the threads which are waiting for a
// `notify` call on the monitor stored in `acquiringMonitor`.
private val waitForNotify = BooleanArray(nThreads) { false }

/**
* Performs a logical acquisition.
*/
fun acquireMonitor(iThread: Int, monitor: Any): Boolean {
// Increment the reentrant depth and store the
// acquisition info if needed.
val info = acquiredMonitors.computeIfAbsent(monitor) {
MonitorAcquiringInfo(monitor, iThread, 0)
}
if (info.iThread != iThread) {
waitingMonitor[iThread] = MonitorAcquiringInfo(monitor, iThread, 0)
return false
}
info.timesAcquired++
waitingMonitor[iThread] = null
return true
}

/**
* Performs a logical release.
*/
fun releaseMonitor(monitor: Any) {
// Decrement the reentrancy depth and remove the acquisition info
// if the monitor becomes free to acquire by another thread.
val info = acquiredMonitors[monitor]!!
info.timesAcquired--
if (info.timesAcquired == 0)
acquiredMonitors.remove(monitor)
}

/**
* Returns `true` if the corresponding threads is waiting on some monitor.
*/
fun isWaiting(iThread: Int): Boolean {
val monitor = waitingMonitor[iThread]?.monitor ?: return false
return waitForNotify[iThread] || !canAcquireMonitor(iThread, monitor)
}

/**
* Returns `true` if the monitor is already acquired by
* the thread [iThread], or if this monitor is free to acquire.
*/
private fun canAcquireMonitor(iThread: Int, monitor: Any) =
acquiredMonitors[monitor]?.iThread?.equals(iThread) ?: true

/**
* Performs a logical wait, [isWaiting] for the specified thread
* returns `true` until the corresponding [notify] or [notifyAll] is invoked.
*/
fun waitOnMonitor(iThread: Int, monitor: Any): Boolean {
// TODO: we can add spurious wakeups here
var info = acquiredMonitors[monitor]
if (info != null) {
// in case when lock is currently acquired by another thread continue waiting
if (info.iThread != iThread)
return true
// in case when current thread owns the lock we release it
// in order to give other thread a chance to acquire it
// and put the current thread into waiting state
waitForNotify[iThread] = true
waitingMonitor[iThread] = info
acquiredMonitors.remove(monitor)
return true
}
// otherwise the lock is held by no-one and can be acquired
info = waitingMonitor[iThread]
check(info != null && info.monitor === monitor && info.iThread == iThread) {
"Monitor should have been acquired by this thread"
}
// if there has been no `notify` yet continue waiting
if (waitForNotify[iThread])
return true
// otherwise acquire monitor restoring its re-entrance depth
acquiredMonitors[monitor] = info
waitingMonitor[iThread] = null
return false
}

/**
* Just notify all thread. Odd threads will have a spurious wakeup
*/
fun notify(monitor: Any) = notifyAll(monitor)

/**
* Performs the logical `notifyAll`.
*/
fun notifyAll(monitor: Any): Unit = waitingMonitor.forEachIndexed { iThread, info ->
if (monitor === info?.monitor)
waitForNotify[iThread] = false
}

/**
* Stores the [monitor], id of the thread acquired the monitor [iThread],
* and the number of reentrant acquisitions [timesAcquired].
*/
private class MonitorAcquiringInfo(val monitor: Any, val iThread: Int, var timesAcquired: Int)
}

/**
* This exception is used to finish the execution correctly for managed strategies.
* Otherwise, there is no way to do it in case of (e.g.) deadlocks.
Expand Down
Loading