Skip to content

Commit

Permalink
4.x: Http2 OutputStream redirect support (#7637)
Browse files Browse the repository at this point in the history
HTTP2 OutputStream redirect support

Signed-off-by: David Kral <[email protected]>
  • Loading branch information
Verdent authored Sep 27, 2023
1 parent 2c9b4cc commit aa1e1d8
Show file tree
Hide file tree
Showing 16 changed files with 782 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ private static Http2StreamState checkHeaders(Http2StreamState current,
if (current == HALF_CLOSED_LOCAL) {
return HALF_CLOSED_LOCAL;
}

if (current == OPEN) {
return endOfStream ? HALF_CLOSED_REMOTE : OPEN;
}
throw new Http2Exception(Http2ErrorCode.PROTOCOL, "Received " + type + " in invalid state: " + current);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ abstract class Http2CallChainBase implements WebClientService.Chain {
this.http1EntityHandler = http1EntityHandler;
}

static WebClientServiceResponse createServiceResponse(WebClientServiceRequest serviceRequest,
HttpClientConfig clientConfig,
Http2ClientStream stream,
CompletableFuture<WebClientServiceResponse> whenComplete,
Status responseStatus,
ClientResponseHeaders clientResponseHeaders) {
WebClientServiceResponse.Builder builder = WebClientServiceResponse.builder();

// we need an instance to create it, so let's just use a reference
AtomicReference<WebClientServiceResponse> response = new AtomicReference<>();
if (stream.hasEntity()) {
ContentDecoder decoder = contentDecoder(clientResponseHeaders, clientConfig);
builder.inputStream(decoder.apply(new RequestingInputStream(stream, whenComplete, response)));
}
WebClientServiceResponse serviceResponse = builder
.serviceRequest(serviceRequest)
.whenComplete(whenComplete)
.connection(stream)
.status(responseStatus)
.headers(clientResponseHeaders)
.connection(stream)
.build();

response.set(serviceResponse);
return serviceResponse;
}

@Override
public WebClientServiceResponse proceed(WebClientServiceRequest serviceRequest) {
ClientUri uri = serviceRequest.uri();
Expand All @@ -100,7 +127,11 @@ public WebClientServiceResponse proceed(WebClientServiceRequest serviceRequest)
return doProceed(serviceRequest, result.response());
}
} catch (StreamTimeoutException e){
http2Client.connectionCache().remove(connectionKey);
//This request was waiting for 100 Continue, but it was very likely not supported by the server.
//Do not remove connection from the cache in that case.
if (!clientRequest().outputStreamRedirect()) {
http2Client.connectionCache().remove(connectionKey);
}
throw e;
}
}
Expand All @@ -113,6 +144,10 @@ Status responseStatus() {
return responseStatus;
}

CompletableFuture<WebClientServiceResponse> whenComplete() {
return whenComplete;
}

/**
* HTTP/2.
*
Expand Down Expand Up @@ -159,7 +194,7 @@ protected WebClientServiceResponse readResponse(WebClientServiceRequest serviceR
// we need an instance to create it, so let's just use a reference
AtomicReference<WebClientServiceResponse> response = new AtomicReference<>();
if (stream.hasEntity()) {
ContentDecoder decoder = contentDecoder(responseHeaders);
ContentDecoder decoder = contentDecoder(responseHeaders, clientConfig);
builder.inputStream(decoder.apply(new RequestingInputStream(stream, whenComplete, response)));
}

Expand All @@ -181,10 +216,10 @@ protected WebClientServiceResponse readResponse(WebClientServiceRequest serviceR
return serviceResponse;
}

private ContentDecoder contentDecoder(ClientResponseHeaders responseHeaders) {
private static ContentDecoder contentDecoder(ClientResponseHeaders responseHeaders, HttpClientConfig clientConfig) {
ContentEncodingContext encodingSupport = clientConfig.contentEncoding();
if (encodingSupport.contentDecodingEnabled() && responseHeaders.contains(CONTENT_ENCODING)) {
String contentEncoding = responseHeaders.get(CONTENT_ENCODING).value();
String contentEncoding = responseHeaders.get(CONTENT_ENCODING).get();
if (encodingSupport.contentDecodingSupported(contentEncoding)) {
return encodingSupport.decoder(contentEncoding);
} else {
Expand All @@ -196,7 +231,7 @@ private ContentDecoder contentDecoder(ClientResponseHeaders responseHeaders) {
return ContentDecoder.NO_OP;
}

protected Http2Headers prepareHeaders(Method method, ClientRequestHeaders headers, ClientUri uri) {
protected static Http2Headers prepareHeaders(Method method, ClientRequestHeaders headers, ClientUri uri) {
Http2Headers h2Headers = Http2Headers.create(headers);
h2Headers.method(method);
h2Headers.path(uri.pathWithQueryAndFragment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ protected WebClientServiceResponse doProceed(WebClientServiceRequest serviceRequ
entityBytes = entityBytes(entity, headers);
}

headers.set(HeaderValues.create(HeaderNames.CONTENT_LENGTH, entityBytes.length));
if (!clientRequest().outputStreamRedirect()) {
headers.set(HeaderValues.create(HeaderNames.CONTENT_LENGTH, entityBytes.length));
}

ClientUri uri = serviceRequest.uri();

Http2Headers http2Headers = prepareHeaders(serviceRequest.method(), headers, uri);

stream.writeHeaders(http2Headers, entityBytes.length == 0);
stream.writeHeaders(http2Headers, !clientRequest().outputStreamRedirect() && entityBytes.length == 0);
stream.flowControl().inbound().incrementWindowSize(clientRequest().requestPrefetch());
whenSent.complete(serviceRequest);

Expand Down
Loading

0 comments on commit aa1e1d8

Please sign in to comment.