Skip to content

Commit

Permalink
Introduced Suspendable type class
Browse files Browse the repository at this point in the history
This PR moves `suspend` and `delay` from the `Effect` type class to a
new `Suspendable` type class. There are two motivations for doing this:

 - Exposing `unsafeRunAsync` to any function that requires `suspend`
   or `delay` gives up a lot of parametricity.

 - Some type constructors have `Suspendable` instances but do not
   support value extraction. This came up in the [port of Doobie to
   FS2](typelevel/doobie#323 (comment)).

This [came up on Gitter a while back as
well](http://www.gitterforum.com/discussion/scalaz-scalaz-stream?page=143).
  • Loading branch information
mpilquist committed Aug 16, 2016
1 parent 9ef7ad4 commit 7cb2be4
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 39 deletions.
14 changes: 2 additions & 12 deletions core/shared/src/main/scala/fs2/util/Effect.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
package fs2.util

trait Effect[F[_]] extends Catchable[F] {

/**
* Returns an `F[A]` that evaluates and runs the provided `fa` on each run.
*/
def suspend[A](fa: => F[A]): F[A]

/**
* Promotes a non-strict value to an `F`, catching exceptions in the process.
* Evaluates `a` each time the returned effect is run.
*/
def delay[A](a: => A): F[A] = suspend(pure(a))
/** Monad which supports catching exceptions, suspending evaluation, and potentially asynchronous evaluation. */
trait Effect[F[_]] extends Catchable[F] with Suspendable[F] {

/**
* Evaluates the specified `F[A]`, possibly asynchronously, and calls the specified
Expand Down
23 changes: 23 additions & 0 deletions core/shared/src/main/scala/fs2/util/Suspendable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package fs2.util

/**
* Monad which supports capturing a deferred evaluation of a by-name `F[A]`.
*
* Evaluation is suspended until a value is extracted, typically via the `unsafeRunAsync`
* method on the related [[Effect]] or [[Async]] type classes. Side-effects that occur
* while evaluating a suspension are evaluated exactly once at the time of extraction.
*/
trait Suspendable[F[_]] extends Monad[F] {

/**
* Returns an `F[A]` that evaluates and runs the provided `fa` on each run.
*/
def suspend[A](fa: => F[A]): F[A]

/**
* Promotes a non-strict value to an `F`, catching exceptions in the process.
* Evaluates `a` each time the returned effect is run.
*/
def delay[A](a: => A): F[A] = suspend(pure(a))
}

5 changes: 3 additions & 2 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,15 @@ val eff = Stream.eval(Task.delay { println("TASK BEING RUN!!"); 1 + 1 })

[`Task`](../core/shared/src/main/scala/fs2/Task.scala) is an effect type we'll see a lot in these examples. Creating a `Task` has no side effects, and `Stream.eval` doesn't do anything at the time of creation, it's just a description of what needs to happen when the stream is eventually interpreted. Notice the type of `eff` is now `Stream[Task,Int]`.

The `eval` function works for any effect type, not just `Task`. FS2 does not care what effect type you use for your streams. You may use the included [`Task` type][Task] for effects or bring your own, just by implementing a few interfaces for your effect type ([`Catchable`][Catchable] and optionally [`Effect`][Effect] or [`Async`][Async] if you wish to use various concurrent operations discussed later). Here's the signature of `eval`:
The `eval` function works for any effect type, not just `Task`. FS2 does not care what effect type you use for your streams. You may use the included [`Task` type][Task] for effects or bring your own, just by implementing a few interfaces for your effect type ([`Catchable`][Catchable], [`Suspendable`][Suspendable], [`Effect`][Effect], and optional [`Async`][Async] if you wish to use various concurrent operations discussed later). Here's the signature of `eval`:

```Scala
def eval[F[_],A](f: F[A]): Stream[F,A]
```

[Task]: ../core/shared/src/main/scala/fs2/Task.scala
[Catchable]: ../core/shared/src/main/scala/fs2/util/Catchable.scala
[Suspendable]: ../core/shared/src/main/scala/fs2/util/Suspendable.scala
[Effect]: ../core/shared/src/main/scala/fs2/util/Effect.scala
[Async]: ../core/shared/src/main/scala/fs2/util/Async.scala

Expand Down Expand Up @@ -290,7 +291,7 @@ scala> Stream.bracket(acquire)(_ => Stream(1,2,3) ++ err, _ => release).run.unsa
incremented: 1
decremented: 0
java.lang.Exception: oh noes!
... 818 elided
... 822 elided
```

The inner stream fails, but notice the `release` action is still run:
Expand Down
3 changes: 2 additions & 1 deletion docs/src/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ val eff = Stream.eval(Task.delay { println("TASK BEING RUN!!"); 1 + 1 })

[`Task`](../core/shared/src/main/scala/fs2/Task.scala) is an effect type we'll see a lot in these examples. Creating a `Task` has no side effects, and `Stream.eval` doesn't do anything at the time of creation, it's just a description of what needs to happen when the stream is eventually interpreted. Notice the type of `eff` is now `Stream[Task,Int]`.

The `eval` function works for any effect type, not just `Task`. FS2 does not care what effect type you use for your streams. You may use the included [`Task` type][Task] for effects or bring your own, just by implementing a few interfaces for your effect type ([`Catchable`][Catchable] and optionally [`Effect`][Effect] or [`Async`][Async] if you wish to use various concurrent operations discussed later). Here's the signature of `eval`:
The `eval` function works for any effect type, not just `Task`. FS2 does not care what effect type you use for your streams. You may use the included [`Task` type][Task] for effects or bring your own, just by implementing a few interfaces for your effect type ([`Catchable`][Catchable], [`Suspendable`][Suspendable], [`Effect`][Effect], and optional [`Async`][Async] if you wish to use various concurrent operations discussed later). Here's the signature of `eval`:

```Scala
def eval[F[_],A](f: F[A]): Stream[F,A]
```

[Task]: ../core/shared/src/main/scala/fs2/Task.scala
[Catchable]: ../core/shared/src/main/scala/fs2/util/Catchable.scala
[Suspendable]: ../core/shared/src/main/scala/fs2/util/Suspendable.scala
[Effect]: ../core/shared/src/main/scala/fs2/util/Effect.scala
[Async]: ../core/shared/src/main/scala/fs2/util/Async.scala

Expand Down
12 changes: 6 additions & 6 deletions io/src/main/scala/fs2/io/JavaInputOutputStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package io

import java.io.{InputStream, OutputStream}

import fs2.util.Effect
import fs2.util.Suspendable
import fs2.util.syntax._

private[io] object JavaInputOutputStream {
def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte])(implicit F: Effect[F]): F[Option[Chunk[Byte]]] =
def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte])(implicit F: Suspendable[F]): F[Option[Chunk[Byte]]] =
F.delay(is.read(buf)).map { numBytes =>
if (numBytes < 0) None
else if (numBytes == 0) Some(Chunk.empty)
else Some(Chunk.bytes(buf, 0, numBytes))
}

def readInputStreamGeneric[F[_]](fis: F[InputStream], chunkSize: Int, f: (InputStream, Array[Byte]) => F[Option[Chunk[Byte]]], closeAfterUse: Boolean = true)(implicit F: Effect[F]): Stream[F, Byte] = {
def readInputStreamGeneric[F[_]](fis: F[InputStream], chunkSize: Int, f: (InputStream, Array[Byte]) => F[Option[Chunk[Byte]]], closeAfterUse: Boolean = true)(implicit F: Suspendable[F]): Stream[F, Byte] = {
val buf = new Array[Byte](chunkSize)

def useIs(is: InputStream) =
Expand All @@ -29,10 +29,10 @@ private[io] object JavaInputOutputStream {
Stream.eval(fis).flatMap(useIs)
}

def writeBytesToOutputStream[F[_]](os: OutputStream, bytes: Chunk[Byte])(implicit F: Effect[F]): F[Unit] =
def writeBytesToOutputStream[F[_]](os: OutputStream, bytes: Chunk[Byte])(implicit F: Suspendable[F]): F[Unit] =
F.delay(os.write(bytes.toArray))

def writeOutputStreamGeneric[F[_]](fos: F[OutputStream], closeAfterUse: Boolean, f: (OutputStream, Chunk[Byte]) => F[Unit])(implicit F: Effect[F]): Sink[F, Byte] = s => {
def writeOutputStreamGeneric[F[_]](fos: F[OutputStream], closeAfterUse: Boolean, f: (OutputStream, Chunk[Byte]) => F[Unit])(implicit F: Suspendable[F]): Sink[F, Byte] = s => {
def useOs(os: OutputStream): Stream[F, Unit] =
s.chunks.evalMap(f(os, _))

Expand All @@ -41,5 +41,5 @@ private[io] object JavaInputOutputStream {
else
Stream.eval(fos).flatMap(useOs)
}

}

4 changes: 2 additions & 2 deletions io/src/main/scala/fs2/io/file/FileHandle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package file
import java.nio.ByteBuffer
import java.nio.channels.{AsynchronousFileChannel, FileChannel, FileLock}

import fs2.util.{Async,Effect}
import fs2.util.{Async,Suspendable}
import fs2.util.syntax._

trait FileHandle[F[_]] {
Expand Down Expand Up @@ -142,7 +142,7 @@ object FileHandle {
/**
* Creates a `FileHandle[F]` from a `java.nio.channels.FileChannel`.
*/
private[fs2] def fromFileChannel[F[_]](chan: FileChannel)(implicit F: Effect[F]): FileHandle[F] = {
private[fs2] def fromFileChannel[F[_]](chan: FileChannel)(implicit F: Suspendable[F]): FileHandle[F] = {
new FileHandle[F] {
type Lock = FileLock

Expand Down
6 changes: 3 additions & 3 deletions io/src/main/scala/fs2/io/file/file.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package io
import java.nio.channels.CompletionHandler
import java.nio.file.{Path, StandardOpenOption}

import fs2.util.{Async, Effect}
import fs2.util.{Async, Suspendable}

package object file {

Expand All @@ -27,7 +27,7 @@ package object file {
/**
* Reads all data synchronously from the file at the specified `java.nio.file.Path`.
*/
def readAll[F[_]](path: Path, chunkSize: Int)(implicit F: Effect[F]): Stream[F, Byte] =
def readAll[F[_]: Suspendable](path: Path, chunkSize: Int): Stream[F, Byte] =
pulls.fromPath(path, List(StandardOpenOption.READ)).flatMap(pulls.readAllFromFileHandle(chunkSize)).close

/**
Expand All @@ -41,7 +41,7 @@ package object file {
*
* Adds the WRITE flag to any other `OpenOption` flags specified. By default, also adds the CREATE flag.
*/
def writeAll[F[_]](path: Path, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE))(implicit F: Effect[F]): Sink[F, Byte] =
def writeAll[F[_]: Suspendable](path: Path, flags: Seq[StandardOpenOption] = List(StandardOpenOption.CREATE)): Sink[F, Byte] =
s => (for {
in <- s.open
out <- pulls.fromPath(path, StandardOpenOption.WRITE :: flags.toList)
Expand Down
16 changes: 8 additions & 8 deletions io/src/main/scala/fs2/io/file/pulls.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package file
import java.nio.channels._
import java.nio.file._

import fs2.util.{Async,Effect}
import fs2.util.{Async,Suspendable}
import fs2.util.syntax._

object pulls {
Expand All @@ -28,16 +28,16 @@ object pulls {
/**
* Given a `Handle[F, Byte]` and `FileHandle[F]`, writes all data from the `Handle` to the file.
*/
def writeAllToFileHandle[F[_]](in: Handle[F, Byte], out: FileHandle[F])(implicit F: Effect[F]): Pull[F, Nothing, Unit] =
def writeAllToFileHandle[F[_]](in: Handle[F, Byte], out: FileHandle[F]): Pull[F, Nothing, Unit] =
_writeAllToFileHandle1(in, out, 0)

private def _writeAllToFileHandle1[F[_]](in: Handle[F, Byte], out: FileHandle[F], offset: Long)(implicit F: Effect[F]): Pull[F, Nothing, Unit] = for {
private def _writeAllToFileHandle1[F[_]](in: Handle[F, Byte], out: FileHandle[F], offset: Long): Pull[F, Nothing, Unit] = for {
(hd, tail) <- in.await
_ <- _writeAllToFileHandle2(hd, out, offset)
next <- _writeAllToFileHandle1(tail, out, offset + hd.size)
} yield next

private def _writeAllToFileHandle2[F[_]](buf: Chunk[Byte], out: FileHandle[F], offset: Long)(implicit F: Effect[F]): Pull[F, Nothing, Unit] =
private def _writeAllToFileHandle2[F[_]](buf: Chunk[Byte], out: FileHandle[F], offset: Long): Pull[F, Nothing, Unit] =
Pull.eval(out.write(buf, offset)) flatMap { (written: Int) =>
if (written >= buf.size)
Pull.pure(())
Expand All @@ -50,30 +50,30 @@ object pulls {
*
* The `Pull` closes the acquired `java.nio.channels.FileChannel` when it is done.
*/
def fromPath[F[_]](path: Path, flags: Seq[OpenOption])(implicit F: Effect[F]): Pull[F, Nothing, FileHandle[F]] =
def fromPath[F[_]](path: Path, flags: Seq[OpenOption])(implicit F: Suspendable[F]): Pull[F, Nothing, FileHandle[F]] =
fromFileChannel(F.delay(FileChannel.open(path, flags: _*)))

/**
* Creates a `Pull` which allows asynchronous file operations against the file at the specified `java.nio.file.Path`.
*
* The `Pull` closes the acquired `java.nio.channels.AsynchronousFileChannel` when it is done.
*/
def fromPathAsync[F[_]](path: Path, flags: Seq[OpenOption])(implicit F: Async[F]): Pull[F, Nothing, FileHandle[F]] =
def fromPathAsync[F[_]: Async](path: Path, flags: Seq[OpenOption])(implicit F: Suspendable[F]): Pull[F, Nothing, FileHandle[F]] =
fromAsynchronousFileChannel(F.delay(AsynchronousFileChannel.open(path, flags: _*)))

/**
* Given a `java.nio.channels.FileChannel`, will create a `Pull` which allows synchronous operations against the underlying file.
*
* The `Pull` closes the provided `java.nio.channels.FileChannel` when it is done.
*/
def fromFileChannel[F[_]](channel: F[FileChannel])(implicit F: Effect[F]): Pull[F, Nothing, FileHandle[F]] =
def fromFileChannel[F[_]: Suspendable](channel: F[FileChannel]): Pull[F, Nothing, FileHandle[F]] =
Pull.acquire(channel.map(FileHandle.fromFileChannel[F]))(_.close())

/**
* Given a `java.nio.channels.AsynchronousFileChannel`, will create a `Pull` which allows asynchronous operations against the underlying file.
*
* The `Pull` closes the provided `java.nio.channels.AsynchronousFileChannel` when it is done.
*/
def fromAsynchronousFileChannel[F[_]](channel: F[AsynchronousFileChannel])(implicit F: Async[F]): Pull[F, Nothing, FileHandle[F]] =
def fromAsynchronousFileChannel[F[_]: Async](channel: F[AsynchronousFileChannel]): Pull[F, Nothing, FileHandle[F]] =
Pull.acquire(channel.map(FileHandle.fromAsynchronousFileChannel[F]))(_.close())
}
10 changes: 5 additions & 5 deletions io/src/main/scala/fs2/io/package.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package fs2

import fs2.util.{Async, Effect}
import fs2.util.{Async, Suspendable}
import fs2.util.syntax._
import java.io.{InputStream, OutputStream}

Expand All @@ -13,7 +13,7 @@ package object io {
*
* Blocks the current thread.
*/
def readInputStream[F[_]](fis: F[InputStream], chunkSize: Int, closeAfterUse: Boolean = true)(implicit F: Effect[F]): Stream[F, Byte] =
def readInputStream[F[_]: Suspendable](fis: F[InputStream], chunkSize: Int, closeAfterUse: Boolean = true): Stream[F, Byte] =
readInputStreamGeneric(fis, chunkSize, readBytesFromInputStream[F], closeAfterUse)

/**
Expand All @@ -36,7 +36,7 @@ package object io {
*
* Blocks the current thread.
*/
def writeOutputStream[F[_]](fos: F[OutputStream], closeAfterUse: Boolean = true)(implicit F: Effect[F]): Sink[F, Byte] =
def writeOutputStream[F[_]: Suspendable](fos: F[OutputStream], closeAfterUse: Boolean = true): Sink[F, Byte] =
writeOutputStreamGeneric(fos, closeAfterUse, writeBytesToOutputStream[F])

/**
Expand All @@ -56,13 +56,13 @@ package object io {
//
// STDIN/STDOUT Helpers

def stdin[F[_]](bufSize: Int)(implicit F: Effect[F]): Stream[F, Byte] =
def stdin[F[_]](bufSize: Int)(implicit F: Suspendable[F]): Stream[F, Byte] =
readInputStream(F.delay(System.in), bufSize, false)

def stdinAsync[F[_]](bufSize: Int)(implicit F: Async[F]): Stream[F, Byte] =
readInputStreamAsync(F.delay(System.in), bufSize, false)

def stdout[F[_]](implicit F: Effect[F]): Sink[F, Byte] =
def stdout[F[_]](implicit F: Suspendable[F]): Sink[F, Byte] =
writeOutputStream(F.delay(System.out), false)

def stdoutAsync[F[_]](implicit F: Async[F]): Sink[F, Byte] =
Expand Down

0 comments on commit 7cb2be4

Please sign in to comment.