From 356cfac297587136d01f015dfd7f77a6fd5fb1da Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Fri, 24 Jan 2025 11:28:16 +0100 Subject: [PATCH 1/2] Improve the error message thrown when a blocking call is called on Vert.x event loop thread --- .../deployment/test/Client3xx4xx5xxTest.java | 4 ++-- .../http/client/VertxHttpClientHTTPConduit.java | 17 ++++++++++------- .../it/vertx/async/AsyncVertxClientTest.java | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java index aca85946a..12c3f43ef 100644 --- a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/Client3xx4xx5xxTest.java @@ -187,7 +187,7 @@ void wsdlUri200OnEventLoop() throws InterruptedException { .then() .statusCode(500) .body(CoreMatchers.containsString( - "java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread.")); + "java.lang.IllegalStateException You have attempted to perform a blocking service method call on Vert.x event loop thread with CXF client wsdlUri200.")); } } @@ -223,7 +223,7 @@ void endpointUri404OnEventLoop() throws InterruptedException { .then() .statusCode(500) .body(CoreMatchers.containsString( - "java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread.")); + "java.lang.IllegalStateException You have attempted to perform a blocking service method call on Vert.x event loop thread with CXF client endpointUri404.")); } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java index f1096e70c..3f7d4af0a 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/vertx/http/client/VertxHttpClientHTTPConduit.java @@ -143,13 +143,16 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic final boolean isAsync = useAsync.isAsync(message); message.put(USE_ASYNC, isAsync); - if (!isAsync && !BlockingOperationControl.isBlockingAllowed()) { - throw new IllegalStateException("You have attempted to perform a blocking operation on an IO thread." - + " This is not allowed, as blocking the IO thread will cause major performance issues with your application." - + " You need to offload the blocking CXF client call to a worker thread," - + " e.g. by using the @io.smallrye.common.annotation.Blocking annotation on a caller method" - + " where it is supported by the underlying Quarkus extension, such as quarkus-rest, quarkus-vertx," - + " quarkus-reactive-routes, quarkus-grpc, quarkus-messaging-* and possibly others."); + final boolean blockingAllowed = BlockingOperationControl.isBlockingAllowed(); + if (!isAsync && !blockingAllowed) { + throw new IllegalStateException( + "You have attempted to perform a blocking service method call on Vert.x event loop thread with CXF client " + + clientInfo.getConfigKey() + "." + + " This is not allowed, as blocking the IO thread will cause major performance issues with your application." + + " You need to offload the blocking CXF client call to a worker thread," + + " e.g. by using the @io.smallrye.common.annotation.Blocking annotation on a caller method" + + " where it is supported by the underlying Quarkus extension, such as quarkus-rest, quarkus-vertx," + + " quarkus-reactive-routes, quarkus-grpc, quarkus-messaging-* and possibly others."); } final HttpVersion version = getVersion(message, csPolicy); diff --git a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java index c2d0f7344..bb341b8c0 100644 --- a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java +++ b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/vertx/async/AsyncVertxClientTest.java @@ -33,7 +33,7 @@ void helloWithWsdl(String payloadSize) { .then() .statusCode(500) .body(CoreMatchers.containsString( - "You have attempted to perform a blocking operation on an IO thread.")); + "You have attempted to perform a blocking service method call on Vert.x event loop thread")); } From b0df31b9ab5da6ee9d4d11ab5bfff4b3eb746398 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Fri, 24 Jan 2025 11:28:44 +0100 Subject: [PATCH 2/2] Make RedirectTest more robust --- .../RetransmitCacheServiceImpl.java | 84 ++++---- .../cxf/it/redirect/RedirectTest.java | 187 +++++++++++------- 2 files changed, 155 insertions(+), 116 deletions(-) diff --git a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java index 3fa39b3c8..fd110640d 100644 --- a/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java +++ b/integration-tests/client-server/src/main/java/io/quarkiverse/cxf/it/redirect/retransmitcache/RetransmitCacheServiceImpl.java @@ -7,6 +7,7 @@ import java.nio.file.Path; import java.util.Properties; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import jakarta.jws.WebService; @@ -53,57 +54,58 @@ public static String toString(final Properties props) { } public static Properties listTempFiles(int expectedFileCount, String retransmitCacheDir) { - final String prefix = "qcxf-TempStore-" + ProcessHandle.current().pid() + "-"; + final int timeout = 3000; + final long deadline = System.currentTimeMillis() + timeout; final Properties props = new Properties(); final Path dir = Path.of(retransmitCacheDir); - Log.infof("Listing %s/%s", expectedFileCount, prefix); + Log.infof("Expecting %d files in %s", expectedFileCount, retransmitCacheDir); if (expectedFileCount == 0) { sleep(500); } - try { - while (!Files.isDirectory(dir) && Files.list(dir).count() != expectedFileCount) { - sleep(50); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - - if (Files.isDirectory(dir)) { - try (Stream dirFiles = Files.list(dir)) { - dirFiles - .filter(p -> { - String fn = p.getFileName().toString(); - - return fn.startsWith(prefix) // io.quarkiverse.cxf.vertx.http.client.TempStore - || // org.apache.cxf.io.CachedOutputStream.createFileOutputStream() - (fn.startsWith("cos") && fn.endsWith("tmp")); - - }) - .forEach(path -> { - Log.infof("Found temp file %s", path); - String content; - - /* We have to wait a bit till the event loop finishes writing to the file */ - while (true) { - try { - content = Files.readString(path, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException("Could not read " + path, e); - } - if (content.endsWith("")) { - break; + final AtomicReference lastException = new AtomicReference<>(); + do { + if (Files.isDirectory(dir)) { + try (Stream dirFiles = Files.list(dir)) { + dirFiles + .filter(RetransmitCacheServiceImpl::isRetransmitFile) + .forEach(path -> { + Log.infof("Found temp file %s", path); + String content; + + /* We have to wait a bit till the event loop finishes writing to the file */ + while (true) { + try { + content = Files.readString(path, StandardCharsets.UTF_8); + if (content.endsWith("")) { + break; + } + } catch (IOException e) { + lastException.set(e); + } + sleep(50); } - sleep(50); - } - props.setProperty(path.toString(), content); - }); - } catch (IOException e) { - throw new RuntimeException("Could not list " + expectedFileCount, e); + props.setProperty(path.toString(), content); + }); + } catch (IOException e) { + lastException.set(e); + } } - } + if (System.currentTimeMillis() > deadline) { + throw new IllegalStateException("" + expectedFileCount + " expected files in " + retransmitCacheDir + + " did not appear within " + timeout + " ms; found: " + props.keySet(), lastException.get()); + } + } while (props.size() < expectedFileCount); + return props; } + public static boolean isRetransmitFile(Path p) { + final String fn = p.getFileName().toString(); + return fn.startsWith("qcxf-TempStore-") // io.quarkiverse.cxf.vertx.http.client.TempStore + || // org.apache.cxf.io.CachedOutputStream.createFileOutputStream() + (fn.startsWith("cos") && fn.endsWith("tmp")); + } + private static void sleep(long delay) { try { Thread.sleep(delay); diff --git a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java index 8a2400222..82d2a5d43 100644 --- a/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java +++ b/integration-tests/client-server/src/test/java/io/quarkiverse/cxf/it/redirect/RedirectTest.java @@ -10,15 +10,18 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; +import org.assertj.core.api.Condition; import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.jboss.logging.Logger; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import io.quarkiverse.cxf.HTTPConduitImpl; import io.quarkiverse.cxf.it.large.slow.LargeSlowServiceImpl; +import io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheServiceImpl; import io.quarkus.runtime.configuration.MemorySizeConverter; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; @@ -188,13 +191,48 @@ static ValidatableResponse getResponse(String endpoint, int sizeBytes) { .then(); } - @ParameterizedTest - @ValueSource(strings = { // - "retransmitCacheSync", // - "retransmitCacheAsyncBlocking" // - }) - void retransmitCache(String endpoint) throws IOException { + /* + * 1k is smaller than 500K we set in quarkus.cxf.retransmit-cache.threshold + * Hence the file should not be cached on disk + */ + @Test + void retransmitCacheSync1k() throws IOException { + retransmitCache("retransmitCacheSync", "1k", 0); + } + + @Test + void retransmitCacheAsyncBlocking1k() throws IOException { + retransmitCache("retransmitCacheAsyncBlocking", "1k", 0); + } + + /* + * 9M is greater than the 500K we set in quarkus.cxf.retransmit-cache.threshold + * Hence the file should not be cached on disk + */ + @Test + void retransmitCacheSync9m() throws IOException { + retransmitCache("retransmitCacheSync", "9m", 1); + } + + @Test + void retransmitCacheAsyncBlocking9m() throws IOException { + retransmitCache("retransmitCacheAsyncBlocking", "9m", 1); + } + + /* + * Let server return 500 + */ + @Test + void retransmitCacheSync500() throws IOException { + retransmitCache500("retransmitCacheSync"); + } + @Test + void retransmitCacheAsyncBlocking500() throws IOException { + retransmitCache500("retransmitCacheAsyncBlocking"); + } + + private static void retransmitCache500(String endpoint) throws IOException { if (endpoint.contains("Async")) { /* URLConnectionHTTPConduitFactory does not support async */ Assumptions.assumeThat(HTTPConduitImpl.findDefaultHTTPConduitImpl()) @@ -202,85 +240,83 @@ void retransmitCache(String endpoint) throws IOException { } final MemorySizeConverter converter = new MemorySizeConverter(); - { - /* - * 1k is smaller than 500K we set in quarkus.cxf.retransmit-cache.threshold - * Hence the file should not be cached on disk - */ - final int payloadLen = (int) converter.convert("1K").asLongValue(); - final Properties props = retransmitCache(payloadLen, 0, endpoint); - Assertions.assertThat(props.size()).isEqualTo(1); - } - { - /* - * 9M is greater than the 500K we set in quarkus.cxf.retransmit-cache.threshold - * Hence the file should not be cached on disk - */ - final int payloadLen = (int) converter.convert("9M").asLongValue(); - final Properties props = retransmitCache(payloadLen, 1, endpoint); - Assertions.assertThat(props.size()).isEqualTo(2); - - for (Entry en : props.entrySet()) { - String path = (String) en.getKey(); - if (path.contains("qcxf-TempStore-")) { - Assertions.assertThat(Path.of(path)).doesNotExist(); - Assertions.assertThat((String) en.getValue()) - .startsWith("" - + "" - + ""); - Assertions.assertThat((String) en.getValue()) - .endsWith(""); - Assertions.assertThat((String) en.getValue()) - .contains("" + LargeSlowServiceImpl.largeString(payloadLen) + ""); - } + final int payloadLen = (int) converter.convert("501K").asLongValue(); + final String reqId = UUID.randomUUID().toString(); + RestAssured.given() + .header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, "1") + .header(RedirectRest.REQUEST_ID_HEADER, reqId) + .header(RedirectRest.STATUS_CODE_HEADER, "500") + .body(LargeSlowServiceImpl.largeString(payloadLen)) + .post("/RedirectRest/" + endpoint) + .then() + .statusCode(500); + + final String propString = RestAssured.given() + .get("/RedirectRest/retransmitCache-tempFiles/" + reqId) + .then() + .statusCode(200) + .extract().body().asString(); + + Properties props = new Properties(); + props.load(new StringReader(propString)); + + Assertions.assertThat(props.size()).isEqualTo(1); + for (Entry en : props.entrySet()) { + final String path = (String) en.getKey(); + final Path p = Path.of(path); + if (path.contains("qcxf-TempStore-")) { + Assertions.assertThat(p).doesNotExist(); } + Assertions.assertThat(p).satisfies(new Condition(RetransmitCacheServiceImpl::isRetransmitFile, + "a retransmit file matching 'qcxf-TempStore-*' or 'cos*tmp'", "fairy")); + assertContent((String) en.getValue(), payloadLen); + } + + } + + private static void retransmitCache(String endpoint, String payloadSize, int expectedFileCount) throws IOException { + if (endpoint.contains("Async")) { + /* URLConnectionHTTPConduitFactory does not support async */ + Assumptions.assumeThat(HTTPConduitImpl.findDefaultHTTPConduitImpl()) + .isNotEqualTo(HTTPConduitImpl.URLConnectionHTTPConduitFactory); } - { - /* - * Let server return 500 - */ - final int payloadLen = (int) converter.convert("501K").asLongValue(); - final String reqId = UUID.randomUUID().toString(); - RestAssured.given() - .header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, "1") - .header(RedirectRest.REQUEST_ID_HEADER, reqId) - .header(RedirectRest.STATUS_CODE_HEADER, "500") - .body(LargeSlowServiceImpl.largeString(payloadLen)) - .post("/RedirectRest/" + endpoint) - .then() - .statusCode(500); - final String propString = RestAssured.given() - .get("/RedirectRest/retransmitCache-tempFiles/" + reqId) - .then() - .statusCode(200) - .extract().body().asString(); - - Properties props = new Properties(); - props.load(new StringReader(propString)); - - Assertions.assertThat(props.size()).isEqualTo(1); - for (Entry en : props.entrySet()) { - String path = (String) en.getKey(); - if (path.contains("qcxf-TempStore-")) { - Assertions.assertThat(Path.of(path)).doesNotExist(); - Assertions.assertThat((String) en.getValue()) - .startsWith("" - + "" - + ""); - Assertions.assertThat((String) en.getValue()) - .endsWith(""); - Assertions.assertThat((String) en.getValue()) - .contains("" + LargeSlowServiceImpl.largeString(payloadLen) + ""); + final MemorySizeConverter converter = new MemorySizeConverter(); + { + final int payloadLen = (int) converter.convert(payloadSize).asLongValue(); + final Properties props = retransmitCache(payloadLen, 0, endpoint); + Assertions.assertThat(props.size()).isEqualTo(expectedFileCount); + + if (expectedFileCount >= 1) { + for (Entry en : props.entrySet()) { + final String path = (String) en.getKey(); + Path p = Path.of(path); + if (path.contains("qcxf-TempStore-")) { + Assertions.assertThat(p).doesNotExist(); + } + Assertions.assertThat(p).satisfies(new Condition(RetransmitCacheServiceImpl::isRetransmitFile, + "a retransmit file matching 'qcxf-TempStore-*' or 'cos*tmp'", "fairy")); + assertContent((String) en.getValue(), payloadLen); } } - } } - private Properties retransmitCache(final int payloadLen, int expectedFileCount, String syncAsync) throws IOException { + private static void assertContent(String content, int payloadLen) { + Assertions.assertThat(content) + .startsWith("" + + "" + + ""); + Assertions.assertThat(content) + .endsWith(""); + Assertions.assertThat(content) + .contains("" + LargeSlowServiceImpl.largeString(payloadLen) + ""); + } + + private static Properties retransmitCache(final int payloadLen, int expectedFileCount, String syncAsync) + throws IOException { String body = RestAssured.given() .header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, String.valueOf(expectedFileCount)) .body(LargeSlowServiceImpl.largeString(payloadLen)) @@ -292,6 +328,7 @@ private Properties retransmitCache(final int payloadLen, int expectedFileCount, final Properties props = new Properties(); props.load(new StringReader(body)); Assertions.assertThat(props.get("payload.length")).isEqualTo(String.valueOf(payloadLen)); + props.remove("payload.length"); return props; }