Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Task #894

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/scala/cats/Eval.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package cats

import data.Xor
import scala.reflect.ClassTag
import scala.annotation.tailrec
import cats.syntax.all._

Expand Down Expand Up @@ -104,6 +106,20 @@ sealed abstract class Eval[A] extends Serializable { self =>
* Later[A] with an equivalent computation will be returned.
*/
def memoize: Eval[A]

/**
* Returns a new Eval which will catch any non-fatal exception
* thrown by the running of the computation.
*/
def catchNonFatal: Eval[Throwable Xor A] =
Eval.always(Xor.catchNonFatal(value))

/**
* Returns a new Eval catch some subset of the exceptions that might
* be thrown by the running of the computation.
*/
def catchOnly[T >: Null <: Throwable: ClassTag]: Eval[T Xor A] =
Eval.always(Xor.catchOnly[T](value))
}


Expand Down
41 changes: 41 additions & 0 deletions jvm/src/main/scala/cats/jvm/EvalAsync.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package cats
package jvm

import data.Reader
import java.util.concurrent.{Callable, CountDownLatch, ExecutorService}

object EvalAsync {
def apply[A](cb: (A => Unit) => Unit): Eval[A] = Eval.always {
val cdl = new CountDownLatch(1)
var result: Option[A] = None
cb((a: A) => {result = Some(a); cdl.countDown})
cdl.await
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not asynchronous and it's not equivalent to a Task, because this is cheating :-)

I think Cats should strive to expose the same API for both the JVM and Scala.js. For one because it would allow people to write portable code without worrying about which types and functions are available where. But also because it keeps you honest.

When you do a countDownLatch.await, in essence blocking a thread for a result, the JVM pretends that the processing you're doing is synchronous, with a result available immediately. But it's a trick that can backfire. Ever heard of EJB (Enterprise Java Beans)? Ever heard of Corba? THE reason for why these sucked so badly was because their original design was a game of pretend. As in, lets pretend that where the processing happens for these synchronous methods don't matter. But of course, making local and remote method calls look the same ended up being a problem, because in the real world you can have significant latency, bandwidth limitations, security issues, dropped packages, timing issues and all sorts of other networking problems.

But back to the point, blocking threads is dangerous and in order to block safely, you have to know the configuration of the underlying thread-pool. Lets say that our thread-pool is limited to a single thread. Lets say that our countDownLatch.await will block that single thread. At that point there will be no threads left to do the actual processing. A one-thread pool is simplifying the problem of course, but this can happen with any fixed sized thread-pool.

This is why we have a BlockContext in Scala, being able to tell the thread-pool that this and that operations are potentially blocking so you might want to add more threads or whatever. Such a strategy is acceptable for blocking I/O. But this is very inefficient, as it defeats the purpose of using a limited thread-pool, so when doing a lot of synchronous I/O (like for example JDBC stuff), we might as well use a CachedThreadPool with unlimited threads.

Which leads us to the reason for why we want to use limited thread pools (e.g. with a number of threads directly proportional to the number of CPU cores). It's because preemptive multi-threading of more threads than CPU-cores is done by a technique called time slicing and when a CPU core switches between threads/processes it has to do a context switch and context switches are freaking expensive. This is why it's considered a best practice to do async I/O and to do CPU-bound processing on a limited thread-pool and if you have to do synchronous I/O, then people instantiate a second unlimited thread-pool meant only for synchronous I/O.

In other words, on top of the JVM, we have 3 options:

  1. thread-block a limited thread-pool and suffer from non-deterministic deadlocks
  2. thread-block an unlimited thread-pool and suffer the performance penalties
  3. do not block any threads, ever

So you see, from my point of view, this isn't even an issue related to Javascript :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL;DR - sorry for the long message - what I'm saying is that if the design involves any form of blocking threads (like usage of a CountDownLatch), for a foundational library like Cats I think the design is wrong.

result.get // YOLO
}

implicit class EvalFork[A](val eval: Eval[A]) extends AnyVal {
def callable(cb: A => Unit): Callable[Unit] = new Callable[Unit] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a glance I have no idea why you're using Callable instead of Runnable here. Do you actually need the ability to throw checked exceptions?

def call = cb(eval.value)
}

/**
* Returns an Eval that will produce the same value as the wrapped
* Eval, but extracting the value from the resulting Eval will
* submit the work to the given ExecutorService
*/
def fork: Reader[ExecutorService, Eval[A]] =
Reader { pool =>
EvalAsync { cb =>
val _ = pool.submit(callable(cb))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the val _ = accomplish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to get rid of a fatal warning about discarding a non-unit value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, val _ = is both more line noise (at least to my eye) than ; () and more bytecode (there actually is a local variable being assigned to).

}
}

/**
* Run this computation asynchronously, and call the callback with the result
*/
final def asyncValue(cb: A => Unit): Reader[ExecutorService, Unit] =
Reader { pool =>
val _ = pool.submit(callable(cb))
}
}
}
66 changes: 66 additions & 0 deletions jvm/src/test/scala/cats/tests/EvalAsyncTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package cats
package jvm
package tests

import cats.tests.CatsSuite

class EvalAsyncTests extends CatsSuite {
test("evalAsync should be stack-safe") {
import data.Streaming
val ones = List(Eval.now(1),
Eval.later(1),
Eval.always(1),
EvalAsync[Int](_(1)))

val onesStream: Streaming[Eval[Int]] = Streaming.continually(Streaming.fromList(ones)).flatMap(x => x)


def taskMap2[A,B,C](t1: Eval[A], t2: Eval[B])(f: (A,B) => C): Eval[C] = {
t1.flatMap(a => t2.map(b => f(a,b)))
}

def sequenceStreaming[A](fa: Streaming[Eval[A]]): Eval[Streaming[A]] = {
fa.foldRight(Eval.later(Eval.now(Streaming.empty[A])))((a, st) =>
st.map(b => taskMap2(b,a)((x,y) => Streaming.cons(y,x)))).value
}

val howmany = 1000000

sequenceStreaming(onesStream.take(howmany)).value.foldLeft(0)((x, _) => x + 1) should be (howmany)

onesStream.take(howmany).sequence.value.foldLeft(0)((x, _) => x + 1) should be (howmany)
}

test("EvalAsync should run forked tasks on another thread") {
import EvalAsync._

val pool = new java.util.concurrent.ForkJoinPool

var time1: Long = 0
var time2: Long = 0

val t1: Eval[Unit] = Eval.later {
Thread.sleep(2000)
time1 = System.currentTimeMillis
}.fork.run(pool)

val t2: Eval[Unit] = Eval.later {
Thread.sleep(1000)
time2 = System.currentTimeMillis
()
}.fork.run(pool)

val cdl = new java.util.concurrent.CountDownLatch(2)

t1.asyncValue(_ => cdl.countDown).run(pool)
t2.asyncValue(_ => cdl.countDown).run(pool)

time1 should be(0L)
time2 should be(0L)

cdl.await

time2 should be > 0L
time1 should be > time2
}
}
14 changes: 14 additions & 0 deletions tests/src/test/scala/cats/tests/EvalTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ class EvalTests extends CatsSuite {
}
}

test("eval should be stack-safe") {
val ones = List(Eval.now(1),
Eval.later(1),
Eval.always(1))
import data.Streaming
// an infinite stream of ones
val onesStream: Streaming[Eval[Int]] = Streaming.continually(Streaming.fromList(ones)).flatMap(x => x)

val howmany = 1000000
onesStream.take(howmany).sequence.value.foldLeft(0)((x, _) => x + 1) should be (howmany)

}


{
implicit val iso = CartesianTests.Isomorphisms.invariant[Eval]
checkAll("Eval[Int]", BimonadTests[Eval].bimonad[Int, Int, Int])
Expand Down