Skip to content

Commit

Permalink
Generalize timeout tests. Fix twitter timout.
Browse files Browse the repository at this point in the history
  • Loading branch information
peterneyens committed Aug 18, 2017
1 parent 93a7d41 commit 67e7f2f
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 214 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ lazy val fetchJS = fetch.js

lazy val monix = crossProject
.in(file("monix"))
.dependsOn(fetch)
.dependsOn(fetch % "compile->compile;test->test")
.settings(name := "fetch-monix")
.jsSettings(sharedJsSettings: _*)
.crossDepSettings(commonCrossDependencies ++ monixCrossDependencies: _*)
Expand All @@ -40,7 +40,7 @@ lazy val debugJS = debug.js

lazy val twitter = crossProject
.in(file("twitter"))
.dependsOn(fetch)
.dependsOn(fetch % "compile->compile;test->test")
.crossDepSettings(commonCrossDependencies ++ twitterUtilDependencies: _*)

lazy val twitterJVM = twitter.jvm
Expand Down
56 changes: 56 additions & 0 deletions jvm/src/test/scala/FetchMonadErrorTimoutSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2016-2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch

import java.util.concurrent.TimeoutException
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import org.scalatest.{AsyncFlatSpecLike, Matchers}

// Note that this test cannot run on Scala.js

trait FetchMonadErrorTimeoutSpec[F[_]] { self: AsyncFlatSpecLike with Matchers =>

def runAsFuture[A](fa: F[A]): Future[A]

def fetchMonadError: FetchMonadError[F]

def delayQuery(timeout: Duration, delay: FiniteDuration): Query[Option[Int]] =
Query.async((ok, fail) => {
Thread.sleep(delay.toMillis)
ok(Some(1))
}, timeout)

"FetchMonadError" should "fail with timeout when a Query does not complete in time" in {
recoverToSucceededIf[TimeoutException] {
runAsFuture { fetchMonadError.runQuery(delayQuery(100.millis, 300.millis)) }
}
}

it should "not fail with timeout when a Query does complete in time" in {
runAsFuture {
fetchMonadError.runQuery(delayQuery(300.millis, 100.millis))
}.map(_ shouldEqual Some(1))
}

it should "not fail with timeout when infinite timeout specified" in {
runAsFuture {
fetchMonadError.runQuery(delayQuery(Duration.Inf, 100.millis))
}.map(_ shouldEqual Some(1))
}

}
66 changes: 6 additions & 60 deletions jvm/src/test/scala/FutureTimeoutTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,74 +16,20 @@

package fetch

import scala.concurrent._
import scala.concurrent.duration._
import org.scalatest._
import cats.data.NonEmptyList
import scala.concurrent.{ExecutionContext, Future}
import org.scalatest.{AsyncFlatSpec, Matchers}
import fetch.implicits._

// Note that this test cannot run on Scala.js

class FutureTimeoutTests
extends AsyncFlatSpec
with Matchers
with OptionValues
with Inside
with Inspectors {
with FetchMonadErrorTimeoutSpec[Future] {

implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global
implicit override val executionContext: ExecutionContext = ExecutionContext.Implicits.global

case class ArticleId(id: Int)
case class Article(id: Int, content: String)

def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] =
Fetch(ArticleId(id))

// A sample datasource with configurable delay and timeout

case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration)
extends DataSource[ArticleId, Article] {
override def name = "ArticleFuture"
override def fetchOne(id: ArticleId): Query[Option[Article]] =
Query.async((ok, fail) => {
Thread.sleep(delay.toMillis)
ok(Option(Article(id.id, "An article with id " + id.id)))
}, timeout)
override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] =
batchingNotSupported(ids)
}

"FetchMonadError[Future]" should "fail with timeout when a datasource does not complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

recoverToSucceededIf[TimeoutException] {
fut

}
}

it should "not fail with timeout when a datasource does complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}

it should "not fail with timeout when infinite timeout specified" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Inf, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Future[Article] = Fetch.run[Future](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}
def runAsFuture[A](fa: Future[A]): Future[A] = fa

def fetchMonadError: FetchMonadError[Future] = FetchMonadError[Future]
}
38 changes: 38 additions & 0 deletions monix/jvm/src/test/scala/FetchTaskTimoutTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2016-2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fetch.monixTask

import monix.eval.Task
import monix.execution.Scheduler
import scala.concurrent.{ExecutionContext, Future}
import org.scalatest.{AsyncFlatSpec, Matchers}
import fetch.{FetchMonadError, FetchMonadErrorTimeoutSpec}
import fetch.monixTask.implicits._

// Note that this test cannot run on Scala.js

class FetchTaskTimeoutTests
extends AsyncFlatSpec
with Matchers
with FetchMonadErrorTimeoutSpec[Task] {

implicit override val executionContext: Scheduler = Scheduler.Implicits.global

def runAsFuture[A](task: Task[A]): Future[A] = task.runAsync

def fetchMonadError: FetchMonadError[Task] = FetchMonadError[Task]
}
11 changes: 5 additions & 6 deletions monix/shared/src/test/scala/FetchTaskTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,20 @@
* limitations under the License.
*/

package fetch.monixTask

import monix.eval.Task
import monix.execution.Scheduler

import org.scalatest._

import org.scalatest.{AsyncFreeSpec, Matchers}
import cats.data.NonEmptyList
import cats.instances.list._
import scala.concurrent.Future

import fetch._
import fetch.monixTask.implicits._

import scala.concurrent.Future

class FetchTaskTests extends AsyncFreeSpec with Matchers {
implicit override def executionContext = Scheduler.Implicits.global
implicit override val executionContext = Scheduler.Implicits.global

case class ArticleId(id: Int)
case class Article(id: Int, content: String) {
Expand Down
34 changes: 14 additions & 20 deletions twitter/jvm/src/main/scala/TwitterFuture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package fetch.twitterFuture

import cats.{Always, Eval, Later, Now}
import com.twitter.util.{Duration, Future, FuturePool, JavaTimer, Promise, Timer}
import io.catbird.util._
import fetch._
import scala.concurrent.duration.FiniteDuration

object implicits {

import cats._
import com.twitter.util.{Duration, Future, FuturePool, Promise, Timer}
import io.catbird.util._
private[fetch] val timeoutTimer =
new JavaTimer(true, Some("fetch-twitter-future-timeout-daemon"))

def evalToRerunnable[A](e: Eval[A]): Rerunnable[A] = e match {
case Now(x) => Rerunnable.const(x)
Expand All @@ -36,19 +38,11 @@ object implicits {
implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool
): FetchMonadError[Rerunnable] =
new FetchMonadError.FromMonadError[Rerunnable] {
override def runQuery[A](j: Query[A]): Rerunnable[A] = j match {
case Sync(e) evalToRerunnable(e)
case Async(ac, timeout)
Rerunnable.fromFuture {
val p: Promise[A] = Promise()
pool(ac(p setValue _, p setException _))
timeout match {
case _: FiniteDuration =>
p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil)
case _ => p
}
}
case Ap(qf, qx)
override def runQuery[A](q: Query[A]): Rerunnable[A] = q match {
case Sync(e) => evalToRerunnable(e)
case Async(_, _) =>
Rerunnable.fromFuture { fetchTwFutureMonadError(pool).runQuery(q) }
case Ap(qf, qx) =>
runQuery(qf).product(runQuery(qx)) map { case (f, x) => f(x) }
}
}
Expand All @@ -57,14 +51,14 @@ object implicits {
implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool
): FetchMonadError[Future] =
new FetchMonadError.FromMonadError[Future] {
override def runQuery[A](j: Query[A]): Future[A] = j match {
case Sync(e) Future(e.value)
case Async(ac, timeout)
override def runQuery[A](q: Query[A]): Future[A] = q match {
case Sync(e) => Future(e.value)
case Async(ac, timeout) =>
val p: Promise[A] = Promise()
pool(ac(p setValue _, p setException _))
timeout match {
case _: FiniteDuration =>
p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil)
p.raiseWithin(Duration(timeout.length, timeout.unit))(timeoutTimer)
case _ => p
}
case Ap(qf, qx)
Expand Down
80 changes: 17 additions & 63 deletions twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,30 @@

package fetch.twitterFuture

import cats.data.NonEmptyList
import com.twitter.util.{Await, Duration, Future, TimeoutException}
import com.twitter.conversions.time._
import fetch._
import fetch.implicits._
import io.catbird.util.Rerunnable
import com.twitter.util.{ExecutorServiceFuturePool, FuturePool}
import scala.concurrent.{Future => ScalaFuture, ExecutionContext}
import org.scalatest.{AsyncFlatSpec, Matchers}

import fetch.{FetchMonadError, FetchMonadErrorTimeoutSpec}
import fetch.twitterFuture.implicits._
import io.catbird.util._
import org.scalatest._

class RerunnableTimeoutSpac
extends FlatSpec
class RerunnableTimeoutSpec
extends AsyncFlatSpec
with Matchers
with OptionValues
with Inside
with Inspectors {

case class ArticleId(id: Int)
case class Article(id: Int, content: String)

def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] =
Fetch(ArticleId(id))

// A sample datasource with configurable delay and timeout

case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration)
extends DataSource[ArticleId, Article] {
override def name = "ArticleRerunnable"
override def fetchOne(id: ArticleId): Query[Option[Article]] =
Query.async(
(ok, fail) => {
Thread.sleep(delay.inMillis)
ok(Option(Article(id.id, "An article with id " + id.id)))
},
scala.concurrent.duration.Duration.fromNanos(timeout.inNanoseconds)
)
override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] =
batchingNotSupported(ids)
}

"FetchMonadError[Rerunnable]" should "fail with timeout when a datasource does not complete in time" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)
with FetchMonadErrorTimeoutSpec[Rerunnable] {

assertThrows[TimeoutException] {
Await.result(fut.run, 1 seconds)
}
implicit val pool: FuturePool = FuturePool.interruptibleUnboundedPool

implicit override val executionContext: ExecutionContext = {
val executor = pool.asInstanceOf[ExecutorServiceFuturePool].executor
ExecutionContext.fromExecutorService(executor)
}

it should "not fail with timeout when a datasource does complete in time" in {
def runAsFuture[A](rerun: Rerunnable[A]): ScalaFuture[A] =
Convert.twitterToScalaFuture(rerun.run)

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}

it should "not fail with timeout when infinite timeout specified" in {

implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Top, 250 milliseconds)

val fetch: Fetch[Article] = article(1)
val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch)

fut.map { _ shouldEqual Article(1, "An article with id 1") }
}
def fetchMonadError: FetchMonadError[Rerunnable] =
FetchMonadError[Rerunnable]

}
Loading

0 comments on commit 67e7f2f

Please sign in to comment.