diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java index 1424b392af8e7..c066f3edf6900 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/opensearch/http/netty4/Netty4Http2IT.java @@ -9,17 +9,21 @@ package org.opensearch.http.netty4; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.ReferenceCounted; import org.opensearch.OpenSearchNetty4IntegTestCase; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.transport.TransportAddress; import org.opensearch.http.HttpServerTransport; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; import java.util.Collection; +import java.util.List; import java.util.Locale; import java.util.stream.IntStream; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; @@ -31,7 +35,7 @@ protected boolean addMockHttpTransport() { return false; // enable http } - public void testThatNettyHttpServerSupportsHttp2() throws Exception { + public void testThatNettyHttpServerSupportsHttp2GetUpgrades() throws Exception { String[] requests = new String[] { "/", "/_nodes/stats", "/", "/_cluster/state", "/" }; HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); @@ -44,19 +48,42 @@ public void testThatNettyHttpServerSupportsHttp2() throws Exception { assertThat(responses, hasSize(5)); Collection opaqueIds = Netty4HttpClient.returnOpaqueIds(responses); - assertOpaqueIdsInAnyOrder(opaqueIds); + assertOpaqueIdsInAnyOrder(5, opaqueIds); } finally { responses.forEach(ReferenceCounted::release); } } } - private void assertOpaqueIdsInAnyOrder(Collection opaqueIds) { + public void testThatNettyHttpServerSupportsHttp2PostUpgrades() throws Exception { + final List> requests = List.of(Tuple.tuple("/_search", "{\"query\":{ \"match_all\":{}}}")); + + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); + TransportAddress transportAddress = randomFrom(boundAddresses); + + try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) { + Collection responses = nettyHttpClient.post(transportAddress.address(), requests); + try { + assertThat(responses, hasSize(1)); + + for (FullHttpResponse response : responses) { + assertThat(response.getStatus(), equalTo(HttpResponseStatus.OK)); + } + + Collection opaqueIds = Netty4HttpClient.returnOpaqueIds(responses); + String msg = String.format(Locale.ROOT, "Expected opaque id [0], got [%s]", opaqueIds); + assertOpaqueIdsInAnyOrder(1, opaqueIds); + } finally { + responses.forEach(ReferenceCounted::release); + } + } + } + + private void assertOpaqueIdsInAnyOrder(int expected, Collection opaqueIds) { // check if opaque ids are present in any order, since for HTTP/2 we use streaming (no head of line blocking) // and responses may come back at any order - int i = 0; - String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be in any order, got [%s]", opaqueIds); - assertThat(msg, opaqueIds, containsInAnyOrder(IntStream.range(0, 5).mapToObj(Integer::toString).toArray())); + assertThat(opaqueIds, containsInAnyOrder(IntStream.range(0, expected).mapToObj(Integer::toString).toArray())); } } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 2074279a99f79..124bc02527bd1 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -397,7 +397,11 @@ public UpgradeCodec newUpgradeCodec(CharSequence protocol) { handlingSettings.getMaxChunkSize() ); - final HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory); + final HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler( + sourceCodec, + upgradeCodecFactory, + handlingSettings.getMaxContentLength() + ); final CleartextHttp2ServerUpgradeHandler cleartextUpgradeHandler = new CleartextHttp2ServerUpgradeHandler( sourceCodec, upgradeHandler, diff --git a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java index 6fdd698c117f2..cad2e50327023 100644 --- a/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/opensearch/http/netty4/Netty4HttpClient.java @@ -176,13 +176,15 @@ private List processRequestsWithBody( List> urisAndBodies ) throws InterruptedException { List requests = new ArrayList<>(urisAndBodies.size()); - for (Tuple uriAndBody : urisAndBodies) { + for (int i = 0; i < urisAndBodies.size(); ++i) { + final Tuple uriAndBody = urisAndBodies.get(i); ByteBuf content = Unpooled.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8); HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uriAndBody.v1(), content); request.headers().add(HttpHeaderNames.HOST, "localhost"); request.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); request.headers().add(HttpHeaderNames.CONTENT_TYPE, "application/json"); request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http"); + request.headers().add("X-Opaque-ID", String.valueOf(i)); requests.add(request); } return sendRequests(remoteAddress, requests); @@ -211,7 +213,7 @@ private synchronized List sendRequests(final SocketAddress rem } finally { if (channelFuture != null) { - channelFuture.channel().close().sync(); + channelFuture.channel().close().awaitUninterruptibly(); } } @@ -368,7 +370,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { request.headers().add(HttpHeaderNames.HOST, "localhost"); request.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http"); - ctx.channel().attr(AttributeKey.newInstance("upgrade")).set(true); + ctx.channel().attr(AttributeKey.valueOf("upgrade")).set(true); ctx.writeAndFlush(request); ctx.fireChannelActive();