Skip to content

Commit

Permalink
Add twitter future support (#128)
Browse files Browse the repository at this point in the history
* initial cut at twitter support

this is jvm only, so I'm not sure exactly how to make the work here
but the core of this works. Twitter future and Catbirds Rerunnable are
now supported

* make the thing build

and execute the tests to verify they're passing

* add an evalToRerunnable

this matches the monix side

* spell rerunnable the conventional way

with two n's in ht middle

* add implicit FuturePool

with the default value of the interruptableUnboundedPool and then use
the fact that the pool creates interruptable threads to wire up the timeouts.

* simplify dependencies

rely on `catbird` to provide the twitter `util-core` dependency.

* remove 2.10 cross version

Twitter util no longer supports 2.10 so this won't build with it in. also remove the now unneeded version check for including paradise.

* remove 2.10

oops, missed travis

* extend test coverage

test evalToRunnable

* test sync

for both Rerunnable and Future ... the code is there and it should
work so prove it.

* add applicative usage tests

for both twitter future and catbirds rerunnable

* update twitter future Ap handler

to use the catbird provided ap

* use evalToRerunnable

in the rerunnable sync case

* add test to excercise twfuture fetch timeouts

adapted from the scala future version

* also test Rerunnable side

when testing timeouts
  • Loading branch information
Chris Vale authored and peterneyens committed Aug 17, 2017
1 parent e3a102a commit 93a7d41
Show file tree
Hide file tree
Showing 7 changed files with 455 additions and 9 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: scala

scala:
- 2.10.6
- 2.11.11
- 2.12.2

Expand Down
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ lazy val root = project
.in(file("."))
.settings(name := "fetch")
.settings(moduleName := "root")
.aggregate(fetchJS, fetchJVM, fetchMonixJVM, fetchMonixJS, debugJVM, debugJS)
.aggregate(fetchJS, fetchJVM, fetchMonixJVM, fetchMonixJS, debugJVM, debugJS, twitterJVM)

lazy val fetch = crossProject
.in(file("."))
Expand Down Expand Up @@ -38,6 +38,13 @@ lazy val debug = (crossProject in file("debug"))
lazy val debugJVM = debug.jvm
lazy val debugJS = debug.js

lazy val twitter = crossProject
.in(file("twitter"))
.dependsOn(fetch)
.crossDepSettings(commonCrossDependencies ++ twitterUtilDependencies: _*)

lazy val twitterJVM = twitter.jvm

lazy val examples = (project in file("examples"))
.settings(name := "fetch-examples")
.dependsOn(fetchJVM)
Expand Down
12 changes: 5 additions & 7 deletions project/ProjectPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ object ProjectPlugin extends AutoPlugin {

lazy val monixCrossDependencies: Seq[ModuleID] = Seq(%%("monix-eval"), %%("monix-cats"))

lazy val twitterUtilDependencies: Seq[ModuleID] = Seq(%%("catbird-util"))

lazy val micrositeSettings: Seq[Def.Setting[_]] = Seq(
micrositeName := "Fetch",
micrositeDescription := "Simple & Efficient data access for Scala and Scala.js",
Expand Down Expand Up @@ -78,8 +80,10 @@ object ProjectPlugin extends AutoPlugin {
List(
"fetchJVM/compile",
"monixJVM/compile",
"twitterJVM/compile",
"fetchJVM/test",
"monixJVM/test",
"twitterJVM/test",
"project root").asCmd) ++
addCommandAlias(
"validateJS",
Expand Down Expand Up @@ -111,7 +115,7 @@ object ProjectPlugin extends AutoPlugin {
orgUpdateDocFilesSetting += baseDirectory.value / "tut",
scalaOrganization := "org.scala-lang",
scalaVersion := "2.12.2",
crossScalaVersions := List("2.10.6", "2.11.11", "2.12.2"),
crossScalaVersions := List("2.11.11", "2.12.2"),
resolvers += Resolver.sonatypeRepo("snapshots"),
scalacOptions := Seq(
"-unchecked",
Expand All @@ -122,12 +126,6 @@ object ProjectPlugin extends AutoPlugin {
"-language:existentials",
"-language:postfixOps"
),
libraryDependencies ++= (scalaBinaryVersion.value match {
case "2.10" =>
compilerPlugin(%%("paradise") cross CrossVersion.full) :: Nil
case _ =>
Nil
}),
ScoverageKeys.coverageFailOnMinimum := false
) ++ shellPromptSettings

Expand Down
75 changes: 75 additions & 0 deletions twitter/jvm/src/main/scala/TwitterFuture.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2016-2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch.twitterFuture

import fetch._
import scala.concurrent.duration.FiniteDuration

object implicits {

import cats._
import com.twitter.util.{Duration, Future, FuturePool, Promise, Timer}
import io.catbird.util._

def evalToRerunnable[A](e: Eval[A]): Rerunnable[A] = e match {
case Now(x) => Rerunnable.const(x)
case l: Later[A] => Rerunnable.fromFuture({ Future(l.value) })
case a: Always[A] => Rerunnable({ a.value })
case e => Rerunnable.fromFuture(Future(e.value))
}

implicit def fetchRerunnableMonadError(
implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool
): FetchMonadError[Rerunnable] =
new FetchMonadError.FromMonadError[Rerunnable] {
override def runQuery[A](j: Query[A]): Rerunnable[A] = j match {
case Sync(e) evalToRerunnable(e)
case Async(ac, timeout)
Rerunnable.fromFuture {
val p: Promise[A] = Promise()
pool(ac(p setValue _, p setException _))
timeout match {
case _: FiniteDuration =>
p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil)
case _ => p
}
}
case Ap(qf, qx)
runQuery(qf).product(runQuery(qx)) map { case (f, x) => f(x) }
}
}

implicit def fetchTwFutureMonadError(
implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool
): FetchMonadError[Future] =
new FetchMonadError.FromMonadError[Future] {
override def runQuery[A](j: Query[A]): Future[A] = j match {
case Sync(e) Future(e.value)
case Async(ac, timeout)
val p: Promise[A] = Promise()
pool(ac(p setValue _, p setException _))
timeout match {
case _: FiniteDuration =>
p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil)
case _ => p
}
case Ap(qf, qx)
ap(runQuery(qf))(runQuery(qx))
}
}

}
185 changes: 185 additions & 0 deletions twitter/jvm/src/test/scala/FetchTwitterFutureSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2016-2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch.twitterFuture

import cats._, data._
import cats.implicits._
import fetch._
import io.catbird.util._
import org.scalatest._
import com.twitter.util.{Await, Future}
import com.twitter.conversions.time._

class FetchTwitterFutureSpec extends FlatSpec with Matchers {

import fetch.twitterFuture.implicits._

case class One(id: Int)
implicit object OneSource extends DataSource[One, Int] {
override def name = "OneSource"
override def fetchOne(id: One): Query[Option[Int]] =
Query.sync(Option(id.id))
override def fetchMany(ids: NonEmptyList[One]): Query[Map[One, Int]] =
Query.sync(ids.toList.map(one => (one, one.id)).toMap)
}
def one(id: Int): Fetch[Int] = Fetch(One(id))

case class ArticleId(id: Int)
case class Article(id: Int, content: String) {
def author: Int = id + 1
}

object ArticleSync extends DataSource[ArticleId, Article] {
override def name = "ArticleAsync"
override def fetchOne(id: ArticleId): Query[Option[Article]] =
Query.sync({
Option(Article(id.id, "An article with id " + id.id))
})
override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] =
batchingNotSupported(ids)
}

object ArticleAsync extends DataSource[ArticleId, Article] {
override def name = "ArticleAsync"
override def fetchOne(id: ArticleId): Query[Option[Article]] =
Query.async((ok, fail) {
ok(Option(Article(id.id, "An article with id " + id.id)))
})
override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] =
batchingNotSupported(ids)
}

def article(id: Int): Fetch[Article] = Fetch(ArticleId(id))(ArticleAsync)

case class AuthorId(id: Int)
case class Author(id: Int, name: String)

implicit object AuthorFuture extends DataSource[AuthorId, Author] {
override def name = "AuthorFuture"
override def fetchOne(id: AuthorId): Query[Option[Author]] =
Query.async((ok, fail) {
ok(Option(Author(id.id, "@egg" + id.id)))
})
override def fetchMany(ids: NonEmptyList[AuthorId]): Query[Map[AuthorId, Author]] =
batchingNotSupported(ids)
}

def author(a: Article): Fetch[Author] = Fetch(AuthorId(a.author))

"TwFutureMonadError" should "execute an async fetch on a Future" in {
val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleAsync)
val article: Future[Article] = Fetch.run[Future](fetch)
Await.result(article, 100.milliseconds) shouldEqual (Article(1, "An article with id 1"))
}
it should "allow for several async datasources to be combined" in {
val fetch: Fetch[(Article, Author)] = for {
art article(1)
author author(art)
} yield (art, author)

Await.result(Fetch.run[Future](fetch), 100.milliseconds) shouldEqual (Article(
1,
"An article with id 1"), Author(2, "@egg2"))
}
it should "execute a sync fetch" in {
val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleSync)
Await.result(Fetch.run[Future](fetch), 100.milliseconds) shouldEqual (Article(
1,
"An article with id 1"))
}
it should "be used as an applicative" in {
import cats.syntax.cartesian._

val fetch = (one(1) |@| one(2) |@| one(3)).map(_ + _ + _)
val fut = Fetch.run[Future](fetch)

fut.map(_ shouldEqual 6)
}

"RerunnableMonadError" should "lift and execute an async fetch into a Rerunnable" in {
val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleAsync)
val article: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)
Await.result(article.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1"))
}
it should "run a sync fetch" in {
val fetch: Fetch[Article] = Fetch(ArticleId(1))(ArticleSync)
val article: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)
Await.result(article.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1"))
}
it should "allow for several async datasources to be combined" in {
val fetch: Fetch[(Article, Author)] = for {
art article(1)
author author(art)
} yield (art, author)

val rr: Rerunnable[(Article, Author)] = Fetch.run[Rerunnable](fetch)
Await.result(rr.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1"), Author(
2,
"@egg2"))
Await.result(rr.run, 100.milliseconds) shouldEqual (Article(1, "An article with id 1"), Author(
2,
"@egg2"))
}
it should "be used as an applicative" in {
import cats.syntax.cartesian._

val fetch: Fetch[Int] = (one(1) |@| one(2) |@| one(3)).map(_ + _ + _)
val fut = Fetch.run[Rerunnable](fetch)

fut.map(_ shouldEqual 6)
}
it should "be usable as an applicative" in {
import cats.syntax.cartesian._

val fetch: Fetch[Int] = (one(1) |@| one(2) |@| one(3)).map(_ + _ + _)
val fut = Fetch.run[Rerunnable](fetch)

fut.map(_ shouldEqual 6)
}

"evalToRunnable" should "convert an Eval into a Runnable" in {
var dontDoThisAtHome: Int = 1000
def evalFun: Int = {
dontDoThisAtHome += 1
dontDoThisAtHome
}

// this invokes evalFun as soon as the Now is created. subsequent
// calls to the rerunnable return the memoized value
val now = Now(evalFun)
val rNow = evalToRerunnable(now)
val nowAnswer = dontDoThisAtHome
Await.result(rNow.run) should be(nowAnswer)
Await.result(rNow.run) should be(nowAnswer)

// this invokes evalFun on first run. subsequent calls to the
// run return the memoized value
def laterValue: Int = evalFun
val later = Later(laterValue)
val rLater = evalToRerunnable(later)
val laterAnswer = dontDoThisAtHome
Await.result(rLater.run) should be(laterAnswer + 1)
Await.result(rLater.run) should be(laterAnswer + 1)

// each time rerunnable run is invoked evalFun is called
val always = Always(evalFun)
val rAlways = evalToRerunnable(always)
val alwaysAnswer = dontDoThisAtHome
Await.result(rAlways.run) should be(alwaysAnswer + 1)
Await.result(rAlways.run) should be(alwaysAnswer + 2)
}
}
Loading

0 comments on commit 93a7d41

Please sign in to comment.