Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 'fs.io.readInputStreamGeneric' overallocation of underlying buffers #3318

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.file.Files.openSeekableByteChannel"
),
// package-private method: #3318
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"fs2.io.package.readInputStreamGeneric"
)
)

Expand Down
4 changes: 2 additions & 2 deletions io/jvm/src/main/scala/fs2/io/ioplatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ private[fs2] trait ioplatform extends iojvmnative {
F.pure(System.in),
F.delay(new Array[Byte](bufSize)),
false
) { (is, buf) =>
) { (is, buf, off) =>
F.async[Int] { cb =>
F.delay {
val task: Runnable = () => cb(Right(is.read(buf)))
val task: Runnable = () => cb(Right(is.read(buf, off, buf.length - off)))
stdinExecutor.submit(task)
}.map { fut =>
Some(F.delay {
Expand Down
36 changes: 36 additions & 0 deletions io/jvm/src/test/scala/fs2/io/IoPlatformSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,42 @@ class IoPlatformSuite extends Fs2Suite {
// This suite runs for a long time, this avoids timeouts in CI.
override def munitIOTimeout: Duration = 2.minutes

group("readInputStream") {
test("reuses internal buffer on smaller chunks") {
forAllF { (bytes: Array[Byte], chunkSize0: Int) =>
val chunkSize = (chunkSize0 % 20).abs + 1
fs2.Stream
.chunk(Chunk.array(bytes))
.chunkN(chunkSize / 3 + 1)
.unchunks
.covary[IO]
// we know that '.toInputStream' reads by chunk
.through(fs2.io.toInputStream)
.flatMap(is => io.readInputStream(IO(is), chunkSize))
.chunks
.zipWithPrevious
.assertForall {
case (None, _) => true // skip first element
case (_, _: Chunk.Singleton[_]) => true // skip singleton bytes
case (Some(_: Chunk.Singleton[_]), _) => true // skip singleton bytes
case (Some(Chunk.ArraySlice(bs1, o1, l1)), Chunk.ArraySlice(bs2, o2, _)) =>
{
// if first slice buffer is not 'full'
(bs1.length != (o1 + l1)) &&
// we expect that next slice will wrap same buffer
((bs2 eq bs1) && (o2 == o1 + l1))
} || {
// if first slice buffer is 'full'
(bs2.length == (o1 + l1)) &&
// we expect new buffer allocated for next slice
((bs2 ne bs1) && (o2 == 0))
}
case _ => false // unexpected chunk subtype
}
}
}
}

group("readOutputStream") {
test("writes data and terminates when `f` returns") {
forAllF { (bytes: Array[Byte], chunkSize0: Int) =>
Expand Down
31 changes: 14 additions & 17 deletions io/shared/src/main/scala/fs2/io/io.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ package object io extends ioplatform {
fis,
F.delay(new Array[Byte](chunkSize)),
closeAfterUse
)((is, buf) => F.blocking(is.read(buf)))
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)))

private[io] def readInputStreamCancelable[F[_]](
fis: F[InputStream],
Expand All @@ -57,7 +57,7 @@ package object io extends ioplatform {
fis,
F.delay(new Array[Byte](chunkSize)),
closeAfterUse
)((is, buf) => F.blocking(is.read(buf)).cancelable(cancel))
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)).cancelable(cancel))

/** Reads all bytes from the specified `InputStream` with a buffer size of `chunkSize`.
* Set `closeAfterUse` to false if the `InputStream` should not be closed after use.
Expand All @@ -76,31 +76,28 @@ package object io extends ioplatform {
fis,
F.pure(new Array[Byte](chunkSize)),
closeAfterUse
)((is, buf) => F.blocking(is.read(buf)))
)((is, buf, off) => F.blocking(is.read(buf, off, buf.length - off)))

private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte])(
read: (InputStream, Array[Byte]) => F[Int]
private def readBytesFromInputStream[F[_]](is: InputStream, buf: Array[Byte], offset: Int)(
read: (InputStream, Array[Byte], Int) => F[Int]
)(implicit
F: Sync[F]
): F[Option[Chunk[Byte]]] =
read(is, buf).map { numBytes =>
): F[Option[(Chunk[Byte], Option[(Array[Byte], Int)])]] =
read(is, buf, offset).map { numBytes =>
if (numBytes < 0) None
else if (numBytes == 0) Some(Chunk.empty)
else if (numBytes < buf.size) Some(Chunk.array(buf, 0, numBytes))
else Some(Chunk.array(buf))
else if (offset + numBytes == buf.size) Some(Chunk.array(buf, offset, numBytes) -> None)
else Some(Chunk.array(buf, offset, numBytes) -> Some(buf -> (offset + numBytes)))
}

private[fs2] def readInputStreamGeneric[F[_]](
fis: F[InputStream],
buf: F[Array[Byte]],
closeAfterUse: Boolean
)(read: (InputStream, Array[Byte]) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = {
def useIs(is: InputStream) =
Stream
.eval(buf.flatMap(b => readBytesFromInputStream(is, b)(read)))
.repeat
.unNoneTerminate
.flatMap(c => Stream.chunk(c))
)(read: (InputStream, Array[Byte], Int) => F[Int])(implicit F: Sync[F]): Stream[F, Byte] = {
def useIs(is: InputStream) = Stream.unfoldChunkEval(Option.empty[(Array[Byte], Int)]) {
case None => buf.flatMap(b => readBytesFromInputStream(is, b, 0)(read))
case Some((b, offset)) => readBytesFromInputStream(is, b, offset)(read)
}

if (closeAfterUse)
Stream.bracket(fis)(is => Sync[F].blocking(is.close())).flatMap(useIs)
Expand Down