diff --git a/.travis.yml b/.travis.yml index f1c8f54a..eb396d68 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: scala scala: -- 2.10.6 - 2.11.11 - 2.12.2 diff --git a/build.sbt b/build.sbt index bd3c7ef1..3e1caf29 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ lazy val root = project .in(file(".")) .settings(name := "fetch") .settings(moduleName := "root") - .aggregate(fetchJS, fetchJVM, fetchMonixJVM, fetchMonixJS, debugJVM, debugJS) + .aggregate(fetchJS, fetchJVM, fetchMonixJVM, fetchMonixJS, debugJVM, debugJS, twitterJVM) lazy val fetch = crossProject .in(file(".")) @@ -38,6 +38,13 @@ lazy val debug = (crossProject in file("debug")) lazy val debugJVM = debug.jvm lazy val debugJS = debug.js +lazy val twitter = crossProject + .in(file("twitter")) + .dependsOn(fetch) + .crossDepSettings(commonCrossDependencies ++ twitterUtilDependencies: _*) + +lazy val twitterJVM = twitter.jvm + lazy val examples = (project in file("examples")) .settings(name := "fetch-examples") .dependsOn(fetchJVM) diff --git a/project/ProjectPlugin.scala b/project/ProjectPlugin.scala index 722f6dc5..86b2656b 100644 --- a/project/ProjectPlugin.scala +++ b/project/ProjectPlugin.scala @@ -22,6 +22,8 @@ object ProjectPlugin extends AutoPlugin { lazy val monixCrossDependencies: Seq[ModuleID] = Seq(%%("monix-eval"), %%("monix-cats")) + lazy val twitterUtilDependencies: Seq[ModuleID] = Seq(%%("catbird-util")) + lazy val micrositeSettings: Seq[Def.Setting[_]] = Seq( micrositeName := "Fetch", micrositeDescription := "Simple & Efficient data access for Scala and Scala.js", @@ -78,8 +80,10 @@ object ProjectPlugin extends AutoPlugin { List( "fetchJVM/compile", "monixJVM/compile", + "twitterJVM/compile", "fetchJVM/test", "monixJVM/test", + "twitterJVM/test", "project root").asCmd) ++ addCommandAlias( "validateJS", @@ -111,7 +115,7 @@ object ProjectPlugin extends AutoPlugin { orgUpdateDocFilesSetting += baseDirectory.value / "tut", scalaOrganization := "org.scala-lang", scalaVersion := "2.12.2", - crossScalaVersions := List("2.10.6", "2.11.11", "2.12.2"), + crossScalaVersions := List("2.11.11", "2.12.2"), resolvers += Resolver.sonatypeRepo("snapshots"), scalacOptions := Seq( "-unchecked", @@ -122,12 +126,6 @@ object ProjectPlugin extends AutoPlugin { "-language:existentials", "-language:postfixOps" ), - libraryDependencies ++= (scalaBinaryVersion.value match { - case "2.10" => - compilerPlugin(%%("paradise") cross CrossVersion.full) :: Nil - case _ => - Nil - }), ScoverageKeys.coverageFailOnMinimum := false ) ++ shellPromptSettings diff --git a/twitter/jvm/src/main/scala/TwitterFuture.scala b/twitter/jvm/src/main/scala/TwitterFuture.scala new file mode 100644 index 00000000..bc9765ec --- /dev/null +++ b/twitter/jvm/src/main/scala/TwitterFuture.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2016-2017 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fetch.twitterFuture + +import fetch._ +import scala.concurrent.duration.FiniteDuration + +object implicits { + + import cats._ + import com.twitter.util.{Duration, Future, FuturePool, Promise, Timer} + import io.catbird.util._ + + def evalToRerunnable[A](e: Eval[A]): Rerunnable[A] = e match { + case Now(x) => Rerunnable.const(x) + case l: Later[A] => Rerunnable.fromFuture({ Future(l.value) }) + case a: Always[A] => Rerunnable({ a.value }) + case e => Rerunnable.fromFuture(Future(e.value)) + } + + implicit def fetchRerunnableMonadError( + implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool + ): FetchMonadError[Rerunnable] = + new FetchMonadError.FromMonadError[Rerunnable] { + override def runQuery[A](j: Query[A]): Rerunnable[A] = j match { + case Sync(e) ⇒ evalToRerunnable(e) + case Async(ac, timeout) ⇒ + Rerunnable.fromFuture { + val p: Promise[A] = Promise() + pool(ac(p setValue _, p setException _)) + timeout match { + case _: FiniteDuration => + p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil) + case _ => p + } + } + case Ap(qf, qx) ⇒ + runQuery(qf).product(runQuery(qx)) map { case (f, x) => f(x) } + } + } + + implicit def fetchTwFutureMonadError( + implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool + ): FetchMonadError[Future] = + new FetchMonadError.FromMonadError[Future] { + override def runQuery[A](j: Query[A]): Future[A] = j match { + case Sync(e) ⇒ Future(e.value) + case Async(ac, timeout) ⇒ + val p: Promise[A] = Promise() + pool(ac(p setValue _, p setException _)) + timeout match { + case _: FiniteDuration => + p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil) + case _ => p + } + case Ap(qf, qx) ⇒ + ap(runQuery(qf))(runQuery(qx)) + } + } + +} diff --git a/twitter/jvm/src/test/scala/FetchTwitterFutureSpec.scala b/twitter/jvm/src/test/scala/FetchTwitterFutureSpec.scala new file mode 100644 index 00000000..0acc1b64 --- /dev/null +++ b/twitter/jvm/src/test/scala/FetchTwitterFutureSpec.scala @@ -0,0 +1,185 @@ +/* + * Copyright 2016-2017 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fetch.twitterFuture + +import cats._, data._ +import cats.implicits._ +import fetch._ +import io.catbird.util._ +import org.scalatest._ +import com.twitter.util.{Await, Future} +import com.twitter.conversions.time._ + +class FetchTwitterFutureSpec extends FlatSpec with Matchers { + + import fetch.twitterFuture.implicits._ + + case class One(id: Int) + implicit object OneSource extends DataSource[One, Int] { + override def name = "OneSource" + override def fetchOne(id: One): Query[Option[Int]] = + Query.sync(Option(id.id)) + override def fetchMany(ids: NonEmptyList[One]): Query[Map[One, Int]] = + Query.sync(ids.toList.map(one => (one, one.id)).toMap) + } + def one(id: Int): Fetch[Int] = Fetch(One(id)) + + case class ArticleId(id: Int) + case class Article(id: Int, content: String) { + def author: Int = id + 1 + } + + object ArticleSync extends DataSource[ArticleId, Article] { + override def name = "ArticleAsync" + override def fetchOne(id: ArticleId): Query[Option[Article]] = + Query.sync({ + Option(Article(id.id, "An article with id " + id.id)) + }) + override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] = + batchingNotSupported(ids) + } + + object ArticleAsync extends DataSource[ArticleId, Article] { + override def name = "ArticleAsync" + override def fetchOne(id: ArticleId): Query[Option[Article]] = + Query.async((ok, fail) ⇒ { + ok(Option(Article(id.id, "An article with id " + id.id))) + }) + override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] = + batchingNotSupported(ids) + } + + def article(id: Int): Fetch[Article] = Fetch(ArticleId(id))(ArticleAsync) + + case class AuthorId(id: Int) + case class Author(id: Int, name: String) + + implicit object AuthorFuture extends DataSource[AuthorId, Author] { + override def name = "AuthorFuture" + override def fetchOne(id: AuthorId): Query[Option[Author]] = + Query.async((ok, fail) ⇒ { + ok(Option(Author(id.id, "@egg" + id.id))) + }) + override def fetchMany(ids: NonEmptyList[AuthorId]): Query[Map[AuthorId, Author]] = + batchingNotSupported(ids) + } + + def author(a: Article): Fetch[Author] = Fetch(AuthorId(a.author)) + + "TwFutureMonadError" should "execute an async fetch on a Future" in { + val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleAsync) + val article: Future[Article] = Fetch.run[Future](fetch) + Await.result(article, 100.milliseconds) shouldEqual (Article(1, "An article with id 1")) + } + it should "allow for several async datasources to be combined" in { + val fetch: Fetch[(Article, Author)] = for { + art ← article(1) + author ← author(art) + } yield (art, author) + + Await.result(Fetch.run[Future](fetch), 100.milliseconds) shouldEqual (Article( + 1, + "An article with id 1"), Author(2, "@egg2")) + } + it should "execute a sync fetch" in { + val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleSync) + Await.result(Fetch.run[Future](fetch), 100.milliseconds) shouldEqual (Article( + 1, + "An article with id 1")) + } + it should "be used as an applicative" in { + import cats.syntax.cartesian._ + + val fetch = (one(1) |@| one(2) |@| one(3)).map(_ + _ + _) + val fut = Fetch.run[Future](fetch) + + fut.map(_ shouldEqual 6) + } + + "RerunnableMonadError" should "lift and execute an async fetch into a Rerunnable" in { + val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleAsync) + val article: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) + Await.result(article.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1")) + } + it should "run a sync fetch" in { + val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleSync) + val article: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) + Await.result(article.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1")) + } + it should "allow for several async datasources to be combined" in { + val fetch: Fetch[(Article, Author)] = for { + art ← article(1) + author ← author(art) + } yield (art, author) + + val rr: Rerunnable[(Article, Author)] = Fetch.run[Rerunnable](fetch) + Await.result(rr.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1"), Author( + 2, + "@egg2")) + Await.result(rr.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1"), Author( + 2, + "@egg2")) + } + it should "be used as an applicative" in { + import cats.syntax.cartesian._ + + val fetch: Fetch[Int] = (one(1) |@| one(2) |@| one(3)).map(_ + _ + _) + val fut = Fetch.run[Rerunnable](fetch) + + fut.map(_ shouldEqual 6) + } + it should "be usable as an applicative" in { + import cats.syntax.cartesian._ + + val fetch: Fetch[Int] = (one(1) |@| one(2) |@| one(3)).map(_ + _ + _) + val fut = Fetch.run[Rerunnable](fetch) + + fut.map(_ shouldEqual 6) + } + + "evalToRunnable" should "convert an Eval into a Runnable" in { + var dontDoThisAtHome: Int = 1000 + def evalFun: Int = { + dontDoThisAtHome += 1 + dontDoThisAtHome + } + + // this invokes evalFun as soon as the Now is created. subsequent + // calls to the rerunnable return the memoized value + val now = Now(evalFun) + val rNow = evalToRerunnable(now) + val nowAnswer = dontDoThisAtHome + Await.result(rNow.run) should be(nowAnswer) + Await.result(rNow.run) should be(nowAnswer) + + // this invokes evalFun on first run. subsequent calls to the + // run return the memoized value + def laterValue: Int = evalFun + val later = Later(laterValue) + val rLater = evalToRerunnable(later) + val laterAnswer = dontDoThisAtHome + Await.result(rLater.run) should be(laterAnswer + 1) + Await.result(rLater.run) should be(laterAnswer + 1) + + // each time rerunnable run is invoked evalFun is called + val always = Always(evalFun) + val rAlways = evalToRerunnable(always) + val alwaysAnswer = dontDoThisAtHome + Await.result(rAlways.run) should be(alwaysAnswer + 1) + Await.result(rAlways.run) should be(alwaysAnswer + 2) + } +} diff --git a/twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala b/twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala new file mode 100644 index 00000000..8031712f --- /dev/null +++ b/twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2016-2017 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fetch.twitterFuture + +import cats.data.NonEmptyList +import com.twitter.util.{Await, Duration, Future, TimeoutException} +import com.twitter.conversions.time._ +import fetch._ +import fetch.implicits._ +import fetch.twitterFuture.implicits._ +import io.catbird.util._ +import org.scalatest._ + +class RerunnableTimeoutSpac + extends FlatSpec + with Matchers + with OptionValues + with Inside + with Inspectors { + + case class ArticleId(id: Int) + case class Article(id: Int, content: String) + + def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] = + Fetch(ArticleId(id)) + + // A sample datasource with configurable delay and timeout + + case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration) + extends DataSource[ArticleId, Article] { + override def name = "ArticleRerunnable" + override def fetchOne(id: ArticleId): Query[Option[Article]] = + Query.async( + (ok, fail) => { + Thread.sleep(delay.inMillis) + ok(Option(Article(id.id, "An article with id " + id.id))) + }, + scala.concurrent.duration.Duration.fromNanos(timeout.inNanoseconds) + ) + override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] = + batchingNotSupported(ids) + } + + "FetchMonadError[Rerunnable]" should "fail with timeout when a datasource does not complete in time" in { + + implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds) + + val fetch: Fetch[Article] = article(1) + val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) + + assertThrows[TimeoutException] { + Await.result(fut.run, 1 seconds) + } + + } + + it should "not fail with timeout when a datasource does complete in time" in { + + implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds) + + val fetch: Fetch[Article] = article(1) + val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) + + fut.map { _ shouldEqual Article(1, "An article with id 1") } + } + + it should "not fail with timeout when infinite timeout specified" in { + + implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Top, 250 milliseconds) + + val fetch: Fetch[Article] = article(1) + val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) + + fut.map { _ shouldEqual Article(1, "An article with id 1") } + } + +} diff --git a/twitter/jvm/src/test/scala/TwitterFutureTimeoutSpec.scala b/twitter/jvm/src/test/scala/TwitterFutureTimeoutSpec.scala new file mode 100644 index 00000000..7b6e3902 --- /dev/null +++ b/twitter/jvm/src/test/scala/TwitterFutureTimeoutSpec.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2016-2017 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fetch.twitterFuture + +import cats.data.NonEmptyList +import com.twitter.util.{Await, Duration, Future, TimeoutException} +import com.twitter.conversions.time._ +import fetch._ +import fetch.implicits._ +import fetch.twitterFuture.implicits._ +import org.scalatest._ +// import scala.concurrent.duration._ + +class TwiterFutureTimeoutSpac + extends FlatSpec + with Matchers + with OptionValues + with Inside + with Inspectors { + + case class ArticleId(id: Int) + case class Article(id: Int, content: String) + + def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] = + Fetch(ArticleId(id)) + + // A sample datasource with configurable delay and timeout + + case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration) + extends DataSource[ArticleId, Article] { + override def name = "ArticleFuture" + override def fetchOne(id: ArticleId): Query[Option[Article]] = + Query.async( + (ok, fail) => { + Thread.sleep(delay.inMillis) + ok(Option(Article(id.id, "An article with id " + id.id))) + }, + scala.concurrent.duration.Duration.fromNanos(timeout.inNanoseconds) + ) + override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] = + batchingNotSupported(ids) + } + + "FetchMonadError[Future]" should "fail with timeout when a datasource does not complete in time" in { + + implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds) + + val fetch: Fetch[Article] = article(1) + val fut: Future[Article] = Fetch.run[Future](fetch) + + assertThrows[TimeoutException] { + Await.result(fut, 1 seconds) + } + + } + + it should "not fail with timeout when a datasource does complete in time" in { + + implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds) + + val fetch: Fetch[Article] = article(1) + val fut: Future[Article] = Fetch.run[Future](fetch) + + fut.map { _ shouldEqual Article(1, "An article with id 1") } + } + + it should "not fail with timeout when infinite timeout specified" in { + + implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Top, 250 milliseconds) + + val fetch: Fetch[Article] = article(1) + val fut: Future[Article] = Fetch.run[Future](fetch) + + fut.map { _ shouldEqual Article(1, "An article with id 1") } + } + +}