diff --git a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt index d32fa24269..8c8f2f6bc6 100644 --- a/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt +++ b/okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt @@ -28,19 +28,18 @@ import kotlin.concurrent.withLock * writing. * * Subclasses should override [timedOut] to take action when a timeout occurs. This method will be - * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise + * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise, * we risk starving other timeouts from being triggered. * * Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the * timeout to each operation on the wrapped stream. * - * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterwards. + * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward. * The return value of [exit] indicates whether a timeout was triggered. Note that the call to * [timedOut] is asynchronous, and may be called after [exit]. */ open class AsyncTimeout : Timeout() { - /** True if this node is currently in the queue. */ - private var inQueue = false + private var state = STATE_IDLE /** The next node in the linked list. */ private var next: AsyncTimeout? = null @@ -48,30 +47,32 @@ open class AsyncTimeout : Timeout() { /** If scheduled, this is the time that the watchdog should time this out. */ private var timeoutAt = 0L - private var isCanceled = false - private var hadTimeoutWhenCanceled = false - fun enter() { val timeoutNanos = timeoutNanos() val hasDeadline = hasDeadline() if (timeoutNanos == 0L && !hasDeadline) { return // No timeout and no deadline? Don't bother with the queue. } - scheduleTimeout(this, timeoutNanos, hasDeadline) + + lock.withLock { + check(state == STATE_IDLE) { "Unbalanced enter/exit" } + state = STATE_IN_QUEUE + insertIntoQueue(this, timeoutNanos, hasDeadline) + } } /** Returns true if the timeout occurred. */ fun exit(): Boolean { lock.withLock { - if (isCanceled) { - return hadTimeoutWhenCanceled - .also { - isCanceled = false - hadTimeoutWhenCanceled = false - } + val oldState = this.state + state = STATE_IDLE + + if (oldState == STATE_IN_QUEUE) { + removeFromQueue(this) + return false + } else { + return oldState == STATE_TIMED_OUT } - - return cancelScheduledTimeout(this) } } @@ -79,10 +80,10 @@ open class AsyncTimeout : Timeout() { super.cancel() lock.withLock { - if (isCanceled) return - if (!inQueue) return - isCanceled = true - hadTimeoutWhenCanceled = cancelScheduledTimeout(this) + if (state == STATE_IN_QUEUE) { + removeFromQueue(this) + state = STATE_CANCELED + } } } @@ -197,7 +198,7 @@ open class AsyncTimeout : Timeout() { return e } - private class Watchdog internal constructor() : Thread("Okio Watchdog") { + private class Watchdog : Thread("Okio Watchdog") { init { isDaemon = true } @@ -205,8 +206,8 @@ open class AsyncTimeout : Timeout() { override fun run() { while (true) { try { - var timedOut: AsyncTimeout? = null - AsyncTimeout.lock.withLock { + var timedOut: AsyncTimeout? + lock.withLock { timedOut = awaitTimeout() // The queue is completely empty. Let this thread exit and let another watchdog thread @@ -225,7 +226,7 @@ open class AsyncTimeout : Timeout() { } } - companion object { + private companion object { val lock: ReentrantLock = ReentrantLock() val condition: Condition = lock.newCondition() @@ -240,6 +241,43 @@ open class AsyncTimeout : Timeout() { private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60) private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS) + /* + * .-------------. + * | | + * .------------ exit() ------| CANCELED | + * | | | + * | '-------------' + * | ^ + * | | cancel() + * v | + * .-------------. .-------------. + * | |---- enter() ----->| | + * | IDLE | | IN QUEUE | + * | |<---- exit() ------| | + * '-------------' '-------------' + * ^ | + * | | time out + * | v + * | .-------------. + * | | | + * '------------ exit() ------| TIMED OUT | + * | | + * '-------------' + * + * Notes: + * * enter() crashes if called from a state other than IDLE. + * * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to + * track entered but not in the queue. + * * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for + * early implementations to support cases where enter() as a no-op. + * * cancel() is a no-op from every state but IN QUEUE. + */ + + const val STATE_IDLE = 0 + const val STATE_IN_QUEUE = 1 + const val STATE_TIMED_OUT = 2 + const val STATE_CANCELED = 3 + /** * The watchdog thread processes a linked list of pending timeouts, sorted in the order to be * triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue. @@ -250,77 +288,67 @@ open class AsyncTimeout : Timeout() { */ private var head: AsyncTimeout? = null - private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { - AsyncTimeout.lock.withLock { - check(!node.inQueue) { "Unbalanced enter/exit" } - node.inQueue = true - - // Start the watchdog thread and create the head node when the first timeout is scheduled. - if (head == null) { - head = AsyncTimeout() - Watchdog().start() - } + private fun insertIntoQueue(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) { + // Start the watchdog thread and create the head node when the first timeout is scheduled. + if (head == null) { + head = AsyncTimeout() + Watchdog().start() + } - val now = System.nanoTime() - if (timeoutNanos != 0L && hasDeadline) { - // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap - // around, minOf() is undefined for absolute values, but meaningful for relative ones. - node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now) - } else if (timeoutNanos != 0L) { - node.timeoutAt = now + timeoutNanos - } else if (hasDeadline) { - node.timeoutAt = node.deadlineNanoTime() - } else { - throw AssertionError() - } + val now = System.nanoTime() + if (timeoutNanos != 0L && hasDeadline) { + // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap + // around, minOf() is undefined for absolute values, but meaningful for relative ones. + node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now) + } else if (timeoutNanos != 0L) { + node.timeoutAt = now + timeoutNanos + } else if (hasDeadline) { + node.timeoutAt = node.deadlineNanoTime() + } else { + throw AssertionError() + } - // Insert the node in sorted order. - val remainingNanos = node.remainingNanos(now) - var prev = head!! - while (true) { - if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) { - node.next = prev.next - prev.next = node - if (prev === head) { - // Wake up the watchdog when inserting at the front. - condition.signal() - } - break + // Insert the node in sorted order. + val remainingNanos = node.remainingNanos(now) + var prev = head!! + while (true) { + if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) { + node.next = prev.next + prev.next = node + if (prev === head) { + // Wake up the watchdog when inserting at the front. + condition.signal() } - prev = prev.next!! + break } + prev = prev.next!! } } /** Returns true if the timeout occurred. */ - private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean { - if (!node.inQueue) return false - node.inQueue = false - - // Remove the node from the linked list. + private fun removeFromQueue(node: AsyncTimeout) { var prev = head while (prev != null) { if (prev.next === node) { prev.next = node.next node.next = null - return false + return } prev = prev.next } - // The node wasn't found in the linked list: it must have timed out! - return true + error("node was not found in the queue") } /** * Removes and returns the node at the head of the list, waiting for it to time out if * necessary. This returns [head] if there was no node at the head of the list when starting, * and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a - * new node was inserted while waiting. Otherwise this returns the node being waited on that has - * been removed. + * new node was inserted while waiting. Otherwise, this returns the node being waited on that + * has been removed. */ @Throws(InterruptedException::class) - internal fun awaitTimeout(): AsyncTimeout? { + fun awaitTimeout(): AsyncTimeout? { // Get the next eligible node. val node = head!!.next @@ -335,7 +363,7 @@ open class AsyncTimeout : Timeout() { } } - var waitNanos = node.remainingNanos(System.nanoTime()) + val waitNanos = node.remainingNanos(System.nanoTime()) // The head of the queue hasn't timed out yet. Await that. if (waitNanos > 0) { @@ -346,6 +374,7 @@ open class AsyncTimeout : Timeout() { // The head of the queue has timed out. Remove it. head!!.next = node.next node.next = null + node.state = STATE_TIMED_OUT return node } }