Skip to content
This repository was archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Upgrade to ZIO 2.0.0-RC3 (#484)
Browse files Browse the repository at this point in the history
* upgrade zio version

* format

* format

* fix scala 3 issue
  • Loading branch information
adamgfraser authored Apr 2, 2022
1 parent 8747a4b commit e8b88d6
Show file tree
Hide file tree
Showing 39 changed files with 567 additions and 549 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ addCommandAlias(
";zioNio/test;examples/test"
)

val zioVersion = "2.0.0-RC2"
val zioVersion = "2.0.0-RC3"

lazy val zioNio = project
.in(file("nio"))
Expand Down
26 changes: 13 additions & 13 deletions docs/essentials/blocking.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ Many NIO operations can block the calling thread when called. ZIO-NIO provides A

## Blocking and Non-Blocking Channel Operations

Channel APIs that may block are not exposed on the channel itself. They are accessed via the channel's `useBlocking` method. You provide this method a function that excepts a `BlockingOps` object and returns a `ZIO` effect value. The `BlockingOps` parameter will be appropriate to the type of channel and has the actual blocking I/O effects such as read and write.
Channel APIs that may block are not exposed on the channel itself. They are accessed via the channel's `flatMapBlocking` method. You provide this method a function that excepts a `BlockingOps` object and returns a `ZIO` effect value. The `BlockingOps` parameter will be appropriate to the type of channel and has the actual blocking I/O effects such as read and write.

The `useBlocking` method performs some setup required for safe use of blocking NIO APIs:
The `flatMapBlocking` method performs some setup required for safe use of blocking NIO APIs:

* Puts the channel in blocking mode
* Runs the resulting effect value on ZIO's blocking thread pool, leaving the standard pool unblocked.
* Installs interrupt handling, so the channel will be closed if the ZIO fiber is interrupted. This unblocks the blocked I/O operation. (Note that NIO does not offer a way to interrupt a blocked I/O operation on a channel that does not close the channel).

Non-blocking usage does not require this special handling, but for consistency the non-blocking operations are accessed in a similar way by calling `useNonBlocking` on the channel. For some channels there are some small differences between the blocking and non-blocking APIs. For example, `SocketChannel` only offers the `finishConnect` operation in the non-blocking case, as it is never needed in blocking mode.
Non-blocking usage does not require this special handling, but for consistency the non-blocking operations are accessed in a similar way by calling `flatMapNonBlocking` on the channel. For some channels there are some small differences between the blocking and non-blocking APIs. For example, `SocketChannel` only offers the `finishConnect` operation in the non-blocking case, as it is never needed in blocking mode.

```scala mdoc:silent
import zio.ZIO
Expand All @@ -32,36 +32,36 @@ def readHeader(c: SocketChannel): ZIO[Blocking, IOException, (Chunk[Byte], Chunk
}
```

### Using Managed Channels
### Using Channels

To help with the common use-case where you want to create a channel, there is versions of `useBlocking` and `useNonBlocking` that can be called directly on a managed value providing a channel.
To help with the common use-case where you want to create a channel, there is versions of `flatMapBlocking` and `flatMapNonBlocking` that can be called directly on a ZIO value providing a channel.

`useNioBlocking` provides both the channel and the requested type of operations:
`flatMapNioBlocking` provides both the channel and the requested type of operations:

```scala mdoc:silent
import zio.nio._
import zio.nio.channels._

SocketChannel.open.useNioBlocking { (channel, blockingOps) =>
SocketChannel.open.flatMapNioBlocking { (channel, blockingOps) =>
blockingOps.readChunk(100) <*> channel.remoteAddress
}
```

If you don't need the channel, there's `useNioBlockingOps`:
If you don't need the channel, there's `flatMapNioBlockingOps`:

```scala mdoc:silent
import zio.nio.channels._

SocketChannel.open.useNioBlockingOps { blockingOps =>
SocketChannel.open.flatMapNioBlockingOps { blockingOps =>
blockingOps.readChunk(100)
}
```

To use the channel in non-blocking mode, there's corresponding `useNioNonBlocking` and `useNioNonBlockingOps` methods.
To use the channel in non-blocking mode, there's corresponding `flatMapNioNonBlocking` and `flatMapNioNonBlockingOps` methods.

### Avoiding Asynchronous Boundaries

If you have a complex program that makes more than one call to `useBlocking`, then it may be worth running *all* of the ZIO-NIO parts using the blocking pool. This can be done by wrapping the effect value with your ZIO-NIO operations in `zio.blocking.blocking`.
If you have a complex program that makes more than one call to `flatMapBlocking`, then it may be worth running *all* of the ZIO-NIO parts using the blocking pool. This can be done by wrapping the effect value with your ZIO-NIO operations in `zio.blocking.blocking`.

If this isn't done, you can end up with the calls using `BlockingOps` running on a thread from the blocking pool, while the other parts run on a thread from the standard pool. This involves an "asynchronous boundary" whever the fiber changes the underlying thread it's running on, which imposes some overheads including a full memory barrier. By using `zio.blocking.blocking` up-front, all the code can run on the same thread from the blocking pool.

Expand All @@ -71,7 +71,7 @@ There are three main styles of channel available: blocking, non-blocking and asy

### Blocking Channels

Easy to use, with a straight-forward operation. The downsides are that you have to use `useBlocking`, which creates a new thread, and will create an additional thread for every forked fiber subsequently created. Essentially you have a blocked thread for every active I/O call, which limits scalability. Also, the additional interrupt handling logic imposes a small overhead.
Easy to use, with a straight-forward operation. The downsides are that you have to use `flatMapBlocking`, which creates a new thread, and will create an additional thread for every forked fiber subsequently created. Essentially you have a blocked thread for every active I/O call, which limits scalability. Also, the additional interrupt handling logic imposes a small overhead.

### Non-Blocking Channels

Expand All @@ -85,6 +85,6 @@ The other issue is that only network channels and pipes support non-blocking mod

Asynchronous channels give us what we want: we don't need a `Selector` to use them, and our thread will never block when we use them.

However, it should be noted that asynchronous file I/O is not currently possible on the JVM. `AsynchronousFileChannel` is performing blocking I/O using a pool of blocked threads, which exactly what `useBlocking` does, and shares the same drawbacks. It may be preferable to use a standard `FileChannel`, as you'll have more visibility and control over what's going on.
However, it should be noted that asynchronous file I/O is not currently possible on the JVM. `AsynchronousFileChannel` is performing blocking I/O using a pool of blocked threads, which exactly what `flatMapBlocking` does, and shares the same drawbacks. It may be preferable to use a standard `FileChannel`, as you'll have more visibility and control over what's going on.

The asynchronous socket channels do *appear* to use non-blocking I/O, although they also have some form of internal thread pool as well. These should scale roughly as well as non-blocking channels. One downside is that there is no asynchronous datagram channel.
2 changes: 1 addition & 1 deletion docs/essentials/charsets.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import zio.ZIO

// dump a file encoded in ISO8859 to the console

FileChannel.open(Path("iso8859.txt")).useNioBlockingOps { fileOps =>
FileChannel.open(Path("iso8859.txt")).flatMapNioBlockingOps { fileOps =>
val inStream: ZStream[Any, Exception, Byte] = ZStream.repeatZIOChunkOption {
fileOps.readChunk(1000).asSomeError.flatMap { chunk =>
if (chunk.isEmpty) ZIO.fail(None) else ZIO.succeed(chunk)
Expand Down
22 changes: 12 additions & 10 deletions docs/essentials/files.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ import zio.Console._

## Basic operations

Opening a file for a given path (with no additional open attributes) returns a `ZManaged` instance on which we're running the intended operations. `ZManaged` makes sure that the channel gets closed afterwards:
Opening a file for a given path (with no additional open attributes) returns a scoped `ZIO` instance on which we're running the intended operations. `Scope` makes sure that the channel gets closed afterwards:

```scala mdoc:silent
import java.nio.file.StandardOpenOption

val path = Path("file.txt")
val channelM = AsynchronousFileChannel.open(
path,
StandardOpenOption.READ,
StandardOpenOption.WRITE
).use { channel =>
readWriteOp(channel) *> lockOp(channel)
val channelM = ZIO.scoped {
AsynchronousFileChannel.open(
path,
StandardOpenOption.READ,
StandardOpenOption.WRITE
).flatMap { channel =>
readWriteOp(channel) *> lockOp(channel)
}
}
```

Expand All @@ -54,11 +56,11 @@ val lockOp = (channel: AsynchronousFileChannel) =>
isShared <- channel.lock().acquireReleaseWith(_.release.ignore)(l => IO.succeed(l.isShared))
_ <- printLine(isShared.toString) // false

managed = Managed.acquireReleaseWith(channel.lock(position = 0, size = 10, shared = false))(_.release.ignore)
isOverlaping <- managed.use(l => IO.succeed(l.overlaps(5, 20)))
scoped = ZIO.acquireRelease(channel.lock(position = 0, size = 10, shared = false))(_.release.ignore)
isOverlaping <- ZIO.scoped(scoped.flatMap(l => IO.succeed(l.overlaps(5, 20))))
_ <- printLine(isOverlaping.toString) // true
} yield ()
```

Also it's worth mentioning that we are treating `FileLock` as a resource here.
For demonstration purposes we handled it in two different ways: using `bracket` and creating `Managed` for this.
For demonstration purposes we handled it in two different ways: using `acquireRelease` and creating a scoped `ZIO` for this.
8 changes: 5 additions & 3 deletions docs/essentials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import zio.nio.file.Path
import java.io.IOException

val read100: ZIO[Any, Option[IOException], Chunk[Byte]] =
FileChannel.open(Path("foo.txt"))
.useNioBlockingOps(_.readChunk(100))
.eofCheck
ZIO.scoped {
FileChannel.open(Path("foo.txt"))
.flatMapNioBlockingOps(_.readChunk(100))
.eofCheck
}
```

End-of-stream will be signalled with `None`. Any errors will be wrapped in `Some`.
Expand Down
44 changes: 19 additions & 25 deletions docs/essentials/resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ title: "Resource Management"

NIO offers several objects, primarily channels, that consume resources (such as operating system file handles) that need to be released when no longer needed. If channels are not closed reliably, resource leaks can occur, causing a number of issues.

For this reason, ZIO-NIO provides such resources using the [ZIO `ZManaged` API][zio-managed]. For example, calling `FileChannel.open` will produce a value of `ZManaged[Blocking, IOException, FileChannel]`. The file will not actually be opened until the managed value is *used*.
For this reason, ZIO-NIO provides such resources using the [ZIO `Scope` API][zio-scope]. For example, calling `FileChannel.open` will produce a value of `ZIO[Scope, IOException, FileChannel]`. The file will automatically be closed when the scope is closed.

## Simple Usage

The most straight-forward way to use a managed resource is with the `use` method:
The most straight-forward way to use a scoped resource is with the `scoped` method:

```scala mdoc:silent
import zio._
Expand All @@ -19,47 +19,41 @@ import java.io.IOException

def useChannel(f: FileChannel): ZIO[Any, IOException, Unit] = ???

val effect: ZIO[Any, IOException, Unit] = FileChannel.open(Path("foo.txt"))
.use { fileChannel =>
// fileChannel is only valid in this lexical scope
useChannel(fileChannel)
}
val effect: ZIO[Any, IOException, Unit] = ZIO.scoped {
FileChannel.open(Path("foo.txt"))
.flatMap { fileChannel =>
// fileChannel is only valid in the scope
useChannel(fileChannel)
}
}
```

In the above example, the `FileChannel` will be opened and then provided to the function passed to `use`. The channel will always be closed when the `use` function completes, regardless of whether the operation succeeds, fails, dies or is interrupted. As long as the channel is only used within the function passed to `use`, then we're guaranteed not to have leaks.
In the above example, the `FileChannel` will be opened and then provided to the `useChannel` operation. The channel will always be closed when the scope is closed, regardless of whether the operation succeeds, fails, dies or is interrupted. As long as the channel is only used within the `Scope`, then we're guaranteed not to have leaks.

## Flexible Resource Scoping

Sometimes there are situations where `ZManaged#use` is too limiting, because the resource lifecycle needs to extend beyond a lexical scope. An example of this is registering channels with a `Selector`. How can we do this using `ZManaged` while still avoiding the possibility of leaks? One way is to use [the "scope" feature of `ZManaged`][zio-scope].
Sometimes the resource lifecycle needs to extend beyond a lexical scope. An example of this is registering channels with a `Selector`. How can we do this using `Scope` while still avoiding the possibility of leaks?.

A scope is itself a managed resource. Other managed resources can be attached to a scope, which gives them the same lifecycle as the scope. When the scope is released, all the other resources that have been attached to it will also be released.
We can access the current scope using the `ZIO.scope` operator. The lifetime of other resources can be extended into this scope using the `Scope#extend` operator. When the scope is closed, all the other resources whose lifetimes have been extended into the scope will also be finalized.

```scala mdoc:silent
ZManaged.scope.use { scope =>
ZIO.scope.flatMap { scope =>

val channel: IO[IOException, SocketChannel] = scope(SocketChannel.open).map {
case (earlyRelease @ _, channel) => channel
}
val channel: IO[IOException, SocketChannel] = scope.extend(SocketChannel.open)

// use channel, perhaps with a Selector
channel.flatMap(_.useNonBlocking(_.readChunk(10)))
channel.flatMap(_.flatMapNonBlocking(_.readChunk(10)))

}
// the scope has now been released, as have all the resources attached to it
// when the scope is closed all resources whose lifetimes have been extended into it will automatically be finalized.
```

Note that `scope` returns both the resource and an "early release" effect. This allows you to release the resource before the scope exits, if you know it is no longer needed. This allows efficient use of the resource while still having the safety net of the scope to ensure the release happens even if there are failures, defects or interruptions.
You can continue to finalize the resource before the scope is closed, if you know it is no longer needed. This allows efficient use of the resource while still having the safety net of the scope to ensure the finalization happens even if there are failures, defects or interruptions.

The `zio.nio.channels.SelectorSpec` test demonstrates the use of scoping to ensure nothing leaks if an error occurs.

### Using `close` for Early Release

In the case of channels, we don't actually need the early release features that `ZManaged` provides, as every channel has a built-in early release in the form of the `close` method. Closing a channel more than once is a perfectly safe thing to do, so you can use `close` to release a channel's resources early. When the `ZManaged` scope of the channel later ends, `close` will be called again, but it will be a no-op.

## Manual Resource Management

It is also possible to switch to completely manual resource management. [The `reserve` method][zio-reserve] can be called on any `ZManaged` value, which gives you the acquisition and release of the resource as two separate effect values that you can use as you like. If you use these reservation effects directly, it is entirely up to you to avoid leaking resources. This requires code to be written very carefully, and an understanding the finer details of how failures, defects and interruption work in ZIO.
Every channel has a built-in early release in the form of the `close` method. Closing a channel more than once is a perfectly safe thing to do, so you can use `close` to release a channel's resources early. When the `Scope` of the channel later ends, `close` will be called again, but it will be a no-op.

[zio-managed]: https://zio.dev/docs/datatypes/datatypes_managed
[zio-scope]: https://javadoc.io/doc/dev.zio/zio_2.13/latest/zio/ZManaged$.html#scope:zio.Managed[Nothing,zio.ZManaged.Scope]
[zio-reserve]: https://javadoc.io/doc/dev.zio/zio_2.13/latest/zio/ZManaged.html#reserve:zio.UIO[zio.Reservation[R,E,A]]
[zio-scope]: https://zio.dev/docs/datatypes/datatypes_scope
24 changes: 13 additions & 11 deletions docs/essentials/sockets.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ import zio.nio._
Creating a server socket:

```scala mdoc:silent
val server = AsynchronousServerSocketChannel.open
.mapZIO { socket =>
for {
address <- InetSocketAddress.hostName("127.0.0.1", 1337)
_ <- socket.bindTo(address)
_ <- socket.accept.preallocate.flatMap(_.use(channel => doWork(channel).catchAll(ex => printLine(ex.getMessage))).fork).forever.fork
} yield ()
}.useForever
val server = ZIO.scoped {
AsynchronousServerSocketChannel.open
.flatMap { socket =>
for {
address <- InetSocketAddress.hostName("127.0.0.1", 1337)
_ <- socket.bindTo(address)
_ <- socket.accept.flatMap(channel => doWork(channel).catchAll(ex => printLine(ex.getMessage)).fork).forever.fork
} yield ()
} *> ZIO.never
}

def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwable, Unit] = {
val process =
Expand All @@ -44,8 +46,8 @@ def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwabl
Creating a client socket:

```scala mdoc:silent
val clientM: Managed[Exception, AsynchronousSocketChannel] = AsynchronousSocketChannel.open
.mapZIO { client =>
val clientM: ZIO[Scope, Exception, AsynchronousSocketChannel] = AsynchronousSocketChannel.open
.flatMap { client =>
for {
host <- InetAddress.localHost
address <- InetSocketAddress.inetAddress(host, 2552)
Expand All @@ -59,7 +61,7 @@ Reading and writing to a socket:
```scala mdoc:silent
for {
serverFiber <- server.fork
_ <- clientM.use(_.writeChunk(Chunk.fromArray(Array(1, 2, 3).map(_.toByte))))
_ <- ZIO.scoped(clientM.flatMap(_.writeChunk(Chunk.fromArray(Array(1, 2, 3).map(_.toByte)))))
_ <- serverFiber.join
} yield ()
```
Loading

0 comments on commit e8b88d6

Please sign in to comment.