Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RedirectTest more robust #1693

Merged
merged 2 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ void wsdlUri200OnEventLoop() throws InterruptedException {
.then()
.statusCode(500)
.body(CoreMatchers.containsString(
"java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread."));
"java.lang.IllegalStateException You have attempted to perform a blocking service method call on Vert.x event loop thread with CXF client wsdlUri200."));
}

}
Expand Down Expand Up @@ -223,7 +223,7 @@ void endpointUri404OnEventLoop() throws InterruptedException {
.then()
.statusCode(500)
.body(CoreMatchers.containsString(
"java.lang.IllegalStateException You have attempted to perform a blocking operation on an IO thread."));
"java.lang.IllegalStateException You have attempted to perform a blocking service method call on Vert.x event loop thread with CXF client endpointUri404."));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,16 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic
final boolean isAsync = useAsync.isAsync(message);
message.put(USE_ASYNC, isAsync);

if (!isAsync && !BlockingOperationControl.isBlockingAllowed()) {
throw new IllegalStateException("You have attempted to perform a blocking operation on an IO thread."
+ " This is not allowed, as blocking the IO thread will cause major performance issues with your application."
+ " You need to offload the blocking CXF client call to a worker thread,"
+ " e.g. by using the @io.smallrye.common.annotation.Blocking annotation on a caller method"
+ " where it is supported by the underlying Quarkus extension, such as quarkus-rest, quarkus-vertx,"
+ " quarkus-reactive-routes, quarkus-grpc, quarkus-messaging-* and possibly others.");
final boolean blockingAllowed = BlockingOperationControl.isBlockingAllowed();
if (!isAsync && !blockingAllowed) {
throw new IllegalStateException(
"You have attempted to perform a blocking service method call on Vert.x event loop thread with CXF client "
+ clientInfo.getConfigKey() + "."
+ " This is not allowed, as blocking the IO thread will cause major performance issues with your application."
+ " You need to offload the blocking CXF client call to a worker thread,"
+ " e.g. by using the @io.smallrye.common.annotation.Blocking annotation on a caller method"
+ " where it is supported by the underlying Quarkus extension, such as quarkus-rest, quarkus-vertx,"
+ " quarkus-reactive-routes, quarkus-grpc, quarkus-messaging-* and possibly others.");
}

final HttpVersion version = getVersion(message, csPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import jakarta.jws.WebService;
Expand Down Expand Up @@ -53,57 +54,58 @@ public static String toString(final Properties props) {
}

public static Properties listTempFiles(int expectedFileCount, String retransmitCacheDir) {
final String prefix = "qcxf-TempStore-" + ProcessHandle.current().pid() + "-";
final int timeout = 3000;
final long deadline = System.currentTimeMillis() + timeout;
final Properties props = new Properties();
final Path dir = Path.of(retransmitCacheDir);
Log.infof("Listing %s/%s", expectedFileCount, prefix);
Log.infof("Expecting %d files in %s", expectedFileCount, retransmitCacheDir);
if (expectedFileCount == 0) {
sleep(500);
}
try {
while (!Files.isDirectory(dir) && Files.list(dir).count() != expectedFileCount) {
sleep(50);
}
} catch (IOException e) {
throw new RuntimeException(e);
}

if (Files.isDirectory(dir)) {
try (Stream<Path> dirFiles = Files.list(dir)) {
dirFiles
.filter(p -> {
String fn = p.getFileName().toString();

return fn.startsWith(prefix) // io.quarkiverse.cxf.vertx.http.client.TempStore
|| // org.apache.cxf.io.CachedOutputStream.createFileOutputStream()
(fn.startsWith("cos") && fn.endsWith("tmp"));

})
.forEach(path -> {
Log.infof("Found temp file %s", path);
String content;

/* We have to wait a bit till the event loop finishes writing to the file */
while (true) {
try {
content = Files.readString(path, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Could not read " + path, e);
}
if (content.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>")) {
break;
final AtomicReference<Exception> lastException = new AtomicReference<>();
do {
if (Files.isDirectory(dir)) {
try (Stream<Path> dirFiles = Files.list(dir)) {
dirFiles
.filter(RetransmitCacheServiceImpl::isRetransmitFile)
.forEach(path -> {
Log.infof("Found temp file %s", path);
String content;

/* We have to wait a bit till the event loop finishes writing to the file */
while (true) {
try {
content = Files.readString(path, StandardCharsets.UTF_8);
if (content.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>")) {
break;
}
} catch (IOException e) {
lastException.set(e);
}
sleep(50);
}
sleep(50);
}
props.setProperty(path.toString(), content);
});
} catch (IOException e) {
throw new RuntimeException("Could not list " + expectedFileCount, e);
props.setProperty(path.toString(), content);
});
} catch (IOException e) {
lastException.set(e);
}
}
}
if (System.currentTimeMillis() > deadline) {
throw new IllegalStateException("" + expectedFileCount + " expected files in " + retransmitCacheDir
+ " did not appear within " + timeout + " ms; found: " + props.keySet(), lastException.get());
}
} while (props.size() < expectedFileCount);

return props;
}

public static boolean isRetransmitFile(Path p) {
final String fn = p.getFileName().toString();
return fn.startsWith("qcxf-TempStore-") // io.quarkiverse.cxf.vertx.http.client.TempStore
|| // org.apache.cxf.io.CachedOutputStream.createFileOutputStream()
(fn.startsWith("cos") && fn.endsWith("tmp"));
}

private static void sleep(long delay) {
try {
Thread.sleep(delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@

import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.Condition;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import io.quarkiverse.cxf.HTTPConduitImpl;
import io.quarkiverse.cxf.it.large.slow.LargeSlowServiceImpl;
import io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheServiceImpl;
import io.quarkus.runtime.configuration.MemorySizeConverter;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
Expand Down Expand Up @@ -188,99 +191,132 @@ static ValidatableResponse getResponse(String endpoint, int sizeBytes) {
.then();
}

@ParameterizedTest
@ValueSource(strings = { //
"retransmitCacheSync", //
"retransmitCacheAsyncBlocking" //
})
void retransmitCache(String endpoint) throws IOException {
/*
* 1k is smaller than 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
@Test
void retransmitCacheSync1k() throws IOException {
retransmitCache("retransmitCacheSync", "1k", 0);
}

@Test
void retransmitCacheAsyncBlocking1k() throws IOException {
retransmitCache("retransmitCacheAsyncBlocking", "1k", 0);
}

/*
* 9M is greater than the 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
@Test
void retransmitCacheSync9m() throws IOException {
retransmitCache("retransmitCacheSync", "9m", 1);
}

@Test
void retransmitCacheAsyncBlocking9m() throws IOException {
retransmitCache("retransmitCacheAsyncBlocking", "9m", 1);
}

/*
* Let server return 500
*/
@Test
void retransmitCacheSync500() throws IOException {
retransmitCache500("retransmitCacheSync");
}

@Test
void retransmitCacheAsyncBlocking500() throws IOException {
retransmitCache500("retransmitCacheAsyncBlocking");
}

private static void retransmitCache500(String endpoint) throws IOException {
if (endpoint.contains("Async")) {
/* URLConnectionHTTPConduitFactory does not support async */
Assumptions.assumeThat(HTTPConduitImpl.findDefaultHTTPConduitImpl())
.isNotEqualTo(HTTPConduitImpl.URLConnectionHTTPConduitFactory);
}

final MemorySizeConverter converter = new MemorySizeConverter();
{
/*
* 1k is smaller than 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
final int payloadLen = (int) converter.convert("1K").asLongValue();
final Properties props = retransmitCache(payloadLen, 0, endpoint);
Assertions.assertThat(props.size()).isEqualTo(1);
}

{
/*
* 9M is greater than the 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
final int payloadLen = (int) converter.convert("9M").asLongValue();
final Properties props = retransmitCache(payloadLen, 1, endpoint);
Assertions.assertThat(props.size()).isEqualTo(2);

for (Entry<Object, Object> en : props.entrySet()) {
String path = (String) en.getKey();
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(Path.of(path)).doesNotExist();
Assertions.assertThat((String) en.getValue())
.startsWith("<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<soap:Body><ns2:retransmitCache xmlns:ns2=\"https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test\">"
+ "<expectedFileCount>");
Assertions.assertThat((String) en.getValue())
.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>");
Assertions.assertThat((String) en.getValue())
.contains("<payload>" + LargeSlowServiceImpl.largeString(payloadLen) + "</payload>");
}
final int payloadLen = (int) converter.convert("501K").asLongValue();
final String reqId = UUID.randomUUID().toString();
RestAssured.given()
.header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, "1")
.header(RedirectRest.REQUEST_ID_HEADER, reqId)
.header(RedirectRest.STATUS_CODE_HEADER, "500")
.body(LargeSlowServiceImpl.largeString(payloadLen))
.post("/RedirectRest/" + endpoint)
.then()
.statusCode(500);

final String propString = RestAssured.given()
.get("/RedirectRest/retransmitCache-tempFiles/" + reqId)
.then()
.statusCode(200)
.extract().body().asString();

Properties props = new Properties();
props.load(new StringReader(propString));

Assertions.assertThat(props.size()).isEqualTo(1);
for (Entry<Object, Object> en : props.entrySet()) {
final String path = (String) en.getKey();
final Path p = Path.of(path);
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(p).doesNotExist();
}
Assertions.assertThat(p).satisfies(new Condition<Path>(RetransmitCacheServiceImpl::isRetransmitFile,
"a retransmit file matching 'qcxf-TempStore-*' or 'cos*tmp'", "fairy"));
assertContent((String) en.getValue(), payloadLen);
}

}

private static void retransmitCache(String endpoint, String payloadSize, int expectedFileCount) throws IOException {

if (endpoint.contains("Async")) {
/* URLConnectionHTTPConduitFactory does not support async */
Assumptions.assumeThat(HTTPConduitImpl.findDefaultHTTPConduitImpl())
.isNotEqualTo(HTTPConduitImpl.URLConnectionHTTPConduitFactory);
}
{
/*
* Let server return 500
*/
final int payloadLen = (int) converter.convert("501K").asLongValue();
final String reqId = UUID.randomUUID().toString();
RestAssured.given()
.header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, "1")
.header(RedirectRest.REQUEST_ID_HEADER, reqId)
.header(RedirectRest.STATUS_CODE_HEADER, "500")
.body(LargeSlowServiceImpl.largeString(payloadLen))
.post("/RedirectRest/" + endpoint)
.then()
.statusCode(500);

final String propString = RestAssured.given()
.get("/RedirectRest/retransmitCache-tempFiles/" + reqId)
.then()
.statusCode(200)
.extract().body().asString();

Properties props = new Properties();
props.load(new StringReader(propString));

Assertions.assertThat(props.size()).isEqualTo(1);
for (Entry<Object, Object> en : props.entrySet()) {
String path = (String) en.getKey();
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(Path.of(path)).doesNotExist();
Assertions.assertThat((String) en.getValue())
.startsWith("<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<soap:Body><ns2:retransmitCache xmlns:ns2=\"https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test\">"
+ "<expectedFileCount>");
Assertions.assertThat((String) en.getValue())
.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>");
Assertions.assertThat((String) en.getValue())
.contains("<payload>" + LargeSlowServiceImpl.largeString(payloadLen) + "</payload>");
final MemorySizeConverter converter = new MemorySizeConverter();
{
final int payloadLen = (int) converter.convert(payloadSize).asLongValue();
final Properties props = retransmitCache(payloadLen, 0, endpoint);
Assertions.assertThat(props.size()).isEqualTo(expectedFileCount);

if (expectedFileCount >= 1) {
for (Entry<Object, Object> en : props.entrySet()) {
final String path = (String) en.getKey();
Path p = Path.of(path);
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(p).doesNotExist();
}
Assertions.assertThat(p).satisfies(new Condition<Path>(RetransmitCacheServiceImpl::isRetransmitFile,
"a retransmit file matching 'qcxf-TempStore-*' or 'cos*tmp'", "fairy"));
assertContent((String) en.getValue(), payloadLen);
}
}

}
}

private Properties retransmitCache(final int payloadLen, int expectedFileCount, String syncAsync) throws IOException {
private static void assertContent(String content, int payloadLen) {
Assertions.assertThat(content)
.startsWith("<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<soap:Body><ns2:retransmitCache xmlns:ns2=\"https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test\">"
+ "<expectedFileCount>");
Assertions.assertThat(content)
.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>");
Assertions.assertThat(content)
.contains("<payload>" + LargeSlowServiceImpl.largeString(payloadLen) + "</payload>");
}

private static Properties retransmitCache(final int payloadLen, int expectedFileCount, String syncAsync)
throws IOException {
String body = RestAssured.given()
.header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, String.valueOf(expectedFileCount))
.body(LargeSlowServiceImpl.largeString(payloadLen))
Expand All @@ -292,6 +328,7 @@ private Properties retransmitCache(final int payloadLen, int expectedFileCount,
final Properties props = new Properties();
props.load(new StringReader(body));
Assertions.assertThat(props.get("payload.length")).isEqualTo(String.valueOf(payloadLen));
props.remove("payload.length");
return props;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void helloWithWsdl(String payloadSize) {
.then()
.statusCode(500)
.body(CoreMatchers.containsString(
"You have attempted to perform a blocking operation on an IO thread."));
"You have attempted to perform a blocking service method call on Vert.x event loop thread"));

}

Expand Down
Loading