Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): don't leak responses when retrying #103

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ private constructor(
}

response
} catch (t: Throwable) {
if (++retries > maxRetries || !shouldRetry(t)) {
throw t
} catch (throwable: Throwable) {
if (++retries > maxRetries || !shouldRetry(throwable)) {
throw throwable
}

null
}

val backoffMillis = getRetryBackoffMillis(retries, response)
// All responses must be closed, so close the failed one before retrying.
response?.close()
Thread.sleep(backoffMillis.toMillis())
}
}
Expand Down Expand Up @@ -113,6 +115,8 @@ private constructor(
}

val backoffMillis = getRetryBackoffMillis(retries, response)
// All responses must be closed, so close the failed one before retrying.
response?.close()
return sleepAsync(backoffMillis.toMillis()).thenCompose {
executeWithRetries(requestWithRetryCount, requestOptions)
}
Expand Down Expand Up @@ -223,23 +227,23 @@ private constructor(
return Duration.ofNanos((TimeUnit.SECONDS.toNanos(1) * backoffSeconds * jitter).toLong())
}

private fun sleepAsync(millis: Long): CompletableFuture<Void> {
val future = CompletableFuture<Void>()
TIMER.schedule(
object : TimerTask() {
override fun run() {
future.complete(null)
}
},
millis
)
return future
}

companion object {

private val TIMER = Timer("RetryingHttpClient", true)

private fun sleepAsync(millis: Long): CompletableFuture<Void> {
val future = CompletableFuture<Void>()
TIMER.schedule(
object : TimerTask() {
override fun run() {
future.complete(null)
}
},
millis
)
return future
}

@JvmStatic fun builder() = Builder()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.anthropic.core.http

import com.anthropic.client.okhttp.OkHttpClient
import com.anthropic.core.RequestOptions
import com.github.tomakehurst.wiremock.client.WireMock.*
import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo
import com.github.tomakehurst.wiremock.junit5.WireMockTest
import com.github.tomakehurst.wiremock.stubbing.Scenario
import java.io.InputStream
import java.util.concurrent.CompletableFuture
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
Expand All @@ -13,11 +16,49 @@ import org.junit.jupiter.params.provider.ValueSource
@WireMockTest
internal class RetryingHttpClientTest {

private var openResponseCount = 0
private lateinit var httpClient: HttpClient

@BeforeEach
fun beforeEach(wmRuntimeInfo: WireMockRuntimeInfo) {
httpClient = OkHttpClient.builder().baseUrl(wmRuntimeInfo.httpBaseUrl).build()
val okHttpClient = OkHttpClient.builder().baseUrl(wmRuntimeInfo.httpBaseUrl).build()
httpClient =
object : HttpClient {
override fun execute(
request: HttpRequest,
requestOptions: RequestOptions
): HttpResponse = trackClose(okHttpClient.execute(request, requestOptions))

override fun executeAsync(
request: HttpRequest,
requestOptions: RequestOptions
): CompletableFuture<HttpResponse> =
okHttpClient.executeAsync(request, requestOptions).thenApply { trackClose(it) }

override fun close() = okHttpClient.close()

private fun trackClose(response: HttpResponse): HttpResponse {
openResponseCount++
return object : HttpResponse {
private var isClosed = false

override fun statusCode(): Int = response.statusCode()

override fun headers(): Headers = response.headers()

override fun body(): InputStream = response.body()

override fun close() {
response.close()
if (isClosed) {
return
}
openResponseCount--
isClosed = true
}
}
}
}
resetAllScenarios()
}

Expand All @@ -35,6 +76,7 @@ internal class RetryingHttpClientTest {

assertThat(response.statusCode()).isEqualTo(200)
verify(1, postRequestedFor(urlPathEqualTo("/something")))
assertNoResponseLeaks()
}

@ParameterizedTest
Expand All @@ -60,6 +102,7 @@ internal class RetryingHttpClientTest {

assertThat(response.statusCode()).isEqualTo(200)
verify(1, postRequestedFor(urlPathEqualTo("/something")))
assertNoResponseLeaks()
}

@ParameterizedTest
Expand Down Expand Up @@ -116,6 +159,7 @@ internal class RetryingHttpClientTest {
postRequestedFor(urlPathEqualTo("/something"))
.withHeader("x-stainless-retry-count", equalTo("2"))
)
assertNoResponseLeaks()
}

@ParameterizedTest
Expand Down Expand Up @@ -156,6 +200,7 @@ internal class RetryingHttpClientTest {
postRequestedFor(urlPathEqualTo("/something"))
.withHeader("x-stainless-retry-count", equalTo("42"))
)
assertNoResponseLeaks()
}

@ParameterizedTest
Expand Down Expand Up @@ -186,8 +231,13 @@ internal class RetryingHttpClientTest {

assertThat(response.statusCode()).isEqualTo(200)
verify(2, postRequestedFor(urlPathEqualTo("/something")))
assertNoResponseLeaks()
}

private fun HttpClient.execute(request: HttpRequest, async: Boolean): HttpResponse =
if (async) executeAsync(request).get() else execute(request)

// When retrying, all failed responses should be closed. Only the final returned response should
// be open.
private fun assertNoResponseLeaks() = assertThat(openResponseCount).isEqualTo(1)
}