From 67e7f2f89a86aeef235fde6e69c130d39b0efee1 Mon Sep 17 00:00:00 2001 From: peterneyens Date: Fri, 18 Aug 2017 14:19:25 +0100 Subject: [PATCH] Generalize timeout tests. Fix twitter timout. --- build.sbt | 4 +- .../scala/FetchMonadErrorTimoutSpec.scala | 56 +++++++++++++ jvm/src/test/scala/FutureTimeoutTests.scala | 66 ++------------- .../src/test/scala/FetchTaskTimoutTests.scala | 38 +++++++++ .../src/test/scala/FetchTaskTests.scala | 11 ++- .../jvm/src/main/scala/TwitterFuture.scala | 34 ++++---- .../test/scala/RerunnableTimeoutSpecs.scala | 80 ++++--------------- .../test/scala/TwitterFutureTimeoutSpec.scala | 79 ++++-------------- .../src/test/scala/TwitterToScalaFuture.scala | 35 ++++++++ 9 files changed, 189 insertions(+), 214 deletions(-) create mode 100644 jvm/src/test/scala/FetchMonadErrorTimoutSpec.scala create mode 100644 monix/jvm/src/test/scala/FetchTaskTimoutTests.scala create mode 100644 twitter/jvm/src/test/scala/TwitterToScalaFuture.scala diff --git a/build.sbt b/build.sbt index 3e1caf29..ccc2daee 100644 --- a/build.sbt +++ b/build.sbt @@ -21,7 +21,7 @@ lazy val fetchJS = fetch.js lazy val monix = crossProject .in(file("monix")) - .dependsOn(fetch) + .dependsOn(fetch % "compile->compile;test->test") .settings(name := "fetch-monix") .jsSettings(sharedJsSettings: _*) .crossDepSettings(commonCrossDependencies ++ monixCrossDependencies: _*) @@ -40,7 +40,7 @@ lazy val debugJS = debug.js lazy val twitter = crossProject .in(file("twitter")) - .dependsOn(fetch) + .dependsOn(fetch % "compile->compile;test->test") .crossDepSettings(commonCrossDependencies ++ twitterUtilDependencies: _*) lazy val twitterJVM = twitter.jvm diff --git a/jvm/src/test/scala/FetchMonadErrorTimoutSpec.scala b/jvm/src/test/scala/FetchMonadErrorTimoutSpec.scala new file mode 100644 index 00000000..fb0b04ee --- /dev/null +++ b/jvm/src/test/scala/FetchMonadErrorTimoutSpec.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2017 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fetch + +import java.util.concurrent.TimeoutException +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ +import org.scalatest.{AsyncFlatSpecLike, Matchers} + +// Note that this test cannot run on Scala.js + +trait FetchMonadErrorTimeoutSpec[F[_]] { self: AsyncFlatSpecLike with Matchers => + + def runAsFuture[A](fa: F[A]): Future[A] + + def fetchMonadError: FetchMonadError[F] + + def delayQuery(timeout: Duration, delay: FiniteDuration): Query[Option[Int]] = + Query.async((ok, fail) => { + Thread.sleep(delay.toMillis) + ok(Some(1)) + }, timeout) + + "FetchMonadError" should "fail with timeout when a Query does not complete in time" in { + recoverToSucceededIf[TimeoutException] { + runAsFuture { fetchMonadError.runQuery(delayQuery(100.millis, 300.millis)) } + } + } + + it should "not fail with timeout when a Query does complete in time" in { + runAsFuture { + fetchMonadError.runQuery(delayQuery(300.millis, 100.millis)) + }.map(_ shouldEqual Some(1)) + } + + it should "not fail with timeout when infinite timeout specified" in { + runAsFuture { + fetchMonadError.runQuery(delayQuery(Duration.Inf, 100.millis)) + }.map(_ shouldEqual Some(1)) + } + +} diff --git a/jvm/src/test/scala/FutureTimeoutTests.scala b/jvm/src/test/scala/FutureTimeoutTests.scala index 8098053e..6458032c 100644 --- a/jvm/src/test/scala/FutureTimeoutTests.scala +++ b/jvm/src/test/scala/FutureTimeoutTests.scala @@ -16,10 +16,8 @@ package fetch -import scala.concurrent._ -import scala.concurrent.duration._ -import org.scalatest._ -import cats.data.NonEmptyList +import scala.concurrent.{ExecutionContext, Future} +import org.scalatest.{AsyncFlatSpec, Matchers} import fetch.implicits._ // Note that this test cannot run on Scala.js @@ -27,63 +25,11 @@ import fetch.implicits._ class FutureTimeoutTests extends AsyncFlatSpec with Matchers - with OptionValues - with Inside - with Inspectors { + with FetchMonadErrorTimeoutSpec[Future] { - implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global + implicit override val executionContext: ExecutionContext = ExecutionContext.Implicits.global - case class ArticleId(id: Int) - case class Article(id: Int, content: String) - - def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] = - Fetch(ArticleId(id)) - - // A sample datasource with configurable delay and timeout - - case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration) - extends DataSource[ArticleId, Article] { - override def name = "ArticleFuture" - override def fetchOne(id: ArticleId): Query[Option[Article]] = - Query.async((ok, fail) => { - Thread.sleep(delay.toMillis) - ok(Option(Article(id.id, "An article with id " + id.id))) - }, timeout) - override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] = - batchingNotSupported(ids) - } - - "FetchMonadError[Future]" should "fail with timeout when a datasource does not complete in time" in { - - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Future[Article] = Fetch.run[Future](fetch) - - recoverToSucceededIf[TimeoutException] { - fut - - } - } - - it should "not fail with timeout when a datasource does complete in time" in { - - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Future[Article] = Fetch.run[Future](fetch) - - fut.map { _ shouldEqual Article(1, "An article with id 1") } - } - - it should "not fail with timeout when infinite timeout specified" in { - - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Inf, 250 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Future[Article] = Fetch.run[Future](fetch) - - fut.map { _ shouldEqual Article(1, "An article with id 1") } - } + def runAsFuture[A](fa: Future[A]): Future[A] = fa + def fetchMonadError: FetchMonadError[Future] = FetchMonadError[Future] } diff --git a/monix/jvm/src/test/scala/FetchTaskTimoutTests.scala b/monix/jvm/src/test/scala/FetchTaskTimoutTests.scala new file mode 100644 index 00000000..623eb864 --- /dev/null +++ b/monix/jvm/src/test/scala/FetchTaskTimoutTests.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2017 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fetch.monixTask + +import monix.eval.Task +import monix.execution.Scheduler +import scala.concurrent.{ExecutionContext, Future} +import org.scalatest.{AsyncFlatSpec, Matchers} +import fetch.{FetchMonadError, FetchMonadErrorTimeoutSpec} +import fetch.monixTask.implicits._ + +// Note that this test cannot run on Scala.js + +class FetchTaskTimeoutTests + extends AsyncFlatSpec + with Matchers + with FetchMonadErrorTimeoutSpec[Task] { + + implicit override val executionContext: Scheduler = Scheduler.Implicits.global + + def runAsFuture[A](task: Task[A]): Future[A] = task.runAsync + + def fetchMonadError: FetchMonadError[Task] = FetchMonadError[Task] +} diff --git a/monix/shared/src/test/scala/FetchTaskTests.scala b/monix/shared/src/test/scala/FetchTaskTests.scala index 0d573c0d..8ec1be02 100644 --- a/monix/shared/src/test/scala/FetchTaskTests.scala +++ b/monix/shared/src/test/scala/FetchTaskTests.scala @@ -14,21 +14,20 @@ * limitations under the License. */ +package fetch.monixTask + import monix.eval.Task import monix.execution.Scheduler - -import org.scalatest._ - +import org.scalatest.{AsyncFreeSpec, Matchers} import cats.data.NonEmptyList import cats.instances.list._ +import scala.concurrent.Future import fetch._ import fetch.monixTask.implicits._ -import scala.concurrent.Future - class FetchTaskTests extends AsyncFreeSpec with Matchers { - implicit override def executionContext = Scheduler.Implicits.global + implicit override val executionContext = Scheduler.Implicits.global case class ArticleId(id: Int) case class Article(id: Int, content: String) { diff --git a/twitter/jvm/src/main/scala/TwitterFuture.scala b/twitter/jvm/src/main/scala/TwitterFuture.scala index bc9765ec..4bbd2b99 100644 --- a/twitter/jvm/src/main/scala/TwitterFuture.scala +++ b/twitter/jvm/src/main/scala/TwitterFuture.scala @@ -16,14 +16,16 @@ package fetch.twitterFuture +import cats.{Always, Eval, Later, Now} +import com.twitter.util.{Duration, Future, FuturePool, JavaTimer, Promise, Timer} +import io.catbird.util._ import fetch._ import scala.concurrent.duration.FiniteDuration object implicits { - import cats._ - import com.twitter.util.{Duration, Future, FuturePool, Promise, Timer} - import io.catbird.util._ + private[fetch] val timeoutTimer = + new JavaTimer(true, Some("fetch-twitter-future-timeout-daemon")) def evalToRerunnable[A](e: Eval[A]): Rerunnable[A] = e match { case Now(x) => Rerunnable.const(x) @@ -36,19 +38,11 @@ object implicits { implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool ): FetchMonadError[Rerunnable] = new FetchMonadError.FromMonadError[Rerunnable] { - override def runQuery[A](j: Query[A]): Rerunnable[A] = j match { - case Sync(e) ⇒ evalToRerunnable(e) - case Async(ac, timeout) ⇒ - Rerunnable.fromFuture { - val p: Promise[A] = Promise() - pool(ac(p setValue _, p setException _)) - timeout match { - case _: FiniteDuration => - p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil) - case _ => p - } - } - case Ap(qf, qx) ⇒ + override def runQuery[A](q: Query[A]): Rerunnable[A] = q match { + case Sync(e) => evalToRerunnable(e) + case Async(_, _) => + Rerunnable.fromFuture { fetchTwFutureMonadError(pool).runQuery(q) } + case Ap(qf, qx) => runQuery(qf).product(runQuery(qx)) map { case (f, x) => f(x) } } } @@ -57,14 +51,14 @@ object implicits { implicit pool: FuturePool = FuturePool.interruptibleUnboundedPool ): FetchMonadError[Future] = new FetchMonadError.FromMonadError[Future] { - override def runQuery[A](j: Query[A]): Future[A] = j match { - case Sync(e) ⇒ Future(e.value) - case Async(ac, timeout) ⇒ + override def runQuery[A](q: Query[A]): Future[A] = q match { + case Sync(e) => Future(e.value) + case Async(ac, timeout) => val p: Promise[A] = Promise() pool(ac(p setValue _, p setException _)) timeout match { case _: FiniteDuration => - p.raiseWithin(Duration(timeout.length, timeout.unit))(Timer.Nil) + p.raiseWithin(Duration(timeout.length, timeout.unit))(timeoutTimer) case _ => p } case Ap(qf, qx) ⇒ diff --git a/twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala b/twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala index 8031712f..075ce431 100644 --- a/twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala +++ b/twitter/jvm/src/test/scala/RerunnableTimeoutSpecs.scala @@ -16,76 +16,30 @@ package fetch.twitterFuture -import cats.data.NonEmptyList -import com.twitter.util.{Await, Duration, Future, TimeoutException} -import com.twitter.conversions.time._ -import fetch._ -import fetch.implicits._ +import io.catbird.util.Rerunnable +import com.twitter.util.{ExecutorServiceFuturePool, FuturePool} +import scala.concurrent.{Future => ScalaFuture, ExecutionContext} +import org.scalatest.{AsyncFlatSpec, Matchers} + +import fetch.{FetchMonadError, FetchMonadErrorTimeoutSpec} import fetch.twitterFuture.implicits._ -import io.catbird.util._ -import org.scalatest._ -class RerunnableTimeoutSpac - extends FlatSpec +class RerunnableTimeoutSpec + extends AsyncFlatSpec with Matchers - with OptionValues - with Inside - with Inspectors { - - case class ArticleId(id: Int) - case class Article(id: Int, content: String) - - def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] = - Fetch(ArticleId(id)) - - // A sample datasource with configurable delay and timeout - - case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration) - extends DataSource[ArticleId, Article] { - override def name = "ArticleRerunnable" - override def fetchOne(id: ArticleId): Query[Option[Article]] = - Query.async( - (ok, fail) => { - Thread.sleep(delay.inMillis) - ok(Option(Article(id.id, "An article with id " + id.id))) - }, - scala.concurrent.duration.Duration.fromNanos(timeout.inNanoseconds) - ) - override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] = - batchingNotSupported(ids) - } - - "FetchMonadError[Rerunnable]" should "fail with timeout when a datasource does not complete in time" in { - - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) + with FetchMonadErrorTimeoutSpec[Rerunnable] { - assertThrows[TimeoutException] { - Await.result(fut.run, 1 seconds) - } + implicit val pool: FuturePool = FuturePool.interruptibleUnboundedPool + implicit override val executionContext: ExecutionContext = { + val executor = pool.asInstanceOf[ExecutorServiceFuturePool].executor + ExecutionContext.fromExecutorService(executor) } - it should "not fail with timeout when a datasource does complete in time" in { + def runAsFuture[A](rerun: Rerunnable[A]): ScalaFuture[A] = + Convert.twitterToScalaFuture(rerun.run) - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) - - fut.map { _ shouldEqual Article(1, "An article with id 1") } - } - - it should "not fail with timeout when infinite timeout specified" in { - - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Top, 250 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Rerunnable[Article] = Fetch.run[Rerunnable](fetch) - - fut.map { _ shouldEqual Article(1, "An article with id 1") } - } + def fetchMonadError: FetchMonadError[Rerunnable] = + FetchMonadError[Rerunnable] } diff --git a/twitter/jvm/src/test/scala/TwitterFutureTimeoutSpec.scala b/twitter/jvm/src/test/scala/TwitterFutureTimeoutSpec.scala index 7b6e3902..e030e5bb 100644 --- a/twitter/jvm/src/test/scala/TwitterFutureTimeoutSpec.scala +++ b/twitter/jvm/src/test/scala/TwitterFutureTimeoutSpec.scala @@ -16,76 +16,29 @@ package fetch.twitterFuture -import cats.data.NonEmptyList -import com.twitter.util.{Await, Duration, Future, TimeoutException} -import com.twitter.conversions.time._ -import fetch._ -import fetch.implicits._ +import com.twitter.util.{Future => TwitterFuture, ExecutorServiceFuturePool, FuturePool} +import scala.concurrent.{Future => ScalaFuture, ExecutionContext} +import org.scalatest.{AsyncFlatSpec, Matchers} + +import fetch.{FetchMonadError, FetchMonadErrorTimeoutSpec} import fetch.twitterFuture.implicits._ -import org.scalatest._ -// import scala.concurrent.duration._ -class TwiterFutureTimeoutSpac - extends FlatSpec +class TwiterFutureTimeoutSpec + extends AsyncFlatSpec with Matchers - with OptionValues - with Inside - with Inspectors { - - case class ArticleId(id: Int) - case class Article(id: Int, content: String) - - def article(id: Int)(implicit DS: DataSource[ArticleId, Article]): Fetch[Article] = - Fetch(ArticleId(id)) - - // A sample datasource with configurable delay and timeout - - case class ConfigurableTimeoutDatasource(timeout: Duration, delay: Duration) - extends DataSource[ArticleId, Article] { - override def name = "ArticleFuture" - override def fetchOne(id: ArticleId): Query[Option[Article]] = - Query.async( - (ok, fail) => { - Thread.sleep(delay.inMillis) - ok(Option(Article(id.id, "An article with id " + id.id))) - }, - scala.concurrent.duration.Duration.fromNanos(timeout.inNanoseconds) - ) - override def fetchMany(ids: NonEmptyList[ArticleId]): Query[Map[ArticleId, Article]] = - batchingNotSupported(ids) - } - - "FetchMonadError[Future]" should "fail with timeout when a datasource does not complete in time" in { - - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(250 milliseconds, 750 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Future[Article] = Fetch.run[Future](fetch) + with FetchMonadErrorTimeoutSpec[TwitterFuture] { - assertThrows[TimeoutException] { - Await.result(fut, 1 seconds) - } + implicit val pool: FuturePool = FuturePool.interruptibleUnboundedPool + implicit override val executionContext: ExecutionContext = { + val executor = pool.asInstanceOf[ExecutorServiceFuturePool].executor + ExecutionContext.fromExecutorService(executor) } - it should "not fail with timeout when a datasource does complete in time" in { + def runAsFuture[A](tf: TwitterFuture[A]): ScalaFuture[A] = + Convert.twitterToScalaFuture(tf) - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(750 milliseconds, 250 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Future[Article] = Fetch.run[Future](fetch) - - fut.map { _ shouldEqual Article(1, "An article with id 1") } - } - - it should "not fail with timeout when infinite timeout specified" in { - - implicit val dsWillTimeout = ConfigurableTimeoutDatasource(Duration.Top, 250 milliseconds) - - val fetch: Fetch[Article] = article(1) - val fut: Future[Article] = Fetch.run[Future](fetch) - - fut.map { _ shouldEqual Article(1, "An article with id 1") } - } + def fetchMonadError: FetchMonadError[TwitterFuture] = + FetchMonadError[TwitterFuture] } diff --git a/twitter/jvm/src/test/scala/TwitterToScalaFuture.scala b/twitter/jvm/src/test/scala/TwitterToScalaFuture.scala new file mode 100644 index 00000000..25d6a2b7 --- /dev/null +++ b/twitter/jvm/src/test/scala/TwitterToScalaFuture.scala @@ -0,0 +1,35 @@ +/* + * Copyright 2016-2017 47 Degrees, LLC. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fetch.twitterFuture + +import com.twitter.util.{Future => TwitterFuture, Return, Throw} +import scala.concurrent.{Future => ScalaFuture, Promise => ScalaPromise, ExecutionContext} + +object Convert { + + /** https://twitter.github.io/util/guide/util-cookbook/futures.html */ + def twitterToScalaFuture[A](tf: TwitterFuture[A])( + implicit ec: ExecutionContext): ScalaFuture[A] = { + val promise: ScalaPromise[A] = ScalaPromise() + tf.respond { + case Return(value) => promise.trySuccess(value) + case Throw(exception) => promise.tryFailure(exception) + } + promise.future + } + +}