diff --git a/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldAsyncResponseTest.java b/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldAsyncResponseTest.java index 3ee46dbc442..fc367121a80 100644 --- a/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldAsyncResponseTest.java +++ b/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldAsyncResponseTest.java @@ -19,10 +19,11 @@ import java.time.Duration; import java.util.Map; -import java.util.Optional; import java.util.SortedMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.LongStream; import javax.inject.Inject; @@ -33,8 +34,7 @@ import io.helidon.microprofile.tests.junit5.AddConfig; import io.helidon.microprofile.tests.junit5.HelidonTest; -import io.helidon.webserver.KeyPerformanceIndicatorSupport; -import org.eclipse.microprofile.metrics.ConcurrentGauge; +import io.helidon.webserver.ServerResponse; import org.eclipse.microprofile.metrics.MetricID; import org.eclipse.microprofile.metrics.MetricRegistry; import org.eclipse.microprofile.metrics.SimpleTimer; @@ -80,7 +80,7 @@ public class HelloWorldAsyncResponseTest { @Test public void test() throws NoSuchMethodException { MetricID metricID = MetricsCdiExtension.syntheticSimpleTimerMetricID(HelloWorldResource.class.getMethod("slowMessage", - AsyncResponse.class)); + AsyncResponse.class, ServerResponse.class)); SortedMap simpleTimers = registry.getSimpleTimers(); @@ -117,7 +117,7 @@ public void test() throws NoSuchMethodException { */ assertThat("Mismatched string result", result, is(HelloWorldResource.SLOW_RESPONSE)); - Duration minDuration = Duration.ofSeconds(HelloWorldResource.SLOW_DELAY_SECS); + Duration minDuration = Duration.ofMillis(HelloWorldResource.SLOW_DELAY_MS); assertThat("Change in count for explicit SimpleTimer", explicitSimpleTimer.getCount() - explicitSimpleTimerCountBefore, is(1L)); @@ -163,30 +163,44 @@ SimpleTimer getSyntheticSimpleTimer() { @Test void testInflightRequests() throws InterruptedException, ExecutionException { - Optional inflightRequests = - vendorRegistry.getConcurrentGauges((metricID, metric) -> metricID.getName().endsWith("inFlight")) - .values() - .stream() - .findAny(); - assertThat("In-flight requests ConcurrentGauge is present", inflightRequests.isPresent(),is(true)); - - long beforeRequest = inflightRequests.get().getCount(); - HelloWorldResource.initSlowRequest(); - - Future future = - webTarget - .path("helloworld/slow") - .request(MediaType.TEXT_PLAIN_TYPE) - .async() - .get(String.class); - HelloWorldResource.awaitSlowRequestStarted(); - long duringRequest = inflightRequests.get().getCount(); - String response = future.get(); - long afterRequest = inflightRequests.get().getCount(); - - assertThat("Slow response", response, containsString(SLOW_RESPONSE)); - assertThat("Change in inflight during slow request", duringRequest - beforeRequest, is(1L)); - assertThat("Change in inflight after slow request completes", afterRequest - beforeRequest, is(0L)); + assertThat("In-flight requests ConcurrentGauge is present", HelloWorldResource.inflightRequests(vendorRegistry).isPresent(), + is(true)); + + int testCount = Integer.getInteger("helidon.microprofile.metrics.asyncRepeatCount", 1); + for (int i = 0; i < testCount; i++) { + long beforeRequest = inflightRequestsCount(); + HelloWorldResource.initSlowRequest(); + + // The request processing will start but then stall, waiting until after this test fetches the "duringRequest" + // value of inflightRequests. + Future future = + webTarget + .path("helloworld/slow") + .request(MediaType.TEXT_PLAIN_TYPE) + .async() + .get(String.class); + + HelloWorldResource.awaitSlowRequestStarted(); + long duringRequest = inflightRequestsCount(); + + HelloWorldResource.reportDuringRequestFetched(); + + String response = future.get(); + + // The response might arrive here before the server-side code which updates the inflight metric has run. So wait. + HelloWorldResource.awaitResponseSent(); + long afterRequest = inflightRequestsCount(); + + assertThat("Slow response", response, containsString(SLOW_RESPONSE)); + assertThat("Change in inflight from before (" + beforeRequest + ") to during (" + duringRequest + + ") the slow request", duringRequest - beforeRequest, is(1L)); + assertThat("Change in inflight from before (" + beforeRequest + ") to after (" + afterRequest + + ") the slow request", afterRequest - beforeRequest, is(0L)); + } + } + + private long inflightRequestsCount() { + return HelloWorldResource.inflightRequestsCount(vendorRegistry); } @Test diff --git a/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldResource.java b/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldResource.java index 4c17efd128c..fc81d4044b0 100644 --- a/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldResource.java +++ b/microprofile/metrics/src/test/java/io/helidon/microprofile/metrics/HelloWorldResource.java @@ -15,9 +15,10 @@ */ package io.helidon.microprofile.metrics; +import io.helidon.webserver.ServerResponse; import org.eclipse.microprofile.metrics.MetricRegistry; -import org.eclipse.microprofile.metrics.annotation.ConcurrentGauge; import org.eclipse.microprofile.metrics.annotation.Counted; +import org.eclipse.microprofile.metrics.annotation.RegistryType; import org.eclipse.microprofile.metrics.annotation.SimplyTimed; import org.eclipse.microprofile.metrics.annotation.Timed; @@ -25,25 +26,23 @@ import javax.inject.Inject; import javax.json.Json; import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.ws.rs.Consumes; import javax.ws.rs.GET; -import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.ext.ExceptionMapper; -import javax.ws.rs.ext.Provider; import java.util.Collections; +import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; /** * HelloWorldResource class. @@ -53,10 +52,12 @@ @Counted public class HelloWorldResource { + private static final Logger LOGGER = Logger.getLogger(HelloWorldResource.class.getName()); + static final String SLOW_RESPONSE = "At last"; // In case pipeline runs need a different time - static final int SLOW_DELAY_SECS = Integer.getInteger("helidon.asyncSimplyTimedDelaySeconds", 2); + static final int SLOW_DELAY_MS = Integer.getInteger("helidon.microprofile.metrics.asyncSimplyTimedDelayMS", 2 * 1000); static final String MESSAGE_SIMPLE_TIMER = "messageSimpleTimer"; static final String SLOW_MESSAGE_TIMER = "slowMessageTimer"; @@ -67,18 +68,45 @@ public class HelloWorldResource { private static ExecutorService executorService = Executors.newSingleThreadExecutor(); static CountDownLatch slowRequestInProgress = null; + static CountDownLatch slowRequestInProgressDataCaptured = null; + static CountDownLatch slowRequestResponseSent = null; static void initSlowRequest() { slowRequestInProgress = new CountDownLatch(1); + slowRequestInProgressDataCaptured = new CountDownLatch(1); + slowRequestResponseSent = new CountDownLatch(1); } static void awaitSlowRequestStarted() throws InterruptedException { slowRequestInProgress.await(); } + static void reportDuringRequestFetched() { + slowRequestInProgressDataCaptured.countDown(); + } + + static void awaitResponseSent() throws InterruptedException { + slowRequestResponseSent.await(); + } + + static Optional inflightRequests(MetricRegistry metricRegistry) { + return metricRegistry.getConcurrentGauges((metricID, metric) -> metricID.getName().endsWith("inFlight")) + .values() + .stream() + .findAny(); + } + + static long inflightRequestsCount(MetricRegistry metricRegistry) { + return inflightRequests(metricRegistry).get().getCount(); + } + @Inject MetricRegistry metricRegistry; + @Inject + @RegistryType(type = MetricRegistry.Type.VENDOR) + private MetricRegistry vendorRegistry; + public HelloWorldResource() { } @@ -104,18 +132,33 @@ public String messageWithArg(@PathParam("name") String input){ @Produces(MediaType.TEXT_PLAIN) @SimplyTimed(name = SLOW_MESSAGE_SIMPLE_TIMER, absolute = true) @Timed(name = SLOW_MESSAGE_TIMER, absolute = true) - public void slowMessage(@Suspended AsyncResponse ar) { - if (slowRequestInProgress != null) { - slowRequestInProgress.countDown(); + public void slowMessage(@Suspended AsyncResponse ar, @Context ServerResponse serverResponse) { + if (slowRequestInProgress == null) { + ar.resume(new RuntimeException("slowRequestInProgress was unexpectedly null")); + return; } + serverResponse.whenSent() + .thenAccept(r -> slowRequestResponseSent.countDown()); + + long uponEntry = inflightRequestsCount(); + + slowRequestInProgress.countDown(); executorService.execute(() -> { try { - TimeUnit.SECONDS.sleep(SLOW_DELAY_SECS); + long inAsyncExec = inflightRequestsCount(); + TimeUnit.MILLISECONDS.sleep(SLOW_DELAY_MS); + slowRequestInProgressDataCaptured.await(); + if (!ar.resume(SLOW_RESPONSE)) { + throw new RuntimeException("Error resuming asynchronous response: not in suspended state"); + } + long afterResume = inflightRequestsCount(); + LOGGER.log(Level.FINE, + "inAsyncExec: " + inAsyncExec + ", afterResume: " + afterResume); } catch (InterruptedException e) { - // absorb silently + throw new RuntimeException("Async test /slow wait was interrupted", e); } - ar.resume(SLOW_RESPONSE); }); + LOGGER.log(Level.FINE, "uponEntry: " + uponEntry + ", beforeReturn: " + inflightRequestsCount()); } @GET @@ -126,11 +169,13 @@ public void slowMessage(@Suspended AsyncResponse ar) { public void slowMessageWithArg(@PathParam("name") String input, @Suspended AsyncResponse ar) { executorService.execute(() -> { try { - TimeUnit.SECONDS.sleep(SLOW_DELAY_SECS); + TimeUnit.MILLISECONDS.sleep(SLOW_DELAY_MS); + if (!ar.resume(SLOW_RESPONSE + " " + input)) { + throw new RuntimeException("Error resuming asynchronous response: not in suspended state"); + } } catch (InterruptedException e) { - // absorb silently + throw new RuntimeException("Async test /slowWithArg{name} was interrupted", e); } - ar.resume(SLOW_RESPONSE + " " + input); }); } @@ -140,4 +185,24 @@ public void slowMessageWithArg(@PathParam("name") String input, @Suspended Async public String testDeletedMetric() { return "Hello there"; } + + @GET + @Path("/error") + @Produces(MediaType.TEXT_PLAIN) + public void triggerAsyncError(@Suspended AsyncResponse ar) { + executorService.execute(() -> { + try { + TimeUnit.MILLISECONDS.sleep(SLOW_DELAY_MS); + if (!ar.resume(new Exception("Expected execption"))) { + throw new RuntimeException("Error resuming asynchronous response: not in suspended state"); + } + } catch (InterruptedException e) { + throw new RuntimeException("Error test /error was interrupted"); + } + }); + } + + private long inflightRequestsCount() { + return inflightRequestsCount(vendorRegistry); + } }