-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UnsecureFtp: improve readFile
Stream finalization
#410
Conversation
There's something wrong with the CI setup (docker-compose seems to be missing), but |
i have a PR to fix it |
@@ -30,24 +32,58 @@ import zio.ZIO.{ acquireRelease, attemptBlockingIO } | |||
*/ | |||
final private class UnsecureFtp(unsafeClient: Client) extends FtpAccessors[Client] { | |||
|
|||
private var pendingExit: Option[Exit[IOException, Unit]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i dont think it is safe to have a mutable share state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @regis-leray, thanks for the review.
Could you be more specific about what kind of safety issue you see here? Because the Client
class already contains a ton of mutable state. I think this particular use of mutable state increases safety because now it yells at you when you try to use the FTP Client at a time you shouldn't (i. e. before the stream finalization). Until we have capture checking with Caprese, this is the best that can be done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can put it into an atomic reference--just to make it thread safe, mind you.
As long as the only access to this is through methods returning ZIO
, it's still as 'purely functional' as anything else and won't inhibit retries, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jdegoes,
thanks for commenting. Maybe I'm missing something here, but this thing being essentially just a wrapper around a socket connection, isn't it completely thread unsafe anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again i was able to take some time and to read your changes,
But it leads to another problem: finalizers aren't allowed to throw errors, but completePendingCommand
can fail. I could die in that situation, but that doesn't seem right, because die indicates a defect in the
program, and when the FTP server decides to be weird, that's not the program's fault. Therefore, I decided
to not throw the error from the finalizer but instead delay the error until the next execute call.
For the error propagation of the finaliser, as you mention it can be a server side issue. If it fails, we cannot recover, or propagate the error, the only remaining option is at least to print the error
ZIO.logError()
Also delay the error propagation is not intuitive and it would lead to more confusion, since you will not know that a previous error failed. So i would suggest to remove
private var pendingExit: Option[Exit[IOException, Unit]] = None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this version could fix your issue
def readFile(path: String, chunkSize: Int = 2048, fileOffset: Long): ZStream[Any, IOException, Byte] = {
val initialize = execute(_.setRestartOffset(fileOffset))
val terminate: UIO[Unit] = execute(_.completePendingCommand())
.tapError(err => ZIO.logError(s"Cannot finalize the file transfer and completely read the entire file $path due to : $err"))
.ignore
val inputStream =
execute(c => Option(c.retrieveFileStream(path))).someOrFail(InvalidPathError(s"File does not exist $path"))
ZStream.unwrap(
for {
_ <- initialize
is <- inputStream
} yield ZStream.fromInputStream(is, chunkSize).ensuring(terminate)
)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @regis-leray ,
I understand your concerns about delaying error propagation, I agree it's not ideal. I did it this way because I wanted to keep the API kind-of-compatible while still exposing the error if it occurs.
But I feel uneasy about the .option.unit
thing too because then there is no possibility at all to programmatically detect that an error has happened.
Maybe we could try a loan-pattern style API here?
def readFile[R, E >: IOException, A](
f: ZStream[Any, IOException, Byte] => ZIO[R, E, A]
): ZIO[R, E, A]
The implementation would work like this:
- create the stream
- process it with the
f
function - if the stream isn't completely processed by
f
, raise an error - call
completePendingCommand
- if either
f
orcompletePendingCommand
fails, the error is raised to the caller - if they both fail, the error raised by
f
is returned to the caller and thecompletePendingCommand
error is logged and ignored
What do you think?
Also, I'd like to retain at least some amount of mutable state in order to detect the situation where the user is trying to stuff with the FTP client before the stream is finalized, because that is never going to work and it currently leads to errors that are just wrong, like the "file not found" error I've had.
CI build is fixed, please rebase your branch |
Hi there,
I recently noticed a problem with zio-ftp. I was trying to read a zip file from the server using
java.util.zip.ZipInputStream
.So I was just iterating over the entries in the zip file, reading the data from the entries that I'm interested in and it all worked beautifully… until I tried to read another file from the FTP server, which resulted in a file not found error, even though I knew the file was there.
It turns out that iterating over a zip archive with
ZipInputStream
never results in an attempt to read beyond the end of the file. But due to the way that finalization of the stream is implemented (using the++
operator on a stream), this means thatcompletePendingCommand
isn't called, resulting in the problem I've seen.This might also be the reason for bug #227.
The solution is to use
.ensuring
for the finalizer. This has the added benefit that it's now possible to read only part of the file and stop processing once you've found the bit you're interested in.But it leads to another problem: finalizers aren't allowed to throw errors, but
completePendingCommand
can fail. I coulddie
in that situation, but that doesn't seem right, becausedie
indicates a defect in the program, and when the FTP server decides to be weird, that's not the program's fault. Therefore, I decided to not throw the error from the finalizer but instead delay the error until the nextexecute
call.I've also improved the error handling so that it doesn't just assume a "File doesn't exist" error but instead exposes the error message given by the FTP server to make it more transparent what's going on.