diff --git a/build.sbt b/build.sbt index f4a56581..681641b8 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ addCommandAlias( ";zioNio/test;examples/test" ) -val zioVersion = "2.0.0-RC2" +val zioVersion = "2.0.0-RC3" lazy val zioNio = project .in(file("nio")) diff --git a/docs/essentials/blocking.md b/docs/essentials/blocking.md index abd6d6dc..8193ff5d 100644 --- a/docs/essentials/blocking.md +++ b/docs/essentials/blocking.md @@ -11,15 +11,15 @@ Many NIO operations can block the calling thread when called. ZIO-NIO provides A ## Blocking and Non-Blocking Channel Operations -Channel APIs that may block are not exposed on the channel itself. They are accessed via the channel's `useBlocking` method. You provide this method a function that excepts a `BlockingOps` object and returns a `ZIO` effect value. The `BlockingOps` parameter will be appropriate to the type of channel and has the actual blocking I/O effects such as read and write. +Channel APIs that may block are not exposed on the channel itself. They are accessed via the channel's `flatMapBlocking` method. You provide this method a function that excepts a `BlockingOps` object and returns a `ZIO` effect value. The `BlockingOps` parameter will be appropriate to the type of channel and has the actual blocking I/O effects such as read and write. -The `useBlocking` method performs some setup required for safe use of blocking NIO APIs: +The `flatMapBlocking` method performs some setup required for safe use of blocking NIO APIs: * Puts the channel in blocking mode * Runs the resulting effect value on ZIO's blocking thread pool, leaving the standard pool unblocked. * Installs interrupt handling, so the channel will be closed if the ZIO fiber is interrupted. This unblocks the blocked I/O operation. (Note that NIO does not offer a way to interrupt a blocked I/O operation on a channel that does not close the channel). -Non-blocking usage does not require this special handling, but for consistency the non-blocking operations are accessed in a similar way by calling `useNonBlocking` on the channel. For some channels there are some small differences between the blocking and non-blocking APIs. For example, `SocketChannel` only offers the `finishConnect` operation in the non-blocking case, as it is never needed in blocking mode. +Non-blocking usage does not require this special handling, but for consistency the non-blocking operations are accessed in a similar way by calling `flatMapNonBlocking` on the channel. For some channels there are some small differences between the blocking and non-blocking APIs. For example, `SocketChannel` only offers the `finishConnect` operation in the non-blocking case, as it is never needed in blocking mode. ```scala mdoc:silent import zio.ZIO @@ -32,36 +32,36 @@ def readHeader(c: SocketChannel): ZIO[Blocking, IOException, (Chunk[Byte], Chunk } ``` -### Using Managed Channels +### Using Channels -To help with the common use-case where you want to create a channel, there is versions of `useBlocking` and `useNonBlocking` that can be called directly on a managed value providing a channel. +To help with the common use-case where you want to create a channel, there is versions of `flatMapBlocking` and `flatMapNonBlocking` that can be called directly on a ZIO value providing a channel. -`useNioBlocking` provides both the channel and the requested type of operations: +`flatMapNioBlocking` provides both the channel and the requested type of operations: ```scala mdoc:silent import zio.nio._ import zio.nio.channels._ -SocketChannel.open.useNioBlocking { (channel, blockingOps) => +SocketChannel.open.flatMapNioBlocking { (channel, blockingOps) => blockingOps.readChunk(100) <*> channel.remoteAddress } ``` -If you don't need the channel, there's `useNioBlockingOps`: +If you don't need the channel, there's `flatMapNioBlockingOps`: ```scala mdoc:silent import zio.nio.channels._ -SocketChannel.open.useNioBlockingOps { blockingOps => +SocketChannel.open.flatMapNioBlockingOps { blockingOps => blockingOps.readChunk(100) } ``` -To use the channel in non-blocking mode, there's corresponding `useNioNonBlocking` and `useNioNonBlockingOps` methods. +To use the channel in non-blocking mode, there's corresponding `flatMapNioNonBlocking` and `flatMapNioNonBlockingOps` methods. ### Avoiding Asynchronous Boundaries -If you have a complex program that makes more than one call to `useBlocking`, then it may be worth running *all* of the ZIO-NIO parts using the blocking pool. This can be done by wrapping the effect value with your ZIO-NIO operations in `zio.blocking.blocking`. +If you have a complex program that makes more than one call to `flatMapBlocking`, then it may be worth running *all* of the ZIO-NIO parts using the blocking pool. This can be done by wrapping the effect value with your ZIO-NIO operations in `zio.blocking.blocking`. If this isn't done, you can end up with the calls using `BlockingOps` running on a thread from the blocking pool, while the other parts run on a thread from the standard pool. This involves an "asynchronous boundary" whever the fiber changes the underlying thread it's running on, which imposes some overheads including a full memory barrier. By using `zio.blocking.blocking` up-front, all the code can run on the same thread from the blocking pool. @@ -71,7 +71,7 @@ There are three main styles of channel available: blocking, non-blocking and asy ### Blocking Channels -Easy to use, with a straight-forward operation. The downsides are that you have to use `useBlocking`, which creates a new thread, and will create an additional thread for every forked fiber subsequently created. Essentially you have a blocked thread for every active I/O call, which limits scalability. Also, the additional interrupt handling logic imposes a small overhead. +Easy to use, with a straight-forward operation. The downsides are that you have to use `flatMapBlocking`, which creates a new thread, and will create an additional thread for every forked fiber subsequently created. Essentially you have a blocked thread for every active I/O call, which limits scalability. Also, the additional interrupt handling logic imposes a small overhead. ### Non-Blocking Channels @@ -85,6 +85,6 @@ The other issue is that only network channels and pipes support non-blocking mod Asynchronous channels give us what we want: we don't need a `Selector` to use them, and our thread will never block when we use them. -However, it should be noted that asynchronous file I/O is not currently possible on the JVM. `AsynchronousFileChannel` is performing blocking I/O using a pool of blocked threads, which exactly what `useBlocking` does, and shares the same drawbacks. It may be preferable to use a standard `FileChannel`, as you'll have more visibility and control over what's going on. +However, it should be noted that asynchronous file I/O is not currently possible on the JVM. `AsynchronousFileChannel` is performing blocking I/O using a pool of blocked threads, which exactly what `flatMapBlocking` does, and shares the same drawbacks. It may be preferable to use a standard `FileChannel`, as you'll have more visibility and control over what's going on. The asynchronous socket channels do *appear* to use non-blocking I/O, although they also have some form of internal thread pool as well. These should scale roughly as well as non-blocking channels. One downside is that there is no asynchronous datagram channel. diff --git a/docs/essentials/charsets.md b/docs/essentials/charsets.md index 5e114550..1863e7db 100644 --- a/docs/essentials/charsets.md +++ b/docs/essentials/charsets.md @@ -59,7 +59,7 @@ import zio.ZIO // dump a file encoded in ISO8859 to the console -FileChannel.open(Path("iso8859.txt")).useNioBlockingOps { fileOps => +FileChannel.open(Path("iso8859.txt")).flatMapNioBlockingOps { fileOps => val inStream: ZStream[Any, Exception, Byte] = ZStream.repeatZIOChunkOption { fileOps.readChunk(1000).asSomeError.flatMap { chunk => if (chunk.isEmpty) ZIO.fail(None) else ZIO.succeed(chunk) diff --git a/docs/essentials/files.md b/docs/essentials/files.md index b04a4245..9d4bca62 100644 --- a/docs/essentials/files.md +++ b/docs/essentials/files.md @@ -16,18 +16,20 @@ import zio.Console._ ## Basic operations -Opening a file for a given path (with no additional open attributes) returns a `ZManaged` instance on which we're running the intended operations. `ZManaged` makes sure that the channel gets closed afterwards: +Opening a file for a given path (with no additional open attributes) returns a scoped `ZIO` instance on which we're running the intended operations. `Scope` makes sure that the channel gets closed afterwards: ```scala mdoc:silent import java.nio.file.StandardOpenOption val path = Path("file.txt") -val channelM = AsynchronousFileChannel.open( - path, - StandardOpenOption.READ, - StandardOpenOption.WRITE -).use { channel => - readWriteOp(channel) *> lockOp(channel) +val channelM = ZIO.scoped { + AsynchronousFileChannel.open( + path, + StandardOpenOption.READ, + StandardOpenOption.WRITE + ).flatMap { channel => + readWriteOp(channel) *> lockOp(channel) + } } ``` @@ -54,11 +56,11 @@ val lockOp = (channel: AsynchronousFileChannel) => isShared <- channel.lock().acquireReleaseWith(_.release.ignore)(l => IO.succeed(l.isShared)) _ <- printLine(isShared.toString) // false - managed = Managed.acquireReleaseWith(channel.lock(position = 0, size = 10, shared = false))(_.release.ignore) - isOverlaping <- managed.use(l => IO.succeed(l.overlaps(5, 20))) + scoped = ZIO.acquireRelease(channel.lock(position = 0, size = 10, shared = false))(_.release.ignore) + isOverlaping <- ZIO.scoped(scoped.flatMap(l => IO.succeed(l.overlaps(5, 20)))) _ <- printLine(isOverlaping.toString) // true } yield () ``` Also it's worth mentioning that we are treating `FileLock` as a resource here. -For demonstration purposes we handled it in two different ways: using `bracket` and creating `Managed` for this. +For demonstration purposes we handled it in two different ways: using `acquireRelease` and creating a scoped `ZIO` for this. diff --git a/docs/essentials/index.md b/docs/essentials/index.md index 90f3d099..f2473071 100644 --- a/docs/essentials/index.md +++ b/docs/essentials/index.md @@ -33,9 +33,11 @@ import zio.nio.file.Path import java.io.IOException val read100: ZIO[Any, Option[IOException], Chunk[Byte]] = - FileChannel.open(Path("foo.txt")) - .useNioBlockingOps(_.readChunk(100)) - .eofCheck + ZIO.scoped { + FileChannel.open(Path("foo.txt")) + .flatMapNioBlockingOps(_.readChunk(100)) + .eofCheck + } ``` End-of-stream will be signalled with `None`. Any errors will be wrapped in `Some`. diff --git a/docs/essentials/resources.md b/docs/essentials/resources.md index 09453e89..7c816622 100644 --- a/docs/essentials/resources.md +++ b/docs/essentials/resources.md @@ -5,11 +5,11 @@ title: "Resource Management" NIO offers several objects, primarily channels, that consume resources (such as operating system file handles) that need to be released when no longer needed. If channels are not closed reliably, resource leaks can occur, causing a number of issues. -For this reason, ZIO-NIO provides such resources using the [ZIO `ZManaged` API][zio-managed]. For example, calling `FileChannel.open` will produce a value of `ZManaged[Blocking, IOException, FileChannel]`. The file will not actually be opened until the managed value is *used*. +For this reason, ZIO-NIO provides such resources using the [ZIO `Scope` API][zio-scope]. For example, calling `FileChannel.open` will produce a value of `ZIO[Scope, IOException, FileChannel]`. The file will automatically be closed when the scope is closed. ## Simple Usage -The most straight-forward way to use a managed resource is with the `use` method: +The most straight-forward way to use a scoped resource is with the `scoped` method: ```scala mdoc:silent import zio._ @@ -19,47 +19,41 @@ import java.io.IOException def useChannel(f: FileChannel): ZIO[Any, IOException, Unit] = ??? -val effect: ZIO[Any, IOException, Unit] = FileChannel.open(Path("foo.txt")) - .use { fileChannel => - // fileChannel is only valid in this lexical scope - useChannel(fileChannel) - } +val effect: ZIO[Any, IOException, Unit] = ZIO.scoped { + FileChannel.open(Path("foo.txt")) + .flatMap { fileChannel => + // fileChannel is only valid in the scope + useChannel(fileChannel) + } +} ``` -In the above example, the `FileChannel` will be opened and then provided to the function passed to `use`. The channel will always be closed when the `use` function completes, regardless of whether the operation succeeds, fails, dies or is interrupted. As long as the channel is only used within the function passed to `use`, then we're guaranteed not to have leaks. +In the above example, the `FileChannel` will be opened and then provided to the `useChannel` operation. The channel will always be closed when the scope is closed, regardless of whether the operation succeeds, fails, dies or is interrupted. As long as the channel is only used within the `Scope`, then we're guaranteed not to have leaks. ## Flexible Resource Scoping -Sometimes there are situations where `ZManaged#use` is too limiting, because the resource lifecycle needs to extend beyond a lexical scope. An example of this is registering channels with a `Selector`. How can we do this using `ZManaged` while still avoiding the possibility of leaks? One way is to use [the "scope" feature of `ZManaged`][zio-scope]. +Sometimes the resource lifecycle needs to extend beyond a lexical scope. An example of this is registering channels with a `Selector`. How can we do this using `Scope` while still avoiding the possibility of leaks?. -A scope is itself a managed resource. Other managed resources can be attached to a scope, which gives them the same lifecycle as the scope. When the scope is released, all the other resources that have been attached to it will also be released. +We can access the current scope using the `ZIO.scope` operator. The lifetime of other resources can be extended into this scope using the `Scope#extend` operator. When the scope is closed, all the other resources whose lifetimes have been extended into the scope will also be finalized. ```scala mdoc:silent -ZManaged.scope.use { scope => +ZIO.scope.flatMap { scope => - val channel: IO[IOException, SocketChannel] = scope(SocketChannel.open).map { - case (earlyRelease @ _, channel) => channel - } + val channel: IO[IOException, SocketChannel] = scope.extend(SocketChannel.open) // use channel, perhaps with a Selector - channel.flatMap(_.useNonBlocking(_.readChunk(10))) + channel.flatMap(_.flatMapNonBlocking(_.readChunk(10))) } -// the scope has now been released, as have all the resources attached to it +// when the scope is closed all resources whose lifetimes have been extended into it will automatically be finalized. ``` -Note that `scope` returns both the resource and an "early release" effect. This allows you to release the resource before the scope exits, if you know it is no longer needed. This allows efficient use of the resource while still having the safety net of the scope to ensure the release happens even if there are failures, defects or interruptions. +You can continue to finalize the resource before the scope is closed, if you know it is no longer needed. This allows efficient use of the resource while still having the safety net of the scope to ensure the finalization happens even if there are failures, defects or interruptions. The `zio.nio.channels.SelectorSpec` test demonstrates the use of scoping to ensure nothing leaks if an error occurs. ### Using `close` for Early Release -In the case of channels, we don't actually need the early release features that `ZManaged` provides, as every channel has a built-in early release in the form of the `close` method. Closing a channel more than once is a perfectly safe thing to do, so you can use `close` to release a channel's resources early. When the `ZManaged` scope of the channel later ends, `close` will be called again, but it will be a no-op. - -## Manual Resource Management - -It is also possible to switch to completely manual resource management. [The `reserve` method][zio-reserve] can be called on any `ZManaged` value, which gives you the acquisition and release of the resource as two separate effect values that you can use as you like. If you use these reservation effects directly, it is entirely up to you to avoid leaking resources. This requires code to be written very carefully, and an understanding the finer details of how failures, defects and interruption work in ZIO. +Every channel has a built-in early release in the form of the `close` method. Closing a channel more than once is a perfectly safe thing to do, so you can use `close` to release a channel's resources early. When the `Scope` of the channel later ends, `close` will be called again, but it will be a no-op. -[zio-managed]: https://zio.dev/docs/datatypes/datatypes_managed -[zio-scope]: https://javadoc.io/doc/dev.zio/zio_2.13/latest/zio/ZManaged$.html#scope:zio.Managed[Nothing,zio.ZManaged.Scope] -[zio-reserve]: https://javadoc.io/doc/dev.zio/zio_2.13/latest/zio/ZManaged.html#reserve:zio.UIO[zio.Reservation[R,E,A]] +[zio-scope]: https://zio.dev/docs/datatypes/datatypes_scope diff --git a/docs/essentials/sockets.md b/docs/essentials/sockets.md index 8bef1dab..ff94a078 100644 --- a/docs/essentials/sockets.md +++ b/docs/essentials/sockets.md @@ -20,14 +20,16 @@ import zio.nio._ Creating a server socket: ```scala mdoc:silent -val server = AsynchronousServerSocketChannel.open - .mapZIO { socket => - for { - address <- InetSocketAddress.hostName("127.0.0.1", 1337) - _ <- socket.bindTo(address) - _ <- socket.accept.preallocate.flatMap(_.use(channel => doWork(channel).catchAll(ex => printLine(ex.getMessage))).fork).forever.fork - } yield () - }.useForever +val server = ZIO.scoped { + AsynchronousServerSocketChannel.open + .flatMap { socket => + for { + address <- InetSocketAddress.hostName("127.0.0.1", 1337) + _ <- socket.bindTo(address) + _ <- socket.accept.flatMap(channel => doWork(channel).catchAll(ex => printLine(ex.getMessage)).fork).forever.fork + } yield () + } *> ZIO.never +} def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwable, Unit] = { val process = @@ -44,8 +46,8 @@ def doWork(channel: AsynchronousSocketChannel): ZIO[Console with Clock, Throwabl Creating a client socket: ```scala mdoc:silent -val clientM: Managed[Exception, AsynchronousSocketChannel] = AsynchronousSocketChannel.open - .mapZIO { client => +val clientM: ZIO[Scope, Exception, AsynchronousSocketChannel] = AsynchronousSocketChannel.open + .flatMap { client => for { host <- InetAddress.localHost address <- InetSocketAddress.inetAddress(host, 2552) @@ -59,7 +61,7 @@ Reading and writing to a socket: ```scala mdoc:silent for { serverFiber <- server.fork - _ <- clientM.use(_.writeChunk(Chunk.fromArray(Array(1, 2, 3).map(_.toByte)))) + _ <- ZIO.scoped(clientM.flatMap(_.writeChunk(Chunk.fromArray(Array(1, 2, 3).map(_.toByte))))) _ <- serverFiber.join } yield () ``` diff --git a/examples/src/main/scala/StreamDirWatch.scala b/examples/src/main/scala/StreamDirWatch.scala index 1bfc473d..bb272076 100644 --- a/examples/src/main/scala/StreamDirWatch.scala +++ b/examples/src/main/scala/StreamDirWatch.scala @@ -16,34 +16,36 @@ import java.nio.file.{StandardWatchEventKinds, WatchEvent} object StreamDirWatch extends ZIOAppDefault { private def watch(dir: Path) = - WatchService.forDefaultFileSystem.use { service => - for { - _ <- dir.registerTree( - watcher = service, - events = Set( - StandardWatchEventKinds.ENTRY_CREATE, - StandardWatchEventKinds.ENTRY_MODIFY, - StandardWatchEventKinds.ENTRY_DELETE - ), - maxDepth = 100 - ) - _ <- Console.printLine(s"Watching directory '$dir'") - _ <- Console.printLine("") - _ <- service.stream.foreach { key => - val eventProcess = { (event: WatchEvent[_]) => - val desc = event.kind() match { - case StandardWatchEventKinds.ENTRY_CREATE => "Create" - case StandardWatchEventKinds.ENTRY_MODIFY => "Modify" - case StandardWatchEventKinds.ENTRY_DELETE => "Delete" - case StandardWatchEventKinds.OVERFLOW => "** Overflow **" - case other => s"Unknown: $other" + ZIO.scoped { + WatchService.forDefaultFileSystem.flatMap { service => + for { + _ <- dir.registerTree( + watcher = service, + events = Set( + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_DELETE + ), + maxDepth = 100 + ) + _ <- Console.printLine(s"Watching directory '$dir'") + _ <- Console.printLine("") + _ <- service.stream.foreach { key => + val eventProcess = { (event: WatchEvent[_]) => + val desc = event.kind() match { + case StandardWatchEventKinds.ENTRY_CREATE => "Create" + case StandardWatchEventKinds.ENTRY_MODIFY => "Modify" + case StandardWatchEventKinds.ENTRY_DELETE => "Delete" + case StandardWatchEventKinds.OVERFLOW => "** Overflow **" + case other => s"Unknown: $other" + } + val path = key.resolveEventPath(event).getOrElse("** PATH UNKNOWN **") + Console.printLine(s"$desc, count: ${event.count()}, $path") } - val path = key.resolveEventPath(event).getOrElse("** PATH UNKNOWN **") - Console.printLine(s"$desc, count: ${event.count()}, $path") + ZIO.scoped(key.pollEventsScoped.flatMap(ZIO.foreachDiscard(_)(eventProcess))) } - key.pollEventsManaged.use(ZIO.foreachDiscard(_)(eventProcess)) - } - } yield () + } yield () + } } override def run: URIO[zio.ZEnv with ZIOAppArgs, ExitCode] = diff --git a/examples/src/main/scala/StreamsBasedServer.scala b/examples/src/main/scala/StreamsBasedServer.scala index b45a03c5..a3aac840 100644 --- a/examples/src/main/scala/StreamsBasedServer.scala +++ b/examples/src/main/scala/StreamsBasedServer.scala @@ -3,13 +3,13 @@ package zio.nio.examples import zio.nio.InetSocketAddress import zio.nio.channels.AsynchronousServerSocketChannel import zio.stream._ -import zio.{Clock, Console, ExitCode, Managed, RIO, URIO, ZIO, ZIOAppDefault, ZTraceElement, durationInt} +import zio.{Clock, Console, ExitCode, RIO, Scope, URIO, ZIO, ZIOAppDefault, ZTraceElement, durationInt} object StreamsBasedServer extends ZIOAppDefault { def run: URIO[Console with Clock with Console, ExitCode] = ZStream - .managed(server(8080)) + .scoped(server(8080)) .flatMap(handleConnections(_) { chunk => Console.printLine(s"Read data: ${chunk.mkString}") *> Clock.sleep(2.seconds) *> @@ -19,34 +19,33 @@ object StreamsBasedServer extends ZIOAppDefault { .orDie .exitCode - def server(port: Int)(implicit trace: ZTraceElement): Managed[Exception, AsynchronousServerSocketChannel] = + def server(port: Int)(implicit trace: ZTraceElement): ZIO[Scope, Exception, AsynchronousServerSocketChannel] = for { server <- AsynchronousServerSocketChannel.open - socketAddress <- InetSocketAddress.wildCard(port).toManaged - _ <- server.bindTo(socketAddress).toManaged + socketAddress <- InetSocketAddress.wildCard(port) + _ <- server.bindTo(socketAddress) } yield server def handleConnections[R <: Console]( server: AsynchronousServerSocketChannel )(f: String => RIO[R, Unit])(implicit trace: ZTraceElement): ZStream[R, Throwable, Unit] = ZStream - .repeatZIO(server.accept.preallocate) - .map(conn => ZStream.managed(conn.ensuring(Console.printLine("Connection closed").ignore).withEarlyRelease)) - .flatMapPar[R, Throwable, Unit](16) { connection => - connection.mapZIO { case (closeConn, channel) => - for { - _ <- Console.printLine("Received connection") - data <- ZStream - .fromZIOOption( - channel.readChunk(64).tap(_ => Console.printLine("Read chunk")).orElse(ZIO.fail(None)) - ) - .flattenChunks - .take(4) - .via(ZPipeline.utf8Decode) - .run(Sink.foldLeft("")(_ + (_: String))) - _ <- closeConn - _ <- f(data) - } yield () - } + .scoped(server.accept) + .forever + .mapZIOPar[R, Throwable, Unit](16) { channel => + for { + _ <- Console.printLine("Received connection") + data <- ZStream + .fromZIOOption( + channel.readChunk(64).tap(_ => Console.printLine("Read chunk")).orElse(ZIO.fail(None)) + ) + .flattenChunks + .take(4) + .via(ZPipeline.utf8Decode) + .run(Sink.foldLeft("")(_ + (_: String))) + _ <- channel.close + _ <- Console.printLine("Connection closed") + _ <- f(data) + } yield () } } diff --git a/examples/src/main/scala/TextFileDump.scala b/examples/src/main/scala/TextFileDump.scala index d819478c..99873b77 100644 --- a/examples/src/main/scala/TextFileDump.scala +++ b/examples/src/main/scala/TextFileDump.scala @@ -2,7 +2,7 @@ package zio package nio package examples -import zio.nio.channels.{FileChannel, ManagedBlockingNioOps} +import zio.nio.channels.{BlockingNioOps, FileChannel} import zio.nio.charset.Charset import zio.nio.file.Path import zio.stream.ZStream @@ -36,18 +36,20 @@ object TextFileDump extends ZIOAppDefault { private def dump(charset: Charset, file: Path)(implicit trace: ZTraceElement ): ZIO[Console with Any, Exception, Unit] = - FileChannel.open(file).useNioBlockingOps { fileOps => - val inStream: ZStream[Any, Exception, Byte] = ZStream.repeatZIOChunkOption { - fileOps.readChunk(1000).asSomeError.flatMap { chunk => - if (chunk.isEmpty) ZIO.fail(None) else ZIO.succeed(chunk) + ZIO.scoped { + FileChannel.open(file).flatMapNioBlockingOps { fileOps => + val inStream: ZStream[Any, Exception, Byte] = ZStream.repeatZIOChunkOption { + fileOps.readChunk(1000).asSomeError.flatMap { chunk => + if (chunk.isEmpty) ZIO.fail(None) else ZIO.succeed(chunk) + } } - } - // apply decoding pipeline - val charStream: ZStream[Any, Exception, Char] = - inStream.via(charset.newDecoder.transducer()) + // apply decoding pipeline + val charStream: ZStream[Any, Exception, Char] = + inStream.via(charset.newDecoder.transducer()) - charStream.runForeachChunk(chars => Console.print(chars.mkString)) + charStream.runForeachChunk(chars => Console.print(chars.mkString)) + } } } diff --git a/examples/src/main/scala/ToUppercaseAsAService.scala b/examples/src/main/scala/ToUppercaseAsAService.scala index 2f751684..d8168379 100644 --- a/examples/src/main/scala/ToUppercaseAsAService.scala +++ b/examples/src/main/scala/ToUppercaseAsAService.scala @@ -3,7 +3,7 @@ package nio package examples import zio._ -import zio.nio.channels.{ManagedBlockingNioOps, ServerSocketChannel, SocketChannel} +import zio.nio.channels.{BlockingNioOps, ServerSocketChannel, SocketChannel} import zio.nio.charset.Charset import zio.stream._ @@ -33,7 +33,7 @@ object ToUppercaseAsAService extends ZIOAppDefault { upperCaseIfier >>> Charset.Standard.utf8.newEncoder.transducer() Console.printLine("Connection accepted") *> - socket.useBlocking { ops => + socket.flatMapBlocking { ops => ops .stream() .via(transducer) @@ -54,11 +54,13 @@ object ToUppercaseAsAService extends ZIOAppDefault { portEff .flatMap(port => - ServerSocketChannel.open.useNioBlocking { (serverChannel, ops) => - InetSocketAddress.wildCard(port).flatMap { socketAddress => - serverChannel.bindTo(socketAddress) *> - Console.printLine(s"Listening on $socketAddress") *> - ops.acceptAndFork(handleConnection).forever + ZIO.scoped { + ServerSocketChannel.open.flatMapNioBlocking { (serverChannel, ops) => + InetSocketAddress.wildCard(port).flatMap { socketAddress => + serverChannel.bindTo(socketAddress) *> + Console.printLine(s"Listening on $socketAddress") *> + ops.acceptAndFork(handleConnection).forever + } } } ) diff --git a/nio/src/main/scala-2/zio/nio/channels/package.scala b/nio/src/main/scala-2/zio/nio/channels/package.scala index ac54e67f..e534d8af 100644 --- a/nio/src/main/scala-2/zio/nio/channels/package.scala +++ b/nio/src/main/scala-2/zio/nio/channels/package.scala @@ -1,39 +1,40 @@ package zio.nio import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{ZIO, ZManaged, ZTraceElement} +import zio.{ZIO, ZTraceElement} import java.io.IOException package object channels { - implicit final class ManagedBlockingNioOps[-R, +C <: BlockingChannel]( - private val underlying: ZManaged[R, IOException, C] + implicit final class BlockingNioOps[-R, +C <: BlockingChannel]( + private val underlying: ZIO[R, IOException, C] ) extends AnyVal { - def useNioBlocking[R1, E >: IOException, A]( + def flatMapNioBlocking[R1, E >: IOException, A]( f: (C, C#BlockingOps) => ZIO[R1, E, A] - )(implicit trace: ZTraceElement): ZIO[R with R1 with Any, E, A] = underlying.use(c => c.useBlocking(f(c, _))) + )(implicit trace: ZTraceElement): ZIO[R with R1 with Any, E, A] = + underlying.flatMap(c => c.flatMapBlocking(f(c, _))) - def useNioBlockingOps[R1, E >: IOException, A]( + def flatMapNioBlockingOps[R1, E >: IOException, A]( f: C#BlockingOps => ZIO[R1, E, A] - )(implicit trace: ZTraceElement): ZIO[R with R1 with Any, E, A] = useNioBlocking((_, ops) => f(ops)) + )(implicit trace: ZTraceElement): ZIO[R with R1 with Any, E, A] = flatMapNioBlocking((_, ops) => f(ops)) } - implicit final class ManagedNonBlockingNioOps[-R, +C <: SelectableChannel]( - private val underlying: ZManaged[R, IOException, C] + implicit final class NonBlockingNioOps[-R, +C <: SelectableChannel]( + private val underlying: ZIO[R, IOException, C] ) extends AnyVal { - def useNioNonBlocking[R1, E >: IOException, A](f: (C, C#NonBlockingOps) => ZIO[R1, E, A])(implicit + def flatMapNioNonBlocking[R1, E >: IOException, A](f: (C, C#NonBlockingOps) => ZIO[R1, E, A])(implicit trace: ZTraceElement ): ZIO[R with R1, E, A] = - underlying.use(c => c.useNonBlocking(f(c, _))) + underlying.flatMap(c => c.flatMapNonBlocking(f(c, _))) - def useNioNonBlockingOps[R1, E >: IOException, A](f: C#NonBlockingOps => ZIO[R1, E, A])(implicit + def flatMapNioNonBlockingOps[R1, E >: IOException, A](f: C#NonBlockingOps => ZIO[R1, E, A])(implicit trace: ZTraceElement ): ZIO[R with R1, E, A] = - useNioNonBlocking((_, ops) => f(ops)) + flatMapNioNonBlocking((_, ops) => f(ops)) } diff --git a/nio/src/main/scala-3/zio/nio/channels/package.scala b/nio/src/main/scala-3/zio/nio/channels/package.scala index bb99c167..311475af 100644 --- a/nio/src/main/scala-3/zio/nio/channels/package.scala +++ b/nio/src/main/scala-3/zio/nio/channels/package.scala @@ -1,40 +1,40 @@ package zio.nio import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{ZIO, ZManaged, ZTraceElement} +import zio.{ZIO, ZTraceElement} import java.io.IOException package object channels { - implicit final class ManagedBlockingNioOps[-R, BO, C <: BlockingChannel { type BlockingOps <: BO }]( - private val underlying: ZManaged[R, IOException, C] + implicit final class BlockingNioOps[-R, BO, C <: BlockingChannel { type BlockingOps <: BO }]( + private val underlying: ZIO[R, IOException, C] ) extends AnyVal { type F1[R, E, A] = (C, BO) => ZIO[R, E, A] - def useNioBlocking[R1, E >: IOException, A]( + def flatMapNioBlocking[R1, E >: IOException, A]( f: F1[R1, E, A] - )(implicit trace: ZTraceElement): ZIO[R with R1, E, A] = underlying.use(c => c.useBlocking(f(c, _))) + )(implicit trace: ZTraceElement): ZIO[R with R1, E, A] = underlying.flatMap(c => c.flatMapBlocking(f(c, _))) - def useNioBlockingOps[R1, E >: IOException, A]( + def flatMapNioBlockingOps[R1, E >: IOException, A]( f: BO => ZIO[R1, E, A] - )(implicit trace: ZTraceElement): ZIO[R with R1, E, A] = useNioBlocking((_, ops) => f(ops)) + )(implicit trace: ZTraceElement): ZIO[R with R1, E, A] = flatMapNioBlocking((_, ops) => f(ops)) } - implicit final class ManagedNonBlockingNioOps[-R, BO, C <: SelectableChannel { type NonBlockingOps <: BO }]( - private val underlying: ZManaged[R, IOException, C] + implicit final class NonBlockingNioOps[-R, BO, C <: SelectableChannel { type NonBlockingOps <: BO }]( + private val underlying: ZIO[R, IOException, C] ) extends AnyVal { - def useNioNonBlocking[R1, E >: IOException, A](f: (C, BO) => ZIO[R1, E, A])(implicit + def flatMapNioNonBlocking[R1, E >: IOException, A](f: (C, BO) => ZIO[R1, E, A])(implicit trace: ZTraceElement ): ZIO[R with R1, E, A] = - underlying.use(c => c.useNonBlocking(f(c, _))) + underlying.flatMap(c => c.flatMapNonBlocking(f(c, _))) - def useNioNonBlockingOps[R1, E >: IOException, A](f: BO => ZIO[R1, E, A])(implicit + def flatMapNioNonBlockingOps[R1, E >: IOException, A](f: BO => ZIO[R1, E, A])(implicit trace: ZTraceElement ): ZIO[R with R1, E, A] = - useNioNonBlocking((_, ops) => f(ops)) + flatMapNioNonBlocking((_, ops) => f(ops)) } diff --git a/nio/src/main/scala/zio/nio/channels/AsynchronousChannel.scala b/nio/src/main/scala/zio/nio/channels/AsynchronousChannel.scala index 327eec58..72bb8564 100644 --- a/nio/src/main/scala/zio/nio/channels/AsynchronousChannel.scala +++ b/nio/src/main/scala/zio/nio/channels/AsynchronousChannel.scala @@ -74,8 +74,8 @@ abstract class AsynchronousByteChannel private[channels] (protected val channel: ZSink.fromPush { val bufferConstruct = bufferConstruct0 for { - buffer <- bufferConstruct.toManaged - countRef <- Ref.makeManaged(0L) + buffer <- bufferConstruct + countRef <- Ref.make(0L) } yield (_: Option[Chunk[Byte]]).map { chunk => def doWrite(total: Int, c: Chunk[Byte])(implicit trace: ZTraceElement @@ -118,8 +118,8 @@ abstract class AsynchronousByteChannel private[channels] (protected val channel: def stream( bufferConstruct: UIO[ByteBuffer] )(implicit trace: ZTraceElement): Stream[IOException, Byte] = - ZStream.unwrapManaged { - bufferConstruct.toManaged.map { buffer => + ZStream.unwrapScoped { + bufferConstruct.map { buffer => val doRead = for { _ <- read(buffer) _ <- buffer.flip @@ -147,10 +147,11 @@ object AsynchronousByteChannel { private[channels] def effectAsyncChannel[C <: JChannel, A]( channel: C )(op: C => CompletionHandler[A, Any] => Any)(implicit trace: ZTraceElement): IO[IOException, A] = - Task(op(channel)) - .flatMap(Task.effectAsyncWithCompletionHandler) + ZIO + .attempt(op(channel)) + .flatMap(Task.asyncWithCompletionHandler) .refineToOrDie[IOException] - .onInterrupt(IO.effect(channel.close()).ignore) + .onInterrupt(IO.attempt(channel.close()).ignore) } @@ -174,11 +175,11 @@ final class AsynchronousServerSocketChannel(protected val channel: JAsynchronous /** * Accepts a connection. */ - def accept(implicit trace: ZTraceElement): Managed[IOException, AsynchronousSocketChannel] = + def accept(implicit trace: ZTraceElement): ZIO[Scope, IOException, AsynchronousSocketChannel] = AsynchronousByteChannel .effectAsyncChannel[JAsynchronousServerSocketChannel, JAsynchronousSocketChannel](channel)(c => c.accept((), _)) .map(AsynchronousSocketChannel.fromJava) - .toNioManaged + .toNioScoped /** * The `SocketAddress` that the socket is bound to, or the `SocketAddress` representing the loopback address if denied @@ -193,17 +194,17 @@ final class AsynchronousServerSocketChannel(protected val channel: JAsynchronous object AsynchronousServerSocketChannel { - def open(implicit trace: ZTraceElement): Managed[IOException, AsynchronousServerSocketChannel] = + def open(implicit trace: ZTraceElement): ZIO[Scope, IOException, AsynchronousServerSocketChannel] = IO.attempt(new AsynchronousServerSocketChannel(JAsynchronousServerSocketChannel.open())) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped def open( channelGroup: AsynchronousChannelGroup - )(implicit trace: ZTraceElement): Managed[IOException, AsynchronousServerSocketChannel] = + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, AsynchronousServerSocketChannel] = IO.attempt(new AsynchronousServerSocketChannel(JAsynchronousServerSocketChannel.open(channelGroup.channelGroup))) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped def fromJava(channel: JAsynchronousServerSocketChannel): AsynchronousServerSocketChannel = new AsynchronousServerSocketChannel(channel) @@ -349,17 +350,17 @@ final class AsynchronousSocketChannel(override protected val channel: JAsynchron object AsynchronousSocketChannel { - def open(implicit trace: ZTraceElement): Managed[IOException, AsynchronousSocketChannel] = + def open(implicit trace: ZTraceElement): ZIO[Scope, IOException, AsynchronousSocketChannel] = IO.attempt(new AsynchronousSocketChannel(JAsynchronousSocketChannel.open())) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped def open( channelGroup: AsynchronousChannelGroup - )(implicit trace: ZTraceElement): Managed[IOException, AsynchronousSocketChannel] = + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, AsynchronousSocketChannel] = IO.attempt(new AsynchronousSocketChannel(JAsynchronousSocketChannel.open(channelGroup.channelGroup))) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped def fromJava(asyncSocketChannel: JAsynchronousSocketChannel): AsynchronousSocketChannel = new AsynchronousSocketChannel(asyncSocketChannel) diff --git a/nio/src/main/scala/zio/nio/channels/AsynchronousFileChannel.scala b/nio/src/main/scala/zio/nio/channels/AsynchronousFileChannel.scala index 33868aa5..1b032885 100644 --- a/nio/src/main/scala/zio/nio/channels/AsynchronousFileChannel.scala +++ b/nio/src/main/scala/zio/nio/channels/AsynchronousFileChannel.scala @@ -112,10 +112,10 @@ final class AsynchronousFileChannel(protected val channel: JAsynchronousFileChan def stream(position: Long, bufferConstruct: UIO[ByteBuffer])(implicit trace: ZTraceElement ): Stream[IOException, Byte] = - ZStream.unwrapManaged { + ZStream.unwrapScoped { for { - posRef <- Ref.makeManaged(position) - buffer <- bufferConstruct.toManaged + posRef <- Ref.make(position) + buffer <- bufferConstruct } yield { val doRead = for { pos <- posRef.get @@ -150,8 +150,8 @@ final class AsynchronousFileChannel(protected val channel: JAsynchronousFileChan )(implicit trace: ZTraceElement): ZSink[Clock, IOException, Byte, Byte, Long] = ZSink.fromPush { for { - buffer <- bufferConstruct.toManaged - posRef <- Ref.makeManaged(position) + buffer <- bufferConstruct + posRef <- Ref.make(position) } yield (_: Option[Chunk[Byte]]).map { chunk => def doWrite(currentPos: Long, c: Chunk[Byte])(implicit trace: ZTraceElement): ZIO[Clock, IOException, Long] = { val x = for { @@ -193,23 +193,23 @@ object AsynchronousFileChannel { def open(file: Path, options: OpenOption*)(implicit trace: ZTraceElement - ): Managed[IOException, AsynchronousFileChannel] = + ): ZIO[Scope, IOException, AsynchronousFileChannel] = IO.attempt(new AsynchronousFileChannel(JAsynchronousFileChannel.open(file.javaPath, options: _*))) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped def open( file: Path, options: Set[OpenOption], executor: Option[ExecutionContextExecutorService], attrs: Set[FileAttribute[_]] - )(implicit trace: ZTraceElement): Managed[IOException, AsynchronousFileChannel] = + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, AsynchronousFileChannel] = IO.attempt( new AsynchronousFileChannel( JAsynchronousFileChannel.open(file.javaPath, options.asJava, executor.orNull, attrs.toSeq: _*) ) ).refineToOrDie[IOException] - .toNioManaged + .toNioScoped def fromJava(javaAsynchronousFileChannel: JAsynchronousFileChannel): AsynchronousFileChannel = new AsynchronousFileChannel(javaAsynchronousFileChannel) diff --git a/nio/src/main/scala/zio/nio/channels/Channel.scala b/nio/src/main/scala/zio/nio/channels/Channel.scala index adcb56d3..8d874c4b 100644 --- a/nio/src/main/scala/zio/nio/channels/Channel.scala +++ b/nio/src/main/scala/zio/nio/channels/Channel.scala @@ -47,7 +47,7 @@ trait BlockingChannel extends Channel { * Given a `BlockingOps` argument appropriate for this channel type, produces an effect value containing blocking * operations. */ - def useBlocking[R, E >: IOException, A](f: BlockingOps => ZIO[R, E, A])(implicit + def flatMapBlocking[R, E >: IOException, A](f: BlockingOps => ZIO[R, E, A])(implicit trace: ZTraceElement ): ZIO[R with Any, E, A] diff --git a/nio/src/main/scala/zio/nio/channels/DatagramChannel.scala b/nio/src/main/scala/zio/nio/channels/DatagramChannel.scala index c39e529a..bea09d85 100644 --- a/nio/src/main/scala/zio/nio/channels/DatagramChannel.scala +++ b/nio/src/main/scala/zio/nio/channels/DatagramChannel.scala @@ -2,7 +2,7 @@ package zio.nio package channels import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{IO, Managed, UIO, ZTraceElement} +import zio.{IO, Scope, UIO, ZIO, ZTraceElement} import java.io.IOException import java.net.{DatagramSocket => JDatagramSocket, ProtocolFamily, SocketAddress => JSocketAddress, SocketOption} @@ -162,17 +162,17 @@ object DatagramChannel { * @return * a new datagram channel */ - def open(implicit trace: ZTraceElement): Managed[IOException, DatagramChannel] = + def open(implicit trace: ZTraceElement): ZIO[Scope, IOException, DatagramChannel] = IO.attempt(new DatagramChannel(JDatagramChannel.open())) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped - def open(family: ProtocolFamily)(implicit trace: ZTraceElement): Managed[IOException, DatagramChannel] = + def open(family: ProtocolFamily)(implicit trace: ZTraceElement): ZIO[Scope, IOException, DatagramChannel] = IO.attempt { val javaChannel = JDatagramChannel.open(family) javaChannel.configureBlocking(false) fromJava(javaChannel) - }.refineToOrDie[IOException].toNioManaged + }.refineToOrDie[IOException].toNioScoped def fromJava(javaDatagramChannel: JDatagramChannel): DatagramChannel = new DatagramChannel(javaDatagramChannel) diff --git a/nio/src/main/scala/zio/nio/channels/FileChannel.scala b/nio/src/main/scala/zio/nio/channels/FileChannel.scala index 8747031b..7cee0f8e 100644 --- a/nio/src/main/scala/zio/nio/channels/FileChannel.scala +++ b/nio/src/main/scala/zio/nio/channels/FileChannel.scala @@ -4,7 +4,7 @@ import com.github.ghik.silencer.silent import zio.nio.file.Path import zio.nio.{ByteBuffer, IOCloseableManagement, MappedByteBuffer} import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{IO, Managed, ZIO, ZTraceElement} +import zio.{IO, Scope, ZIO, ZTraceElement} import java.io.IOException import java.nio.channels.{FileChannel => JFileChannel} @@ -162,7 +162,7 @@ final class FileChannel private[channels] (protected val channel: JFileChannel) } - override def useBlocking[R, E, A](f: BlockingFileOps => ZIO[R, E, A])(implicit + override def flatMapBlocking[R, E, A](f: BlockingFileOps => ZIO[R, E, A])(implicit trace: ZTraceElement ): ZIO[R with Any, E, A] = nioBlocking(f(new BlockingOps)) @@ -231,10 +231,10 @@ object FileChannel { path: Path, options: Set[_ <: OpenOption], attrs: FileAttribute[_]* - )(implicit trace: ZTraceElement): Managed[IOException, FileChannel] = + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, FileChannel] = IO.attempt(new FileChannel(JFileChannel.open(path.javaPath, options.asJava, attrs: _*))) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped /** * Opens or creates a file, returning a file channel to access the file. @@ -244,10 +244,10 @@ object FileChannel { * @param options * Specifies how the file is opened */ - def open(path: Path, options: OpenOption*)(implicit trace: ZTraceElement): Managed[IOException, FileChannel] = + def open(path: Path, options: OpenOption*)(implicit trace: ZTraceElement): ZIO[Scope, IOException, FileChannel] = IO.attempt(new FileChannel(JFileChannel.open(path.javaPath, options: _*))) .refineToOrDie[IOException] - .toNioManaged + .toNioScoped def fromJava(javaFileChannel: JFileChannel): FileChannel = new FileChannel(javaFileChannel) diff --git a/nio/src/main/scala/zio/nio/channels/GatheringByteOps.scala b/nio/src/main/scala/zio/nio/channels/GatheringByteOps.scala index 15e27f43..2d2027d9 100644 --- a/nio/src/main/scala/zio/nio/channels/GatheringByteOps.scala +++ b/nio/src/main/scala/zio/nio/channels/GatheringByteOps.scala @@ -40,11 +40,11 @@ trait GatheringByteOps { for { _ <- write(buffers) pairs <- IO.foreach(buffers)(b => b.hasRemaining.map(_ -> b)) - r <- { + _ <- { val remaining = pairs.dropWhile(!_._1).map(_._2) go(remaining).unless(remaining.isEmpty) } - } yield r + } yield () go(bs) } } yield () @@ -73,8 +73,8 @@ trait GatheringByteOps { )(implicit trace: ZTraceElement): ZSink[Clock, IOException, Byte, Byte, Long] = ZSink.fromPush { for { - buffer <- bufferConstruct.toManaged - countRef <- Ref.makeManaged(0L) + buffer <- bufferConstruct + countRef <- Ref.make(0L) } yield (_: Option[Chunk[Byte]]).map { chunk => def doWrite(total: Int, c: Chunk[Byte])(implicit trace: ZTraceElement): ZIO[Clock, IOException, Int] = { val x = for { diff --git a/nio/src/main/scala/zio/nio/channels/Pipe.scala b/nio/src/main/scala/zio/nio/channels/Pipe.scala index 62689e37..676e7f60 100644 --- a/nio/src/main/scala/zio/nio/channels/Pipe.scala +++ b/nio/src/main/scala/zio/nio/channels/Pipe.scala @@ -2,18 +2,18 @@ package zio.nio package channels import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{IO, Managed, ZTraceElement} +import zio.{IO, Scope, ZIO, ZTraceElement} import java.io.IOException import java.nio.channels.{Pipe => JPipe} final class Pipe private (private val pipe: JPipe)(implicit trace: ZTraceElement) { - def source(implicit trace: ZTraceElement): Managed[Nothing, Pipe.SourceChannel] = - IO.succeed(new channels.Pipe.SourceChannel(pipe.source())).toNioManaged + def source(implicit trace: ZTraceElement): ZIO[Scope, Nothing, Pipe.SourceChannel] = + IO.succeed(new channels.Pipe.SourceChannel(pipe.source())).toNioScoped - def sink(implicit trace: ZTraceElement): Managed[Nothing, Pipe.SinkChannel] = - IO.succeed(new Pipe.SinkChannel(pipe.sink())).toNioManaged + def sink(implicit trace: ZTraceElement): ZIO[Scope, Nothing, Pipe.SinkChannel] = + IO.succeed(new Pipe.SinkChannel(pipe.sink())).toNioScoped } diff --git a/nio/src/main/scala/zio/nio/channels/ScatteringByteOps.scala b/nio/src/main/scala/zio/nio/channels/ScatteringByteOps.scala index 6159df73..2d17f338 100644 --- a/nio/src/main/scala/zio/nio/channels/ScatteringByteOps.scala +++ b/nio/src/main/scala/zio/nio/channels/ScatteringByteOps.scala @@ -93,8 +93,8 @@ trait ScatteringByteOps { def stream( bufferConstruct: UIO[ByteBuffer] )(implicit trace: ZTraceElement): Stream[IOException, Byte] = - ZStream.unwrapManaged { - bufferConstruct.toManaged.map { buffer => + ZStream.unwrap { + bufferConstruct.map { buffer => val doRead = for { _ <- read(buffer) _ <- buffer.flip diff --git a/nio/src/main/scala/zio/nio/channels/SelectableChannel.scala b/nio/src/main/scala/zio/nio/channels/SelectableChannel.scala index f6f89339..3e2adac2 100644 --- a/nio/src/main/scala/zio/nio/channels/SelectableChannel.scala +++ b/nio/src/main/scala/zio/nio/channels/SelectableChannel.scala @@ -4,7 +4,7 @@ package channels import zio.nio.channels.SelectionKey.Operation import zio.nio.channels.spi.SelectorProvider import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{Fiber, IO, Managed, UIO, ZIO, ZManaged, ZTraceElement} +import zio.{Exit, Fiber, IO, Scope, UIO, ZIO, ZTraceElement} import java.io.IOException import java.net.{ServerSocket => JServerSocket, Socket => JSocket, SocketOption} @@ -71,7 +71,7 @@ trait SelectableChannel extends BlockingChannel { protected def makeBlockingOps: BlockingOps - final override def useBlocking[R, E >: IOException, A](f: BlockingOps => ZIO[R, E, A])(implicit + final override def flatMapBlocking[R, E >: IOException, A](f: BlockingOps => ZIO[R, E, A])(implicit trace: ZTraceElement ): ZIO[R with Any, E, A] = configureBlocking(true) *> nioBlocking(f(makeBlockingOps)) @@ -84,22 +84,11 @@ trait SelectableChannel extends BlockingChannel { * @param f * Uses the `NonBlockingOps` appropriate for this channel type to produce non-blocking effects. */ - final def useNonBlocking[R, E >: IOException, A](f: NonBlockingOps => ZIO[R, E, A])(implicit + final def flatMapNonBlocking[R, E >: IOException, A](f: NonBlockingOps => ZIO[R, E, A])(implicit trace: ZTraceElement ): ZIO[R, E, A] = configureBlocking(false) *> f(makeNonBlockingOps) - /** - * Puts this channel into non-blocking mode and performs a set of non-blocking operations as a managed resource. - * - * @param f - * Uses the `NonBlockingOps` appropriate for this channel type to produce non-blocking effects. - */ - final def useNonBlockingManaged[R, E >: IOException, A](f: NonBlockingOps => ZManaged[R, E, A])(implicit - trace: ZTraceElement - ): ZManaged[R, E, A] = - configureBlocking(false).toManaged *> f(makeNonBlockingOps) - } final class SocketChannel(override protected[channels] val channel: JSocketChannel) extends SelectableChannel { @@ -170,11 +159,11 @@ object SocketChannel { def fromJava(javaSocketChannel: JSocketChannel): SocketChannel = new SocketChannel(javaSocketChannel) - def open(implicit trace: ZTraceElement): Managed[IOException, SocketChannel] = - IO.attempt(new SocketChannel(JSocketChannel.open())).refineToOrDie[IOException].toNioManaged + def open(implicit trace: ZTraceElement): ZIO[Scope, IOException, SocketChannel] = + IO.attempt(new SocketChannel(JSocketChannel.open())).refineToOrDie[IOException].toNioScoped - def open(remote: SocketAddress)(implicit trace: ZTraceElement): Managed[IOException, SocketChannel] = - IO.attempt(new SocketChannel(JSocketChannel.open(remote.jSocketAddress))).refineToOrDie[IOException].toNioManaged + def open(remote: SocketAddress)(implicit trace: ZTraceElement): ZIO[Scope, IOException, SocketChannel] = + IO.attempt(new SocketChannel(JSocketChannel.open(remote.jSocketAddress))).refineToOrDie[IOException].toNioScoped } @@ -189,14 +178,11 @@ final class ServerSocketChannel(override protected val channel: JServerSocketCha /** * Accepts a socket connection. * - * Note that the accept operation is not performed until the returned managed resource is actually used. - * `Managed.preallocate` can be used to preform the accept immediately. - * * @return * The channel for the accepted socket connection. */ - def accept(implicit trace: ZTraceElement): Managed[IOException, SocketChannel] = - IO.attempt(new SocketChannel(channel.accept())).refineToOrDie[IOException].toNioManaged + def accept(implicit trace: ZTraceElement): ZIO[Scope, IOException, SocketChannel] = + IO.attempt(new SocketChannel(channel.accept())).refineToOrDie[IOException].toNioScoped /** * Accepts a connection and uses it to perform an effect on a forked fiber. @@ -208,8 +194,17 @@ final class ServerSocketChannel(override protected val channel: JServerSocketCha */ def acceptAndFork[R, A]( use: SocketChannel => ZIO[R, IOException, A] - )(implicit trace: ZTraceElement): ZIO[R, IOException, Fiber[IOException, A]] = accept.useForked(use) - + )(implicit trace: ZTraceElement): ZIO[R, IOException, Fiber[IOException, A]] = + ZIO.uninterruptibleMask { restore => + Scope.make.flatMap { scope => + scope + .extend(restore(accept)) + .foldCauseZIO( + cause => scope.close(Exit.failCause(cause)) *> ZIO.failCause(cause), + socketChannel => scope.use[R](restore(use(socketChannel))).fork + ) + } + } } override protected def makeBlockingOps: BlockingServerSocketOps = new BlockingServerSocketOps @@ -219,19 +214,18 @@ final class ServerSocketChannel(override protected val channel: JServerSocketCha /** * Accepts a socket connection. * - * Note that the accept operation is not performed until the returned managed resource is actually used. - * `Managed.preallocate` can be used to preform the accept immediately. - * * @return * None if no connection is currently available to be accepted. */ - def accept(implicit trace: ZTraceElement): Managed[IOException, Option[SocketChannel]] = - IO.attempt(Option(channel.accept()).map(new SocketChannel(_))) - .refineToOrDie[IOException] - .toManagedWith(IO.whenCase(_) { case Some(channel) => + def accept(implicit trace: ZTraceElement): ZIO[Scope, IOException, Option[SocketChannel]] = + ZIO.acquireRelease { + IO.attempt(Option(channel.accept()).map(new SocketChannel(_))) + .refineToOrDie[IOException] + } { + IO.whenCase(_) { case Some(channel) => channel.close.ignore - }) - + } + } } override protected def makeNonBlockingOps: NonBlockingServerSocketOps = new NonBlockingServerSocketOps @@ -257,8 +251,8 @@ final class ServerSocketChannel(override protected val channel: JServerSocketCha object ServerSocketChannel { - def open(implicit trace: ZTraceElement): Managed[IOException, ServerSocketChannel] = - IO.attempt(new ServerSocketChannel(JServerSocketChannel.open())).refineToOrDie[IOException].toNioManaged + def open(implicit trace: ZTraceElement): ZIO[Scope, IOException, ServerSocketChannel] = + IO.attempt(new ServerSocketChannel(JServerSocketChannel.open())).refineToOrDie[IOException].toNioScoped def fromJava(javaChannel: JServerSocketChannel): ServerSocketChannel = new ServerSocketChannel(javaChannel) } diff --git a/nio/src/main/scala/zio/nio/channels/Selector.scala b/nio/src/main/scala/zio/nio/channels/Selector.scala index b7788f95..7f2005ac 100644 --- a/nio/src/main/scala/zio/nio/channels/Selector.scala +++ b/nio/src/main/scala/zio/nio/channels/Selector.scala @@ -4,7 +4,7 @@ package channels import com.github.ghik.silencer.silent import zio.nio.channels.spi.SelectorProvider import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{Duration, IO, Managed, UIO, ZIO, ZTraceElement} +import zio.{Duration, IO, Scope, UIO, ZIO, ZTraceElement} import java.io.IOException import java.nio.channels.{SelectionKey => JSelectionKey, Selector => JSelector} @@ -143,7 +143,7 @@ object Selector { /** * Opens a selector. */ - def open(implicit trace: ZTraceElement): Managed[IOException, Selector] = - IO.attempt(new Selector(JSelector.open())).refineToOrDie[IOException].toNioManaged + def open(implicit trace: ZTraceElement): ZIO[Scope, IOException, Selector] = + IO.attempt(new Selector(JSelector.open())).refineToOrDie[IOException].toNioScoped } diff --git a/nio/src/main/scala/zio/nio/charset/CharsetDecoder.scala b/nio/src/main/scala/zio/nio/charset/CharsetDecoder.scala index 45419208..7f89228b 100644 --- a/nio/src/main/scala/zio/nio/charset/CharsetDecoder.scala +++ b/nio/src/main/scala/zio/nio/charset/CharsetDecoder.scala @@ -92,11 +92,11 @@ final class CharsetDecoder private (val javaDecoder: j.CharsetDecoder) extends A )(implicit trace: ZTraceElement): ZPipeline[Any, j.CharacterCodingException, Byte, Char] = { def push(implicit trace: ZTraceElement - ): Managed[Nothing, Option[Chunk[Byte]] => IO[j.CharacterCodingException, Chunk[Char]]] = + ): ZIO[Any, Nothing, Option[Chunk[Byte]] => IO[j.CharacterCodingException, Chunk[Char]]] = for { - _ <- reset.toManaged - byteBuffer <- Buffer.byte(bufSize).toManaged - charBuffer <- Buffer.char((bufSize.toFloat * this.averageCharsPerByte).round).toManaged + _ <- reset + byteBuffer <- Buffer.byte(bufSize) + charBuffer <- Buffer.char((bufSize.toFloat * this.averageCharsPerByte).round) } yield { def handleCoderResult(coderResult: CoderResult)(implicit trace: ZTraceElement) = diff --git a/nio/src/main/scala/zio/nio/charset/CharsetEncoder.scala b/nio/src/main/scala/zio/nio/charset/CharsetEncoder.scala index b40ee198..114fdec7 100644 --- a/nio/src/main/scala/zio/nio/charset/CharsetEncoder.scala +++ b/nio/src/main/scala/zio/nio/charset/CharsetEncoder.scala @@ -72,11 +72,11 @@ final class CharsetEncoder private (val javaEncoder: j.CharsetEncoder) extends A def transducer( bufSize: Int = 5000 )(implicit trace: ZTraceElement): ZPipeline[Any, j.CharacterCodingException, Char, Byte] = { - val push: UManaged[Option[Chunk[Char]] => IO[j.CharacterCodingException, Chunk[Byte]]] = { + val push: ZIO[Any, Nothing, Option[Chunk[Char]] => IO[j.CharacterCodingException, Chunk[Byte]]] = { for { - _ <- reset.toManaged - charBuffer <- Buffer.char((bufSize.toFloat / this.averageBytesPerChar).round).toManaged - byteBuffer <- Buffer.byte(bufSize).toManaged + _ <- reset + charBuffer <- Buffer.char((bufSize.toFloat / this.averageBytesPerChar).round) + byteBuffer <- Buffer.byte(bufSize) } yield { def handleCoderResult(coderResult: CoderResult) = diff --git a/nio/src/main/scala/zio/nio/file/FileSystem.scala b/nio/src/main/scala/zio/nio/file/FileSystem.scala index 7c740c4d..364d3096 100644 --- a/nio/src/main/scala/zio/nio/file/FileSystem.scala +++ b/nio/src/main/scala/zio/nio/file/FileSystem.scala @@ -3,7 +3,7 @@ package file import zio.ZIO.attemptBlockingIO import zio.stacktracer.TracingImplicits.disableAutoTrace -import zio.{IO, UIO, ZIO, ZManaged, ZTraceElement} +import zio.{IO, Scope, UIO, ZIO, ZTraceElement} import java.io.IOException import java.net.URI @@ -38,8 +38,8 @@ final class FileSystem private (private val javaFileSystem: jf.FileSystem) exten def getUserPrincipalLookupService: UserPrincipalLookupService = javaFileSystem.getUserPrincipalLookupService - def newWatchService(implicit trace: ZTraceElement): ZManaged[Any, IOException, WatchService] = - attemptBlockingIO(WatchService.fromJava(javaFileSystem.newWatchService())).toNioManaged + def newWatchService(implicit trace: ZTraceElement): ZIO[Scope, IOException, WatchService] = + attemptBlockingIO(WatchService.fromJava(javaFileSystem.newWatchService())).toNioScoped } @@ -63,17 +63,17 @@ object FileSystem { def newFileSystem(uri: URI, env: (String, Any)*)(implicit trace: ZTraceElement - ): ZManaged[Any, IOException, FileSystem] = - attemptBlockingIO(new FileSystem(jf.FileSystems.newFileSystem(uri, env.toMap.asJava))).toNioManaged + ): ZIO[Scope, IOException, FileSystem] = + attemptBlockingIO(new FileSystem(jf.FileSystems.newFileSystem(uri, env.toMap.asJava))).toNioScoped def newFileSystem(uri: URI, env: Map[String, _], loader: ClassLoader)(implicit trace: ZTraceElement - ): ZManaged[Any, Exception, FileSystem] = - attemptBlockingIO(new FileSystem(jf.FileSystems.newFileSystem(uri, env.asJava, loader))).toNioManaged + ): ZIO[Scope, Exception, FileSystem] = + attemptBlockingIO(new FileSystem(jf.FileSystems.newFileSystem(uri, env.asJava, loader))).toNioScoped def newFileSystem(path: Path, loader: ClassLoader)(implicit trace: ZTraceElement - ): ZManaged[Any, IOException, FileSystem] = - attemptBlockingIO(new FileSystem(jf.FileSystems.newFileSystem(path.javaPath, loader))).toNioManaged + ): ZIO[Scope, IOException, FileSystem] = + attemptBlockingIO(new FileSystem(jf.FileSystems.newFileSystem(path.javaPath, loader))).toNioScoped } diff --git a/nio/src/main/scala/zio/nio/file/Files.scala b/nio/src/main/scala/zio/nio/file/Files.scala index 3898ba96..1193a546 100644 --- a/nio/src/main/scala/zio/nio/file/Files.scala +++ b/nio/src/main/scala/zio/nio/file/Files.scala @@ -4,7 +4,7 @@ import zio.ZIO.attemptBlocking import zio.nio.charset.Charset import zio.stacktracer.TracingImplicits.disableAutoTrace import zio.stream.{ZSink, ZStream} -import zio.{Chunk, ZIO, ZManaged, ZTraceElement} +import zio.{Chunk, Scope, ZIO, ZTraceElement} import java.io.IOException import java.nio.file.attribute._ @@ -27,20 +27,20 @@ object Files { def newDirectoryStream(dir: Path, glob: String = "*")(implicit trace: ZTraceElement ): ZStream[Any, IOException, Path] = { - val managed = ZManaged + val scoped = ZIO .fromAutoCloseable(attemptBlocking(JFiles.newDirectoryStream(dir.javaPath, glob))) .map(_.iterator()) - ZStream.fromJavaIteratorManaged(managed).map(Path.fromJava).refineToOrDie[IOException] + ZStream.fromJavaIteratorScoped(scoped).map(Path.fromJava).refineToOrDie[IOException] } def newDirectoryStream(dir: Path, filter: Path => Boolean)(implicit trace: ZTraceElement ): ZStream[Any, IOException, Path] = { val javaFilter: DirectoryStream.Filter[_ >: JPath] = javaPath => filter(Path.fromJava(javaPath)) - val managed = ZManaged + val scoped = ZIO .fromAutoCloseable(attemptBlocking(JFiles.newDirectoryStream(dir.javaPath, javaFilter))) .map(_.iterator()) - ZStream.fromJavaIteratorManaged(managed).map(Path.fromJava).refineToOrDie[IOException] + ZStream.fromJavaIteratorScoped(scoped).map(Path.fromJava).refineToOrDie[IOException] } def createFile(path: Path, attrs: FileAttribute[_]*)(implicit trace: ZTraceElement): ZIO[Any, IOException, Unit] = @@ -65,15 +65,13 @@ object Files { attemptBlocking(Path.fromJava(JFiles.createTempFile(dir.javaPath, prefix.orNull, suffix, fileAttributes.toSeq: _*))) .refineToOrDie[IOException] - def createTempFileInManaged( + def createTempFileInScoped( dir: Path, suffix: String = ".tmp", prefix: Option[String] = None, fileAttributes: Iterable[FileAttribute[_]] = Nil - )(implicit trace: ZTraceElement): ZManaged[Any, IOException, Path] = - ZManaged.acquireReleaseWith(createTempFileIn(dir, suffix, prefix, fileAttributes))(release = - deleteIfExists(_).ignore - ) + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, Path] = + ZIO.acquireRelease(createTempFileIn(dir, suffix, prefix, fileAttributes))(release = deleteIfExists(_).ignore) def createTempFile( suffix: String = ".tmp", @@ -83,12 +81,12 @@ object Files { attemptBlocking(Path.fromJava(JFiles.createTempFile(prefix.orNull, suffix, fileAttributes.toSeq: _*))) .refineToOrDie[IOException] - def createTempFileManaged( + def createTempFileScoped( suffix: String = ".tmp", prefix: Option[String] = None, fileAttributes: Iterable[FileAttribute[_]] = Nil - )(implicit trace: ZTraceElement): ZManaged[Any, IOException, Path] = - ZManaged.acquireReleaseWith(createTempFile(suffix, prefix, fileAttributes))(release = deleteIfExists(_).ignore) + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, Path] = + ZIO.acquireRelease(createTempFile(suffix, prefix, fileAttributes))(release = deleteIfExists(_).ignore) def createTempDirectory( dir: Path, @@ -98,12 +96,12 @@ object Files { attemptBlocking(Path.fromJava(JFiles.createTempDirectory(dir.javaPath, prefix.orNull, fileAttributes.toSeq: _*))) .refineToOrDie[IOException] - def createTempDirectoryManaged( + def createTempDirectoryScoped( dir: Path, prefix: Option[String], fileAttributes: Iterable[FileAttribute[_]] - )(implicit trace: ZTraceElement): ZManaged[Any, IOException, Path] = - ZManaged.acquireReleaseWith(createTempDirectory(dir, prefix, fileAttributes))(release = deleteRecursive(_).ignore) + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, Path] = + ZIO.acquireRelease(createTempDirectory(dir, prefix, fileAttributes))(release = deleteRecursive(_).ignore) def createTempDirectory( prefix: Option[String], @@ -112,11 +110,11 @@ object Files { attemptBlocking(Path.fromJava(JFiles.createTempDirectory(prefix.orNull, fileAttributes.toSeq: _*))) .refineToOrDie[IOException] - def createTempDirectoryManaged( + def createTempDirectoryScoped( prefix: Option[String], fileAttributes: Iterable[FileAttribute[_]] - )(implicit trace: ZTraceElement): ZManaged[Any, IOException, Path] = - ZManaged.acquireReleaseWith(createTempDirectory(prefix, fileAttributes))(release = deleteRecursive(_).ignore) + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, Path] = + ZIO.acquireRelease(createTempDirectory(prefix, fileAttributes))(release = deleteRecursive(_).ignore) def createSymbolicLink( link: Path, @@ -344,15 +342,15 @@ object Files { trace: ZTraceElement ): ZStream[Any, IOException, String] = ZStream - .fromJavaStreamManaged( - ZManaged.fromAutoCloseable(attemptBlocking(JFiles.lines(path.javaPath, charset.javaCharset))) + .fromJavaStreamScoped( + ZIO.fromAutoCloseable(attemptBlocking(JFiles.lines(path.javaPath, charset.javaCharset))) ) .refineToOrDie[IOException] def list(path: Path)(implicit trace: ZTraceElement): ZStream[Any, IOException, Path] = ZStream - .fromJavaStreamManaged( - ZManaged.fromAutoCloseable(attemptBlocking(JFiles.list(path.javaPath))) + .fromJavaStreamScoped( + ZIO.fromAutoCloseable(attemptBlocking(JFiles.list(path.javaPath))) ) .map(Path.fromJava) .refineToOrDie[IOException] @@ -363,8 +361,8 @@ object Files { visitOptions: Set[FileVisitOption] = Set.empty )(implicit trace: ZTraceElement): ZStream[Any, IOException, Path] = ZStream - .fromJavaStreamManaged( - ZManaged.fromAutoCloseable(attemptBlocking(JFiles.walk(path.javaPath, maxDepth, visitOptions.toSeq: _*))) + .fromJavaStreamScoped( + ZIO.fromAutoCloseable(attemptBlocking(JFiles.walk(path.javaPath, maxDepth, visitOptions.toSeq: _*))) ) .map(Path.fromJava) .refineToOrDie[IOException] @@ -374,8 +372,8 @@ object Files { )(implicit trace: ZTraceElement): ZStream[Any, IOException, Path] = { val matcher: BiPredicate[JPath, BasicFileAttributes] = (path, attr) => test(Path.fromJava(path), attr) ZStream - .fromJavaStreamManaged( - ZManaged.fromAutoCloseable( + .fromJavaStreamScoped( + ZIO.fromAutoCloseable( attemptBlocking(JFiles.find(path.javaPath, maxDepth, matcher, visitOptions.toSeq: _*)) ) ) @@ -388,8 +386,10 @@ object Files { target: Path, options: CopyOption* )(implicit trace: ZTraceElement): ZIO[Any, IOException, Long] = - in.toInputStream - .use(inputStream => attemptBlocking(JFiles.copy(inputStream, target.javaPath, options: _*))) - .refineToOrDie[IOException] + ZIO.scoped { + in.toInputStream + .flatMap(inputStream => attemptBlocking(JFiles.copy(inputStream, target.javaPath, options: _*))) + .refineToOrDie[IOException] + } } diff --git a/nio/src/main/scala/zio/nio/file/WatchService.scala b/nio/src/main/scala/zio/nio/file/WatchService.scala index 7a8ee5bb..86bf767c 100644 --- a/nio/src/main/scala/zio/nio/file/WatchService.scala +++ b/nio/src/main/scala/zio/nio/file/WatchService.scala @@ -56,24 +56,24 @@ final class WatchKey private[file] (private val javaKey: JWatchKey) { * Retrieves and removes all pending events for this watch key. * * This does not block, it will immediately return an empty list if there are no events pending. Typically, this key - * should be reset after processing the returned events, the `pollEventsManaged` method can be used to do this + * should be reset after processing the returned events, the `pollEventsScoped` method can be used to do this * automatically and reliably. */ def pollEvents(implicit trace: ZTraceElement): UIO[List[WatchEvent[_]]] = UIO.succeed(javaKey.pollEvents().asScala.toList) /** - * Retrieves and removes all pending events for this watch key as a managed resource. + * Retrieves and removes all pending events for this watch key as a scoped resource. * - * This does not block, it will immediately return an empty list if there are no events pending. When the returned - * `Managed` completed, this key will be '''reset'''. + * This does not block, it will immediately return an empty list if there are no events pending. When the `Scope` is + * closed, this key will be '''reset'''. */ - def pollEventsManaged(implicit trace: ZTraceElement): Managed[Nothing, List[WatchEvent[_]]] = - pollEvents.toManaged.ensuring(reset) + def pollEventsScoped(implicit trace: ZTraceElement): ZIO[Scope, Nothing, List[WatchEvent[_]]] = + pollEvents.withFinalizer(reset) /** * Resets this watch key, making it eligible to be re-queued in the `WatchService`. A key is typically reset after all - * the pending events retrieved from `pollEvents` have been processed. Use `pollEventsManaged` to automatically and + * the pending events retrieved from `pollEvents` have been processed. Use `pollEventsScop[ed` to automatically and * reliably perform a reset. */ def reset(implicit trace: ZTraceElement): UIO[Boolean] = UIO.succeed(javaKey.reset()) @@ -161,7 +161,7 @@ final class WatchService private (private[file] val javaWatchService: JWatchServ object WatchService { - def forDefaultFileSystem(implicit trace: ZTraceElement): ZManaged[Any, IOException, WatchService] = + def forDefaultFileSystem(implicit trace: ZTraceElement): ZIO[Scope, IOException, WatchService] = FileSystem.default.newWatchService def fromJava(javaWatchService: JWatchService): WatchService = new WatchService(javaWatchService) diff --git a/nio/src/main/scala/zio/nio/package.scala b/nio/src/main/scala/zio/nio/package.scala index 8b98a934..11d4c578 100644 --- a/nio/src/main/scala/zio/nio/package.scala +++ b/nio/src/main/scala/zio/nio/package.scala @@ -1,7 +1,6 @@ package zio import com.github.ghik.silencer.silent -import zio.ZManaged.ReleaseMap import zio.stream.{ZChannel, ZPipeline, ZSink, ZStream} import java.io.EOFException @@ -48,26 +47,8 @@ package object nio { private val acquire: ZIO[R, E, A] ) extends AnyVal { - def toNioManaged(implicit trace: ZTraceElement): ZManaged[R, E, A] = - ZManaged.acquireReleaseInterruptibleWith(acquire)(_.close.ignore) + def toNioScoped(implicit trace: ZTraceElement): ZIO[R with Scope, E, A] = + acquire.tap(a => ZIO.addFinalizer(a.close.ignore)) } - - implicit final class ManagedOps[-R, +E, +A](private val managed: ZManaged[R, E, A]) extends AnyVal { - - /** - * Use this managed resource in an effect running in a forked fiber. The resource will be released on the forked - * fiber after the effect exits, whether it succeeds, fails or is interrupted. - * - * @param f - * The effect to run in a forked fiber. The resource is only valid within this effect. - */ - def useForked[R2 <: R, E2 >: E, B]( - f: A => ZIO[R2, E2, B] - )(implicit trace: ZTraceElement): ZIO[R2, E, Fiber[E2, B]] = ReleaseMap.make.flatMap { releaseMap => - managed.zio.flatMap { case (finalizer, a: A) => - f(a).onExit(finalizer).fork - } - } - } } diff --git a/nio/src/test/scala/zio/nio/BaseSpec.scala b/nio/src/test/scala/zio/nio/BaseSpec.scala index 1ce26a96..7479a024 100644 --- a/nio/src/test/scala/zio/nio/BaseSpec.scala +++ b/nio/src/test/scala/zio/nio/BaseSpec.scala @@ -1,8 +1,8 @@ package zio.nio import zio._ -import zio.test.{DefaultRunnableSpec, Live, TestAspect, TestAspectAtLeastR} +import zio.test.{Live, TestAspect, TestAspectAtLeastR, ZIOSpecDefault} -trait BaseSpec extends DefaultRunnableSpec { - override def aspects: List[TestAspectAtLeastR[Live]] = List(TestAspect.timeout(60.seconds)) +trait BaseSpec extends ZIOSpecDefault { + override def aspects: Chunk[TestAspectAtLeastR[Live]] = Chunk(TestAspect.timeout(60.seconds)) } diff --git a/nio/src/test/scala/zio/nio/channels/AsynchronousChannelGroupSpec.scala b/nio/src/test/scala/zio/nio/channels/AsynchronousChannelGroupSpec.scala index 7291c4f7..0677dc4c 100644 --- a/nio/src/test/scala/zio/nio/channels/AsynchronousChannelGroupSpec.scala +++ b/nio/src/test/scala/zio/nio/channels/AsynchronousChannelGroupSpec.scala @@ -57,7 +57,8 @@ object AsynchronousChannelGroupSpec extends BaseSpec { } yield assert(result)(dies(anything)) }, test("companion object create instance using executor and initial size") { - ZIO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())) + ZIO + .attempt(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())) .acquireReleaseWith(executor => ZIO.succeed(executor.shutdown())) { executor => AsynchronousChannelGroup(executor, 1).exit.map(result => assert(result.toEither)(isRight(anything))) } @@ -73,7 +74,8 @@ object AsynchronousChannelGroupSpec extends BaseSpec { } yield assert(result)(dies(isSubtype[NullPointerException](anything))) }, test("companion object create instance using executor service") { - ZIO(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())) + ZIO + .attempt(ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())) .acquireReleaseWith(executor => ZIO.succeed(executor.shutdown())) { executor => AsynchronousChannelGroup(executor).exit.map(result => assert(result.toEither)(isRight(anything))) } @@ -114,6 +116,8 @@ object AsynchronousChannelGroupSpec extends BaseSpec { def providedFixture(f: ClassFixture => ZIO[Any, Throwable, TestResult])(implicit trace: ZTraceElement ): ZIO[Any, Throwable, TestResult] = - ZIO(ClassFixture()).acquireReleaseWith(fixture => ZIO.succeed(fixture.cleanFixture()))(fixture => f(fixture)) + ZIO + .attempt(ClassFixture()) + .acquireReleaseWith(fixture => ZIO.succeed(fixture.cleanFixture()))(fixture => f(fixture)) } } diff --git a/nio/src/test/scala/zio/nio/channels/ChannelSpec.scala b/nio/src/test/scala/zio/nio/channels/ChannelSpec.scala index 98b8da92..1f7784bb 100644 --- a/nio/src/test/scala/zio/nio/channels/ChannelSpec.scala +++ b/nio/src/test/scala/zio/nio/channels/ChannelSpec.scala @@ -11,14 +11,10 @@ import java.{nio => jnio} object ChannelSpec extends BaseSpec { - override def spec: Spec[ - Annotations with Live with Sized with TestClock with TestConfig with TestConsole with TestRandom with TestSystem with Clock with zio.Console with zio.System with Random, - TestFailure[Any], - TestSuccess - ] = + override def spec = suite("Channel")( test("localAddress") { - SocketChannel.open.use { con => + SocketChannel.open.flatMap { con => for { _ <- con.bindAuto localAddress <- con.localAddress @@ -30,33 +26,39 @@ object ChannelSpec extends BaseSpec { def echoServer(started: Promise[Nothing, SocketAddress])(implicit trace: ZTraceElement): IO[Exception, Unit] = for { sink <- Buffer.byte(3) - _ <- AsynchronousServerSocketChannel.open.use { server => - for { - _ <- server.bindAuto() - addr <- server.localAddress.flatMap(opt => IO.attempt(opt.get).orDie) - _ <- started.succeed(addr) - _ <- server.accept.use { worker => - worker.read(sink) *> - sink.flip *> - worker.write(sink) - } - } yield () + _ <- ZIO.scoped { + AsynchronousServerSocketChannel.open.flatMap { server => + for { + _ <- server.bindAuto() + addr <- server.localAddress.flatMap(opt => IO.attempt(opt.get).orDie) + _ <- started.succeed(addr) + _ <- ZIO.scoped { + server.accept.flatMap { worker => + worker.read(sink) *> + sink.flip *> + worker.write(sink) + } + } + } yield () + } }.fork } yield () def echoClient(address: SocketAddress)(implicit trace: ZTraceElement): IO[Exception, Boolean] = for { src <- Buffer.byte(3) - result <- AsynchronousSocketChannel.open.use { client => - for { - _ <- client.connect(address) - sent <- src.array - _ = sent.update(0, 1) - _ <- client.write(src) - _ <- src.flip - _ <- client.read(src) - received <- src.array - } yield sent.sameElements(received) + result <- ZIO.scoped { + AsynchronousSocketChannel.open.flatMap { client => + for { + _ <- client.connect(address) + sent <- src.array + _ = sent.update(0, 1) + _ <- client.write(src) + _ <- src.flip + _ <- client.read(src) + received <- src.array + } yield sent.sameElements(received) + } } } yield result @@ -72,27 +74,34 @@ object ChannelSpec extends BaseSpec { trace: ZTraceElement ): IO[Exception, Fiber[Exception, Boolean]] = for { - result <- AsynchronousServerSocketChannel.open.use { server => - for { - _ <- server.bindAuto() - addr <- server.localAddress.flatMap(opt => IO.attempt(opt.get).orDie) - _ <- started.succeed(addr) - result <- server.accept - .use(worker => worker.readChunk(3) *> worker.readChunk(3) *> ZIO.succeed(false)) - .catchSome { case _: java.io.EOFException => + result <- ZIO.scoped { + AsynchronousServerSocketChannel.open.flatMap { server => + for { + _ <- server.bindAuto() + addr <- server.localAddress.flatMap(opt => IO.attempt(opt.get).orDie) + _ <- started.succeed(addr) + result <- ZIO.scoped { + server.accept + .flatMap(worker => + worker.readChunk(3) *> worker.readChunk(3) *> ZIO.succeed(false) + ) + }.catchSome { case _: java.io.EOFException => ZIO.succeed(true) } - } yield result + } yield result + } }.fork } yield result def client(address: SocketAddress)(implicit trace: ZTraceElement): IO[Exception, Unit] = for { - _ <- AsynchronousSocketChannel.open.use { client => - for { - _ <- client.connect(address) - _ = client.writeChunk(Chunk.fromArray(Array[Byte](1, 1, 1))) - } yield () + _ <- ZIO.scoped { + AsynchronousSocketChannel.open.flatMap { client => + for { + _ <- client.connect(address) + _ = client.writeChunk(Chunk.fromArray(Array[Byte](1, 1, 1))) + } yield () + } } } yield () @@ -106,61 +115,71 @@ object ChannelSpec extends BaseSpec { }, test("close channel unbind port") { def client(address: SocketAddress)(implicit trace: ZTraceElement): IO[Exception, Unit] = - AsynchronousSocketChannel.open.use { - _.connect(address) + ZIO.scoped { + AsynchronousSocketChannel.open.flatMap { + _.connect(address) + } } def server( started: Promise[Nothing, SocketAddress] - )(implicit trace: ZTraceElement): Managed[IOException, Fiber[Exception, Unit]] = + )(implicit trace: ZTraceElement): ZIO[Scope, IOException, Fiber[Exception, Unit]] = for { server <- AsynchronousServerSocketChannel.open - _ <- server.bindAuto().toManaged - addr <- server.localAddress.someOrElseZIO(IO.die(new NoSuchElementException)).toManaged - _ <- started.succeed(addr).toManaged - worker <- server.accept.unit.fork + _ <- server.bindAuto() + addr <- server.localAddress.someOrElseZIO(IO.die(new NoSuchElementException)) + _ <- started.succeed(addr) + worker <- server.accept.unit.forkScoped } yield worker for { serverStarted1 <- Promise.make[Nothing, SocketAddress] - _ <- server(serverStarted1).use { s1 => - serverStarted1.await.flatMap(client).zipRight(s1.join) + _ <- ZIO.scoped { + server(serverStarted1).flatMap { s1 => + serverStarted1.await.flatMap(client).zipRight(s1.join) + } } serverStarted2 <- Promise.make[Nothing, SocketAddress] - _ <- server(serverStarted2).use { s2 => - serverStarted2.await.flatMap(client).zipRight(s2.join) + _ <- ZIO.scoped { + server(serverStarted2).flatMap { s2 => + serverStarted2.await.flatMap(client).zipRight(s2.join) + } } } yield assertCompletes }, test("read can be interrupted") { live { - AsynchronousServerSocketChannel.open - .tapZIO(_.bindAuto()) - .use { serverChannel => - for { - serverAddress <- - serverChannel.localAddress.someOrElseZIO(ZIO.dieMessage("Local address must be bound")) - promise <- Promise.make[Nothing, Unit] - fiber <- AsynchronousSocketChannel.open - .tapZIO(_.connect(serverAddress)) - .use(channel => promise.succeed(()) *> channel.readChunk(1)) - .fork - _ <- promise.await - _ <- ZIO.sleep(500.milliseconds) - exit <- fiber.interrupt - } yield assert(exit)(isInterrupted) - - } + ZIO.scoped { + AsynchronousServerSocketChannel.open + .tap(_.bindAuto()) + .flatMap { serverChannel => + for { + serverAddress <- + serverChannel.localAddress.someOrElseZIO(ZIO.dieMessage("Local address must be bound")) + promise <- Promise.make[Nothing, Unit] + fiber <- ZIO.scoped { + AsynchronousSocketChannel.open + .tap(_.connect(serverAddress)) + .flatMap(channel => promise.succeed(()) *> channel.readChunk(1)) + }.fork + _ <- promise.await + _ <- ZIO.sleep(500.milliseconds) + exit <- fiber.interrupt + } yield assert(exit)(isInterrupted) + } + } } }, test("accept can be interrupted") { live { - AsynchronousServerSocketChannel.open.tapZIO(_.bindAuto()).use { serverChannel => - for { - fiber <- serverChannel.accept.useNow.fork - _ <- ZIO.sleep(500.milliseconds) - exit <- fiber.interrupt - } yield assert(exit)(isInterrupted) + ZIO.scoped { + AsynchronousServerSocketChannel.open.tap(_.bindAuto()).flatMap { serverChannel => + for { + fiber <- ZIO.scoped(serverChannel.accept).fork + _ <- ZIO.sleep(500.milliseconds) + exit <- fiber.interrupt + } yield assert(exit)(isInterrupted) + } } } } @@ -182,10 +201,12 @@ object ChannelSpec extends BaseSpec { live { for { promise <- Promise.make[Nothing, Unit] - fiber <- Pipe.open.toManaged - .flatMap(_.source) - .useNioBlockingOps(ops => promise.succeed(()) *> ops.readChunk(1)) - .fork + fiber <- ZIO.scoped { + Pipe.open + .flatMap(_.source) + .flatMapNioBlockingOps(ops => promise.succeed(()) *> ops.readChunk(1)) + + }.fork _ <- promise.await _ <- ZIO.sleep(500.milliseconds) exit <- fiber.interrupt @@ -218,7 +239,7 @@ object ChannelSpec extends BaseSpec { val hangingChannel = new BlockingChannel { override type BlockingOps = GatheringByteOps - override def useBlocking[R, E >: IOException, A]( + override def flatMapBlocking[R, E >: IOException, A]( f: GatheringByteOps => ZIO[R, E, A] )(implicit trace: ZTraceElement): ZIO[R, E, A] = nioBlocking(f(hangingOps)) @@ -229,7 +250,9 @@ object ChannelSpec extends BaseSpec { for { promise <- Promise.make[Nothing, Unit] fiber <- - hangingChannel.useBlocking(ops => promise.succeed(()) *> ops.writeChunk(Chunk.single(42.toByte))).fork + hangingChannel + .flatMapBlocking(ops => promise.succeed(()) *> ops.writeChunk(Chunk.single(42.toByte))) + .fork _ <- promise.await _ <- ZIO.sleep(500.milliseconds) exit <- fiber.interrupt @@ -238,14 +261,16 @@ object ChannelSpec extends BaseSpec { }, test("accept can be interrupted") { live { - ServerSocketChannel.open.tapZIO(_.bindAuto()).use { serverChannel => - for { - promise <- Promise.make[Nothing, Unit] - fiber <- serverChannel.useBlocking(ops => promise.succeed(()) *> ops.accept.useNow).fork - _ <- promise.await - _ <- ZIO.sleep(500.milliseconds) - exit <- fiber.interrupt - } yield assert(exit)(isInterrupted) + ZIO.scoped { + ServerSocketChannel.open.tap(_.bindAuto()).flatMap { serverChannel => + for { + promise <- Promise.make[Nothing, Unit] + fiber <- serverChannel.flatMapBlocking(ops => promise.succeed(()) *> ZIO.scoped(ops.accept)).fork + _ <- promise.await + _ <- ZIO.sleep(500.milliseconds) + exit <- fiber.interrupt + } yield assert(exit)(isInterrupted) + } } } } diff --git a/nio/src/test/scala/zio/nio/channels/DatagramChannelSpec.scala b/nio/src/test/scala/zio/nio/channels/DatagramChannelSpec.scala index 1178aa8d..92968178 100644 --- a/nio/src/test/scala/zio/nio/channels/DatagramChannelSpec.scala +++ b/nio/src/test/scala/zio/nio/channels/DatagramChannelSpec.scala @@ -19,31 +19,35 @@ object DatagramChannelSpec extends BaseSpec { def echoServer(started: Promise[Nothing, SocketAddress])(implicit trace: ZTraceElement): UIO[Unit] = for { sink <- Buffer.byte(3) - _ <- DatagramChannel.open.useNioBlocking { (server, ops) => - for { - _ <- server.bindAuto - addr <- server.localAddress.someOrElseZIO(ZIO.dieMessage("Must have local address")) - _ <- started.succeed(addr) - addr <- ops.receive(sink) - _ <- sink.flip - _ <- ops.send(sink, addr) - } yield () + _ <- ZIO.scoped { + DatagramChannel.open.flatMapNioBlocking { (server, ops) => + for { + _ <- server.bindAuto + addr <- server.localAddress.someOrElseZIO(ZIO.dieMessage("Must have local address")) + _ <- started.succeed(addr) + addr <- ops.receive(sink) + _ <- sink.flip + _ <- ops.send(sink, addr) + } yield () + } }.fork } yield () def echoClient(address: SocketAddress)(implicit trace: ZTraceElement): IO[IOException, Boolean] = for { src <- Buffer.byte(3) - result <- DatagramChannel.open.useNioBlockingOps { client => - for { - _ <- client.connect(address) - sent <- src.array - _ = sent.update(0, 1) - _ <- client.send(src, address) - _ <- src.flip - _ <- client.read(src) - received <- src.array - } yield sent.sameElements(received) + result <- ZIO.scoped { + DatagramChannel.open.flatMapNioBlockingOps { client => + for { + _ <- client.connect(address) + sent <- src.array + _ = sent.update(0, 1) + _ <- client.send(src, address) + _ <- src.flip + _ <- client.read(src) + received <- src.array + } yield sent.sameElements(received) + } } } yield result @@ -56,19 +60,23 @@ object DatagramChannelSpec extends BaseSpec { }, test("close channel unbind port") { def client(address: SocketAddress)(implicit trace: ZTraceElement): IO[IOException, Unit] = - DatagramChannel.open.useNioBlockingOps(_.connect(address).unit) + ZIO.scoped { + DatagramChannel.open.flatMapNioBlockingOps(_.connect(address).unit) + } def server( address: Option[SocketAddress], started: Promise[Nothing, SocketAddress] )(implicit trace: ZTraceElement): UIO[Fiber[IOException, Unit]] = for { - worker <- DatagramChannel.open.useNioBlocking { (server, _) => - for { - _ <- server.bind(address) - addr <- server.localAddress.someOrElseZIO(ZIO.dieMessage("Local address must be bound")) - _ <- started.succeed(addr) - } yield () + worker <- ZIO.scoped { + DatagramChannel.open.flatMapNioBlocking { (server, _) => + for { + _ <- server.bind(address) + addr <- server.localAddress.someOrElseZIO(ZIO.dieMessage("Local address must be bound")) + _ <- started.succeed(addr) + } yield () + } }.fork } yield worker diff --git a/nio/src/test/scala/zio/nio/channels/FileChannelSpec.scala b/nio/src/test/scala/zio/nio/channels/FileChannelSpec.scala index c1d26988..6c6d159a 100644 --- a/nio/src/test/scala/zio/nio/channels/FileChannelSpec.scala +++ b/nio/src/test/scala/zio/nio/channels/FileChannelSpec.scala @@ -24,14 +24,10 @@ object FileChannelSpec extends BaseSpec { .acquireReleaseWith(s => ZIO.succeed(s.close()))(s => ZIO.attemptBlocking(s.getLines().toList)) .orDie - override def spec: Spec[ - Annotations with Live with Sized with TestClock with TestConfig with TestConsole with TestRandom with TestSystem with Clock with zio.Console with zio.System with Random, - TestFailure[Any], - TestSuccess - ] = + override def spec = suite("FileChannelSpec")( test("asynchronous file buffer read") { - AsynchronousFileChannel.open(readFile, StandardOpenOption.READ).use { channel => + AsynchronousFileChannel.open(readFile, StandardOpenOption.READ).flatMap { channel => for { buffer <- Buffer.byte(16) _ <- channel.read(buffer, 0) @@ -42,7 +38,7 @@ object FileChannelSpec extends BaseSpec { } }, test("asynchronous file chunk read") { - AsynchronousFileChannel.open(readFile, StandardOpenOption.READ).use { channel => + AsynchronousFileChannel.open(readFile, StandardOpenOption.READ).flatMap { channel => for { bytes <- channel.readChunk(500, 0L) chars <- Charset.Standard.utf8.decodeChunk(bytes) @@ -57,7 +53,7 @@ object FileChannelSpec extends BaseSpec { StandardOpenOption.CREATE, StandardOpenOption.WRITE ) - .use { channel => + .flatMap { channel => for { buffer <- Buffer.byte(Chunk.fromArray("Hello World".getBytes)) _ <- channel.write(buffer, 0) @@ -71,7 +67,7 @@ object FileChannelSpec extends BaseSpec { for { result <- FileChannel .open(readFile, StandardOpenOption.READ) - .useNioBlockingOps { ops => + .flatMapNioBlockingOps { ops => for { buffer <- ops.map(FileChannel.MapMode.READ_ONLY, 0L, 6L) bytes <- buffer.getChunk() @@ -83,7 +79,7 @@ object FileChannelSpec extends BaseSpec { test("end of stream") { FileChannel .open(readFile, StandardOpenOption.READ) - .useNioBlocking { (channel, ops) => + .flatMapNioBlocking { (channel, ops) => for { size <- channel.size _ <- ops.readChunk(size.toInt) @@ -96,7 +92,7 @@ object FileChannelSpec extends BaseSpec { test("stream reading") { FileChannel .open(readFile, StandardOpenOption.READ) - .useNioBlockingOps { + .flatMapNioBlockingOps { _.stream().via(Charset.Standard.utf8.newDecoder.transducer()).runCollect.map(_.mkString) } .map(assert(_)(equalTo(readFileContents))) @@ -109,7 +105,7 @@ object FileChannelSpec extends BaseSpec { val file = Path("nio/src/test/resources/sink_write_test.txt") FileChannel .open(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW) - .useNioBlockingOps(channel => stream.run(channel.sink())) + .flatMapNioBlockingOps(channel => stream.run(channel.sink())) .zipRight(loadViaSource(file).ensuring(Files.delete(file).orDie)) .map(lines => assert(lines.mkString("\n"))(equalTo(testData))) } diff --git a/nio/src/test/scala/zio/nio/channels/ScatterGatherChannelSpec.scala b/nio/src/test/scala/zio/nio/channels/ScatterGatherChannelSpec.scala index dfcefe6c..68ff964b 100644 --- a/nio/src/test/scala/zio/nio/channels/ScatterGatherChannelSpec.scala +++ b/nio/src/test/scala/zio/nio/channels/ScatterGatherChannelSpec.scala @@ -11,16 +11,12 @@ import scala.io.Source object ScatterGatherChannelSpec extends BaseSpec { - override def spec: Spec[ - Any with Annotations with Live with Sized with TestClock with TestConfig with TestConsole with TestRandom with TestSystem with Clock with zio.Console with zio.System with Random, - TestFailure[Any], - TestSuccess - ] = + override def spec = suite("ScatterGatherChannelSpec")( test("scattering read") { FileChannel .open(Path("nio/src/test/resources/scattering_read_test.txt"), StandardOpenOption.READ) - .useNioBlockingOps { ops => + .flatMapNioBlockingOps { ops => for { buffs <- IO.collectAll(List(Buffer.byte(5), Buffer.byte(5))) _ <- ops.read(buffs) @@ -39,7 +35,7 @@ object ScatterGatherChannelSpec extends BaseSpec { val file = Path("nio/src/test/resources/gathering_write_test.txt") FileChannel .open(file, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) - .useNioBlockingOps { ops => + .flatMapNioBlockingOps { ops => for { buffs <- IO.collectAll( List( @@ -51,7 +47,7 @@ object ScatterGatherChannelSpec extends BaseSpec { result = Source.fromFile(file.toFile).getLines().toSeq } yield assert(result)(equalTo(Seq("HelloWorld"))) } - .ensuring(IO.effectTotal(Files.delete(file.javaPath))) + .ensuring(IO.succeed(Files.delete(file.javaPath))) } ) } diff --git a/nio/src/test/scala/zio/nio/channels/SelectorSpec.scala b/nio/src/test/scala/zio/nio/channels/SelectorSpec.scala index d4438dd8..62901dd7 100644 --- a/nio/src/test/scala/zio/nio/channels/SelectorSpec.scala +++ b/nio/src/test/scala/zio/nio/channels/SelectorSpec.scala @@ -20,7 +20,7 @@ object SelectorSpec extends BaseSpec { test("read/write") { for { started <- Promise.make[Nothing, SocketAddress] - serverFiber <- server(started).useNow.fork + serverFiber <- ZIO.scoped(server(started)).fork addr <- started.await clientFiber <- client(addr).fork _ <- serverFiber.join @@ -29,12 +29,14 @@ object SelectorSpec extends BaseSpec { }, test("select is interruptible") { live { - Selector.open.use { selector => - for { - fiber <- selector.select.fork - _ <- ZIO.sleep(500.milliseconds) - exit <- fiber.interrupt - } yield assert(exit)(isInterrupted) + ZIO.scoped { + Selector.open.flatMap { selector => + for { + fiber <- selector.select.fork + _ <- ZIO.sleep(500.milliseconds) + exit <- fiber.interrupt + } yield assert(exit)(isInterrupted) + } } } } @@ -49,9 +51,9 @@ object SelectorSpec extends BaseSpec { def server( started: Promise[Nothing, SocketAddress] - )(implicit trace: ZTraceElement): ZManaged[Clock, Exception, Unit] = { + )(implicit trace: ZTraceElement): ZIO[Clock with Scope, Exception, Unit] = { def serverLoop( - scope: Managed.Scope, + scope: Scope, selector: Selector, buffer: ByteBuffer )(implicit trace: ZTraceElement): ZIO[Any, Exception, Unit] = @@ -62,14 +64,14 @@ object SelectorSpec extends BaseSpec { { case channel: ServerSocketChannel if readyOps(Operation.Accept) => for { - scopeResult <- scope(channel.useNonBlockingManaged(_.accept)) - (_, maybeClient) = scopeResult + scopeResult <- scope.extend(channel.flatMapNonBlocking(_.accept)) + maybeClient = scopeResult _ <- IO.whenCase(maybeClient) { case Some(client) => client.configureBlocking(false) *> client.register(selector, Set(Operation.Read)) } } yield () case channel: SocketChannel if readyOps(Operation.Read) => - channel.useNonBlocking { client => + channel.flatMapNonBlocking { client => for { _ <- client.read(buffer) _ <- buffer.flip @@ -86,26 +88,24 @@ object SelectorSpec extends BaseSpec { } yield () for { - scope <- Managed.scope + scope <- ZIO.scope selector <- Selector.open channel <- ServerSocketChannel.open - _ <- Managed.fromZIO { - for { - _ <- channel.bindAuto() - _ <- channel.configureBlocking(false) - _ <- channel.register(selector, Set(Operation.Accept)) - buffer <- Buffer.byte(256) - addr <- channel.localAddress - _ <- started.succeed(addr) + _ <- for { + _ <- channel.bindAuto() + _ <- channel.configureBlocking(false) + _ <- channel.register(selector, Set(Operation.Accept)) + buffer <- Buffer.byte(256) + addr <- channel.localAddress + _ <- started.succeed(addr) - /* - * we need to run the server loop twice: - * 1. to accept the client request - * 2. to read from the client channel - */ - _ <- serverLoop(scope, selector, buffer).repeat(Schedule.once) - } yield () - } + /* + * we need to run the server loop twice: + * 1. to accept the client request + * 2. to read from the client channel + */ + _ <- serverLoop(scope, selector, buffer).repeat(Schedule.once) + } yield () } yield () } @@ -113,15 +113,17 @@ object SelectorSpec extends BaseSpec { val bytes = Chunk.fromArray("Hello world".getBytes) for { buffer <- Buffer.byte(bytes) - text <- SocketChannel.open(address).useNioBlockingOps { client => - for { - _ <- client.write(buffer) - _ <- buffer.clear - _ <- client.read(buffer) - array <- buffer.array - text = byteArrayToString(array) - _ <- buffer.clear - } yield text + text <- ZIO.scoped { + SocketChannel.open(address).flatMapNioBlockingOps { client => + for { + _ <- client.write(buffer) + _ <- buffer.clear + _ <- client.read(buffer) + array <- buffer.array + text = byteArrayToString(array) + _ <- buffer.clear + } yield text + } } } yield text } diff --git a/nio/src/test/scala/zio/nio/file/FilesSpec.scala b/nio/src/test/scala/zio/nio/file/FilesSpec.scala index 47e0e75e..92e63d5b 100644 --- a/nio/src/test/scala/zio/nio/file/FilesSpec.scala +++ b/nio/src/test/scala/zio/nio/file/FilesSpec.scala @@ -3,77 +3,81 @@ package zio.nio.file import zio.nio.BaseSpec import zio.test.Assertion._ import zio.test._ -import zio.{Chunk, Clock, Random, Ref} +import zio.{Chunk, Clock, Random, Ref, ZIO} object FilesSpec extends BaseSpec { - override def spec: Spec[ - Annotations with Live with Sized with TestClock with TestConfig with TestConsole with TestRandom with TestSystem with Clock with zio.Console with zio.System with Random, - TestFailure[Any], - TestSuccess - ] = + override def spec: Spec[Any, TestFailure[Any], TestSuccess] = suite("FilesSpec")( - test("createTempFileInManaged cleans up temp file") { - val sampleFileContent = Chunk.fromArray("createTempFileInManaged works!".getBytes) + test("createTempFileInScoped cleans up temp file") { + val sampleFileContent = Chunk.fromArray("createTempFileInScoped works!".getBytes) for { pathRef <- Ref.make[Option[Path]](None) - readBytes <- Files - .createTempFileInManaged(dir = Path(".")) - .use { tmpFile => - pathRef.set(Some(tmpFile)) *> writeAndThenRead(tmpFile)(sampleFileContent) - } - Some(tmpFilePath) <- pathRef.get + readBytes <- ZIO.scoped { + Files + .createTempFileInScoped(dir = Path(".")) + .flatMap { tmpFile => + pathRef.set(Some(tmpFile)) *> writeAndThenRead(tmpFile)(sampleFileContent) + } + } + tmpFilePath <- pathRef.get.some tmpFileExistsAfterUsage <- Files.exists(tmpFilePath) } yield assert(readBytes)(equalTo(sampleFileContent)) && assert(tmpFileExistsAfterUsage)(isFalse) }, - test("createTempFileManaged cleans up temp file") { - val sampleFileContent = Chunk.fromArray("createTempFileManaged works!".getBytes) + test("createTempFileScoped cleans up temp file") { + val sampleFileContent = Chunk.fromArray("createTempFileScoped works!".getBytes) for { pathRef <- Ref.make[Option[Path]](None) - readBytes <- Files - .createTempFileManaged() - .use { tmpFile => - pathRef.set(Some(tmpFile)) *> writeAndThenRead(tmpFile)(sampleFileContent) - } - Some(tmpFilePath) <- pathRef.get + readBytes <- ZIO.scoped { + Files + .createTempFileScoped() + .flatMap { tmpFile => + pathRef.set(Some(tmpFile)) *> writeAndThenRead(tmpFile)(sampleFileContent) + } + } + tmpFilePath <- pathRef.get.some tmpFileExistsAfterUsage <- Files.exists(tmpFilePath) } yield assert(readBytes)(equalTo(sampleFileContent)) && assert(tmpFileExistsAfterUsage)(isFalse) }, - test("createTempDirectoryManaged cleans up temp dir") { - val sampleFileContent = Chunk.fromArray("createTempDirectoryManaged works!".getBytes) + test("createTempDirectoryScoped cleans up temp dir") { + val sampleFileContent = Chunk.fromArray("createTempDirectoryScoped works!".getBytes) for { pathRef <- Ref.make[Option[Path]](None) - readBytes <- Files - .createTempDirectoryManaged( - prefix = None, - fileAttributes = Nil - ) - .use { tmpDir => - val sampleFile = tmpDir / "createTempDirectoryManaged" - pathRef.set(Some(tmpDir)) *> createAndWriteAndThenRead(sampleFile)(sampleFileContent) - } - Some(tmpFilePath) <- pathRef.get + readBytes <- ZIO.scoped { + Files + .createTempDirectoryScoped( + prefix = None, + fileAttributes = Nil + ) + .flatMap { tmpDir => + val sampleFile = tmpDir / "createTempDirectoryScoped" + pathRef.set(Some(tmpDir)) *> createAndWriteAndThenRead(sampleFile)(sampleFileContent) + } + } + tmpFilePath <- pathRef.get.some tmpFileExistsAfterUsage <- Files.exists(tmpFilePath) } yield assert(readBytes)(equalTo(sampleFileContent)) && assert(tmpFileExistsAfterUsage)(isFalse) }, - test("createTempDirectoryManaged (dir) cleans up temp dir") { - val sampleFileContent = Chunk.fromArray("createTempDirectoryManaged(dir) works!".getBytes) + test("createTempDirectoryscoped (dir) cleans up temp dir") { + val sampleFileContent = Chunk.fromArray("createTempDirectoryScoped(dir) works!".getBytes) for { pathRef <- Ref.make[Option[Path]](None) - readBytes <- Files - .createTempDirectoryManaged( - dir = Path("."), - prefix = None, - fileAttributes = Nil - ) - .use { tmpDir => - val sampleFile = tmpDir / "createTempDirectoryManaged2" - pathRef.set(Some(tmpDir)) *> createAndWriteAndThenRead(sampleFile)(sampleFileContent) - } - Some(tmpFilePath) <- pathRef.get + readBytes <- ZIO.scoped { + Files + .createTempDirectoryScoped( + dir = Path("."), + prefix = None, + fileAttributes = Nil + ) + .flatMap { tmpDir => + val sampleFile = tmpDir / "createTempDirectoryScoped2" + pathRef.set(Some(tmpDir)) *> createAndWriteAndThenRead(sampleFile)(sampleFileContent) + } + } + tmpFilePath <- pathRef.get.some tmpFileExistsAfterUsage <- Files.exists(tmpFilePath) } yield assert(readBytes)(equalTo(sampleFileContent)) && assert(tmpFileExistsAfterUsage)(isFalse) diff --git a/nio/src/test/scala/zio/nio/file/PathSpec.scala b/nio/src/test/scala/zio/nio/file/PathSpec.scala index c7e19629..a52e7fa8 100644 --- a/nio/src/test/scala/zio/nio/file/PathSpec.scala +++ b/nio/src/test/scala/zio/nio/file/PathSpec.scala @@ -6,7 +6,7 @@ import zio.test._ object PathSpec extends BaseSpec { - override def spec: ZSpec[Environment, Failure] = + override def spec = suite("PathSpec")( test("Path construction") { val p = Path("a", "b") / "c/d" diff --git a/nio/src/test/scala/zio/nio/file/WathServiceSpec.scala b/nio/src/test/scala/zio/nio/file/WathServiceSpec.scala index 0b1354d4..67944276 100644 --- a/nio/src/test/scala/zio/nio/file/WathServiceSpec.scala +++ b/nio/src/test/scala/zio/nio/file/WathServiceSpec.scala @@ -1,5 +1,6 @@ package zio.nio.file +import zio.Scope import zio.nio.BaseSpec import zio.test.Assertion._ import zio.test._ @@ -9,10 +10,10 @@ import java.nio.file.StandardWatchEventKinds.ENTRY_CREATE object WathServiceSpec extends BaseSpec { - override def spec: Spec[Any, TestFailure[IOException], TestSuccess] = + override def spec: Spec[Scope, TestFailure[IOException], TestSuccess] = suite("WatchServiceSpec")( test("Watch Service register")( - FileSystem.default.newWatchService.use { watchService => + FileSystem.default.newWatchService.flatMap { watchService => for { watchKey <- Path("nio/src/test/resources").register(watchService, ENTRY_CREATE) watchable = watchKey.watchable