Skip to content

Commit

Permalink
420 - improve readFile Stream finalization
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray committed Oct 10, 2024
1 parent b19ce44 commit b5ff912
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 70 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/site.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ jobs:
java-version: 17
check-latest: true
- name: Check if the README file is up to date
run: sbt docs/checkReadme
run: sbt docs/checkReadme
- name: Check if the site workflow is up to date
run: sbt docs/checkGithubWorkflow
run: sbt docs/checkGithubWorkflow
- name: Check artifacts build process
run: sbt +publishLocal
run: sbt +publishLocal
- name: Check website build process
run: sbt docs/clean; sbt docs/buildWebsite
publish-docs:
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
addCommandAlias("fix", "; all compile:scalafix test:scalafix; all scalafmtSbt scalafmtAll")

val zioVersion = "2.0.21"
val zioVersion = "2.1.6"

lazy val root =
project.in(file(".")).settings(publish / skip := true).aggregate(`zio-ftp`, docs)
Expand All @@ -41,8 +41,8 @@ lazy val `zio-ftp` = project
"dev.zio" %% "zio-streams" % zioVersion,
("dev.zio" %% "zio-nio" % "2.0.2").exclude("org.scala-lang.modules", "scala-collection-compat_2.13"),
"com.hierynomus" % "sshj" % "0.39.0",
"commons-net" % "commons-net" % "3.10.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
"commons-net" % "commons-net" % "3.11.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0",
"org.apache.logging.log4j" % "log4j-api" % "2.24.0" % Test,
"org.apache.logging.log4j" % "log4j-core" % "2.24.0" % Test,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.24.0" % Test,
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.8.2
sbt.version = 1.10.2
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.7.0")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.0")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.29")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.5")
40 changes: 32 additions & 8 deletions zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.io.IOException
import org.apache.commons.net.ftp.{ FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient }
import zio.ftp.UnsecureFtp.Client
import zio.stream.ZStream
import zio.{ Scope, ZIO }
import zio.{ Ref, Scope, UIO, ZIO }
import zio.ZIO.{ acquireRelease, attemptBlockingIO }

/**
Expand All @@ -34,19 +34,43 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien
execute(c => Option(c.mlistFile(path))).map(_.map(FtpResource.fromFtpFile(_)))

def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = {
def error(cause: Option[Exception] = None): Left[FileTransferIncompleteError, Unit] =
Left(
FileTransferIncompleteError(
s"Cannot finalize the file transfer and completely read the entire file $path. ${cause.fold("")(_.getMessage)}"
)
)
val success: Either[IOException, Unit] = Right(())

val initialize = execute(_.setRestartOffset(fileOffset))

val terminate = ZIO
.fail(
FileTransferIncompleteError(s"Cannot finalize the file transfer and completely read the entire file $path.")
)
.unlessZIO(execute(_.completePendingCommand()))
def terminate(state: Ref[Either[IOException, Unit]]): UIO[Unit] =
execute(_.completePendingCommand())
.foldZIO(
err => state.set(error(Some(err))),
resp =>
if (resp) state.set(success)
else state.set(error())
)
.ignore

def propagate(state: Ref[Either[IOException, Unit]]) =
ZStream.fromZIO(state.get).flatMap {
case Left(ex) => ZStream.fail(ex)
case _ => ZStream.empty
}

val inputStream =
execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path"))

(ZStream.fromZIO(initialize) *> ZStream.empty) ++ ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ (ZStream
.fromZIO(terminate) *> ZStream.empty)
ZStream.unwrap {
for {
state <- Ref.make(success)
is <- initialize *> inputStream
} yield ZStream
.fromInputStream(is, chunkSize)
.ensuring(terminate(state)) ++ propagate(state)
}
}

def rm(path: String): ZIO[Any, IOException, Unit] =
Expand Down
49 changes: 0 additions & 49 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureDownloadFinalizeSpec.scala

This file was deleted.

64 changes: 64 additions & 0 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package zio.ftp

import org.apache.commons.net.ftp.FTPClient
import zio.test.Assertion._
import zio.test._

import java.io.{ IOException, InputStream }
import scala.util.Random

object UnsecureFtpReadFileSpec extends ZIOSpecDefault {

private def createFtpclient(errorOrsuccess: Either[Exception, Boolean]) = {
val client = new FTPClient {
override def retrieveFileStream(remote: String): InputStream = {
val it = Random.alphanumeric.take(5000).map(_.toByte).iterator
() => if (it.hasNext) it.next().toInt else -1
}

override def completePendingCommand(): Boolean =
errorOrsuccess match {
case Left(err) => throw err
case Right(flag) => flag
}
}
new UnsecureFtp(client)
}

private def hasIncompleteMsg(a: Assertion[String]) =
hasField("file transfer incomplete message", (e: Exception) => e.getMessage, a)

override def spec =
suite("UnsecureFtp - ReadFile complete pending")(
test("succeed") {
val ftpClient = createFtpclient(Right(true))
for {
bytes <- ftpClient.readFile("/a/b/c.txt").runCollect
} yield assert(bytes)(hasSize(equalTo(5000)))
},
test("fail to complete") {
val ftpClient = createFtpclient(Right(false))
for {
exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit
} yield assert(exit)(
fails(
isSubtype[FileTransferIncompleteError](
hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt"))
)
)
)
},
test("error occur") {
val ftpClient = createFtpclient(Left(new IOException("Boom")))
for {
exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit
} yield assert(exit)(
fails(
isSubtype[FileTransferIncompleteError](
hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt"))
)
)
)
}
)
}
10 changes: 5 additions & 5 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ import java.net.{ InetSocketAddress, Proxy }
import scala.io.Source

object UnsecureSslFtpSpec extends ZIOSpecDefault {
val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass"))
private val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass"))

override def spec =
override def spec: Spec[TestEnvironment & Scope, Any] =
FtpSuite.spec("UnsecureSslFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential
}

object UnsecureFtpSpec extends ZIOSpecDefault {
val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass"))
private val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass"))

override def spec =
override def spec: Spec[TestEnvironment & Scope, Any] =
FtpSuite.spec("UnsecureFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential
}

object FtpSuite {
val home = ZPath("ftp-home/ftp/home")
private val home = ZPath("ftp-home/ftp/home")

def spec(labelSuite: String, settings: UnsecureFtpSettings) =
suite(labelSuite)(
Expand Down

0 comments on commit b5ff912

Please sign in to comment.