diff --git a/.scalafmt b/.scalafmt new file mode 100644 index 00000000..0272cf8c --- /dev/null +++ b/.scalafmt @@ -0,0 +1,2 @@ +--style defaultWithAlign +--maxColumn 100 \ No newline at end of file diff --git a/build.sbt b/build.sbt index e3828e61..ced2c6c8 100644 --- a/build.sbt +++ b/build.sbt @@ -28,8 +28,9 @@ lazy val commonSettings = Seq( "-language:higherKinds", "-language:existentials", "-language:postfixOps" - ) -) + ), + scalafmtConfig := Some(file(".scalafmt")) +) ++ reformatOnCompileSettings lazy val allSettings = buildSettings ++ commonSettings @@ -54,3 +55,4 @@ lazy val root = project.in(file(".")) publish := {}, publishLocal := {} ) + diff --git a/project/plugins.sbt b/project/plugins.sbt index 510c78b8..b31d6cde 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,3 @@ addSbtPlugin("org.scala-js" % "sbt-scalajs" % "0.6.8") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "1.5.1") +addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "0.2.5") diff --git a/shared/src/main/scala/cache.scala b/shared/src/main/scala/cache.scala index 0dfc7007..d3f66c5e 100644 --- a/shared/src/main/scala/cache.scala +++ b/shared/src/main/scala/cache.scala @@ -16,16 +16,16 @@ package fetch - /** - * A `Cache` trait so the users of the library can provide their own cache. - */ + * A `Cache` trait so the users of the library can provide their own cache. + */ trait DataSourceCache { def update[A](k: DataSourceIdentity, v: A): DataSourceCache def get(k: DataSourceIdentity): Option[Any] - def cacheResults[I, A](results: Map[I, A], ds: DataSource[I, A]): DataSourceCache = { + def cacheResults[I, A]( + results: Map[I, A], ds: DataSource[I, A]): DataSourceCache = { results.foldLeft(this)({ case (acc, (i, a)) => acc.update(ds.identity(i), a) }) diff --git a/shared/src/main/scala/datasource.scala b/shared/src/main/scala/datasource.scala index 19f4d747..c4e5c574 100644 --- a/shared/src/main/scala/datasource.scala +++ b/shared/src/main/scala/datasource.scala @@ -19,10 +19,10 @@ package fetch import cats.Eval /** - * A `DataSource` is the recipe for fetching a certain identity `I`, which yields - * results of type `A` with the concurrency and error handling specified by the Monad - * `M`. - */ + * A `DataSource` is the recipe for fetching a certain identity `I`, which yields + * results of type `A` with the concurrency and error handling specified by the Monad + * `M`. + */ trait DataSource[I, A] { def name: DataSourceName = this.toString def identity(i: I): DataSourceIdentity = (name, i) diff --git a/shared/src/main/scala/env.scala b/shared/src/main/scala/env.scala index 89472086..2ca7b4b7 100644 --- a/shared/src/main/scala/env.scala +++ b/shared/src/main/scala/env.scala @@ -19,9 +19,9 @@ package fetch import scala.collection.immutable._ /** - * An environment that is passed along during the fetch rounds. It holds the - * cache and the list of rounds that have been executed. - */ + * An environment that is passed along during the fetch rounds. It holds the + * cache and the list of rounds that have been executed. + */ trait Env { def cache: DataSourceCache def rounds: Seq[Round] @@ -33,22 +33,22 @@ trait Env { rounds.filterNot(_.cached) def next( - newCache: DataSourceCache, - newRound: Round, - newIds: List[Any] + newCache: DataSourceCache, + newRound: Round, + newIds: List[Any] ): Env } /** - * A data structure that holds information about a fetch round. - */ + * A data structure that holds information about a fetch round. + */ case class Round( - cache: DataSourceCache, - ds: DataSourceName, - kind: RoundKind, - startRound: Long, - endRound: Long, - cached: Boolean = false + cache: DataSourceCache, + ds: DataSourceName, + kind: RoundKind, + startRound: Long, + endRound: Long, + cached: Boolean = false ) { def duration: Double = (endRound - startRound) / 1e6 @@ -64,17 +64,18 @@ final case class ManyRound(ids: List[Any]) extends RoundKind final case class ConcurrentRound(ids: Map[String, List[Any]]) extends RoundKind /** - * A concrete implementation of `Env` used in the default Fetch interpreter. - */ + * A concrete implementation of `Env` used in the default Fetch interpreter. + */ case class FetchEnv( - cache: DataSourceCache, - ids: List[Any] = Nil, - rounds: Queue[Round] = Queue.empty -) extends Env { + cache: DataSourceCache, + ids: List[Any] = Nil, + rounds: Queue[Round] = Queue.empty +) + extends Env { def next( - newCache: DataSourceCache, - newRound: Round, - newIds: List[Any] + newCache: DataSourceCache, + newRound: Round, + newIds: List[Any] ): FetchEnv = copy(cache = newCache, rounds = rounds :+ newRound, ids = newIds) } diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index 24fc4f94..cf58d450 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -23,15 +23,15 @@ import cats.data.{StateT, Const} import cats.free.{Free} /** - * Primitive operations in the Fetch Free monad. - */ + * Primitive operations in the Fetch Free monad. + */ sealed abstract class FetchOp[A] extends Product with Serializable -final case class Cached[A](a: A) extends FetchOp[A] -final case class FetchOne[I, A](a: I, ds: DataSource[I, A]) extends FetchOp[A] +final case class Cached[A](a: A) extends FetchOp[A] +final case class FetchOne[I, A](a: I, ds: DataSource[I, A]) extends FetchOp[A] final case class FetchMany[I, A](as: List[I], ds: DataSource[I, A]) extends FetchOp[List[A]] -final case class Concurrent(as: List[FetchMany[_, _]]) extends FetchOp[Env] -final case class FetchError[A, E <: Throwable](err: E) extends FetchOp[A] +final case class Concurrent(as: List[FetchMany[_, _]]) extends FetchOp[Env] +final case class FetchError[A, E <: Throwable](err: E) extends FetchOp[A] object `package` { @@ -53,26 +53,26 @@ object `package` { } object Fetch extends FetchInterpreters { + /** - * Lift a plain value to the Fetch monad. - */ + * Lift a plain value to the Fetch monad. + */ def pure[A](a: A): Fetch[A] = Free.pure(a) /** - * Lift an error to the Fetch monad. - */ + * Lift an error to the Fetch monad. + */ def error[A](e: Throwable): Fetch[A] = Free.liftF(FetchError(e)) /** - * Given a value that has a related `DataSource` implementation, lift it - * to the `Fetch` monad. When executing the fetch the data source will be - * queried and the fetch will return its result. - */ + * Given a value that has a related `DataSource` implementation, lift it + * to the `Fetch` monad. When executing the fetch the data source will be + * queried and the fetch will return its result. + */ def apply[I, A](i: I)( - implicit - DS: DataSource[I, A] + implicit DS: DataSource[I, A] ): Fetch[A] = Free.liftF(FetchOne[I, A](i, DS)) @@ -80,31 +80,38 @@ object `package` { type FM = List[FetchOp[_]] f.foldMap[Const[FM, ?]](new (FetchOp ~> Const[FM, ?]) { - def apply[X](x: FetchOp[X]): Const[FM, X] = x match { - case one @ FetchOne(id, ds) => Const(List(FetchMany(List(id), ds.asInstanceOf[DataSource[Any, A]]))) - case conc @ Concurrent(as) => Const(as.asInstanceOf[FM]) - case cach @ Cached(a) => Const(List(cach)) - case _ => Const(List()) - } - })(new Monad[Const[FM, ?]] { - def pure[A](x: A): Const[FM, A] = Const(List()) - - def flatMap[A, B](fa: Const[FM, A])(f: A => Const[FM, B]): Const[FM, B] = fa match { - case Const(List(Cached(a))) => f(a.asInstanceOf[A]) - case other => fa.asInstanceOf[Const[FM, B]] - } + def apply[X](x: FetchOp[X]): Const[FM, X] = x match { + case one @ FetchOne(id, ds) => + Const(List(FetchMany(List(id), ds.asInstanceOf[DataSource[Any, A]]))) + case conc @ Concurrent(as) => Const(as.asInstanceOf[FM]) + case cach @ Cached(a) => Const(List(cach)) + case _ => Const(List()) + } + })(new Monad[Const[FM, ?]] { + def pure[A](x: A): Const[FM, A] = Const(List()) - }).getConst + def flatMap[A, B](fa: Const[FM, A])(f: A => Const[FM, B]): Const[FM, B] = fa match { + case Const(List(Cached(a))) => f(a.asInstanceOf[A]) + case other => fa.asInstanceOf[Const[FM, B]] + } + }) + .getConst } def combineDeps(ds: List[FetchOp[_]]): List[FetchMany[_, _]] = { - ds.foldLeft(Map.empty[Any, List[_]])((acc, op) => op match { - case one @ FetchOne(id, ds) => acc.updated(ds, acc.get(ds).fold(List(id))(accids => accids :+ id)) - case many @ FetchMany(ids, ds) => acc.updated(ds, acc.get(ds).fold(ids)(accids => accids ++ ids)) - case _ => acc - }).toList.map({ - case (ds, ids) => FetchMany[Any, Any](ids, ds.asInstanceOf[DataSource[Any, Any]]) - }) + ds.foldLeft(Map.empty[Any, List[_]])((acc, op) => + op match { + case one @ FetchOne(id, ds) => + acc.updated(ds, acc.get(ds).fold(List(id))(accids => accids :+ id)) + case many @ FetchMany(ids, ds) => + acc.updated(ds, acc.get(ds).fold(ids)(accids => accids ++ ids)) + case _ => acc + }) + .toList + .map({ + case (ds, ids) => + FetchMany[Any, Any](ids, ds.asInstanceOf[DataSource[Any, Any]]) + }) } private[this] def concurrently(fa: Fetch[_], fb: Fetch[_]): Fetch[Env] = { @@ -113,30 +120,30 @@ object `package` { } /** - * Collect a list of fetches into a fetch of a list. It implies concurrent execution of fetches. - */ + * Collect a list of fetches into a fetch of a list. It implies concurrent execution of fetches. + */ def collect[I, A](ids: List[Fetch[A]]): Fetch[List[A]] = { ids.foldLeft(Fetch.pure(List(): List[A]))((f, newF) => - Fetch.join(f, newF).map(t => t._1 :+ t._2)) + Fetch.join(f, newF).map(t => t._1 :+ t._2)) } /** - * Apply a fetch-returning function to every element in a list and return a Fetch of the list of - * results. It implies concurrent execution of fetches. - */ + * Apply a fetch-returning function to every element in a list and return a Fetch of the list of + * results. It implies concurrent execution of fetches. + */ def traverse[A, B](ids: List[A])(f: A => Fetch[B]): Fetch[List[B]] = collect(ids.map(f)) /** - * Apply the given function to the result of the two fetches. It implies concurrent execution of fetches. - */ + * Apply the given function to the result of the two fetches. It implies concurrent execution of fetches. + */ def map2[A, B, C](f: (A, B) => C)(fa: Fetch[A], fb: Fetch[B]): Fetch[C] = Fetch.join(fa, fb).map({ case (a, b) => f(a, b) }) /** - * Join two fetches from any data sources and return a Fetch that returns a tuple with the two - * results. It implies concurrent execution of fetches. - */ + * Join two fetches from any data sources and return a Fetch that returns a tuple with the two + * results. It implies concurrent execution of fetches. + */ def join[A, B](fl: Fetch[A], fr: Fetch[B]): Fetch[(A, B)] = { for { env <- concurrently(fl, fr) @@ -146,41 +153,44 @@ object `package` { val simplify: FetchOp ~> FetchOp = new (FetchOp ~> FetchOp) { def apply[B](f: FetchOp[B]): FetchOp[B] = f match { case one @ FetchOne(id, ds) => { - env.cache.get(ds.identity(id)).fold(one: FetchOp[B])(b => Cached(b).asInstanceOf[FetchOp[B]]) - } + env.cache + .get(ds.identity(id)) + .fold(one: FetchOp[B])(b => Cached(b).asInstanceOf[FetchOp[B]]) + } case many @ FetchMany(ids, ds) => { - val results = ids.flatMap(id => - env.cache.get(ds.identity(id))) + val results = ids.flatMap(id => env.cache.get(ds.identity(id))) - if (results.size == ids.size) { - Cached(results) - } else { - many + if (results.size == ids.size) { + Cached(results) + } else { + many + } } - } case conc @ Concurrent(manies) => { - val newManies = manies.filterNot({ fm => - val ids: List[Any] = fm.as - val ds: DataSource[Any, _] = fm.ds.asInstanceOf[DataSource[Any, _]] - - val results = ids.flatMap(id => { - env.cache.get(ds.identity(id)) - }) - - results.size == ids.size - }).asInstanceOf[List[FetchMany[_, _]]] - - if (newManies.isEmpty) - Cached(env).asInstanceOf[FetchOp[B]] - else - Concurrent(newManies).asInstanceOf[FetchOp[B]] - } + val newManies = manies + .filterNot({ fm => + val ids: List[Any] = fm.as + val ds: DataSource[Any, _] = fm.ds.asInstanceOf[DataSource[Any, _]] + + val results = ids.flatMap(id => { + env.cache.get(ds.identity(id)) + }) + + results.size == ids.size + }) + .asInstanceOf[List[FetchMany[_, _]]] + + if (newManies.isEmpty) + Cached(env).asInstanceOf[FetchOp[B]] + else + Concurrent(newManies).asInstanceOf[FetchOp[B]] + } case other => other } } - val sfl = fl.compile(simplify) - val sfr = fr.compile(simplify) + val sfl = fl.compile(simplify) + val sfr = fr.compile(simplify) val remainingDeps = combineDeps(deps(sfl) ++ deps(sfr)) if (remainingDeps.isEmpty) { @@ -198,52 +208,50 @@ object `package` { class FetchRunner[M[_]] { def apply[A]( - fa: Fetch[A], - cache: DataSourceCache = InMemoryCache.empty + fa: Fetch[A], + cache: DataSourceCache = InMemoryCache.empty )( - implicit - MM: MonadError[M, Throwable] - ): M[(FetchEnv, A)] = fa.foldMap[FetchInterpreter[M]#f](interpreter).run(FetchEnv(cache)) - + implicit MM: MonadError[M, Throwable] + ): M[(FetchEnv, A)] = + fa.foldMap[FetchInterpreter[M]#f](interpreter).run(FetchEnv(cache)) } class FetchRunnerEnv[M[_]] { def apply[A]( - fa: Fetch[A], - cache: DataSourceCache = InMemoryCache.empty + fa: Fetch[A], + cache: DataSourceCache = InMemoryCache.empty )( - implicit - MM: MonadError[M, Throwable] - ): M[FetchEnv] = fa.foldMap[FetchInterpreter[M]#f](interpreter).runS(FetchEnv(cache)) + implicit MM: MonadError[M, Throwable] + ): M[FetchEnv] = + fa.foldMap[FetchInterpreter[M]#f](interpreter).runS(FetchEnv(cache)) } class FetchRunnerA[M[_]] { def apply[A]( - fa: Fetch[A], - cache: DataSourceCache = InMemoryCache.empty + fa: Fetch[A], + cache: DataSourceCache = InMemoryCache.empty )( - implicit - MM: MonadError[M, Throwable] - ): M[A] = fa.foldMap[FetchInterpreter[M]#f](interpreter).runA(FetchEnv(cache)) + implicit MM: MonadError[M, Throwable] + ): M[A] = + fa.foldMap[FetchInterpreter[M]#f](interpreter).runA(FetchEnv(cache)) } /** - * Run a `Fetch` with the given cache, returning a pair of the final environment and result - * in the monad `M`. - */ + * Run a `Fetch` with the given cache, returning a pair of the final environment and result + * in the monad `M`. + */ def runFetch[M[_]]: FetchRunner[M] = new FetchRunner[M] /** - * Run a `Fetch` with the given cache, returning the final environment in the monad `M`. - */ + * Run a `Fetch` with the given cache, returning the final environment in the monad `M`. + */ def runEnv[M[_]]: FetchRunnerEnv[M] = new FetchRunnerEnv[M] /** - * Run a `Fetch` with the given cache, the result in the monad `M`. - */ + * Run a `Fetch` with the given cache, the result in the monad `M`. + */ def run[M[_]]: FetchRunnerA[M] = new FetchRunnerA[M] } - } diff --git a/shared/src/main/scala/implicits.scala b/shared/src/main/scala/implicits.scala index ef3066fe..da6a6fc1 100644 --- a/shared/src/main/scala/implicits.scala +++ b/shared/src/main/scala/implicits.scala @@ -20,9 +20,10 @@ import cats.{Eval, Id} import cats.{MonadError} /** - * A cache that stores its elements in memory. - */ -case class InMemoryCache(state: Map[DataSourceIdentity, Any]) extends DataSourceCache { + * A cache that stores its elements in memory. + */ +case class InMemoryCache(state: Map[DataSourceIdentity, Any]) + extends DataSourceCache { override def get(k: DataSourceIdentity): Option[Any] = state.get(k) @@ -34,29 +35,35 @@ object InMemoryCache { def empty: InMemoryCache = InMemoryCache(Map.empty[DataSourceIdentity, Any]) def apply(results: (DataSourceIdentity, Any)*): InMemoryCache = - InMemoryCache(results.foldLeft(Map.empty[DataSourceIdentity, Any])({ + InMemoryCache( + results.foldLeft(Map.empty[DataSourceIdentity, Any])({ case (c, (k, v)) => c.updated(k, v) })) } object implicits { - val evalMonadError: MonadError[Eval, Throwable] = new MonadError[Eval, Throwable] { - override def pure[A](x: A): Eval[A] = Eval.now(x) + val evalMonadError: MonadError[Eval, Throwable] = + new MonadError[Eval, Throwable] { + override def pure[A](x: A): Eval[A] = Eval.now(x) - override def map[A, B](fa: Eval[A])(f: A ⇒ B): Eval[B] = fa.map(f) + override def map[A, B](fa: Eval[A])(f: A ⇒ B): Eval[B] = fa.map(f) - override def flatMap[A, B](fa: Eval[A])(ff: A => Eval[B]): Eval[B] = fa.flatMap(ff) + override def flatMap[A, B](fa: Eval[A])(ff: A => Eval[B]): Eval[B] = + fa.flatMap(ff) - override def raiseError[A](e: Throwable): Eval[A] = Eval.later({ throw e }) + override def raiseError[A](e: Throwable): Eval[A] = + Eval.later({ throw e }) - override def handleErrorWith[A](fa: Eval[A])(f: Throwable ⇒ Eval[A]): Eval[A] = Eval.now({ - try { - fa.value - } catch { - case e: Throwable => f(e).value - } - }) - } + override def handleErrorWith[A](fa: Eval[A])( + f: Throwable ⇒ Eval[A]): Eval[A] = + Eval.now({ + try { + fa.value + } catch { + case e: Throwable => f(e).value + } + }) + } val idMonadError: MonadError[Id, Throwable] = new MonadError[Id, Throwable] { override def pure[A](x: A): Id[A] = x @@ -74,5 +81,4 @@ object implicits { case e: Throwable => f(e) } } - } diff --git a/shared/src/main/scala/interpreters.scala b/shared/src/main/scala/interpreters.scala index 40e2684c..18fca88f 100644 --- a/shared/src/main/scala/interpreters.scala +++ b/shared/src/main/scala/interpreters.scala @@ -27,17 +27,17 @@ import cats.std.list._ import cats.syntax.traverse._ /** - * An exception thrown from the interpreter when failing to perform a data fetch. - */ + * An exception thrown from the interpreter when failing to perform a data fetch. + */ case class FetchFailure[C <: DataSourceCache](env: Env) extends Throwable trait FetchInterpreters { def interpreter[I, M[_]]( - implicit - MM: MonadError[M, Throwable] + implicit MM: MonadError[M, Throwable] ): FetchOp ~> FetchInterpreter[M]#f = { - def dedupeIds[I, A, M[_]](ids: List[I], ds: DataSource[I, A], cache: DataSourceCache) = { + def dedupeIds[I, A, M[_]]( + ids: List[I], ds: DataSource[I, A], cache: DataSourceCache) = { ids.distinct.filterNot(i => cache.get(ds.identity(i)).isDefined) } @@ -48,135 +48,192 @@ trait FetchInterpreters { case FetchError(e) => MM.raiseError(e) case Cached(a) => MM.pure((env, a)) case Concurrent(manies) => { - val startRound = System.nanoTime() - val cache = env.cache - val sources = manies.map(_.ds) - val ids = manies.map(_.as) + val startRound = System.nanoTime() + val cache = env.cache + val sources = manies.map(_.ds) + val ids = manies.map(_.as) - val sourcesAndIds = (sources zip ids).map({ - case (ds, ids) => ( - ds, - dedupeIds[I, A, M](ids.asInstanceOf[List[I]], ds.asInstanceOf[DataSource[I, A]], cache) - ) - }).filterNot({ - case (_, ids) => ids.isEmpty - }) + val sourcesAndIds = (sources zip ids) + .map({ + case (ds, ids) => + ( + ds, + dedupeIds[I, A, M](ids.asInstanceOf[List[I]], + ds.asInstanceOf[DataSource[I, A]], + cache) + ) + }) + .filterNot({ + case (_, ids) => ids.isEmpty + }) - if (sourcesAndIds.isEmpty) - MM.pure((env, env.asInstanceOf[A])) - else - MM.flatMap(sourcesAndIds.map({ - case (ds, as) => MM.pureEval(ds.asInstanceOf[DataSource[I, A]].fetch(as.asInstanceOf[List[I]])) - }).sequence)((results: List[Map[_, _]]) => { - val endRound = System.nanoTime() - val newCache = (sources zip results).foldLeft(cache)((accache, resultset) => { - val (ds, resultmap) = resultset - val tresults = resultmap.asInstanceOf[Map[I, A]] - val tds = ds.asInstanceOf[DataSource[I, A]] - accache.cacheResults[I, A](tresults, tds) + if (sourcesAndIds.isEmpty) + MM.pure((env, env.asInstanceOf[A])) + else + MM.flatMap( + sourcesAndIds + .map({ + case (ds, as) => + MM.pureEval(ds + .asInstanceOf[DataSource[I, A]] + .fetch(as.asInstanceOf[List[I]])) + }) + .sequence)((results: List[Map[_, _]]) => { + val endRound = System.nanoTime() + val newCache = (sources zip results).foldLeft(cache)( + (accache, resultset) => { + val (ds, resultmap) = resultset + val tresults = resultmap.asInstanceOf[Map[I, A]] + val tds = ds.asInstanceOf[DataSource[I, A]] + accache.cacheResults[I, A](tresults, tds) + }) + val newEnv = env.next( + newCache, + Round( + cache, + "Concurrent", + ConcurrentRound( + sourcesAndIds + .map({ + case (ds, as) => (ds.name, as) + }) + .toMap + ), + startRound, + endRound + ), + Nil + ) + MM.pure((newEnv, newEnv.asInstanceOf[A])) }) - val newEnv = env.next( - newCache, - Round( - cache, - "Concurrent", - ConcurrentRound( - sourcesAndIds.map({ - case (ds, as) => (ds.name, as) - }).toMap - ), - startRound, - endRound - ), - Nil - ) - MM.pure((newEnv, newEnv.asInstanceOf[A])) - }) - } + } case FetchOne(id, ds) => { - val startRound = System.nanoTime() - val cache = env.cache - cache.get(ds.identity(id)).fold[M[(FetchEnv, A)]]( - MM.flatMap(MM.pureEval(ds.fetch(List(id))).asInstanceOf[M[Map[I, A]]])((res: Map[I, A]) => { - val endRound = System.nanoTime() - res.get(id.asInstanceOf[I]).fold[M[(FetchEnv, A)]]( - MM.raiseError( - FetchFailure( - env.next( - cache, - Round(cache, ds.name, OneRound(id), startRound, endRound), - List(id) - ) - ) - ) - )(result => { - val endRound = System.nanoTime() - val newCache = cache.update(ds.identity(id), result) - MM.pure( + val startRound = System.nanoTime() + val cache = env.cache + cache + .get(ds.identity(id)) + .fold[M[(FetchEnv, A)]]( + MM.flatMap(MM + .pureEval(ds.fetch(List(id))) + .asInstanceOf[M[Map[I, A]]])((res: Map[I, A]) => { + val endRound = System.nanoTime() + res + .get(id.asInstanceOf[I]) + .fold[M[(FetchEnv, A)]]( + MM.raiseError( + FetchFailure( + env.next( + cache, + Round(cache, + ds.name, + OneRound(id), + startRound, + endRound), + List(id) + ) + ) + ) + )(result => { + val endRound = System.nanoTime() + val newCache = + cache.update(ds.identity(id), result) + MM.pure( + (env.next( + newCache, + Round(cache, + ds.name, + OneRound(id), + startRound, + endRound), + List(id) + ), + result) + ) + }) + }) + )(cached => { + val endRound = System.nanoTime() + MM.pure( (env.next( - newCache, - Round(cache, ds.name, OneRound(id), startRound, endRound), - List(id) - ), result) - ) - }) - }) - )(cached => { - val endRound = System.nanoTime() + cache, + Round(cache, + ds.name, + OneRound(id), + startRound, + endRound, + true), + List(id) + ), + cached.asInstanceOf[A]) + ) + }) + } + case FetchMany(ids, ds) => { + val startRound = System.nanoTime() + val cache = env.cache + val oldIds = ids.distinct + val newIds = dedupeIds[Any, Any, Any](ids, ds, cache) + if (newIds.isEmpty) MM.pure( - (env.next( - cache, - Round(cache, ds.name, OneRound(id), startRound, endRound, true), - List(id) - ), cached.asInstanceOf[A]) + (env.next( + cache, + Round(cache, + ds.name, + ManyRound(ids), + startRound, + System.nanoTime(), + true), + newIds + ), + ids.flatMap(id => cache.get(ds.identity(id)))) ) - }) - } - case FetchMany(ids, ds) => { - val startRound = System.nanoTime() - val cache = env.cache - val oldIds = ids.distinct - val newIds = dedupeIds[Any, Any, Any](ids, ds, cache) - if (newIds.isEmpty) - MM.pure( - (env.next( - cache, - Round(cache, ds.name, ManyRound(ids), startRound, System.nanoTime(), true), - newIds - ), ids.flatMap(id => cache.get(ds.identity(id)))) - ) - else { - MM.flatMap(MM.pureEval(ds.fetch(newIds)).asInstanceOf[M[Map[I, A]]])((res: Map[I, A]) => { - val endRound = System.nanoTime() - ids.map(i => res.get(i.asInstanceOf[I])).sequence.fold[M[(FetchEnv, A)]]( - MM.raiseError( - FetchFailure( - env.next( - cache, - Round(cache, ds.name, ManyRound(ids), startRound, endRound), - newIds + else { + MM.flatMap(MM + .pureEval(ds.fetch(newIds)) + .asInstanceOf[M[Map[I, A]]])((res: Map[I, A]) => { + val endRound = System.nanoTime() + ids + .map(i => res.get(i.asInstanceOf[I])) + .sequence + .fold[M[(FetchEnv, A)]]( + MM.raiseError( + FetchFailure( + env.next( + cache, + Round(cache, + ds.name, + ManyRound(ids), + startRound, + endRound), + newIds + ) + ) + ) + )(results => { + val endRound = System.nanoTime() + val newCache = cache.cacheResults[I, A]( + res, ds.asInstanceOf[DataSource[I, A]]) + val someCached = oldIds.size == newIds.size + MM.pure( + (env.next( + newCache, + Round(cache, + ds.name, + ManyRound(ids), + startRound, + endRound, + someCached), + newIds + ), + results) ) - ) - ) - )(results => { - val endRound = System.nanoTime() - val newCache = cache.cacheResults[I, A](res, ds.asInstanceOf[DataSource[I, A]]) - val someCached = oldIds.size == newIds.size - MM.pure( - (env.next( - newCache, - Round(cache, ds.name, ManyRound(ids), startRound, endRound, someCached), - newIds - ), results) - ) - }) - }) + }) + }) + } } - } } } } } } - }