From f3fd1dbf784f6c29451d4c811f7d2ae291fdf442 Mon Sep 17 00:00:00 2001 From: regis-leray Date: Wed, 9 Oct 2024 20:52:54 -0400 Subject: [PATCH] 420 - improve readFile Stream finalization --- .github/workflows/site.yml | 6 +- build.sbt | 6 +- project/build.properties | 2 +- project/plugins.sbt | 2 +- .../src/main/scala/zio/ftp/UnsecureFtp.scala | 38 +++++++++--- .../ftp/UnsecureDownloadFinalizeSpec.scala | 49 --------------- .../zio/ftp/UnsecureFtpReadFileSpec.scala | 62 +++++++++++++++++++ .../test/scala/zio/ftp/UnsecureFtpSpec.scala | 10 +-- 8 files changed, 103 insertions(+), 72 deletions(-) delete mode 100644 zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala create mode 100644 zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala diff --git a/.github/workflows/site.yml b/.github/workflows/site.yml index 223121b5..810b6a10 100644 --- a/.github/workflows/site.yml +++ b/.github/workflows/site.yml @@ -28,11 +28,11 @@ jobs: java-version: 17 check-latest: true - name: Check if the README file is up to date - run: sbt docs/checkReadme + run: sbt docs/checkReadme - name: Check if the site workflow is up to date - run: sbt docs/checkGithubWorkflow + run: sbt docs/checkGithubWorkflow - name: Check artifacts build process - run: sbt +publishLocal + run: sbt +publishLocal - name: Check website build process run: sbt docs/clean; sbt docs/buildWebsite publish-docs: diff --git a/build.sbt b/build.sbt index 18fb6709..9e1b615d 100644 --- a/build.sbt +++ b/build.sbt @@ -27,7 +27,7 @@ addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") addCommandAlias("fix", "; all compile:scalafix test:scalafix; all scalafmtSbt scalafmtAll") -val zioVersion = "2.0.21" +val zioVersion = "2.1.6" lazy val root = project.in(file(".")).settings(publish / skip := true).aggregate(`zio-ftp`, docs) @@ -41,8 +41,8 @@ lazy val `zio-ftp` = project "dev.zio" %% "zio-streams" % zioVersion, ("dev.zio" %% "zio-nio" % "2.0.2").exclude("org.scala-lang.modules", "scala-collection-compat_2.13"), "com.hierynomus" % "sshj" % "0.39.0", - "commons-net" % "commons-net" % "3.10.0", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0", + "commons-net" % "commons-net" % "3.11.0", + "org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0", "org.apache.logging.log4j" % "log4j-api" % "2.24.0" % Test, "org.apache.logging.log4j" % "log4j-core" % "2.24.0" % Test, "org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.24.0" % Test, diff --git a/project/build.properties b/project/build.properties index f344c148..23f7d979 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version = 1.8.2 +sbt.version = 1.10.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 155a6dbc..cae16489 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.1") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") -addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10") +addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.7.0") addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.0") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.29") addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.5") diff --git a/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala b/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala index f7c045d1..04326a14 100644 --- a/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala +++ b/zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala @@ -16,11 +16,11 @@ package zio.ftp import java.io.IOException -import org.apache.commons.net.ftp.{ FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient } +import org.apache.commons.net.ftp.{FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient} import zio.ftp.UnsecureFtp.Client import zio.stream.ZStream -import zio.{ Scope, ZIO } -import zio.ZIO.{ acquireRelease, attemptBlockingIO } +import zio.{Ref, Scope, UIO, ZIO} +import zio.ZIO.{acquireRelease, attemptBlockingIO} /** * Unsecure Ftp client wrapper @@ -34,19 +34,37 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien execute(c => Option(c.mlistFile(path))).map(_.map(FtpResource.fromFtpFile(_))) def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = { + def error(cause: Option[Exception] = None): Left[FileTransferIncompleteError, Unit] = Left(FileTransferIncompleteError( + s"Cannot finalize the file transfer and completely read the entire file $path. ${cause.fold("")(_.getMessage)}" + )) + val success: Either[IOException, Unit] = Right(()) + val initialize = execute(_.setRestartOffset(fileOffset)) - val terminate = ZIO - .fail( - FileTransferIncompleteError(s"Cannot finalize the file transfer and completely read the entire file $path.") - ) - .unlessZIO(execute(_.completePendingCommand())) + def terminate(state: Ref[Either[IOException, Unit]]): UIO[Unit] = + execute(_.completePendingCommand()) + .foldZIO( + err => state.set(error(Some(err))), + resp => if(resp) state.set(success) else state.set(error()) + ).ignore + + def propagate(state: Ref[Either[IOException, Unit]]) = + ZStream.fromZIO(state.get).flatMap { + case Left(ex) => ZStream.fail(ex) + case _ => ZStream.empty + } val inputStream = execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path")) - (ZStream.fromZIO(initialize) *> ZStream.empty) ++ ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ (ZStream - .fromZIO(terminate) *> ZStream.empty) + ZStream.unwrap { + for { + state <- Ref.make(success) + is <- initialize *> inputStream + } yield ZStream + .fromInputStream(is, chunkSize) + .ensuring(terminate(state)) ++ propagate(state) + } } def rm(path: String): ZIO[Any, IOException, Unit] = diff --git a/zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala b/zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala deleted file mode 100644 index 689e401f..00000000 --- a/zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala +++ /dev/null @@ -1,49 +0,0 @@ -package zio.ftp - -import org.apache.commons.net.ftp.FTPClient -import zio.test.Assertion._ -import zio.test._ - -import java.io.InputStream -import scala.util.Random - -object UnsecureDownloadFinalizeSpec extends ZIOSpecDefault { - - private def createFtpclient(success: Boolean) = { - val client = new FTPClient { - override def retrieveFileStream(remote: String): InputStream = { - val it = Random.alphanumeric.take(5000).map(_.toByte).iterator - () => if (it.hasNext) it.next().toInt else -1 - } - - override def completePendingCommand(): Boolean = success - } - - new UnsecureFtp(client) - } - - private def hasIncompleteMsg(a: Assertion[String]) = - hasField("file transfer incomplete message", (e: FileTransferIncompleteError) => e.message, a) - - override def spec = - suite("Download finalizer")( - test("complete pending command gets called") { - val ftpClient = createFtpclient(true) - for { - bytes <- ftpClient.readFile("/a/b/c.txt").runCollect - } yield assert(bytes)(hasSize(equalTo(5000))) - }, - test("completion failure is exposed on error channel") { - val ftpClient = createFtpclient(false) - for { - exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit - } yield assert(exit)( - fails( - isSubtype[FileTransferIncompleteError]( - hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt")) - ) - ) - ) - } - ) -} diff --git a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala new file mode 100644 index 00000000..44e1caaf --- /dev/null +++ b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala @@ -0,0 +1,62 @@ +package zio.ftp + +import org.apache.commons.net.ftp.FTPClient +import zio.test.Assertion._ +import zio.test._ + +import java.io.{ IOException, InputStream } +import scala.util.Random + +object UnsecureFtpReadFileSpec extends ZIOSpecDefault { + + private def createFtpclient(errorOrsuccess: Either[Exception, Boolean]) = { + val client = new FTPClient { + override def retrieveFileStream(remote: String): InputStream = { + val it = Random.alphanumeric.take(5000).map(_.toByte).iterator + () => if (it.hasNext) it.next().toInt else -1 + } + + override def completePendingCommand(): Boolean = + errorOrsuccess match { + case Left(err) => throw err + case Right(flag) => flag + } + } + new UnsecureFtp(client) + } + + private def hasIncompleteMsg(a: Assertion[String]) = + hasField("file transfer incomplete message", (e: Exception) => e.getMessage, a) + + override def spec = + suite("UnsecureFtp - ReadFile complete pending")( + test("succeed") { + val ftpClient = createFtpclient(Right(true)) + for { + bytes <- ftpClient.readFile("/a/b/c.txt").runCollect + } yield assert(bytes)(hasSize(equalTo(5000))) + }, + test("fail to complete") { + val ftpClient = createFtpclient(Right(false)) + for { + exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit + } yield assert(exit)( + fails( + isSubtype[FileTransferIncompleteError]( + hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt")) + ) + ) + ) + }, + test("error occur") { + val ftpClient = createFtpclient(Left(new IOException("Boom"))) + for { + exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit + } yield assert(exit)( + fails(isSubtype[FileTransferIncompleteError]( + hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt")) + )) + ) + } + ) +} diff --git a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala index 11c948c7..3110b581 100644 --- a/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala +++ b/zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala @@ -14,21 +14,21 @@ import java.net.{ InetSocketAddress, Proxy } import scala.io.Source object UnsecureSslFtpSpec extends ZIOSpecDefault { - val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass")) + private val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass")) - override def spec = + override def spec: Spec[TestEnvironment & Scope, Any] = FtpSuite.spec("UnsecureSslFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential } object UnsecureFtpSpec extends ZIOSpecDefault { - val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass")) + private val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass")) - override def spec = + override def spec: Spec[TestEnvironment & Scope, Any] = FtpSuite.spec("UnsecureFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential } object FtpSuite { - val home = ZPath("ftp-home/ftp/home") + private val home = ZPath("ftp-home/ftp/home") def spec(labelSuite: String, settings: UnsecureFtpSettings) = suite(labelSuite)(