Skip to content

Commit

Permalink
feat(client): add an AsyncStreamResponse#onCompleteFuture() method (#…
Browse files Browse the repository at this point in the history
…239)

docs: add more documentation to `AsyncStreamResponse`
  • Loading branch information
stainless-app[bot] authored Feb 20, 2025
1 parent d261919 commit 81c2f4e
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 3 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,25 @@ client.async().chat().completions().createStreaming(params).subscribe(new AsyncS
System.out.println("Something went wrong!");
throw new RuntimeException(error.get());
} else {
System.out.println("Something went wrong!");
System.out.println("No more chunks!");
}
}
});

// Or use futures
client.async().chat().completions().createStreaming(params)
.subscribe(chunk -> {
System.out.println(chunk);
})
.onCompleteFuture();
.whenComplete((unused, error) -> {
if (error != null) {
System.out.println("Something went wrong!");
throw new RuntimeException(error);
} else {
System.out.println("No more chunks!");
}
});
```

Async streaming uses a dedicated per-client cached thread pool `Executor` to stream without blocking the current thread. This default is suitable for most purposes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,37 @@ import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicReference

/**
* A class providing access to an API response as an asynchronous stream of chunks of type [T],
* where each chunk can be individually processed as soon as it arrives instead of waiting on the
* full response.
*/
interface AsyncStreamResponse<T> {

/**
* Registers [handler] to be called for events of this stream.
*
* [handler]'s methods will be called in the client's configured or default thread pool.
*
* @throws IllegalStateException if [subscribe] has already been called.
*/
fun subscribe(handler: Handler<T>): AsyncStreamResponse<T>

/**
* Registers [handler] to be called for events of this stream.
*
* [handler]'s methods will be called in the given [executor].
*
* @throws IllegalStateException if [subscribe] has already been called.
*/
fun subscribe(handler: Handler<T>, executor: Executor): AsyncStreamResponse<T>

/**
* Returns a future that completes when a stream is fully consumed, errors, or gets closed
* early.
*/
fun onCompleteFuture(): CompletableFuture<Void?>

/**
* Closes this resource, relinquishing any underlying resources.
*
Expand All @@ -20,10 +45,19 @@ interface AsyncStreamResponse<T> {
*/
fun close()

/** A class for handling streaming events. */
fun interface Handler<in T> {

/** Called whenever a chunk is received. */
fun onNext(value: T)

/**
* Called when a stream is fully consumed, errors, or gets closed early.
*
* [onNext] will not be called once this method is called.
*
* @param error Non-empty if the stream completed due to an error.
*/
fun onComplete(error: Optional<Throwable>) {}
}
}
Expand All @@ -33,8 +67,17 @@ internal fun <T> CompletableFuture<StreamResponse<T>>.toAsync(streamHandlerExecu
PhantomReachableClosingAsyncStreamResponse(
object : AsyncStreamResponse<T> {

private val onCompleteFuture = CompletableFuture<Void?>()
private val state = AtomicReference(State.NEW)

init {
this@toAsync.whenComplete { _, error ->
// If an error occurs from the original future, then we should resolve the
// `onCompleteFuture` even if `subscribe` has not been called.
error?.let(onCompleteFuture::completeExceptionally)
}
}

override fun subscribe(handler: Handler<T>): AsyncStreamResponse<T> =
subscribe(handler, streamHandlerExecutor)

Expand Down Expand Up @@ -72,20 +115,37 @@ internal fun <T> CompletableFuture<StreamResponse<T>>.toAsync(streamHandlerExecu
try {
handler.onComplete(Optional.ofNullable(streamError))
} finally {
close()
try {
// Notify completion via the `onCompleteFuture` as well. This is in
// a separate `try-finally` block so that we still complete the
// future if `handler.onComplete` throws.
if (streamError == null) {
onCompleteFuture.complete(null)
} else {
onCompleteFuture.completeExceptionally(streamError)
}
} finally {
close()
}
}
},
executor,
)
}

override fun onCompleteFuture(): CompletableFuture<Void?> = onCompleteFuture

override fun close() {
val previousState = state.getAndSet(State.CLOSED)
if (previousState == State.CLOSED) {
return
}

this@toAsync.whenComplete { streamResponse, _ -> streamResponse?.close() }
this@toAsync.whenComplete { streamResponse, error -> streamResponse?.close() }
// When the stream is closed, we should always consider it closed. If it closed due
// to an error, then we will have already completed the future earlier, and this
// will be a no-op.
onCompleteFuture.complete(null)
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.openai.core.http
import com.openai.core.closeWhenPhantomReachable
import com.openai.core.http.AsyncStreamResponse.Handler
import java.util.Optional
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor

/**
Expand Down Expand Up @@ -33,6 +34,9 @@ internal class PhantomReachableClosingAsyncStreamResponse<T>(
asyncStreamResponse.subscribe(TrackedHandler(handler, reachabilityTracker), executor)
}

override fun onCompleteFuture(): CompletableFuture<Void?> =
asyncStreamResponse.onCompleteFuture()

override fun close() = asyncStreamResponse.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,109 @@ internal class AsyncStreamResponseTest {
verify(executor, times(1)).execute(any())
}

@Test
fun onCompleteFuture_whenStreamResponseFutureNotCompleted_onCompleteFutureNotCompleted() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isNotCompleted
}

@Test
fun onCompleteFuture_whenStreamResponseFutureErrors_onCompleteFutureCompletedExceptionally() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)
future.completeExceptionally(ERROR)

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isCompletedExceptionally
}

@Test
fun onCompleteFuture_whenStreamResponseFutureCompletedButStillStreaming_onCompleteFutureNotCompleted() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)
future.complete(streamResponse)

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isNotCompleted
}

@Test
fun onCompleteFuture_whenStreamResponseFutureCompletedAndStreamErrors_onCompleteFutureCompletedExceptionally() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)
asyncStreamResponse.subscribe(handler)
future.complete(erroringStreamResponse)

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isCompletedExceptionally
}

@Test
fun onCompleteFuture_whenStreamResponseFutureCompletedAndStreamCompleted_onCompleteFutureCompleted() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)
asyncStreamResponse.subscribe(handler)
future.complete(streamResponse)

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isCompleted
}

@Test
fun onCompleteFuture_whenHandlerOnCompleteWithoutThrowableThrows_onCompleteFutureCompleted() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)
asyncStreamResponse.subscribe(
object : AsyncStreamResponse.Handler<String> {
override fun onNext(value: String) {}

override fun onComplete(error: Optional<Throwable>) = throw ERROR
}
)
future.complete(streamResponse)

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isCompleted
}

@Test
fun onCompleteFuture_whenHandlerOnCompleteWithThrowableThrows_onCompleteFutureCompletedExceptionally() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)
asyncStreamResponse.subscribe(
object : AsyncStreamResponse.Handler<String> {
override fun onNext(value: String) {}

override fun onComplete(error: Optional<Throwable>) = throw ERROR
}
)
future.complete(erroringStreamResponse)

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isCompletedExceptionally
}

@Test
fun onCompleteFuture_whenClosed_onCompleteFutureCompleted() {
val future = CompletableFuture<StreamResponse<String>>()
val asyncStreamResponse = future.toAsync(executor)
asyncStreamResponse.close()

val onCompletableFuture = asyncStreamResponse.onCompleteFuture()

assertThat(onCompletableFuture).isCompleted
}

@Test
fun close_whenNotClosed_closesStreamResponse() {
val future = CompletableFuture<StreamResponse<String>>()
Expand Down

0 comments on commit 81c2f4e

Please sign in to comment.