Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle unexhausted network responses #288

Merged
merged 3 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class ChuckerInterceptor internal constructor(
override fun onSuccess(file: File) {
val buffer = readResponseBuffer(file, response.isGzipped)
file.delete()
processResponseBody(response, buffer, transaction)
if (buffer != null) processResponseBody(response, buffer, transaction)
collector.onResponseReceived(transaction)
}

Expand All @@ -253,15 +253,17 @@ class ChuckerInterceptor internal constructor(
collector.onResponseReceived(transaction)
}

private fun readResponseBuffer(responseBody: File, isGzipped: Boolean): Buffer {
private fun readResponseBuffer(responseBody: File, isGzipped: Boolean): Buffer? {
val bufferedSource = Okio.buffer(Okio.source(responseBody))
val source = if (isGzipped) {
GzipSource(bufferedSource)
} else {
bufferedSource
}
return Buffer().apply {
writeAll(source)
return try {
Buffer().apply { writeAll(source) }
} catch (_: IOException) {
null
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import okio.Timeout
* to a [sideChannel] file. After the [upstream] is depleted or when a failure occurs
* an appropriate [callback] method is called.
*
* Failure is considered any [IOException] during reading the bytes or exceeding [readBytesLimit] length.
* Failure is considered any [IOException] during reading the bytes,
* exceeding [readBytesLimit] length or not reading the whole upstream.
*/
internal class TeeSource(
private val upstream: Source,
Expand All @@ -25,8 +26,9 @@ internal class TeeSource(
) : Source {
private val sideStream = Okio.buffer(Okio.sink(sideChannel))
private var totalBytesRead = 0L
private var reachedLimit = false
private var upstreamFailed = false
private var isReadLimitExceeded = false
private var isUpstreamExhausted = false
private var isFailure = false

override fun read(sink: Buffer, byteCount: Long): Long {
val bytesRead = try {
Expand All @@ -37,19 +39,20 @@ internal class TeeSource(
}

if (bytesRead == -1L) {
isUpstreamExhausted = true
sideStream.close()
return -1L
}

totalBytesRead += bytesRead
if (!reachedLimit && (totalBytesRead <= readBytesLimit)) {
if (!isReadLimitExceeded && (totalBytesRead <= readBytesLimit)) {
val offset = sink.size() - bytesRead
sink.copyTo(sideStream.buffer(), offset, bytesRead)
sideStream.emitCompleteSegments()
return bytesRead
}
if (!reachedLimit) {
reachedLimit = true
if (!isReadLimitExceeded) {
isReadLimitExceeded = true
sideStream.close()
callSideChannelFailure(IOException("Capacity of $readBytesLimit bytes exceeded"))
}
Expand All @@ -60,16 +63,18 @@ internal class TeeSource(
override fun close() {
sideStream.close()
upstream.close()
if (!upstreamFailed) {
if (isUpstreamExhausted) {
callback.onSuccess(sideChannel)
} else {
callSideChannelFailure(IOException("Upstream was not fully consumed"))
}
}

override fun timeout(): Timeout = upstream.timeout()

private fun callSideChannelFailure(exception: IOException) {
if (!upstreamFailed) {
upstreamFailed = true
if (!isFailure) {
isFailure = true
callback.onFailure(exception, sideChannel)
}
}
Expand All @@ -83,7 +88,10 @@ internal class TeeSource(
/**
* Called when there was an issue while copying bytes to the [file].
*
* It might occur due to an exception thrown while reading bytes or due to exceeding capacity limit.
* It might occur due to one of the following reasons:
* - an exception was thrown while reading bytes
* - capacity limit was exceeded
* - upstream was not fully consumed
*/
fun onFailure(exception: IOException, file: File)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,36 @@ class TeeSourceTest {
.isEqualTo("Hello there!")
}

@Test
fun notConsumedUpstream_isNotConsideredSuccess(@TempDir tempDir: File) {
val testFile = File(tempDir, "testFile")
// Okio uses 8KiB as a single size read.
val testSource = TestSource(8_192 * 2)

val teeSource = TeeSource(testSource, testFile, teeCallback)
Okio.buffer(teeSource).use { source ->
source.readByteString(8_192)
}

assertThat(teeCallback.exception)
.hasMessageThat()
.isEqualTo("Upstream was not fully consumed")
}

@Test
fun partiallyReadBytesFromUpstream_areAvailableToSideChannel(@TempDir tempDir: File) {
val testFile = File(tempDir, "testFile")
// Okio uses 8KiB as a single size read.
val testSource = TestSource(8_192 * 2)

val teeSource = TeeSource(testSource, testFile, teeCallback)
Okio.buffer(teeSource).use { source ->
source.readByteString(8_192)
}

assertThat(teeCallback.fileContent).isEqualTo(testSource.content.substring(0, 8_192))
}

private class TestSource(contentLength: Int = 1_000) : Source {
val content: ByteString = ByteString.of(*Random.nextBytes(contentLength))
private val buffer = Buffer().apply { write(content) }
Expand Down