Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of Files.walk on the JVM #3383

Merged
merged 12 commits into from
Feb 16, 2024
54 changes: 52 additions & 2 deletions io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package io
package file

import cats.effect.kernel.{Async, Resource, Sync}
import cats.effect.std.Dispatcher
import cats.syntax.all._

import java.nio.channels.{FileChannel, SeekableByteChannel}
Expand All @@ -32,15 +33,16 @@ import java.nio.file.attribute.{
BasicFileAttributeView,
BasicFileAttributes => JBasicFileAttributes,
PosixFileAttributes => JPosixFileAttributes,
PosixFilePermissions
PosixFilePermissions,
FileTime
}
import java.security.Principal
import java.util.stream.{Stream => JStream}

import scala.concurrent.duration._

import fs2.concurrent.Channel
import fs2.io.CollectionCompat._
import java.nio.file.attribute.FileTime

private[file] trait FilesPlatform[F[_]] extends DeprecatedFilesApi[F] { self: Files[F] =>

Expand Down Expand Up @@ -389,6 +391,54 @@ private[file] trait FilesCompanionPlatform {
.resource(Resource.fromAutoCloseable(javaCollection))
.flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize))

override def walk(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] =
Stream.resource(Dispatcher.sequential[F]).flatMap { dispatcher =>
Stream.eval(Channel.bounded[F, Chunk[Path]](10)).flatMap { channel =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw @armanbilge one thing that occurs to me is that our fancy new unsafe queue thing isn't going to help very much if someone's using Channel.

val doWalk = Sync[F].interruptibleMany {
val bldr = Vector.newBuilder[Path]
val limit = 4096
var size = 0
bldr.sizeHint(limit)
JFiles.walkFileTree(
start.toNioPath,
if (followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
maxDepth,
new SimpleFileVisitor[JPath] {
private def enqueue(path: JPath): FileVisitResult = {
bldr += Path.fromNioPath(path)
size += 1
if (size >= limit) {
val result = dispatcher.unsafeRunSync(channel.send(Chunk.from(bldr.result())))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wish there were a way to suspend the visitation and continue it later. That would allow us to avoid the unsafeRunSync here and use unsafeRunAndForget instead, likely bouncing out of the interruptible once every n enqueues and passing through a Stream#append in order to preserve backpressure.

Is walkFileTree meaningfully faster than just doing the traversal by hand?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even eagerly collecting everything is only 5% faster than the channel based solution (using the 4096 limit):

def walkEager(start: Path, maxDepth: Int, followLinks: Boolean): Stream[F, Path] = {
    val doWalk = Sync[F].interruptibleMany {
      val bldr = Vector.newBuilder[Path]
      JFiles.walkFileTree(
        start.toNioPath,
        if (followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
        maxDepth,
        new SimpleFileVisitor[JPath] {
          private def enqueue(path: JPath): FileVisitResult = {
            bldr += Path.fromNioPath(path)
            FileVisitResult.CONTINUE
          }

          override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
            enqueue(file)

          override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
            FileVisitResult.CONTINUE

          override def preVisitDirectory(dir: JPath, attrs: JBasicFileAttributes): FileVisitResult =
            enqueue(dir)

          override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
            FileVisitResult.CONTINUE
        }
      )
      Chunk.from(bldr.result())
    }
    Stream.eval(doWalk).flatMap(Stream.chunk)
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that's wild honestly. Have to ponder that. It's nice that we can just be lazy about our thread blocking though, since it simplifies this stuff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djspiewak BTW, there's a bunch of performance hackery in the JDK's file walking that's not (directly) available to us if we implement our own walk. For example, Path can cache file attributes avoiding some filesystem calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to close this out, I tried this prototype:

  override def walk(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] =
    walkJustInTime(start, maxDepth, followLinks, chunkSize)
    // if (chunkSize == Int.MaxValue) walkEager(start, maxDepth, followLinks)
    // else walkLazy(start, maxDepth, followLinks, chunkSize)

  private def walkJustInTime(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] = {

    def loop(acc: Vector[Path], toWalk: Vector[Path]): Stream[F, Path] = {
      if (toWalk.isEmpty) {
        Stream.chunk(Chunk.from(acc))
      } else {
        val path = toWalk.head

        val (toEmit, newAcc) =
          if (acc.size + 1 >= chunkSize)
            (Chunk.from(acc :+ path), Vector.empty)
          else (Chunk.empty, acc :+ path)

        val list = Sync[F].interruptibleMany {
          val npath = path.toNioPath
          if (JFiles.isDirectory(npath)) {
            val listing = JFiles.list(npath)
            try listing.iterator.asScala.map(Path.fromNioPath).toVector
            finally listing.close()
          }
          else Vector.empty
        }

        Stream.chunk(toEmit) ++ Stream.eval(list).flatMap(descendants => loop(newAcc, toWalk.drop(1) ++ descendants))
      }
    }

    loop(Vector.empty, Vector(start))
  }

Using MaxDepth = 7, I got these results:

fs2 took: 16070 ms
fs2 eager took: 13935 ms
nio took: 6356 ms

Whereas the implementation in this PR results in:

fs2 took: 8000 ms
fs2 eager took: 5975 ms
nio took: 6858 ms

Copy link
Member Author

@mpilquist mpilquist Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a better prototype that does file attribute reading at the time of directory listing.

asdf  private def walkJustInTime(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] = {

    def loop(acc: Vector[Path], toWalk: Vector[(Path, JBasicFileAttributes)]): Stream[F, Path] = {
      if (toWalk.isEmpty) {
        Stream.chunk(Chunk.from(acc))
      } else {
        val (path, attr) = toWalk.head

        val (toEmit, newAcc) =
          if (acc.size + 1 >= chunkSize)
            (Chunk.from(acc :+ path), Vector.empty)
          else (Chunk.empty, acc :+ path)

        if (attr.isDirectory) {
          val list = Sync[F].interruptibleMany {
            val listing = JFiles.list(path.toNioPath)
            try listing.iterator.asScala.map(p => 
              (Path.fromNioPath(p), JFiles.readAttributes(p, classOf[JBasicFileAttributes]))).toVector
            finally listing.close()
          }
          Stream.chunk(toEmit) ++ Stream.eval(list).flatMap(descendants => loop(newAcc, toWalk.drop(1) ++ descendants))
        } else Stream.chunk(toEmit) ++ loop(newAcc, toWalk.drop(1))
      }
    }

    Stream.eval(Sync[F].interruptibleMany {
      start -> JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes])
    }).flatMap { s => loop(Vector.empty, Vector(s)) }
  }

Performs better but still doesn't beat the walkFileTree solution:

fs2 took: 10399 ms
fs2 eager took: 8843 ms
nio took: 7202 ms

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, maybe we should switch to a version based on this:

  private def walkJustInTime(
      start: Path,
      maxDepth: Int,
      followLinks: Boolean,
      chunkSize: Int
  ): Stream[F, Path] = {
    import scala.collection.immutable.Queue

    def loop(toWalk0: Queue[(Path, JBasicFileAttributes)]): Stream[F, Path] = {
      val partialWalk = Sync[F].interruptibleMany {
        var acc = Vector.empty[Path]
        var toWalk = toWalk0

        while (acc.size < chunkSize && toWalk.nonEmpty) {
          val (path, attr) = toWalk.head
          toWalk = toWalk.drop(1)
          acc = acc :+ path
          if (attr.isDirectory) {
            val listing = JFiles.list(path.toNioPath)
            try {
              val descendants = listing.iterator.asScala.map(p => 
                (Path.fromNioPath(p), JFiles.readAttributes(p, classOf[JBasicFileAttributes]))).toVector
              toWalk = toWalk ++ descendants
            }
            finally listing.close()              
          }
            
        }

        Stream.chunk(Chunk.from(acc)) ++ (if (toWalk.isEmpty) Stream.empty else loop(toWalk))
      }
      Stream.eval(partialWalk).flatten
   }

    Stream.eval(Sync[F].interruptibleMany {
      start -> JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes])
    }).flatMap(s => loop(Queue(s)))
  }
fs2 took: 9312 ms
fs2 eager took: 8538 ms
nio took: 7769 ms

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically what we're trying to figure out is whether it's worth eating 9% overhead to avoid blocking a thread which is already getting blocked by filesystem I/O? My guess is that it's not worth it but I shall ponder a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a new version:

fs2 took: 8131 ms
fs2 eager took: 5950 ms
nio took: 7346 ms

I'd like to add some tests for symbolic link following & max depth limits (we don't have any now). Then this PR should be good.

bldr.clear()
size = 0
if (result.isRight) FileVisitResult.CONTINUE else FileVisitResult.TERMINATE
} else FileVisitResult.CONTINUE
}

override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
enqueue(file)

override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
FileVisitResult.CONTINUE

override def preVisitDirectory(dir: JPath, attrs: JBasicFileAttributes)
: FileVisitResult =
enqueue(dir)

override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
FileVisitResult.CONTINUE
}
)

dispatcher.unsafeRunSync(
if (size > 0) channel.closeWithElement(Chunk.from(bldr.result()))
else channel.close
)
}
channel.stream.unchunks.concurrently(Stream.eval(doWalk))
}
}

def createWatcher: Resource[F, Watcher[F]] = Watcher.default(this, F)

def watch(
Expand Down
30 changes: 16 additions & 14 deletions io/jvm/src/test/scala/fs2/io/file/WalkBenchmark.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
* Copyright (c) 2013 Functional Streams for Scala
*
Expand Down Expand Up @@ -33,7 +32,7 @@ class WalkBenchmark extends Fs2IoSuite {
private var target: Path = _

override def beforeAll() = {
super.beforeAll()
super.beforeAll()
val file = File.createTempFile("fs2-benchmarks-", "-walk")
file.delete()
file.mkdir()
Expand All @@ -42,21 +41,20 @@ class WalkBenchmark extends Fs2IoSuite {
val MaxDepth = 7
val Names = 'A'.to('E').toList.map(_.toString)

def loop(cwd: File, depth: Int): Unit = {
def loop(cwd: File, depth: Int): Unit =
if (depth < MaxDepth) {
Names foreach { name =>
Names.foreach { name =>
val sub = new File(cwd, name)
sub.mkdir()
loop(sub, depth + 1)
}
} else if (depth == MaxDepth) {
Names foreach { name =>
Names.foreach { name =>
val sub = new File(cwd, name)
sub.createNewFile()
loop(sub, depth + 1)
}
}
}

loop(file, 0)
}
Expand All @@ -67,17 +65,21 @@ class WalkBenchmark extends Fs2IoSuite {
(System.nanoTime() - start).nanos
}


test("Files.walk has similar performance to java.nio.file.Files.walk") {
val fs2Time = time(Files[IO]
.walk(target)
.compile
.count
.unsafeRunSync())
test("Files.walk has similar performance to java.nio.file.Files.walk") {
val fs2Time = time(
Files[IO]
.walk(target)
.compile
.count
.unsafeRunSync()
)
val nioTime = time(java.nio.file.Files.walk(target.toNioPath).count())
val epsilon = nioTime.toNanos * 1.5
println(s"fs2 took: ${fs2Time.toMillis} ms")
println(s"nio took: ${nioTime.toMillis} ms")
assert((fs2Time - nioTime).toNanos.abs < epsilon, s"fs2 time: $fs2Time, nio time: $nioTime, diff: ${fs2Time - nioTime}")
assert(
(fs2Time - nioTime).toNanos.abs < epsilon,
s"fs2 time: $fs2Time, nio time: $nioTime, diff: ${fs2Time - nioTime}"
)
}
}
Loading