diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index f3a571d5..92058dc5 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -39,7 +39,10 @@ object `package` { private[fetch] final case class FetchOne[I, A](id: I, data: Data[I, A]) extends FetchQuery[I, A] { override def identities: NonEmptyList[I] = NonEmptyList.one(id) } - private[fetch] final case class Batch[I, A](ids: NonEmptyList[I], data: Data[I, A]) extends FetchQuery[I, A] { override def identities: NonEmptyList[I] = ids } + private[fetch] final case class Batch[I, A](ids: NonEmptyList[I], data: Data[I, A]) extends FetchQuery[I, A] { + override def identities: NonEmptyList[I] = ids + } + // Fetch result states private[fetch] sealed trait FetchStatus extends Product with Serializable @@ -308,13 +311,21 @@ object `package` { ) def liftIO[F[_] : ConcurrentEffect, A](io: IO[A]): Fetch[F, A] = - Unfetch[F, A](for { - either <- ConcurrentEffect[F].liftIO(io).attempt - result = either match { + Unfetch[F, A]( + ConcurrentEffect[F].liftIO(io).attempt.map { case Left(err) => Throw[F, A](log => UnhandledException(err, log)) case Right(r) => Done[F, A](r) } - } yield result) + ) + + def liftF[F[_] : Concurrent, A](f: F[A]): Fetch[F, A] = + Unfetch[F, A]( + f.attempt.map { + case Left(err) => Throw[F, A](log => UnhandledException(err, log)) + case Right(r) => Done[F, A](r) + } + ) + // Running a Fetch diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index 7a252302..40ca3315 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -884,4 +884,47 @@ class FetchTests extends FetchSpec { io.map(_ shouldEqual List(0, 1, 2, 42)).unsafeToFuture } + + // Concurrent[_] in Fetch + + + "We can lift Concurrent actions into Fetch" in { + def fortyTwo[F[_] : Concurrent]: F[Int] = + Concurrent[F].pure(42) + + def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = + Fetch.liftF(fortyTwo) + + Fetch.run[IO](fetch).map(_ shouldEqual 42).unsafeToFuture + } + + "A failed Concurrent action lifted into Fetch will cause a Fetch to fail" in { + def fail[F[_] : Concurrent]: F[Int] = + Concurrent[F].raiseError(AnException()) + + def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = + Fetch.liftF(fail) + + val io = Fetch.run[IO](fetch) + + io.attempt + .map(_ should matchPattern { + case Left(UnhandledException(AnException(), _)) => + }).unsafeToFuture + } + + "A Concurrent action can be combined with data fetches" in { + def concurrently[F[_] : Concurrent, A](x: A): F[A] = + Concurrent[F].pure(x) + + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = for { + x <- Fetch.liftF(concurrently(3)) + manies <- many(x) + (ones, y) <- (manies.traverse(one[F]), Fetch.liftF(concurrently(42))).tupled + } yield ones :+ y + + val io = Fetch.run[IO](fetch) + + io.map(_ shouldEqual List(0, 1, 2, 42)).unsafeToFuture + } }