Skip to content

Commit

Permalink
Parameterize on the HTTP server
Browse files Browse the repository at this point in the history
  • Loading branch information
vy committed Jan 22, 2025
1 parent 8716e71 commit 8332442
Showing 1 changed file with 95 additions and 94 deletions.
189 changes: 95 additions & 94 deletions test/jdk/java/net/httpclient/HttpResponseLimitingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@
/*
* @test
* @bug 8328919
* @summary verifies `limiting()` behaviour in `HttpResponse.Body{Handlers,Subscribers}`
* @summary tests `limiting()` in `HttpResponse.Body{Handlers,Subscribers}`
* @library /test/lib
* /test/jdk/java/net/httpclient/lib
* @build jdk.test.lib.net.SimpleSSLContext
* jdk.httpclient.test.lib.common.HttpServerAdapters
* @run junit HttpResponseLimitingTest
*/

import org.junit.jupiter.api.AfterAll;
import jdk.httpclient.test.lib.common.HttpServerAdapters;
import jdk.test.lib.net.SimpleSSLContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.BodySubscribers;
Expand All @@ -59,6 +59,7 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow.Subscription;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Arrays.copyOfRange;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand All @@ -73,66 +74,115 @@ class HttpResponseLimitingTest {

private static final Charset CHARSET = StandardCharsets.UTF_8;

private static final HttpServer SERVER = new HttpServer();

private static final HttpRequest REQUEST = HttpRequest
.newBuilder(URI.create("http://localhost:" + SERVER.socket.getLocalPort()))
.timeout(Duration.ofSeconds(5))
.build();

private static final HttpClient CLIENT = HttpClient
.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();

@AfterAll
static void tearDown() throws Exception {
CLIENT.close();
SERVER.close();
}
private static final byte[] RESPONSE_BODY = "random non-empty body".getBytes(CHARSET);

@ParameterizedTest
@MethodSource("sufficientCapacities")
void testSuccessOnSufficientCapacity(long sufficientCapacity) throws Exception {
BodyHandler<byte[]> handler =
BodyHandlers.limiting(BodyHandlers.ofByteArray(), sufficientCapacity);
HttpResponse<byte[]> response = CLIENT.send(REQUEST, handler);
assertArrayEquals(HttpServer.RESPONSE_BODY, response.body());
void testSuccessOnSufficientCapacity(HttpClient.Version version, boolean secure, long sufficientCapacity) throws Exception {
HttpResponse<byte[]> response = requestBytes(version, secure, sufficientCapacity);
assertArrayEquals(RESPONSE_BODY, response.body());
}

static long[] sufficientCapacities() {
return new long[]{Long.MAX_VALUE, HttpServer.RESPONSE_BODY.length};
static Arguments[] sufficientCapacities() {
return capacityArgs(Long.MAX_VALUE, RESPONSE_BODY.length);
}

@ParameterizedTest
@MethodSource("insufficientCapacities")
void testFailureOnInsufficientCapacity(long insufficientCapacity) {
void testFailureOnInsufficientCapacity(HttpClient.Version version, boolean secure, long insufficientCapacity) {
assertThrows(
IOException.class,
() -> {
BodyHandler<byte[]> handler =
BodyHandlers.limiting(BodyHandlers.ofByteArray(), insufficientCapacity);
CLIENT.send(REQUEST, handler);
() -> requestBytes(version, secure, insufficientCapacity),
"body exceeds capacity: " + RESPONSE_BODY.length);
}

static Arguments[] insufficientCapacities() {
return capacityArgs(0, RESPONSE_BODY.length - 1);
}

private static Arguments[] capacityArgs(long... capacities) {
return Stream
.of(HttpClient.Version.HTTP_1_1, HttpClient.Version.HTTP_2)
.flatMap(version -> Stream
.of(true, false)
.flatMap(secure -> Arrays
.stream(capacities)
.mapToObj(capacity -> Arguments.of(version, secure, capacity))))
.toArray(Arguments[]::new);
}

private static HttpResponse<byte[]> requestBytes(
HttpClient.Version version,
boolean secure,
long capacity)
throws Exception {

// Create the server and the request URI
SSLContext sslContext;
HttpServerAdapters.HttpTestServer server;
String handlerPath = "/";
URI requestUri;
if (secure) {
sslContext = new SimpleSSLContext().get();
server = HttpServerAdapters.HttpTestServer.create(version, sslContext);
requestUri = URI.create("https://" + server.serverAuthority() + handlerPath);
} else {
sslContext = null;
server = HttpServerAdapters.HttpTestServer.create(version);
requestUri = URI.create("http://" + server.serverAuthority() + handlerPath);
}

// Register the request handler
server.addHandler(
(exchange) -> {
exchange.sendResponseHeaders(200, RESPONSE_BODY.length);
try (var outputStream = exchange.getResponseBody()) {
outputStream.write(RESPONSE_BODY);
}
exchange.close();
},
"body exceeds capacity: " + HttpServer.RESPONSE_BODY.length);
handlerPath);

// Start the server and the client
server.start();
try (var client = createClient(sslContext)) {

// Issue the request
var request = HttpRequest
.newBuilder(requestUri)
.timeout(Duration.ofSeconds(5))
.build();
var handler = BodyHandlers.limiting(BodyHandlers.ofByteArray(), capacity);
return client.send(request, handler);

} finally {
server.stop();
}

}

static long[] insufficientCapacities() {
return new long[]{0, HttpServer.RESPONSE_BODY.length - 1};
private static HttpClient createClient(SSLContext sslContext) {
HttpClient.Builder builder = HttpClient
.newBuilder()
.connectTimeout(Duration.ofSeconds(5));
if (sslContext != null) {
builder.sslContext(sslContext);
}
return builder.build();
}

@Test
void testSubscriberForCompleteConsumption() {

// Create the subscriber (with sufficient capacity)
ObserverSubscriber downstreamSubscriber = new ObserverSubscriber();
int sufficientCapacity = HttpServer.RESPONSE_BODY.length;
int sufficientCapacity = RESPONSE_BODY.length;
BodySubscriber<String> subscriber = BodySubscribers.limiting(downstreamSubscriber, sufficientCapacity);

// Emit values
subscriber.onSubscribe(DummySubscription.INSTANCE);
byte[] responseBodyPart1 = {HttpServer.RESPONSE_BODY[0]};
byte[] responseBodyPart2 = copyOfRange(HttpServer.RESPONSE_BODY, 1, HttpServer.RESPONSE_BODY.length);
byte[] responseBodyPart1 = {RESPONSE_BODY[0]};
byte[] responseBodyPart2 = copyOfRange(RESPONSE_BODY, 1, RESPONSE_BODY.length);
List<ByteBuffer> buffers = toByteBuffers(responseBodyPart1, responseBodyPart2);
subscriber.onNext(buffers);

Expand All @@ -153,8 +203,8 @@ void testSubscriberForFailureOnExcess() {

// Emit values
subscriber.onSubscribe(DummySubscription.INSTANCE);
byte[] responseBodyPart1 = {HttpServer.RESPONSE_BODY[0]};
byte[] responseBodyPart2 = copyOfRange(HttpServer.RESPONSE_BODY, 1, HttpServer.RESPONSE_BODY.length);
byte[] responseBodyPart1 = {RESPONSE_BODY[0]};
byte[] responseBodyPart2 = copyOfRange(RESPONSE_BODY, 1, RESPONSE_BODY.length);
List<ByteBuffer> buffers = toByteBuffers(responseBodyPart1, responseBodyPart2);
subscriber.onNext(buffers);

Expand All @@ -172,55 +222,6 @@ private static List<ByteBuffer> toByteBuffers(byte[]... buffers) {
return Arrays.stream(buffers).map(ByteBuffer::wrap).collect(Collectors.toList());
}

/**
* An HTTP server always returning a fixed response containing a non-empty body.
*/
private static final class HttpServer implements Runnable, AutoCloseable {

private static final byte[] RESPONSE_BODY = "random non-empty body".getBytes(CHARSET);

private static final byte[] RESPONSE = (
"HTTP/1.1 200 OK\r\n" +
"Content-Length: " + RESPONSE_BODY.length + "\r\n" +
"\r\n" +
new String(RESPONSE_BODY, CHARSET))
.getBytes(CHARSET);

private final ServerSocket socket;

private final Thread thread;

private HttpServer() {
try {
this.socket = new ServerSocket(0, 0, InetAddress.getLoopbackAddress());
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
this.thread = new Thread(this);
thread.setDaemon(true); // Avoid blocking JVM exit
thread.start();
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try (Socket clientSocket = socket.accept();
OutputStream outputStream = clientSocket.getOutputStream()) {
outputStream.write(RESPONSE);
} catch (IOException _) {
// Do nothing
}
}
}

@Override
public void close() throws Exception {
socket.close();
thread.interrupt();
}

}

private static final class ObserverSubscriber implements BodySubscriber<String> {

private List<ByteBuffer> lastBuffers;
Expand Down

0 comments on commit 8332442

Please sign in to comment.