diff --git a/build.sbt b/build.sbt index 019c8b2202..0022e6a4d7 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._ Global / onChangedBuildSource := ReloadOnSourceChanges -ThisBuild / tlBaseVersion := "3.9" +ThisBuild / tlBaseVersion := "3.10" ThisBuild / organization := "co.fs2" ThisBuild / organizationName := "Functional Streams for Scala" diff --git a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala index 11938f6687..f895046adb 100644 --- a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala +++ b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala @@ -175,7 +175,7 @@ private[fs2] trait FilesCompanionPlatform { ) ).adaptError { case IOException(ex) => ex } else - walk(path, Int.MaxValue, true).evalTap(deleteIfExists).compile.drain + walk(path, WalkOptions.Default.withFollowLinks(true)).evalTap(deleteIfExists).compile.drain override def exists(path: Path, followLinks: Boolean): F[Boolean] = (if (followLinks) diff --git a/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala index 590da203af..2454ef4de1 100644 --- a/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala @@ -27,20 +27,21 @@ import cats.effect.kernel.{Async, Resource, Sync} import cats.syntax.all._ import java.nio.channels.{FileChannel, SeekableByteChannel} -import java.nio.file.{Files => JFiles, Path => JPath, _} +import java.nio.file.{Files => JFiles, Path => JPath, FileSystemLoopException => _, _} import java.nio.file.attribute.{ BasicFileAttributeView, BasicFileAttributes => JBasicFileAttributes, PosixFileAttributes => JPosixFileAttributes, - PosixFilePermissions + PosixFilePermissions, + FileTime } import java.security.Principal import java.util.stream.{Stream => JStream} import scala.concurrent.duration._ +import scala.util.control.NonFatal import fs2.io.CollectionCompat._ -import java.nio.file.attribute.FileTime private[file] trait FilesPlatform[F[_]] extends DeprecatedFilesApi[F] { self: Files[F] => @@ -91,7 +92,8 @@ private[file] trait FilesCompanionPlatform { private case class NioFileKey(value: AnyRef) extends FileKey private final class AsyncFiles[F[_]](protected implicit val F: Async[F]) - extends Files.UnsealedFiles[F] { + extends Files.UnsealedFiles[F] + with AsyncFilesPlatform[F] { def copy(source: Path, target: Path, flags: CopyFlags): F[Unit] = Sync[F].blocking { @@ -389,6 +391,140 @@ private[file] trait FilesCompanionPlatform { .resource(Resource.fromAutoCloseable(javaCollection)) .flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize)) + protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path] = { + val doWalk = Sync[F].interruptible { + val bldr = Vector.newBuilder[Path] + JFiles.walkFileTree( + start.toNioPath, + if (options.followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava, + options.maxDepth, + new SimpleFileVisitor[JPath] { + private def enqueue(path: JPath): FileVisitResult = { + bldr += Path.fromNioPath(path) + FileVisitResult.CONTINUE + } + + override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult = + if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(file) + + override def visitFileFailed(file: JPath, t: IOException): FileVisitResult = + t match { + case _: FileSystemLoopException => + if (options.allowCycles) enqueue(file) else throw t + case _ => FileVisitResult.CONTINUE + } + + override def preVisitDirectory( + dir: JPath, + attrs: JBasicFileAttributes + ): FileVisitResult = + if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(dir) + + override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult = + if (Thread.interrupted()) FileVisitResult.TERMINATE else FileVisitResult.CONTINUE + } + ) + Chunk.from(bldr.result()) + } + Stream.eval(doWalk).flatMap(Stream.chunk) + } + + private case class WalkEntry( + path: Path, + attr: JBasicFileAttributes, + depth: Int, + ancestry: List[Either[Path, NioFileKey]] + ) + + protected def walkJustInTime( + start: Path, + options: WalkOptions + ): Stream[F, Path] = { + import scala.collection.immutable.Queue + + def loop(toWalk0: Queue[WalkEntry]): Stream[F, Path] = { + val partialWalk = Sync[F].interruptible { + var acc = Vector.empty[Path] + var toWalk = toWalk0 + + while (acc.size < options.chunkSize && toWalk.nonEmpty && !Thread.interrupted()) { + val entry = toWalk.head + toWalk = toWalk.drop(1) + acc = acc :+ entry.path + if (entry.depth < options.maxDepth) { + val dir = + if (entry.attr.isDirectory) entry.path + else if (options.followLinks && entry.attr.isSymbolicLink) { + try { + val targetAttr = + JFiles.readAttributes(entry.path.toNioPath, classOf[JBasicFileAttributes]) + val fileKey = Option(targetAttr.fileKey).map(NioFileKey(_)) + val isCycle = entry.ancestry.exists { + case Right(ancestorKey) => + fileKey.contains(ancestorKey) + case Left(ancestorPath) => + JFiles.isSameFile(entry.path.toNioPath, ancestorPath.toNioPath) + } + if (isCycle) + if (options.allowCycles) null + else throw new FileSystemLoopException(entry.path.toString) + else entry.path + } catch { + case t: FileSystemLoopException => throw t + case NonFatal(_) => null + } + } else null + if (dir ne null) { + try { + val listing = JFiles.list(dir.toNioPath) + try { + val descendants = listing.iterator.asScala.flatMap { p => + try + Some( + WalkEntry( + Path.fromNioPath(p), + JFiles.readAttributes( + p, + classOf[JBasicFileAttributes], + LinkOption.NOFOLLOW_LINKS + ), + entry.depth + 1, + Option(entry.attr.fileKey) + .map(NioFileKey(_)) + .toRight(entry.path) :: entry.ancestry + ) + ) + catch { + case NonFatal(_) => None + } + } + toWalk = Queue.empty ++ descendants ++ toWalk + } finally listing.close() + } catch { + case NonFatal(_) => () + } + } + } + } + + Stream.chunk(Chunk.from(acc)) ++ (if (toWalk.isEmpty) Stream.empty else loop(toWalk)) + } + Stream.eval(partialWalk).flatten + } + + Stream + .eval(Sync[F].interruptible { + WalkEntry( + start, + JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes]), + 0, + Nil + ) + }) + .mask + .flatMap(w => loop(Queue(w))) + } + def createWatcher: Resource[F, Watcher[F]] = Watcher.default(this, F) def watch( diff --git a/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala b/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala index d67248ed66..559328084d 100644 --- a/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala +++ b/io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala @@ -30,6 +30,7 @@ import java.nio.file.{Files => JFiles, Path => JPath, _} import java.nio.file.attribute.{BasicFileAttributes => JBasicFileAttributes} import scala.concurrent.duration._ +import scala.util.control.NonFatal trait BaseFileSuite extends Fs2Suite { @@ -77,11 +78,15 @@ trait BaseFileSuite extends Fs2Suite { dir.toNioPath, new SimpleFileVisitor[JPath] { override def visitFile(path: JPath, attrs: JBasicFileAttributes) = { - JFiles.delete(path) + try JFiles.deleteIfExists(path) + catch { case NonFatal(_) => () } FileVisitResult.CONTINUE } + override def visitFileFailed(path: JPath, e: IOException) = + FileVisitResult.CONTINUE override def postVisitDirectory(path: JPath, e: IOException) = { - JFiles.delete(path) + try JFiles.deleteIfExists(path) + catch { case NonFatal(_) => () } FileVisitResult.CONTINUE } } diff --git a/io/jvm/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala b/io/jvm/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala new file mode 100644 index 0000000000..894a03e91d --- /dev/null +++ b/io/jvm/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package file + +private[file] trait AsyncFilesPlatform[F[_]] { self: Files.UnsealedFiles[F] => + + override def walk( + start: Path, + options: WalkOptions + ): Stream[F, Path] = + if (options.chunkSize == Int.MaxValue) walkEager(start, options) + else walkJustInTime(start, options) + + protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path] + + protected def walkJustInTime( + start: Path, + options: WalkOptions + ): Stream[F, Path] +} diff --git a/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala b/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala new file mode 100644 index 0000000000..d6a8bb71ec --- /dev/null +++ b/io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package file + +import cats.effect.IO +import java.io.File +import scala.concurrent.duration.* + +class WalkBenchmark extends Fs2IoSuite { + + override def munitIOTimeout = 5.minutes + + private var target: Path = _ + + override def beforeAll() = { + super.beforeAll() + val file = File.createTempFile("fs2-benchmarks-", "-walk") + file.delete() + file.mkdir() + target = Path(file.toString) + + val MaxDepth = 7 + val Names = 'A'.to('E').toList.map(_.toString) + + def loop(cwd: File, depth: Int): Unit = + if (depth < MaxDepth) { + Names.foreach { name => + val sub = new File(cwd, name) + sub.mkdir() + loop(sub, depth + 1) + } + } else if (depth == MaxDepth) { + Names.foreach { name => + val sub = new File(cwd, name) + sub.createNewFile() + loop(sub, depth + 1) + } + } + + loop(file, 0) + } + + def time[A](f: => A): FiniteDuration = { + val start = System.nanoTime() + val _ = f + (System.nanoTime() - start).nanos + } + + test("Files.walk has similar performance to java.nio.file.Files.walk") { + val fs2Time = time( + Files[IO] + .walk(target) + .compile + .count + .unsafeRunSync() + ) + val fs2EagerTime = time( + Files[IO] + .walk(target, WalkOptions.Eager) + .compile + .count + .unsafeRunSync() + ) + val nioTime = time(java.nio.file.Files.walk(target.toNioPath).count()) + val epsilon = nioTime.toNanos * 1.5 + println(s"fs2 took: ${fs2Time.toMillis} ms") + println(s"fs2 eager took: ${fs2EagerTime.toMillis} ms") + println(s"nio took: ${nioTime.toMillis} ms") + assert( + (fs2Time - nioTime).toNanos.abs < epsilon, + s"fs2 time: $fs2Time, nio time: $nioTime, diff: ${fs2Time - nioTime}" + ) + } + + test("walk is interruptible") { + val elapsed = time( + Files[IO] + .walk(target) + .interruptAfter(1.second) + .compile + .count + .unsafeRunSync() + ) + assert(elapsed < 1250.milliseconds) + } + + test("walk eager is interruptible") { + val elapsed = time( + Files[IO] + .walk(target, WalkOptions.Eager) + .interruptAfter(1.second) + .compile + .count + .unsafeRunSync() + ) + assert(elapsed < 1250.milliseconds) + } +} diff --git a/io/native/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala b/io/native/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala new file mode 100644 index 0000000000..b5acbbe6af --- /dev/null +++ b/io/native/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package file + +private[file] trait AsyncFilesPlatform[F[_]] { self: Files.UnsealedFiles[F] => + override def walk( + start: Path, + options: WalkOptions + ): Stream[F, Path] = + // Disable eager walks until https://github.com/scala-native/scala-native/issues/3744 + walkJustInTime(start, options) + + protected def walkJustInTime( + start: Path, + options: WalkOptions + ): Stream[F, Path] +} diff --git a/io/shared/src/main/scala/fs2/io/file/Files.scala b/io/shared/src/main/scala/fs2/io/file/Files.scala index 61f3bb1c36..0f1f3ae2c0 100644 --- a/io/shared/src/main/scala/fs2/io/file/Files.scala +++ b/io/shared/src/main/scala/fs2/io/file/Files.scala @@ -375,11 +375,23 @@ sealed trait Files[F[_]] extends FilesPlatform[F] { /** Creates a stream of paths contained in a given file tree. Depth is unlimited. */ def walk(start: Path): Stream[F, Path] = - walk(start, Int.MaxValue, false) + walk(start, WalkOptions.Default) + + /** Creates a stream of paths contained in a given file tree. + * + * The `options` parameter allows for customizing the walk behavior. The `WalkOptions` + * type provides both `WalkOptions.Default` and `WalkOptions.Eager` as starting points, + * and further customizations can be specified via methods on the returned options value. + * For example, to eagerly walk a directory while following symbolic links, emitting all + * paths as a single chunk, use `walk(start, WalkOptions.Eager.withFollowLinks(true))`. + */ + def walk(start: Path, options: WalkOptions): Stream[F, Path] /** Creates a stream of paths contained in a given file tree down to a given depth. */ - def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] + @deprecated("Use walk(start, WalkOptions.Default.withMaxDepth(..).withFollowLinks(..))", "3.10") + def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = + walk(start, WalkOptions.Default) /** Writes all data to the file at the specified path. * @@ -505,7 +517,7 @@ object Files extends FilesCompanionPlatform with FilesLowPriority { case _: NoSuchFileException => () }) - def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = { + def walk(start: Path, options: WalkOptions): Stream[F, Path] = { def go(start: Path, maxDepth: Int, ancestry: List[Either[Path, FileKey]]): Stream[F, Path] = Stream.emit(start) ++ { @@ -516,7 +528,7 @@ object Files extends FilesCompanionPlatform with FilesLowPriority { list(start).mask.flatMap { path => go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry) } - else if (attr.isSymbolicLink && followLinks) + else if (attr.isSymbolicLink && options.followLinks) Stream.eval(getBasicFileAttributes(start, followLinks = true)).mask.flatMap { attr => val fileKey = attr.fileKey @@ -530,6 +542,8 @@ object Files extends FilesCompanionPlatform with FilesLowPriority { list(start).mask.flatMap { path => go(path, maxDepth - 1, attr.fileKey.toRight(start) :: ancestry) } + else if (options.allowCycles) + Stream.empty else Stream.raiseError(new FileSystemLoopException(start.toString)) } @@ -540,7 +554,13 @@ object Files extends FilesCompanionPlatform with FilesLowPriority { } } - Stream.eval(getBasicFileAttributes(start, followLinks)) >> go(start, maxDepth, Nil) + Stream.eval(getBasicFileAttributes(start, options.followLinks)) >> go( + start, + options.maxDepth, + Nil + ) + .chunkN(options.chunkSize) + .flatMap(Stream.chunk) } def writeAll( diff --git a/io/shared/src/main/scala/fs2/io/file/WalkOptions.scala b/io/shared/src/main/scala/fs2/io/file/WalkOptions.scala new file mode 100644 index 0000000000..1a05c28709 --- /dev/null +++ b/io/shared/src/main/scala/fs2/io/file/WalkOptions.scala @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package io +package file + +/** Options that customize a filesystem walk via `Files[F].walk`. */ +sealed trait WalkOptions { + + /** Size of chunks emitted from the walk. + * + * Implementations *may* use this for optimization, batching file system operations. + * + * A chunk size of 1 hints to the implementation to use the maximally laziness in + * file system access, emitting a single path at a time. + * + * A chunk size of `Int.MaxValue` hints to the implementation to perform all file system + * operations at once, emitting a single chunk with all paths. + */ + def chunkSize: Int + + /** Maximum depth to walk. A value of 0 results in emitting just the starting path. + * A value of 1 results in emitting the starting path and all direct descendants. + */ + def maxDepth: Int + + /** Indicates whether links are followed during the walk. If false, the path of + * each link is emitted. If true, links are followed and their contents are emitted. + */ + def followLinks: Boolean + + /** Indicates whether to allow cycles when following links. If true, any link causing a + * cycle is emitted as the link path. If false, a cycle results in walk failing with a `FileSystemLoopException`. + */ + def allowCycles: Boolean + + /** Returns a new `WalkOptions` with the specified chunk size. */ + def withChunkSize(chunkSize: Int): WalkOptions + + /** Returns a new `WalkOptions` with the specified max depth. */ + def withMaxDepth(maxDepth: Int): WalkOptions + + /** Returns a new `WalkOptions` with the specified value for `followLinks`. */ + def withFollowLinks(value: Boolean): WalkOptions + + /** Returns a new `WalkOptions` with the specified value for `allowCycles`. */ + def withAllowCycles(value: Boolean): WalkOptions +} + +object WalkOptions { + private case class DefaultWalkOptions( + chunkSize: Int, + maxDepth: Int, + followLinks: Boolean, + allowCycles: Boolean + ) extends WalkOptions { + def withChunkSize(chunkSize: Int): WalkOptions = copy(chunkSize = chunkSize) + def withMaxDepth(maxDepth: Int): WalkOptions = copy(maxDepth = maxDepth) + def withFollowLinks(value: Boolean): WalkOptions = copy(followLinks = value) + def withAllowCycles(value: Boolean): WalkOptions = copy(allowCycles = value) + override def toString = + s"WalkOptions(chunkSize = $chunkSize, maxDepth = $maxDepth, followLinks = $followLinks, allowCycles = $allowCycles)" + } + + /** Default walk options, using a large chunk size, unlimited depth, and no link following. */ + val Default: WalkOptions = DefaultWalkOptions(4096, Int.MaxValue, false, false) + + /** Like `Default` but uses the maximum chunk size, hinting the implementation should perform all file system operations before emitting any paths. */ + val Eager: WalkOptions = Default.withChunkSize(Int.MaxValue) + + /** Like `Default` but uses the minimum chunk size, hinting the implementation should perform minumum number of file system operations before emitting each path. */ + val Lazy: WalkOptions = Default.withChunkSize(1) +} diff --git a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala index bc0048b22b..bc1359373b 100644 --- a/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala +++ b/io/shared/src/test/scala/fs2/io/file/FilesSuite.scala @@ -568,10 +568,9 @@ class FilesSuite extends Fs2IoSuite with BaseFileSuite { Stream .resource(tempFilesHierarchy) .flatMap(topDir => Files[IO].walk(topDir)) - .map(_ => 1) .compile - .foldMonoid - .assertEquals(31) // the root + 5 children + 5 files per child directory + .count + .assertEquals(31L) // the root + 5 children + 5 files per child directory } test("can delete files in a nested tree") { @@ -591,6 +590,122 @@ class FilesSuite extends Fs2IoSuite with BaseFileSuite { .foldMonoid .assertEquals(25) } + + test("maxDepth = 0") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withMaxDepth(0))) + .compile + .count + .assertEquals(1L) // the root + } + + test("maxDepth = 1") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withMaxDepth(1))) + .compile + .count + .assertEquals(6L) // the root + 5 children + } + + test("maxDepth = 1 / eager") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Eager.withMaxDepth(1))) + .compile + .count + .assertEquals(6L) // the root + 5 children + } + + test("maxDepth = 2") { + Stream + .resource(tempFilesHierarchy) + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withMaxDepth(2))) + .compile + .count + .assertEquals(31L) // the root + 5 children + 5 files per child directory + } + + test("followLinks = true") { + Stream + .resource((tempFilesHierarchy, tempFilesHierarchy).tupled) + .evalMap { case (topDir, secondDir) => + Files[IO].createSymbolicLink(topDir / "link", secondDir).as(topDir) + } + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withFollowLinks(true))) + .compile + .count + .assertEquals(31L * 2) + } + + test("followLinks = false") { + Stream + .resource((tempFilesHierarchy, tempFilesHierarchy).tupled) + .evalMap { case (topDir, secondDir) => + Files[IO].createSymbolicLink(topDir / "link", secondDir).as(topDir) + } + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default)) + .compile + .count + .assertEquals(32L) + } + + test("followLinks with cycle") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => Files[IO].walk(topDir, WalkOptions.Default.withFollowLinks(true))) + .compile + .count + .intercept[FileSystemLoopException] + } + + test("followLinks with cycle / eager") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => + Files[IO] + .walk(topDir, WalkOptions.Eager.withFollowLinks(true)) + ) + .compile + .count + .intercept[FileSystemLoopException] + } + + test("followLinks with cycle / cycles allowed") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => + Files[IO].walk(topDir, WalkOptions.Default.withFollowLinks(true).withAllowCycles(true)) + ) + .compile + .count + .assertEquals(32L) + } + + test("followLinks with cycle / eager / cycles allowed") { + Stream + .resource(tempFilesHierarchy) + .evalTap { topDir => + Files[IO].createSymbolicLink(topDir / "link", topDir) + } + .flatMap(topDir => + Files[IO] + .walk(topDir, WalkOptions.Eager.withFollowLinks(true).withAllowCycles(true)) + ) + .compile + .count + .assertEquals(32L) + } } test("writeRotate") {