Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async test checking KPI metrics #3347

Merged
merged 1 commit into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.LongStream;

import javax.inject.Inject;
Expand All @@ -33,8 +34,7 @@
import io.helidon.microprofile.tests.junit5.AddConfig;
import io.helidon.microprofile.tests.junit5.HelidonTest;

import io.helidon.webserver.KeyPerformanceIndicatorSupport;
import org.eclipse.microprofile.metrics.ConcurrentGauge;
import io.helidon.webserver.ServerResponse;
import org.eclipse.microprofile.metrics.MetricID;
import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.SimpleTimer;
Expand Down Expand Up @@ -80,7 +80,7 @@ public class HelloWorldAsyncResponseTest {
@Test
public void test() throws NoSuchMethodException {
MetricID metricID = MetricsCdiExtension.syntheticSimpleTimerMetricID(HelloWorldResource.class.getMethod("slowMessage",
AsyncResponse.class));
AsyncResponse.class, ServerResponse.class));

SortedMap<MetricID, SimpleTimer> simpleTimers = registry.getSimpleTimers();

Expand Down Expand Up @@ -117,7 +117,7 @@ public void test() throws NoSuchMethodException {
*/
assertThat("Mismatched string result", result, is(HelloWorldResource.SLOW_RESPONSE));

Duration minDuration = Duration.ofSeconds(HelloWorldResource.SLOW_DELAY_SECS);
Duration minDuration = Duration.ofMillis(HelloWorldResource.SLOW_DELAY_MS);

assertThat("Change in count for explicit SimpleTimer",
explicitSimpleTimer.getCount() - explicitSimpleTimerCountBefore, is(1L));
Expand Down Expand Up @@ -163,30 +163,44 @@ SimpleTimer getSyntheticSimpleTimer() {

@Test
void testInflightRequests() throws InterruptedException, ExecutionException {
Optional<ConcurrentGauge> inflightRequests =
vendorRegistry.getConcurrentGauges((metricID, metric) -> metricID.getName().endsWith("inFlight"))
.values()
.stream()
.findAny();
assertThat("In-flight requests ConcurrentGauge is present", inflightRequests.isPresent(),is(true));

long beforeRequest = inflightRequests.get().getCount();
HelloWorldResource.initSlowRequest();

Future<String> future =
webTarget
.path("helloworld/slow")
.request(MediaType.TEXT_PLAIN_TYPE)
.async()
.get(String.class);
HelloWorldResource.awaitSlowRequestStarted();
long duringRequest = inflightRequests.get().getCount();
String response = future.get();
long afterRequest = inflightRequests.get().getCount();

assertThat("Slow response", response, containsString(SLOW_RESPONSE));
assertThat("Change in inflight during slow request", duringRequest - beforeRequest, is(1L));
assertThat("Change in inflight after slow request completes", afterRequest - beforeRequest, is(0L));
assertThat("In-flight requests ConcurrentGauge is present", HelloWorldResource.inflightRequests(vendorRegistry).isPresent(),
is(true));

int testCount = Integer.getInteger("helidon.microprofile.metrics.asyncRepeatCount", 1);
for (int i = 0; i < testCount; i++) {
long beforeRequest = inflightRequestsCount();
HelloWorldResource.initSlowRequest();

// The request processing will start but then stall, waiting until after this test fetches the "duringRequest"
// value of inflightRequests.
Future<String> future =
webTarget
.path("helloworld/slow")
.request(MediaType.TEXT_PLAIN_TYPE)
.async()
.get(String.class);

HelloWorldResource.awaitSlowRequestStarted();
long duringRequest = inflightRequestsCount();

HelloWorldResource.reportDuringRequestFetched();

String response = future.get();

// The response might arrive here before the server-side code which updates the inflight metric has run. So wait.
HelloWorldResource.awaitResponseSent();
long afterRequest = inflightRequestsCount();

assertThat("Slow response", response, containsString(SLOW_RESPONSE));
assertThat("Change in inflight from before (" + beforeRequest + ") to during (" + duringRequest
+ ") the slow request", duringRequest - beforeRequest, is(1L));
assertThat("Change in inflight from before (" + beforeRequest + ") to after (" + afterRequest
+ ") the slow request", afterRequest - beforeRequest, is(0L));
}
}

private long inflightRequestsCount() {
return HelloWorldResource.inflightRequestsCount(vendorRegistry);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,34 @@
*/
package io.helidon.microprofile.metrics;

import io.helidon.webserver.ServerResponse;
import org.eclipse.microprofile.metrics.MetricRegistry;
import org.eclipse.microprofile.metrics.annotation.ConcurrentGauge;
import org.eclipse.microprofile.metrics.annotation.Counted;
import org.eclipse.microprofile.metrics.annotation.RegistryType;
import org.eclipse.microprofile.metrics.annotation.SimplyTimed;
import org.eclipse.microprofile.metrics.annotation.Timed;

import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.json.Json;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* HelloWorldResource class.
Expand All @@ -53,10 +52,12 @@
@Counted
public class HelloWorldResource {

private static final Logger LOGGER = Logger.getLogger(HelloWorldResource.class.getName());

static final String SLOW_RESPONSE = "At last";

// In case pipeline runs need a different time
static final int SLOW_DELAY_SECS = Integer.getInteger("helidon.asyncSimplyTimedDelaySeconds", 2);
static final int SLOW_DELAY_MS = Integer.getInteger("helidon.microprofile.metrics.asyncSimplyTimedDelayMS", 2 * 1000);

static final String MESSAGE_SIMPLE_TIMER = "messageSimpleTimer";
static final String SLOW_MESSAGE_TIMER = "slowMessageTimer";
Expand All @@ -67,18 +68,45 @@ public class HelloWorldResource {
private static ExecutorService executorService = Executors.newSingleThreadExecutor();

static CountDownLatch slowRequestInProgress = null;
static CountDownLatch slowRequestInProgressDataCaptured = null;
static CountDownLatch slowRequestResponseSent = null;

static void initSlowRequest() {
slowRequestInProgress = new CountDownLatch(1);
slowRequestInProgressDataCaptured = new CountDownLatch(1);
slowRequestResponseSent = new CountDownLatch(1);
}

static void awaitSlowRequestStarted() throws InterruptedException {
slowRequestInProgress.await();
}

static void reportDuringRequestFetched() {
slowRequestInProgressDataCaptured.countDown();
}

static void awaitResponseSent() throws InterruptedException {
slowRequestResponseSent.await();
}

static Optional<org.eclipse.microprofile.metrics.ConcurrentGauge> inflightRequests(MetricRegistry metricRegistry) {
return metricRegistry.getConcurrentGauges((metricID, metric) -> metricID.getName().endsWith("inFlight"))
.values()
.stream()
.findAny();
}

static long inflightRequestsCount(MetricRegistry metricRegistry) {
return inflightRequests(metricRegistry).get().getCount();
}

@Inject
MetricRegistry metricRegistry;

@Inject
@RegistryType(type = MetricRegistry.Type.VENDOR)
private MetricRegistry vendorRegistry;

public HelloWorldResource() {

}
Expand All @@ -104,18 +132,33 @@ public String messageWithArg(@PathParam("name") String input){
@Produces(MediaType.TEXT_PLAIN)
@SimplyTimed(name = SLOW_MESSAGE_SIMPLE_TIMER, absolute = true)
@Timed(name = SLOW_MESSAGE_TIMER, absolute = true)
public void slowMessage(@Suspended AsyncResponse ar) {
if (slowRequestInProgress != null) {
slowRequestInProgress.countDown();
public void slowMessage(@Suspended AsyncResponse ar, @Context ServerResponse serverResponse) {
if (slowRequestInProgress == null) {
ar.resume(new RuntimeException("slowRequestInProgress was unexpectedly null"));
return;
}
serverResponse.whenSent()
.thenAccept(r -> slowRequestResponseSent.countDown());

long uponEntry = inflightRequestsCount();

slowRequestInProgress.countDown();
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(SLOW_DELAY_SECS);
long inAsyncExec = inflightRequestsCount();
TimeUnit.MILLISECONDS.sleep(SLOW_DELAY_MS);
slowRequestInProgressDataCaptured.await();
if (!ar.resume(SLOW_RESPONSE)) {
throw new RuntimeException("Error resuming asynchronous response: not in suspended state");
}
long afterResume = inflightRequestsCount();
LOGGER.log(Level.FINE,
"inAsyncExec: " + inAsyncExec + ", afterResume: " + afterResume);
} catch (InterruptedException e) {
// absorb silently
throw new RuntimeException("Async test /slow wait was interrupted", e);
}
ar.resume(SLOW_RESPONSE);
});
LOGGER.log(Level.FINE, "uponEntry: " + uponEntry + ", beforeReturn: " + inflightRequestsCount());
}

@GET
Expand All @@ -126,11 +169,13 @@ public void slowMessage(@Suspended AsyncResponse ar) {
public void slowMessageWithArg(@PathParam("name") String input, @Suspended AsyncResponse ar) {
executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(SLOW_DELAY_SECS);
TimeUnit.MILLISECONDS.sleep(SLOW_DELAY_MS);
if (!ar.resume(SLOW_RESPONSE + " " + input)) {
throw new RuntimeException("Error resuming asynchronous response: not in suspended state");
}
} catch (InterruptedException e) {
// absorb silently
throw new RuntimeException("Async test /slowWithArg{name} was interrupted", e);
}
ar.resume(SLOW_RESPONSE + " " + input);
});
}

Expand All @@ -140,4 +185,24 @@ public void slowMessageWithArg(@PathParam("name") String input, @Suspended Async
public String testDeletedMetric() {
return "Hello there";
}

@GET
@Path("/error")
@Produces(MediaType.TEXT_PLAIN)
public void triggerAsyncError(@Suspended AsyncResponse ar) {
executorService.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(SLOW_DELAY_MS);
if (!ar.resume(new Exception("Expected execption"))) {
throw new RuntimeException("Error resuming asynchronous response: not in suspended state");
}
} catch (InterruptedException e) {
throw new RuntimeException("Error test /error was interrupted");
}
});
}

private long inflightRequestsCount() {
return inflightRequestsCount(vendorRegistry);
}
}