Skip to content

Commit

Permalink
Fix handling of timeout in SseEmitter
Browse files Browse the repository at this point in the history
Closes gh-34426
  • Loading branch information
rstoyanchev committed Feb 27, 2025
1 parent 2b38c00 commit f92f9c1
Showing 1 changed file with 70 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.springframework.http.MediaType;
Expand Down Expand Up @@ -73,21 +73,20 @@ public class ResponseBodyEmitter {
@Nullable
private Handler handler;

private final AtomicReference<State> state = new AtomicReference<>(State.START);

/** Store send data before handler is initialized. */
private final Set<DataWithMediaType> earlySendAttempts = new LinkedHashSet<>(8);

/** Store successful completion before the handler is initialized. */
private final AtomicBoolean complete = new AtomicBoolean();

/** Store an error before the handler is initialized. */
@Nullable
private Throwable failure;

private final DefaultCallback timeoutCallback = new DefaultCallback();
private final TimeoutCallback timeoutCallback = new TimeoutCallback();

private final ErrorCallback errorCallback = new ErrorCallback();

private final DefaultCallback completionCallback = new DefaultCallback();
private final CompletionCallback completionCallback = new CompletionCallback();


/**
Expand Down Expand Up @@ -128,7 +127,7 @@ synchronized void initialize(Handler handler) throws IOException {
this.earlySendAttempts.clear();
}

if (this.complete.get()) {
if (this.state.get() == State.COMPLETE) {
if (this.failure != null) {
this.handler.completeWithError(this.failure);
}
Expand All @@ -144,7 +143,7 @@ synchronized void initialize(Handler handler) throws IOException {
}

void initializeWithError(Throwable ex) {
if (this.complete.compareAndSet(false, true)) {
if (this.state.compareAndSet(State.START, State.COMPLETE)) {
this.failure = ex;
this.earlySendAttempts.clear();
this.errorCallback.accept(ex);
Expand Down Expand Up @@ -186,8 +185,7 @@ public void send(Object object) throws IOException {
* @throws java.lang.IllegalStateException wraps any other errors
*/
public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException {
Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
assertNotComplete();
if (this.handler != null) {
try {
this.handler.send(object, mediaType);
Expand All @@ -214,11 +212,15 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
* @since 6.0.12
*/
public synchronized void send(Set<DataWithMediaType> items) throws IOException {
Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
assertNotComplete();
sendInternal(items);
}

private void assertNotComplete() {
Assert.state(this.state.get() == State.START, () -> "ResponseBodyEmitter has already completed" +
(this.failure != null ? " with error: " + this.failure : ""));
}

private void sendInternal(Set<DataWithMediaType> items) throws IOException {
if (items.isEmpty()) {
return;
Expand Down Expand Up @@ -248,7 +250,7 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
* related events such as an error while {@link #send(Object) sending}.
*/
public void complete() {
if (this.complete.compareAndSet(false, true) && this.handler != null) {
if (trySetComplete() && this.handler != null) {
this.handler.complete();
}
}
Expand All @@ -265,14 +267,19 @@ public void complete() {
* {@link #send(Object) sending}.
*/
public void completeWithError(Throwable ex) {
if (this.complete.compareAndSet(false, true)) {
if (trySetComplete()) {
this.failure = ex;
if (this.handler != null) {
this.handler.completeWithError(ex);
}
}
}

private boolean trySetComplete() {
return (this.state.compareAndSet(State.START, State.COMPLETE) ||
(this.state.compareAndSet(State.TIMEOUT, State.COMPLETE)));
}

/**
* Register code to invoke when the async request times out. This method is
* called from a container thread when an async request times out.
Expand Down Expand Up @@ -369,7 +376,7 @@ public MediaType getMediaType() {
}


private class DefaultCallback implements Runnable {
private class TimeoutCallback implements Runnable {

private final List<Runnable> delegates = new ArrayList<>(1);

Expand All @@ -379,9 +386,10 @@ public synchronized void addDelegate(Runnable delegate) {

@Override
public void run() {
ResponseBodyEmitter.this.complete.compareAndSet(false, true);
for (Runnable delegate : this.delegates) {
delegate.run();
if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.TIMEOUT)) {
for (Runnable delegate : this.delegates) {
delegate.run();
}
}
}
}
Expand All @@ -397,11 +405,51 @@ public synchronized void addDelegate(Consumer<Throwable> callback) {

@Override
public void accept(Throwable t) {
ResponseBodyEmitter.this.complete.compareAndSet(false, true);
for(Consumer<Throwable> delegate : this.delegates) {
delegate.accept(t);
if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) {
for (Consumer<Throwable> delegate : this.delegates) {
delegate.accept(t);
}
}
}
}


private class CompletionCallback implements Runnable {

private final List<Runnable> delegates = new ArrayList<>(1);

public synchronized void addDelegate(Runnable delegate) {
this.delegates.add(delegate);
}

@Override
public void run() {
if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) {
for (Runnable delegate : this.delegates) {
delegate.run();
}
}
}
}


/**
* Represents a state for {@link ResponseBodyEmitter}.
* <p><pre>
* START ----+
* | |
* v |
* TIMEOUT |
* | |
* v |
* COMPLETE <--+
* </pre>
* @since 6.2.4
*/
private enum State {
START,
TIMEOUT, // handling a timeout
COMPLETE
}

}

0 comments on commit f92f9c1

Please sign in to comment.