Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of Files.walk on the JVM #3383

Merged
merged 12 commits into from
Feb 16, 2024
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._

Global / onChangedBuildSource := ReloadOnSourceChanges

ThisBuild / tlBaseVersion := "3.9"
ThisBuild / tlBaseVersion := "3.10"

ThisBuild / organization := "co.fs2"
ThisBuild / organizationName := "Functional Streams for Scala"
Expand Down
2 changes: 1 addition & 1 deletion io/js/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private[fs2] trait FilesCompanionPlatform {
)
).adaptError { case IOException(ex) => ex }
else
walk(path, Int.MaxValue, true).evalTap(deleteIfExists).compile.drain
walk(path, WalkOptions.Default.withFollowLinks(true)).evalTap(deleteIfExists).compile.drain

override def exists(path: Path, followLinks: Boolean): F[Boolean] =
(if (followLinks)
Expand Down
144 changes: 140 additions & 4 deletions io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import cats.effect.kernel.{Async, Resource, Sync}
import cats.syntax.all._

import java.nio.channels.{FileChannel, SeekableByteChannel}
import java.nio.file.{Files => JFiles, Path => JPath, _}
import java.nio.file.{Files => JFiles, Path => JPath, FileSystemLoopException => _, _}
import java.nio.file.attribute.{
BasicFileAttributeView,
BasicFileAttributes => JBasicFileAttributes,
PosixFileAttributes => JPosixFileAttributes,
PosixFilePermissions
PosixFilePermissions,
FileTime
}
import java.security.Principal
import java.util.stream.{Stream => JStream}

import scala.concurrent.duration._
import scala.util.control.NonFatal

import fs2.io.CollectionCompat._
import java.nio.file.attribute.FileTime

private[file] trait FilesPlatform[F[_]] extends DeprecatedFilesApi[F] { self: Files[F] =>

Expand Down Expand Up @@ -91,7 +92,8 @@ private[file] trait FilesCompanionPlatform {
private case class NioFileKey(value: AnyRef) extends FileKey

private final class AsyncFiles[F[_]](protected implicit val F: Async[F])
extends Files.UnsealedFiles[F] {
extends Files.UnsealedFiles[F]
with AsyncFilesPlatform[F] {

def copy(source: Path, target: Path, flags: CopyFlags): F[Unit] =
Sync[F].blocking {
Expand Down Expand Up @@ -389,6 +391,140 @@ private[file] trait FilesCompanionPlatform {
.resource(Resource.fromAutoCloseable(javaCollection))
.flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize))

protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path] = {
val doWalk = Sync[F].interruptible {
val bldr = Vector.newBuilder[Path]
JFiles.walkFileTree(
start.toNioPath,
if (options.followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
options.maxDepth,
new SimpleFileVisitor[JPath] {
private def enqueue(path: JPath): FileVisitResult = {
bldr += Path.fromNioPath(path)
FileVisitResult.CONTINUE
}

override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(file)

override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
t match {
case _: FileSystemLoopException =>
if (options.allowCycles) enqueue(file) else throw t
case _ => FileVisitResult.CONTINUE
}

override def preVisitDirectory(
dir: JPath,
attrs: JBasicFileAttributes
): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(dir)

override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
if (Thread.interrupted()) FileVisitResult.TERMINATE else FileVisitResult.CONTINUE
}
)
Chunk.from(bldr.result())
}
Stream.eval(doWalk).flatMap(Stream.chunk)
}

private case class WalkEntry(
path: Path,
attr: JBasicFileAttributes,
depth: Int,
ancestry: List[Either[Path, NioFileKey]]
)

protected def walkJustInTime(
start: Path,
options: WalkOptions
): Stream[F, Path] = {
import scala.collection.immutable.Queue

def loop(toWalk0: Queue[WalkEntry]): Stream[F, Path] = {
val partialWalk = Sync[F].interruptible {
var acc = Vector.empty[Path]
var toWalk = toWalk0

while (acc.size < options.chunkSize && toWalk.nonEmpty && !Thread.interrupted()) {
val entry = toWalk.head
toWalk = toWalk.drop(1)
acc = acc :+ entry.path
if (entry.depth < options.maxDepth) {
val dir =
if (entry.attr.isDirectory) entry.path
else if (options.followLinks && entry.attr.isSymbolicLink) {
try {
val targetAttr =
JFiles.readAttributes(entry.path.toNioPath, classOf[JBasicFileAttributes])
val fileKey = Option(targetAttr.fileKey).map(NioFileKey(_))
val isCycle = entry.ancestry.exists {
case Right(ancestorKey) =>
fileKey.contains(ancestorKey)
case Left(ancestorPath) =>
JFiles.isSameFile(entry.path.toNioPath, ancestorPath.toNioPath)
}
if (isCycle)
if (options.allowCycles) null
else throw new FileSystemLoopException(entry.path.toString)
else entry.path
} catch {
case t: FileSystemLoopException => throw t
case NonFatal(_) => null
}
} else null
if (dir ne null) {
try {
val listing = JFiles.list(dir.toNioPath)
try {
val descendants = listing.iterator.asScala.flatMap { p =>
try
Some(
WalkEntry(
Path.fromNioPath(p),
JFiles.readAttributes(
p,
classOf[JBasicFileAttributes],
LinkOption.NOFOLLOW_LINKS
),
entry.depth + 1,
Option(entry.attr.fileKey)
.map(NioFileKey(_))
.toRight(entry.path) :: entry.ancestry
)
)
catch {
case NonFatal(_) => None
}
}
toWalk = Queue.empty ++ descendants ++ toWalk
} finally listing.close()
} catch {
case NonFatal(_) => ()
}
}
}
}

Stream.chunk(Chunk.from(acc)) ++ (if (toWalk.isEmpty) Stream.empty else loop(toWalk))
}
Stream.eval(partialWalk).flatten
}

Stream
.eval(Sync[F].interruptible {
WalkEntry(
start,
JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes]),
0,
Nil
)
})
.mask
.flatMap(w => loop(Queue(w)))
}

def createWatcher: Resource[F, Watcher[F]] = Watcher.default(this, F)

def watch(
Expand Down
9 changes: 7 additions & 2 deletions io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import java.nio.file.{Files => JFiles, Path => JPath, _}
import java.nio.file.attribute.{BasicFileAttributes => JBasicFileAttributes}

import scala.concurrent.duration._
import scala.util.control.NonFatal

trait BaseFileSuite extends Fs2Suite {

Expand Down Expand Up @@ -77,11 +78,15 @@ trait BaseFileSuite extends Fs2Suite {
dir.toNioPath,
new SimpleFileVisitor[JPath] {
override def visitFile(path: JPath, attrs: JBasicFileAttributes) = {
JFiles.delete(path)
try JFiles.deleteIfExists(path)
catch { case NonFatal(_) => () }
FileVisitResult.CONTINUE
}
override def visitFileFailed(path: JPath, e: IOException) =
FileVisitResult.CONTINUE
override def postVisitDirectory(path: JPath, e: IOException) = {
JFiles.delete(path)
try JFiles.deleteIfExists(path)
catch { case NonFatal(_) => () }
FileVisitResult.CONTINUE
}
}
Expand Down
41 changes: 41 additions & 0 deletions io/jvm/src/main/scala/fs2/io/file/AsyncFilesPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io
package file

private[file] trait AsyncFilesPlatform[F[_]] { self: Files.UnsealedFiles[F] =>

override def walk(
start: Path,
options: WalkOptions
): Stream[F, Path] =
if (options.chunkSize == Int.MaxValue) walkEager(start, options)
else walkJustInTime(start, options)

protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path]

protected def walkJustInTime(
start: Path,
options: WalkOptions
): Stream[F, Path]
}
119 changes: 119 additions & 0 deletions io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io
package file

import cats.effect.IO
import java.io.File
import scala.concurrent.duration.*

class WalkBenchmark extends Fs2IoSuite {

override def munitIOTimeout = 5.minutes

private var target: Path = _

override def beforeAll() = {
super.beforeAll()
val file = File.createTempFile("fs2-benchmarks-", "-walk")
file.delete()
file.mkdir()
target = Path(file.toString)

val MaxDepth = 7
val Names = 'A'.to('E').toList.map(_.toString)

def loop(cwd: File, depth: Int): Unit =
if (depth < MaxDepth) {
Names.foreach { name =>
val sub = new File(cwd, name)
sub.mkdir()
loop(sub, depth + 1)
}
} else if (depth == MaxDepth) {
Names.foreach { name =>
val sub = new File(cwd, name)
sub.createNewFile()
loop(sub, depth + 1)
}
}

loop(file, 0)
}

def time[A](f: => A): FiniteDuration = {
val start = System.nanoTime()
val _ = f
(System.nanoTime() - start).nanos
}

test("Files.walk has similar performance to java.nio.file.Files.walk") {
val fs2Time = time(
Files[IO]
.walk(target)
.compile
.count
.unsafeRunSync()
)
val fs2EagerTime = time(
Files[IO]
.walk(target, WalkOptions.Eager)
.compile
.count
.unsafeRunSync()
)
val nioTime = time(java.nio.file.Files.walk(target.toNioPath).count())
val epsilon = nioTime.toNanos * 1.5
println(s"fs2 took: ${fs2Time.toMillis} ms")
println(s"fs2 eager took: ${fs2EagerTime.toMillis} ms")
println(s"nio took: ${nioTime.toMillis} ms")
assert(
(fs2Time - nioTime).toNanos.abs < epsilon,
s"fs2 time: $fs2Time, nio time: $nioTime, diff: ${fs2Time - nioTime}"
)
}

test("walk is interruptible") {
val elapsed = time(
Files[IO]
.walk(target)
.interruptAfter(1.second)
.compile
.count
.unsafeRunSync()
)
assert(elapsed < 1250.milliseconds)
}

test("walk eager is interruptible") {
val elapsed = time(
Files[IO]
.walk(target, WalkOptions.Eager)
.interruptAfter(1.second)
.compile
.count
.unsafeRunSync()
)
assert(elapsed < 1250.milliseconds)
}
}
Loading