Skip to content

Commit

Permalink
fix(client): disallow reusing stream response
Browse files Browse the repository at this point in the history
chore: unknown commit message
  • Loading branch information
stainless-bot authored and Stainless Bot committed Nov 8, 2024
1 parent a99e891 commit 6eb1979
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,50 @@ internal fun sseHandler(jsonMapper: JsonMapper): Handler<StreamResponse<SseMessa

override fun handle(response: HttpResponse): StreamResponse<SseMessage> {
val reader = response.body().bufferedReader()
val sequence = sequence {
reader.useLines { lines ->
val state = SseState(jsonMapper)
var done = false
for (line in lines) {
// Stop emitting messages, but iterate through the full stream.
if (done) {
continue
}
val message = state.decode(line) ?: continue

if (message.data.startsWith("[DONE]")) {
// In this case we don't break because we still want to iterate through
// the full stream.
done = true
continue
}

if (message.event == null) {
val error =
message.json<JsonValue>().asObject().getOrNull()?.get("error")
if (error != null) {
val errorMessage =
error.asString().getOrNull()
?: error
val sequence =
sequence {
reader.useLines { lines ->
val state = SseState(jsonMapper)
var done = false
for (line in lines) {
// Stop emitting messages, but iterate through the full stream.
if (done) {
continue
}
val message = state.decode(line) ?: continue

if (message.data.startsWith("[DONE]")) {
// In this case we don't break because we still want to iterate
// through the full stream.
done = true
continue
}

if (message.event == null) {
val error =
message
.json<JsonValue>()
.asObject()
.getOrNull()
?.get("message")
?.asString()
?.getOrNull()
?: "An error occurred during streaming"
throw OpenAIException(errorMessage)
?.get("error")
if (error != null) {
val errorMessage =
error.asString().getOrNull()
?: error
.asObject()
.getOrNull()
?.get("message")
?.asString()
?.getOrNull()
?: "An error occurred during streaming"
throw OpenAIException(errorMessage)
}
yield(message)
}
}
yield(message)
}
}
}
}
.constrainOnce()

return PhantomReachableClosingStreamResponse(
object : StreamResponse<SseMessage> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import java.io.InputStream
import java.nio.charset.StandardCharsets
import java.util.stream.Collectors.toList
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.catchThrowable
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource

Expand Down Expand Up @@ -105,17 +107,7 @@ class SseHandlerTest {
@ParameterizedTest
@EnumSource
fun handle(testCase: TestCase) {
val response =
object : HttpResponse {
override fun statusCode(): Int = 0

override fun headers(): Headers = Headers.builder().build()

override fun body(): InputStream =
ByteArrayInputStream(testCase.body.toByteArray(StandardCharsets.UTF_8))

override fun close() {}
}
val response = httpResponse(testCase.body)
var messages: List<SseMessage>? = null
var exception: Exception? = null

Expand All @@ -134,6 +126,32 @@ class SseHandlerTest {
assertThat(exception).hasMessage(testCase.expectedException.message)
}
}

@Test
fun cannotReuseStream() {
val response = httpResponse("body")
val streamResponse = sseHandler(jsonMapper()).handle(response)

val throwable =
streamResponse.use {
it.stream().collect(toList())
catchThrowable { it.stream().collect(toList()) }
}

assertThat(throwable).isInstanceOf(IllegalStateException::class.java)
}
}

private fun httpResponse(body: String): HttpResponse =
object : HttpResponse {
override fun statusCode(): Int = 0

override fun headers(): Headers = Headers.builder().build()

override fun body(): InputStream =
ByteArrayInputStream(body.toByteArray(StandardCharsets.UTF_8))

override fun close() {}
}

private fun sseMessageBuilder() = SseMessage.builder().jsonMapper(jsonMapper())

0 comments on commit 6eb1979

Please sign in to comment.