Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

420 - improve readFile Stream finalization #421

Merged
merged 1 commit into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,4 @@ jobs:
PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
20 changes: 10 additions & 10 deletions .github/workflows/site.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ name: Website
jobs:
build:
name: Build and Test
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: ${{ github.event_name == 'pull_request' }}
steps:
- name: Git Checkout
Expand All @@ -28,16 +28,16 @@ jobs:
java-version: 17
check-latest: true
- name: Check if the README file is up to date
run: sbt docs/checkReadme
- name: Check if the site workflow is up to date
run: sbt docs/checkGithubWorkflow
run: sbt docs/checkReadme
# - name: Check if the site workflow is up to date
# run: sbt docs/checkGithubWorkflow
- name: Check artifacts build process
run: sbt +publishLocal
run: sbt +publishLocal
- name: Check website build process
run: sbt docs/clean; sbt docs/buildWebsite
publish-docs:
name: Publish Docs
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
if: ${{ ((github.event_name == 'release') && (github.event.action == 'published')) || (github.event_name == 'workflow_dispatch') }}
steps:
- name: Git Checkout
Expand All @@ -56,13 +56,13 @@ jobs:
node-version: 16.x
registry-url: https://registry.npmjs.org
- name: Publish Docs to NPM Registry
run: sbt docs/publishToNpm
run: sbt docs/publishToNpm
env:
NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}
generate-readme:
name: Generate README
runs-on: ubuntu-latest
if: ${{ (github.event_name == 'push') || ((github.event_name == 'release') && (github.event_name == 'published')) }}
runs-on: ubuntu-22.04
if: ${{ (github.event_name == 'push') || ((github.event_name == 'release') && (github.event.action == 'published')) }}
steps:
- name: Git Checkout
uses: actions/[email protected]
Expand All @@ -76,7 +76,7 @@ jobs:
java-version: 17
check-latest: true
- name: Generate Readme
run: sbt docs/generateReadme
run: sbt docs/generateReadme
- name: Commit Changes
run: |
git config --local user.email "github-actions[bot]@users.noreply.github.com"
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")
addCommandAlias("fix", "; all compile:scalafix test:scalafix; all scalafmtSbt scalafmtAll")

val zioVersion = "2.0.21"
val zioVersion = "2.1.6"

lazy val root =
project.in(file(".")).settings(publish / skip := true).aggregate(`zio-ftp`, docs)
Expand All @@ -41,8 +41,8 @@ lazy val `zio-ftp` = project
"dev.zio" %% "zio-streams" % zioVersion,
("dev.zio" %% "zio-nio" % "2.0.2").exclude("org.scala-lang.modules", "scala-collection-compat_2.13"),
"com.hierynomus" % "sshj" % "0.39.0",
"commons-net" % "commons-net" % "3.10.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0",
"commons-net" % "commons-net" % "3.11.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.12.0",
"org.apache.logging.log4j" % "log4j-api" % "2.24.0" % Test,
"org.apache.logging.log4j" % "log4j-core" % "2.24.0" % Test,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.24.0" % Test,
Expand Down
2 changes: 1 addition & 1 deletion project/BuildHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object BuildHelper {
)

final val Scala212 = "2.12.20"
final val Scala213 = "2.13.13"
final val Scala213 = "2.13.15"
final val Scala3 = "3.3.3" // LTS

final private val stdOptions = Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.8.2
sbt.version = 1.10.2
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.2.1")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.10")
addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.7.0")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.0")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.29")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.5")
addSbtPlugin("dev.zio" % "zio-sbt-website" % "0.3.10")
7 changes: 5 additions & 2 deletions zio-ftp/src/main/scala/zio/ftp/SecureFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import zio.ZIO.{ acquireRelease, attemptBlockingIO, fromAutoCloseable, scoped }
* All ftp methods exposed are lift into ZIO or ZStream, which required a Blocking Environment
* since the underlying java client only provide blocking methods.
*/
final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {
sealed abstract class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {

def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] =
execute(c => Option(c.statExistence(path)).map(FtpResource(path, _)))
Expand Down Expand Up @@ -116,6 +116,9 @@ final private class SecureFtp(unsafeClient: Client) extends FtpAccessors[Client]
object SecureFtp {
type Client = SFTPClient

def unsafe(c: Client): SecureFtp =
new SecureFtp(c) {}

def connect(settings: SecureFtpSettings): ZIO[Scope, ConnectionError, FtpAccessors[Client]] = {
val ssh = new SSHClient(settings.sshConfig)
import settings._
Expand All @@ -136,7 +139,7 @@ object SecureFtp {
setIdentity(_, credentials.username)(ssh)
)

new SecureFtp(ssh.newSFTPClient())
new SecureFtp(ssh.newSFTPClient()) {}
}.mapError(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}", _))
)(
_.execute(_.close()).ignore
Expand Down
46 changes: 36 additions & 10 deletions zio-ftp/src/main/scala/zio/ftp/UnsecureFtp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.io.IOException
import org.apache.commons.net.ftp.{ FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient }
import zio.ftp.UnsecureFtp.Client
import zio.stream.ZStream
import zio.{ Scope, ZIO }
import zio.{ Ref, Scope, UIO, ZIO }
import zio.ZIO.{ acquireRelease, attemptBlockingIO }

/**
Expand All @@ -28,25 +28,48 @@ import zio.ZIO.{ acquireRelease, attemptBlockingIO }
* All ftp methods exposed are lift into ZIO or ZStream
* The underlying java client only provide blocking methods.
*/
final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {
sealed abstract class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Client] {

def stat(path: String): ZIO[Any, IOException, Option[FtpResource]] =
execute(c => Option(c.mlistFile(path))).map(_.map(FtpResource.fromFtpFile(_)))

def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = {
def error(cause: Option[Exception] = None): Left[FileTransferIncompleteError, Unit] =
Left(
FileTransferIncompleteError(
s"Cannot finalize the file transfer and completely read the entire file $path. ${cause.fold("")(_.getMessage)}"
)
)
val success: Either[IOException, Unit] = Right(())

val initialize = execute(_.setRestartOffset(fileOffset))

val terminate = ZIO
.fail(
FileTransferIncompleteError(s"Cannot finalize the file transfer and completely read the entire file $path.")
)
.unlessZIO(execute(_.completePendingCommand()))
def terminate(state: Ref[Either[IOException, Unit]]): UIO[Unit] =
execute(_.completePendingCommand())
.foldZIO(
err => state.set(error(Some(err))),
resp =>
if (resp) state.set(success)
else state.set(error())
)

def propagate(state: Ref[Either[IOException, Unit]]) =
ZStream.fromZIO(state.get).flatMap {
case Left(ex) => ZStream.fail(ex)
case _ => ZStream.empty
}

val inputStream =
execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path"))

(ZStream.fromZIO(initialize) *> ZStream.empty) ++ ZStream.fromInputStreamZIO(inputStream, chunkSize) ++ (ZStream
.fromZIO(terminate) *> ZStream.empty)
ZStream.unwrap {
for {
state <- Ref.make(success)
is <- initialize *> inputStream
} yield ZStream
.fromInputStream(is, chunkSize)
.ensuring(terminate(state)) ++ propagate(state)
}
}

def rm(path: String): ZIO[Any, IOException, Unit] =
Expand Down Expand Up @@ -105,6 +128,9 @@ final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Clien
object UnsecureFtp {
type Client = JFTPClient

def unsafe(c: Client): UnsecureFtp =
new UnsecureFtp(c) {}

def connect(settings: UnsecureFtpSettings): ZIO[Scope, ConnectionError, FtpAccessors[Client]] =
acquireRelease(
attemptBlockingIO {
Expand Down Expand Up @@ -141,7 +167,7 @@ object UnsecureFtp {

settings.dataTimeout.foreach(ftpClient.setDataTimeout)

new UnsecureFtp(ftpClient) -> success
new UnsecureFtp(ftpClient) {} -> success
}.mapError(e => ConnectionError(e.getMessage, e))
.filterOrFail(_._2)(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}"))
.map(_._1)
Expand Down

This file was deleted.

63 changes: 63 additions & 0 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureFtpReadFileSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package zio.ftp

import org.apache.commons.net.ftp.FTPClient
import zio.test.Assertion._
import zio.test._

import java.io.{ IOException, InputStream }
import scala.util.Random

object UnsecureFtpReadFileSpec extends ZIOSpecDefault {

private def createFtpclient(errorOrsuccess: Either[Exception, Boolean]) =
UnsecureFtp.unsafe(new FTPClient {

override def retrieveFileStream(remote: String): InputStream = {
val it = Random.alphanumeric.take(5000).map(_.toByte).iterator
() => if (it.hasNext) it.next().toInt else -1
}

override def completePendingCommand(): Boolean =
errorOrsuccess match {
case Left(err) => throw err
case Right(flag) => flag
}
})

private def hasIncompleteMsg(a: Assertion[String]) =
hasField("file transfer incomplete message", (e: Exception) => e.getMessage, a)

override def spec =
suite("UnsecureFtp - ReadFile complete pending")(
test("succeed") {
val ftpClient = createFtpclient(Right(true))
for {
bytes <- ftpClient.readFile("/a/b/c.txt").runCollect
} yield assert(bytes)(hasSize(equalTo(5000)))
},
test("fail to complete") {
val ftpClient = createFtpclient(Right(false))
for {
exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit
} yield assert(exit)(
fails(
isSubtype[FileTransferIncompleteError](
hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt"))
)
)
)
},
test("error occur") {
val ftpClient = createFtpclient(Left(new IOException("Boom")))
for {
exit <- ftpClient.readFile("/a/b/c.txt").runCollect.exit
} yield assert(exit)(
fails(
isSubtype[FileTransferIncompleteError](
hasIncompleteMsg(startsWithString("Cannot finalize the file transfer") && containsString("/a/b/c.txt"))
)
)
)
}
)
}
10 changes: 5 additions & 5 deletions zio-ftp/src/test/scala/zio/ftp/UnsecureFtpSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ import java.net.{ InetSocketAddress, Proxy }
import scala.io.Source

object UnsecureSslFtpSpec extends ZIOSpecDefault {
val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass"))
private val settings = UnsecureFtpSettings.secure("127.0.0.1", 2121, FtpCredentials("username", "userpass"))

override def spec =
override def spec: Spec[TestEnvironment & Scope, Any] =
FtpSuite.spec("UnsecureSslFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential
}

object UnsecureFtpSpec extends ZIOSpecDefault {
val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass"))
private val settings = UnsecureFtpSettings("127.0.0.1", port = 2121, FtpCredentials("username", "userpass"))

override def spec =
override def spec: Spec[TestEnvironment & Scope, Any] =
FtpSuite.spec("UnsecureFtpSpec", settings).provideSomeLayer[Scope](unsecure(settings)) @@ sequential
}

object FtpSuite {
val home = ZPath("ftp-home/ftp/home")
private val home = ZPath("ftp-home/ftp/home")

def spec(labelSuite: String, settings: UnsecureFtpSettings) =
suite(labelSuite)(
Expand Down
Loading