diff --git a/core/shared/src/main/scala/fs2/Compiler.scala b/core/shared/src/main/scala/fs2/Compiler.scala index 000213a24a..d8ee834c29 100644 --- a/core/shared/src/main/scala/fs2/Compiler.scala +++ b/core/shared/src/main/scala/fs2/Compiler.scala @@ -64,7 +64,7 @@ private[fs2] trait CompilerLowPriority2 { init: B )(foldChunk: (B, Chunk[O]) => B): Resource[F, B] = Resource - .makeCase(CompileScope.newRoot[F])((scope, ec) => scope.close(ec).rethrow) + .makeCase(Scope.newRoot[F])((scope, ec) => scope.close(ec).rethrow) .evalMap(scope => Pull.compile(stream, scope, true, init)(foldChunk)) } } @@ -152,7 +152,7 @@ object Compiler extends CompilerLowPriority { init: Out, foldChunk: (Out, Chunk[O]) => Out ): F[Out] = - CompileScope + Scope .newRoot[F](this) .flatMap(scope => Pull @@ -193,7 +193,7 @@ object Compiler extends CompilerLowPriority { foldChunk: (Out, Chunk[O]) => Out ): F[Out] = Resource - .makeCase(CompileScope.newRoot[F](this))((scope, ec) => scope.close(ec).rethrow) + .makeCase(Scope.newRoot[F](this))((scope, ec) => scope.close(ec).rethrow) .use(scope => Pull.compile[F, O, Out](p, scope, false, init)(foldChunk)) } diff --git a/core/shared/src/main/scala/fs2/Pull.scala b/core/shared/src/main/scala/fs2/Pull.scala index 042a1c1a8e..c9651d0e39 100644 --- a/core/shared/src/main/scala/fs2/Pull.scala +++ b/core/shared/src/main/scala/fs2/Pull.scala @@ -219,7 +219,7 @@ object Pull extends PullLowPriority { )(implicit F: MonadError[F, Throwable]): Pull[F, INothing, Stream[F, O]] = for { scope <- Pull.getScope[F] - lease <- Pull.eval(scope.leaseOrError) + lease <- Pull.eval(scope.lease) } yield s.onFinalize(lease.cancel.redeemWith(F.raiseError(_), _ => F.unit)) /** Repeatedly uses the output of the pull as input for the next step of the pull. @@ -269,7 +269,7 @@ object Pull extends PullLowPriority { /** Gets the current scope, allowing manual leasing or interruption. * This is a low-level method and generally should not be used by user code. */ - def getScope[F[_]]: Pull[F, INothing, Scope[F]] = GetScope[F]() + private[fs2] def getScope[F[_]]: Pull[F, INothing, Scope[F]] = GetScope[F]() /** Returns a pull that evaluates the supplied by-name each time the pull is used, * allowing use of a mutable value in pull computations. @@ -528,6 +528,9 @@ object Pull extends PullLowPriority { useInterruption: Boolean ) extends Action[F, O, Unit] + private final case class InterruptWhen[+F[_]](haltOnSignal: F[Either[Throwable, Unit]]) + extends AlgEffect[F, Unit] + // `InterruptedScope` contains id of the scope currently being interrupted // together with any errors accumulated during interruption process private final case class CloseScope( @@ -536,8 +539,7 @@ object Pull extends PullLowPriority { exitCase: ExitCase ) extends AlgEffect[Pure, Unit] - private final case class GetScope[F[_]]() extends AlgEffect[Pure, CompileScope[F]] - private[fs2] def getScopeInternal[F[_]]: Pull[Pure, INothing, CompileScope[F]] = GetScope[F]() + private final case class GetScope[F[_]]() extends AlgEffect[Pure, Scope[F]] private[fs2] def stepLeg[F[_], O]( leg: Stream.StepLeg[F, O] @@ -558,6 +560,10 @@ object Pull extends PullLowPriority { */ private[fs2] def interruptScope[F[_], O](s: Pull[F, O, Unit]): Pull[F, O, Unit] = InScope(s, true) + private[fs2] def interruptWhen[F[_], O]( + haltOnSignal: F[Either[Throwable, Unit]] + ): Pull[F, O, Unit] = InterruptWhen(haltOnSignal) + private[fs2] def uncons[F[_], X, O]( s: Pull[F, O, Unit] ): Pull[F, X, Option[(Chunk[O], Pull[F, O, Unit])]] = @@ -565,12 +571,12 @@ object Pull extends PullLowPriority { /* Left-folds the output of a stream. * - * Interruption of the stream is tightly coupled between Pull and CompileScope. + * Interruption of the stream is tightly coupled between Pull and Scope. * Reason for this is unlike interruption of `F` type (e.g. IO) we need to find * recovery point where stream evaluation has to continue in Stream algebra. * * As such the `Token` is passed to Result.Interrupted as glue between Pull that allows pass-along - * the information to correctly compute recovery point after interruption was signalled via `CompileScope`. + * the information to correctly compute recovery point after interruption was signalled via `Scope`. * * This token indicates scope of the computation where interruption actually happened. * This is used to precisely find most relevant interruption scope where interruption shall be resumed @@ -581,7 +587,7 @@ object Pull extends PullLowPriority { */ private[fs2] def compile[F[_], O, B]( stream: Pull[F, O, Unit], - initScope: CompileScope[F], + initScope: Scope[F], extendLastTopLevelScope: Boolean, init: B )(foldChunk: (B, Chunk[O]) => B)(implicit @@ -589,8 +595,8 @@ object Pull extends PullLowPriority { ): F[B] = { sealed trait R[+G[_], +X] - case class Done(scope: CompileScope[F]) extends R[Pure, INothing] - case class Out[+G[_], +X](head: Chunk[X], scope: CompileScope[F], tail: Pull[G, X, Unit]) + case class Done(scope: Scope[F]) extends R[Pure, INothing] + case class Out[+G[_], +X](head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]) extends R[G, X] case class Interrupted(scopeId: Token, err: Option[Throwable]) extends R[Pure, INothing] @@ -603,19 +609,19 @@ object Pull extends PullLowPriority { case Interrupted(scopeId, err) => interrupted(scopeId, err) } - def done(scope: CompileScope[F]): End - def out(head: Chunk[X], scope: CompileScope[F], tail: Pull[G, X, Unit]): End + def done(scope: Scope[F]): End + def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): End def interrupted(scopeId: Token, err: Option[Throwable]): End } def go[G[_], X]( - scope: CompileScope[F], - extendedTopLevelScope: Option[CompileScope[F]], + scope: Scope[F], + extendedTopLevelScope: Option[Scope[F]], translation: G ~> F, stream: Pull[G, X, Unit] ): F[R[G, X]] = { - def interruptGuard(scope: CompileScope[F], view: Cont[Nothing, G, X])( + def interruptGuard(scope: Scope[F], view: Cont[Nothing, G, X])( next: => F[R[G, X]] ): F[R[G, X]] = scope.isInterrupted.flatMap { @@ -655,10 +661,10 @@ object Pull extends PullLowPriority { def viewRunner(view: Cont[Unit, G, X]): RunR[G, X, F[R[G, X]]] = new RunR[G, X, F[R[G, X]]] { - def done(doneScope: CompileScope[F]): F[R[G, X]] = + def done(doneScope: Scope[F]): F[R[G, X]] = go(doneScope, extendedTopLevelScope, translation, view(Result.unit)) - def out(head: Chunk[X], scope: CompileScope[F], tail: Pull[G, X, Unit]): F[R[G, X]] = { + def out(head: Chunk[X], scope: Scope[F], tail: Pull[G, X, Unit]): F[R[G, X]] = { val contTail = new Bind[G, X, Unit, Unit](tail) { def cont(r: Result[Unit]) = view(r) } @@ -695,13 +701,13 @@ object Pull extends PullLowPriority { class StepRunR() extends RunR[G, Y, F[R[G, X]]] { - def done(scope: CompileScope[F]): F[R[G, X]] = + def done(scope: Scope[F]): F[R[G, X]] = interruptGuard(scope, view) { val result = Result.Succeeded(None) go(scope, extendedTopLevelScope, translation, view(result)) } - def out(head: Chunk[Y], outScope: CompileScope[F], tail: Pull[G, Y, Unit]): F[R[G, X]] = { + def out(head: Chunk[Y], outScope: Scope[F], tail: Pull[G, Y, Unit]): F[R[G, X]] = { // if we originally swapped scopes we want to return the original // scope back to the go as that is the scope that is expected to be here. val nextScope = if (u.scope.isEmpty) outScope else scope @@ -717,7 +723,7 @@ object Pull extends PullLowPriority { } // if scope was specified in step, try to find it, otherwise use the current scope. - val stepScopeF: F[CompileScope[F]] = u.scope match { + val stepScopeF: F[Scope[F]] = u.scope match { case None => F.pure(scope) case Some(scopeId) => scope.shiftScope(scopeId, u.toString) } @@ -760,6 +766,26 @@ object Pull extends PullLowPriority { interruptGuard(scope, view)(cont) } + def goInterruptWhen( + haltOnSignal: F[Either[Throwable, Unit]], + view: Cont[Unit, G, X] + ): F[R[G, X]] = { + val onScope = scope.acquireResource( + _ => scope.interruptWhen(haltOnSignal), + (f: Fiber[F, Throwable, Unit], _: ExitCase) => f.cancel + ) + val cont = onScope.flatMap { outcome => + val result = outcome match { + case Outcome.Succeeded(Right(_)) => Result.Succeeded(()) + case Outcome.Succeeded(Left(scopeId)) => Result.Interrupted(scopeId, None) + case Outcome.Canceled() => Result.Interrupted(scope.id, None) + case Outcome.Errored(err) => Result.Fail(err) + } + go(scope, extendedTopLevelScope, translation, view(result)) + } + interruptGuard(scope, view)(cont) + } + def goInScope( stream: Pull[G, X, Unit], useInterruption: Boolean, @@ -803,7 +829,7 @@ object Pull extends PullLowPriority { } def goCloseScope(close: CloseScope, view: Cont[Unit, G, X]): F[R[G, X]] = { - def closeAndGo(toClose: CompileScope[F]) = + def closeAndGo(toClose: Scope[F]) = toClose.close(close.exitCase).flatMap { r => toClose.openAncestor.flatMap { ancestor => val res = close.interruption match { @@ -824,7 +850,7 @@ object Pull extends PullLowPriority { } } - val scopeToClose: F[Option[CompileScope[F]]] = scope + val scopeToClose: F[Option[Scope[F]]] = scope .findSelfOrAncestor(close.scopeId) .pure[F] .orElse(scope.findSelfOrChild(close.scopeId)) @@ -896,6 +922,9 @@ object Pull extends PullLowPriority { val uu = inScope.stream.asInstanceOf[Pull[g, X, Unit]] goInScope(uu, inScope.useInterruption, view.asInstanceOf[View[g, X, Unit]]) + case int: InterruptWhen[g] => + goInterruptWhen(translation(int.haltOnSignal), view) + case close: CloseScope => goCloseScope(close, view.asInstanceOf[View[G, X, Unit]]) } @@ -904,7 +933,7 @@ object Pull extends PullLowPriority { val initFk: F ~> F = cats.arrow.FunctionK.id[F] - def outerLoop(scope: CompileScope[F], accB: B, stream: Pull[F, O, Unit]): F[B] = + def outerLoop(scope: Scope[F], accB: B, stream: Pull[F, O, Unit]): F[B] = go[F, O](scope, None, initFk, stream).flatMap { case Done(_) => F.pure(accB) case out: Out[f, o] => diff --git a/core/shared/src/main/scala/fs2/Scope.scala b/core/shared/src/main/scala/fs2/Scope.scala deleted file mode 100644 index 40aa863650..0000000000 --- a/core/shared/src/main/scala/fs2/Scope.scala +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2013 Functional Streams for Scala - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -package fs2 - -import cats.MonadError -import cats.syntax.all._ - -/** Represents a period of stream execution in which resources are acquired and released. - * - * Note: this type is generally used to implement low-level actions that manipulate - * resource lifetimes and hence, isn't generally used by user-level code. - */ -abstract class Scope[F[_]] { - - /** Leases the resources of this scope until the returned lease is cancelled. - * - * Note that this leases all resources in this scope, resources in all parent scopes (up to root) - * and resources of all child scopes. - * - * `None` is returned if this scope is already closed. Otherwise a lease is returned, - * which must be cancelled. Upon cancellation, resource finalizers may be run, depending on the - * state of the owning scopes. - * - * Resources may be finalized during the execution of this method and before the lease has been acquired - * for a resource. In such an event, the already finalized resource won't be leased. As such, it is - * important to call `lease` only when all resources are known to be non-finalized / non-finalizing. - * - * When the lease is returned, all resources available at the time `lease` was called have been - * successfully leased. - */ - def lease: F[Option[Scope.Lease[F]]] - - /** Like [[lease]], but fails with an error if the scope is closed. */ - def leaseOrError(implicit F: MonadError[F, Throwable]): F[Scope.Lease[F]] = - lease.flatMap { - case Some(l) => F.pure(l) - case None => F.raiseError(new Throwable("Scope closed at time of lease")) - } - - /** Interrupts evaluation of the current scope. Only scopes previously indicated with Stream.interruptScope may be interrupted. - * For other scopes this will fail. - * - * Interruption is final and may take two forms: - * - * When invoked on right side, that will interrupt only current scope evaluation, and will resume when control is given - * to next scope. - * - * When invoked on left side, then this will inject given throwable like it will be caused by stream evaluation, - * and then, without any error handling the whole stream will fail with supplied throwable. - */ - def interrupt(cause: Either[Throwable, Unit]): F[Unit] -} - -object Scope { - - /** Represents one or more resources that were leased from a scope, causing their - * lifetimes to be extended until `cancel` is invoked on this lease. - */ - abstract class Lease[F[_]] { - - /** Cancels the lease of all resources tracked by this lease. - * - * This may run finalizers on some of the resources (depending on the state of their owning scopes). - * If one or more finalizers fail, the returned action completes with a `Left(t)`, providing the failure. - */ - def cancel: F[Either[Throwable, Unit]] - } -} diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index ff16db5d73..e6bb555069 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1603,10 +1603,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def interruptWhen[F2[x] >: F[x]: Concurrent]( haltOnSignal: F2[Either[Throwable, Unit]] ): Stream[F2, O] = - Stream - .getScope[F2] - .flatMap(scope => Stream.supervise(haltOnSignal.flatMap(scope.interrupt)) >> this) - .interruptScope + (Pull.interruptWhen(haltOnSignal) >> this.pull.echo).stream.interruptScope /** Creates a scope that may be interrupted by calling scope#interrupt. */ @@ -3202,7 +3199,7 @@ object Stream extends StreamLowPriority { /** Gets the current scope, allowing manual leasing or interruption. * This is a low-level method and generally should not be used by user code. */ - def getScope[F[x] >: Pure[x]]: Stream[F, Scope[F]] = + private def getScope[F[x] >: Pure[x]]: Stream[F, Scope[F]] = new Stream(Pull.getScope[F].flatMap(Pull.output1(_))) /** A stream that never emits and never terminates. @@ -3644,7 +3641,7 @@ object Stream extends StreamLowPriority { // and that it must be released once the inner stream terminates or fails. def runInner(inner: Stream[F, O], outerScope: Scope[F]): F[Unit] = F.uncancelable { _ => - outerScope.leaseOrError.flatMap { lease => + outerScope.lease.flatMap { lease => available.acquire >> incrementRunning >> F.start { @@ -4006,7 +4003,7 @@ object Stream extends StreamLowPriority { * If you are not pulling from multiple streams, consider using `uncons`. */ def stepLeg: Pull[F, INothing, Option[StepLeg[F, O]]] = - Pull.getScopeInternal[F].flatMap { scope => + Pull.getScope[F].flatMap { scope => new StepLeg[F, O](Chunk.empty, scope.id, self.underlying).stepLeg } diff --git a/core/shared/src/main/scala/fs2/internal/InterruptContext.scala b/core/shared/src/main/scala/fs2/internal/InterruptContext.scala index 414f535208..cf9efb9449 100644 --- a/core/shared/src/main/scala/fs2/internal/InterruptContext.scala +++ b/core/shared/src/main/scala/fs2/internal/InterruptContext.scala @@ -22,7 +22,7 @@ package fs2.internal import cats.{Applicative, Id} -import cats.effect.kernel.{Concurrent, Deferred, Outcome, Ref} +import cats.effect.kernel.{Concurrent, Deferred, Fiber, Outcome, Ref} import cats.effect.kernel.implicits._ import cats.syntax.all._ import InterruptContext.InterruptionOutcome @@ -45,9 +45,12 @@ final private[fs2] case class InterruptContext[F[_]]( cancelParent: F[Unit] )(implicit F: Concurrent[F]) { self => - def complete(outcome: InterruptionOutcome): F[Unit] = + private def complete(outcome: InterruptionOutcome): F[Unit] = ref.update(_.orElse(Some(outcome))).guarantee(deferred.complete(outcome).void) + def completeWhen(outcome: F[InterruptionOutcome]): F[Fiber[F, Throwable, Unit]] = + F.start(outcome.flatMap(complete)) + /** Creates a [[InterruptContext]] for a child scope which can be interruptible as well. * * In case the child scope is interruptible, this will ensure that this scope interrupt will diff --git a/core/shared/src/main/scala/fs2/internal/Lease.scala b/core/shared/src/main/scala/fs2/internal/Lease.scala new file mode 100644 index 0000000000..7931fde4cf --- /dev/null +++ b/core/shared/src/main/scala/fs2/internal/Lease.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2.internal + +/** Represents one or more resources that were leased from a scope, causing their + * lifetimes to be extended until `cancel` is invoked on this lease. + */ +private[fs2] abstract class Lease[F[_]] { + + /** Cancels the lease of all resources tracked by this lease. + * + * This may run finalizers on some of the resources (depending on the state of their owning scopes). + * If one or more finalizers fail, the returned action completes with a `Left(t)`, providing the failure. + */ + def cancel: F[Either[Throwable, Unit]] +} diff --git a/core/shared/src/main/scala/fs2/internal/CompileScope.scala b/core/shared/src/main/scala/fs2/internal/Scope.scala similarity index 79% rename from core/shared/src/main/scala/fs2/internal/CompileScope.scala rename to core/shared/src/main/scala/fs2/internal/Scope.scala index c2e29c31ce..053d6a9eae 100644 --- a/core/shared/src/main/scala/fs2/internal/CompileScope.scala +++ b/core/shared/src/main/scala/fs2/internal/Scope.scala @@ -25,15 +25,13 @@ import scala.annotation.tailrec import cats.{Id, Traverse, TraverseFilter} import cats.data.Chain -import cats.effect.kernel.{Outcome, Poll, Ref, Resource} +import cats.effect.kernel.{Fiber, Outcome, Poll, Ref, Resource} import cats.syntax.all._ -import fs2.{Compiler, CompositeFailure, Scope} +import fs2.{Compiler, CompositeFailure} import fs2.internal.InterruptContext.InterruptionOutcome -/** Implementation of [[Scope]] for the internal stream interpreter. - * - * Represents a period of stream execution in which resources are acquired and released. +/** Represents a period of stream execution in which resources are acquired and released. * A scope has a state, consisting of resources (with associated finalizers) acquired in this scope * and child scopes spawned from this scope. * @@ -80,21 +78,20 @@ import fs2.internal.InterruptContext.InterruptionOutcome * of Eval, that when interruption is enabled on scope will be wrapped in race, * that eventually allows interruption while eval is evaluating. */ -private[fs2] final class CompileScope[F[_]] private ( +private[fs2] final class Scope[F[_]] private ( val id: Token, - val parent: Option[CompileScope[F]], + val parent: Option[Scope[F]], val interruptible: Option[InterruptContext[F]], - private val state: Ref[F, CompileScope.State[F]] -)(implicit val F: Compiler.Target[F]) - extends Scope[F] { self => + private val state: Ref[F, Scope.State[F]] +)(implicit val F: Compiler.Target[F]) { self => /** Registers supplied resource in this scope. * Returns false and makes no registration if this scope has been closed. */ private def register(resource: ScopedResource[F]): F[Boolean] = state.modify { - case s: CompileScope.State.Open[F] => (s.copy(resources = resource +: s.resources), true) - case s: CompileScope.State.Closed[F] => (s, false) + case s: Scope.State.Open[F] => (s.copy(resources = resource +: s.resources), true) + case s: Scope.State.Closed[F] => (s, false) } /** Opens a child scope. @@ -104,7 +101,7 @@ private[fs2] final class CompileScope[F[_]] private ( * * Returns scope that has to be used in next compilation step. */ - def open(interruptible: Boolean): F[Either[Throwable, CompileScope[F]]] = { + def open(interruptible: Boolean): F[Either[Throwable, Scope[F]]] = { /* * Creates a context for a new scope. * @@ -121,24 +118,24 @@ private[fs2] final class CompileScope[F[_]] private ( * or as a result of interrupting this scope. But it should not propagate its own interruption to this scope. * */ - val createCompileScope: F[CompileScope[F]] = Token[F].flatMap { newScopeId => + val createScope: F[Scope[F]] = Token[F].flatMap { newScopeId => self.interruptible match { case None => val optFCtx = if (interruptible) F.interruptContext(newScopeId) else None - optFCtx.sequence.flatMap(iCtx => CompileScope[F](newScopeId, Some(self), iCtx)) + optFCtx.sequence.flatMap(iCtx => Scope[F](newScopeId, Some(self), iCtx)) case Some(parentICtx) => parentICtx.childContext(interruptible, newScopeId).flatMap { iCtx => - CompileScope[F](newScopeId, Some(self), Some(iCtx)) + Scope[F](newScopeId, Some(self), Some(iCtx)) } } } - createCompileScope.flatMap { scope => + createScope.flatMap { scope => state .modify { - case s: CompileScope.State.Closed[F] => (s, None) - case s: CompileScope.State.Open[F] => + case s: Scope.State.Closed[F] => (s, None) + case s: Scope.State.Open[F] => (s.copy(children = scope +: s.children), Some(scope)) } .flatMap { @@ -212,15 +209,15 @@ private[fs2] final class CompileScope[F[_]] private ( */ private def releaseChildScope(id: Token): F[Unit] = state.update { - case s: CompileScope.State.Open[F] => s.unregisterChild(id) - case s: CompileScope.State.Closed[F] => s + case s: Scope.State.Open[F] => s.unregisterChild(id) + case s: Scope.State.Closed[F] => s } /** Returns all direct resources of this scope (does not return resources in ancestor scopes or child scopes). * */ private def resources: F[Chain[ScopedResource[F]]] = state.get.map { - case s: CompileScope.State.Open[F] => s.resources - case _: CompileScope.State.Closed[F] => Chain.empty + case s: Scope.State.Open[F] => s.resources + case _: Scope.State.Closed[F] => Chain.empty } /** Traverses supplied `Chain` with `f` that may produce a failure, and collects these failures. @@ -249,10 +246,10 @@ private[fs2] final class CompileScope[F[_]] private ( * more details. */ def close(ec: Resource.ExitCase): F[Either[Throwable, Unit]] = - state.modify(s => CompileScope.State.closed -> s).flatMap { - case previous: CompileScope.State.Open[F] => + state.modify(s => Scope.State.closed -> s).flatMap { + case previous: Scope.State.Open[F] => for { - resultChildren <- traverseError[CompileScope[F]](previous.children, _.close(ec)) + resultChildren <- traverseError[Scope[F]](previous.children, _.close(ec)) resultResources <- traverseError[ScopedResource[F]](previous.resources, _.release(ec)) _ <- self.interruptible.map(_.cancelParent).getOrElse(F.unit) _ <- self.parent.fold(F.unit)(_.releaseChildScope(self.id)) @@ -263,22 +260,22 @@ private[fs2] final class CompileScope[F[_]] private ( ) CompositeFailure.fromList(results.toList).toLeft(()) } - case _: CompileScope.State.Closed[F] => F.pure(Right(())) + case _: Scope.State.Closed[F] => F.pure(Right(())) } /** Returns closest open parent scope or root. */ - def openAncestor: F[CompileScope[F]] = + def openAncestor: F[Scope[F]] = self.parent.fold(F.pure(self)) { parent => parent.state.get.flatMap { - case _: CompileScope.State.Open[F] => F.pure(parent) - case _: CompileScope.State.Closed[F] => parent.openAncestor + case _: Scope.State.Open[F] => F.pure(parent) + case _: Scope.State.Closed[F] => parent.openAncestor } } /** Gets all ancestors of this scope, inclusive of root scope. * */ - private def ancestors: Chain[CompileScope[F]] = { + private def ancestors: Chain[Scope[F]] = { @tailrec - def go(curr: CompileScope[F], acc: Chain[CompileScope[F]]): Chain[CompileScope[F]] = + def go(curr: Scope[F], acc: Chain[Scope[F]]): Chain[Scope[F]] = curr.parent match { case Some(parent) => go(parent, acc :+ parent) case None => acc @@ -286,10 +283,10 @@ private[fs2] final class CompileScope[F[_]] private ( go(self, Chain.empty) } - /** finds ancestor of this scope given `scopeId` * */ - def findSelfOrAncestor(scopeId: Token): Option[CompileScope[F]] = { + /** Finds ancestor of this scope given `scopeId`. */ + def findSelfOrAncestor(scopeId: Token): Option[Scope[F]] = { @tailrec - def go(curr: CompileScope[F]): Option[CompileScope[F]] = + def go(curr: Scope[F]): Option[Scope[F]] = if (curr.id == scopeId) Some(curr) else curr.parent match { @@ -299,37 +296,37 @@ private[fs2] final class CompileScope[F[_]] private ( go(self) } - /** finds scope in child hierarchy of current scope * */ - def findSelfOrChild(scopeId: Token): F[Option[CompileScope[F]]] = { - def go(scopes: Chain[CompileScope[F]]): F[Option[CompileScope[F]]] = + /** Finds scope in child hierarchy of current scope. */ + def findSelfOrChild(scopeId: Token): F[Option[Scope[F]]] = { + def go(scopes: Chain[Scope[F]]): F[Option[Scope[F]]] = scopes.uncons match { case None => F.pure(None) case Some((scope, tail)) => if (scope.id == scopeId) F.pure(Some(scope)) else scope.state.get.flatMap { - case s: CompileScope.State.Open[F] => + case s: Scope.State.Open[F] => if (s.children.isEmpty) go(tail) else go(s.children).flatMap { case None => go(tail) case Some(scope) => F.pure(Some(scope)) } - case _: CompileScope.State.Closed[F] => go(tail) + case _: Scope.State.Closed[F] => go(tail) } } if (self.id == scopeId) F.pure(Some(self)) else state.get.flatMap { - case s: CompileScope.State.Open[F] => go(s.children) - case _: CompileScope.State.Closed[F] => F.pure(None) + case s: Scope.State.Open[F] => go(s.children) + case _: Scope.State.Closed[F] => F.pure(None) } } /** Tries to shift from the current scope with the given ScopeId, if one exists. * If not, throws an error. */ - def shiftScope(scopeId: Token, context: => String): F[CompileScope[F]] = + def shiftScope(scopeId: Token, context: => String): F[Scope[F]] = findStepScope(scopeId).flatMap { case Some(scope) => F.pure(scope) case None => @@ -352,9 +349,9 @@ private[fs2] final class CompileScope[F[_]] private ( * - check if id is parent or any of its children * - traverse all known scope ids, starting from the root. */ - def findStepScope(scopeId: Token): F[Option[CompileScope[F]]] = { + def findStepScope(scopeId: Token): F[Option[Scope[F]]] = { @tailrec - def go(scope: CompileScope[F]): CompileScope[F] = + def go(scope: Scope[F]): Scope[F] = scope.parent match { case None => scope case Some(parent) => go(parent) @@ -372,36 +369,20 @@ private[fs2] final class CompileScope[F[_]] private ( } } - // See docs on [[Scope#lease]] - def lease: F[Option[Scope.Lease[F]]] = - state.get.flatMap { - case s: CompileScope.State.Open[F] => - val allScopes = (s.children :+ self) ++ ancestors - Traverse[Chain].flatTraverse(allScopes)(_.resources).flatMap { allResources => - TraverseFilter[Chain].traverseFilter(allResources)(r => r.lease).map { allLeases => - val lease = new Scope.Lease[F] { - def cancel: F[Either[Throwable, Unit]] = - traverseError[Scope.Lease[F]](allLeases, _.cancel) - } - Some(lease) - } - } - case _: CompileScope.State.Closed[F] => F.pure(None) - } - - // See docs on [[Scope#interrupt]] - def interrupt(cause: Either[Throwable, Unit]): F[Unit] = + def interruptWhen(haltWhen: F[Either[Throwable, Unit]]): F[Fiber[F, Throwable, Unit]] = interruptible match { case None => F.raiseError( new IllegalStateException("Scope#interrupt called for Scope that cannot be interrupted") ) case Some(iCtx) => - val outcome: InterruptionOutcome = cause.fold( - t => Outcome.Errored(t), - _ => Outcome.Succeeded[Id, Throwable, Token](iCtx.interruptRoot) + val outcome: F[InterruptionOutcome] = haltWhen.map( + _.fold( + t => Outcome.Errored(t), + _ => Outcome.Succeeded[Id, Throwable, Token](iCtx.interruptRoot) + ) ) - iCtx.complete(outcome) + iCtx.completeWhen(outcome) } /** Checks if current scope is interrupted. @@ -434,22 +415,55 @@ private[fs2] final class CompileScope[F[_]] private ( iCtx.eval(f) } + /** Leases the resources of this scope until the returned lease is cancelled. + * + * Note that this leases all resources in this scope, resources in all parent scopes (up to root) + * and resources of all child scopes. + * + * An error is raised if this scope is already closed. Otherwise a lease is returned, + * which must be cancelled. Upon cancellation, resource finalizers may be run, depending on the + * state of the owning scopes. + * + * Resources may be finalized during the execution of this method and before the lease has been acquired + * for a resource. In such an event, the already finalized resource won't be leased. As such, it is + * important to call `lease` only when all resources are known to be non-finalized / non-finalizing. + * + * When the lease is returned, all resources available at the time `lease` was called have been + * successfully leased. + */ + def lease: F[Lease[F]] = + state.get.flatMap { + case s: Scope.State.Open[F] => + val allScopes = (s.children :+ self) ++ ancestors + Traverse[Chain].flatTraverse(allScopes)(_.resources).flatMap { allResources => + TraverseFilter[Chain].traverseFilter(allResources)(r => r.lease).map { allLeases => + val lease = new Lease[F] { + def cancel: F[Either[Throwable, Unit]] = + traverseError[Lease[F]](allLeases, _.cancel) + } + lease + } + } + case _: Scope.State.Closed[F] => + F.raiseError(new RuntimeException("Scope closed at time of lease")) + } + override def toString = - s"CompileScope(id=$id,interruptible=${interruptible.nonEmpty})" + s"Scope(id=$id,interruptible=${interruptible.nonEmpty})" } -private[fs2] object CompileScope { +private[fs2] object Scope { private def apply[F[_]]( id: Token, - parent: Option[CompileScope[F]], + parent: Option[Scope[F]], interruptible: Option[InterruptContext[F]] - )(implicit F: Compiler.Target[F]): F[CompileScope[F]] = - F.ref(CompileScope.State.initial[F]) - .map(state => new CompileScope[F](id, parent, interruptible, state)) + )(implicit F: Compiler.Target[F]): F[Scope[F]] = + F.ref(Scope.State.initial[F]) + .map(state => new Scope[F](id, parent, interruptible, state)) /** Creates a new root scope. */ - def newRoot[F[_]: Compiler.Target]: F[CompileScope[F]] = + def newRoot[F[_]: Compiler.Target]: F[Scope[F]] = Token[F].flatMap(apply[F](_, None, None)) private sealed trait State[F[_]] @@ -463,7 +477,7 @@ private[fs2] object CompileScope { * split to multiple asynchronously acquired scopes and resources. * Still, likewise for resources they are released in reverse order. */ - case class Open[F[_]](resources: Chain[ScopedResource[F]], children: Chain[CompileScope[F]]) + case class Open[F[_]](resources: Chain[ScopedResource[F]], children: Chain[Scope[F]]) extends State[F] { self => def unregisterChild(id: Token): State[F] = self.children.deleteFirst(_.id == id) match { diff --git a/core/shared/src/main/scala/fs2/internal/ScopedResource.scala b/core/shared/src/main/scala/fs2/internal/ScopedResource.scala index 220ca9aa32..a45452c38f 100644 --- a/core/shared/src/main/scala/fs2/internal/ScopedResource.scala +++ b/core/shared/src/main/scala/fs2/internal/ScopedResource.scala @@ -24,7 +24,7 @@ package fs2.internal import cats.effect.kernel.Resource import cats.implicits._ -import fs2.{Compiler, Scope} +import fs2.Compiler /** Represents a resource acquired during stream interpretation. * @@ -93,7 +93,7 @@ private[fs2] sealed abstract class ScopedResource[F[_]] { * Yields to `Some(lease)`, if this resource was successfully leased, and scope must bind `lease.cancel` it when not needed anymore. * or to `None` when this resource cannot be leased because resource is already released. */ - def lease: F[Option[Scope.Lease[F]]] + def lease: F[Option[Lease[F]]] } private[internal] object ScopedResource { @@ -156,7 +156,7 @@ private[internal] object ScopedResource { } }.flatten - def lease: F[Option[Scope.Lease[F]]] = + def lease: F[Option[Lease[F]]] = state.modify { s => if (s.open) s.copy(leases = s.leases + 1) -> Some(TheLease) @@ -164,7 +164,7 @@ private[internal] object ScopedResource { s -> None } - private[this] object TheLease extends Scope.Lease[F] { + private[this] object TheLease extends Lease[F] { def cancel: F[Either[Throwable, Unit]] = state .modify { s => diff --git a/core/shared/src/test/scala/fs2/StreamTranslateSuite.scala b/core/shared/src/test/scala/fs2/StreamTranslateSuite.scala index 12a0f7166b..9d89eba2a9 100644 --- a/core/shared/src/test/scala/fs2/StreamTranslateSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamTranslateSuite.scala @@ -24,7 +24,7 @@ package fs2 import scala.concurrent.duration._ import cats.~> -import cats.effect.IO +import cats.effect.{Async, IO} import org.scalacheck.effect.PropF.forAllF class StreamTranslateSuite extends Fs2Suite { @@ -154,13 +154,19 @@ class StreamTranslateSuite extends Fs2Suite { } test("translateInterruptible") { + type Eff[A] = cats.data.EitherT[IO, String, A] + val Eff = Async[Eff] Stream - .eval(IO.never) - .merge(Stream.eval(IO(1)).delayBy(5.millis).repeat) + .eval(Eff.never) + .merge(Stream.eval(Eff.delay(1)).delayBy(5.millis).repeat) .interruptAfter(10.millis) - .translate(cats.arrow.FunctionK.id[IO]) + .translate(new (Eff ~> IO) { + def apply[X](eff: Eff[X]) = eff.value.flatMap { + case Left(t) => IO.raiseError(new RuntimeException(t)) + case Right(x) => IO.pure(x) + } + }) .compile .drain } - }