Skip to content

Commit

Permalink
Make RedirectTest more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
ppalaga committed Jan 24, 2025
1 parent dd4e2f5 commit 89d9083
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import jakarta.jws.WebService;
Expand Down Expand Up @@ -53,57 +54,58 @@ public static String toString(final Properties props) {
}

public static Properties listTempFiles(int expectedFileCount, String retransmitCacheDir) {
final String prefix = "qcxf-TempStore-" + ProcessHandle.current().pid() + "-";
final int timeout = 3000;
final long deadline = System.currentTimeMillis() + timeout;
final Properties props = new Properties();
final Path dir = Path.of(retransmitCacheDir);
Log.infof("Listing %s/%s", expectedFileCount, prefix);
Log.infof("Expecting %d files in %s", expectedFileCount, retransmitCacheDir);
if (expectedFileCount == 0) {
sleep(500);
}
try {
while (!Files.isDirectory(dir) && Files.list(dir).count() != expectedFileCount) {
sleep(50);
}
} catch (IOException e) {
throw new RuntimeException(e);
}

if (Files.isDirectory(dir)) {
try (Stream<Path> dirFiles = Files.list(dir)) {
dirFiles
.filter(p -> {
String fn = p.getFileName().toString();

return fn.startsWith(prefix) // io.quarkiverse.cxf.vertx.http.client.TempStore
|| // org.apache.cxf.io.CachedOutputStream.createFileOutputStream()
(fn.startsWith("cos") && fn.endsWith("tmp"));

})
.forEach(path -> {
Log.infof("Found temp file %s", path);
String content;

/* We have to wait a bit till the event loop finishes writing to the file */
while (true) {
try {
content = Files.readString(path, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Could not read " + path, e);
}
if (content.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>")) {
break;
final AtomicReference<Exception> lastException = new AtomicReference<>();
do {
if (Files.isDirectory(dir)) {
try (Stream<Path> dirFiles = Files.list(dir)) {
dirFiles
.filter(RetransmitCacheServiceImpl::isRetransmitFile)
.forEach(path -> {
Log.infof("Found temp file %s", path);
String content;

/* We have to wait a bit till the event loop finishes writing to the file */
while (true) {
try {
content = Files.readString(path, StandardCharsets.UTF_8);
if (content.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>")) {
break;
}
} catch (IOException e) {
lastException.set(e);
}
sleep(50);
}
sleep(50);
}
props.setProperty(path.toString(), content);
});
} catch (IOException e) {
throw new RuntimeException("Could not list " + expectedFileCount, e);
props.setProperty(path.toString(), content);
});
} catch (IOException e) {
lastException.set(e);
}
}
}
if (System.currentTimeMillis() > deadline) {
throw new IllegalStateException("" + expectedFileCount + " expected files in " + retransmitCacheDir
+ " did not appear within " + timeout + " ms; found: " + props.keySet(), lastException.get());
}
} while (props.size() < expectedFileCount);

return props;
}

public static boolean isRetransmitFile(Path p) {
final String fn = p.getFileName().toString();
return fn.startsWith("qcxf-TempStore-") // io.quarkiverse.cxf.vertx.http.client.TempStore
|| // org.apache.cxf.io.CachedOutputStream.createFileOutputStream()
(fn.startsWith("cos") && fn.endsWith("tmp"));
}

private static void sleep(long delay) {
try {
Thread.sleep(delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@

import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.Condition;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.jboss.logging.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import io.quarkiverse.cxf.HTTPConduitImpl;
import io.quarkiverse.cxf.it.large.slow.LargeSlowServiceImpl;
import io.quarkiverse.cxf.it.redirect.retransmitcache.RetransmitCacheServiceImpl;
import io.quarkus.runtime.configuration.MemorySizeConverter;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
Expand Down Expand Up @@ -188,99 +191,132 @@ static ValidatableResponse getResponse(String endpoint, int sizeBytes) {
.then();
}

@ParameterizedTest
@ValueSource(strings = { //
"retransmitCacheSync", //
"retransmitCacheAsyncBlocking" //
})
void retransmitCache(String endpoint) throws IOException {
/*
* 1k is smaller than 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
@Test
void retransmitCacheSync1k() throws IOException {
retransmitCache("retransmitCacheSync", "1k", 0);
}

@Test
void retransmitCacheAsyncBlocking1k() throws IOException {
retransmitCache("retransmitCacheAsyncBlocking", "1k", 0);
}

/*
* 9M is greater than the 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
@Test
void retransmitCacheSync9m() throws IOException {
retransmitCache("retransmitCacheSync", "9m", 1);
}

@Test
void retransmitCacheAsyncBlocking9m() throws IOException {
retransmitCache("retransmitCacheAsyncBlocking", "9m", 1);
}

/*
* Let server return 500
*/
@Test
void retransmitCacheSync500() throws IOException {
retransmitCache500("retransmitCacheSync");
}

@Test
void retransmitCacheAsyncBlocking500() throws IOException {
retransmitCache500("retransmitCacheAsyncBlocking");
}

private static void retransmitCache500(String endpoint) throws IOException {
if (endpoint.contains("Async")) {
/* URLConnectionHTTPConduitFactory does not support async */
Assumptions.assumeThat(HTTPConduitImpl.findDefaultHTTPConduitImpl())
.isNotEqualTo(HTTPConduitImpl.URLConnectionHTTPConduitFactory);
}

final MemorySizeConverter converter = new MemorySizeConverter();
{
/*
* 1k is smaller than 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
final int payloadLen = (int) converter.convert("1K").asLongValue();
final Properties props = retransmitCache(payloadLen, 0, endpoint);
Assertions.assertThat(props.size()).isEqualTo(1);
}

{
/*
* 9M is greater than the 500K we set in quarkus.cxf.retransmit-cache.threshold
* Hence the file should not be cached on disk
*/
final int payloadLen = (int) converter.convert("9M").asLongValue();
final Properties props = retransmitCache(payloadLen, 1, endpoint);
Assertions.assertThat(props.size()).isEqualTo(2);

for (Entry<Object, Object> en : props.entrySet()) {
String path = (String) en.getKey();
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(Path.of(path)).doesNotExist();
Assertions.assertThat((String) en.getValue())
.startsWith("<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<soap:Body><ns2:retransmitCache xmlns:ns2=\"https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test\">"
+ "<expectedFileCount>");
Assertions.assertThat((String) en.getValue())
.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>");
Assertions.assertThat((String) en.getValue())
.contains("<payload>" + LargeSlowServiceImpl.largeString(payloadLen) + "</payload>");
}
final int payloadLen = (int) converter.convert("501K").asLongValue();
final String reqId = UUID.randomUUID().toString();
RestAssured.given()
.header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, "1")
.header(RedirectRest.REQUEST_ID_HEADER, reqId)
.header(RedirectRest.STATUS_CODE_HEADER, "500")
.body(LargeSlowServiceImpl.largeString(payloadLen))
.post("/RedirectRest/" + endpoint)
.then()
.statusCode(500);

final String propString = RestAssured.given()
.get("/RedirectRest/retransmitCache-tempFiles/" + reqId)
.then()
.statusCode(200)
.extract().body().asString();

Properties props = new Properties();
props.load(new StringReader(propString));

Assertions.assertThat(props.size()).isEqualTo(1);
for (Entry<Object, Object> en : props.entrySet()) {
final String path = (String) en.getKey();
final Path p = Path.of(path);
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(p).doesNotExist();
}
Assertions.assertThat(p).satisfies(new Condition<Path>(RetransmitCacheServiceImpl::isRetransmitFile,
"a retransmit file matching 'qcxf-TempStore-*' or 'cos*tmp'", "fairy"));
assertContent((String) en.getValue(), payloadLen);
}

}

private static void retransmitCache(String endpoint, String payloadSize, int expectedFileCount) throws IOException {

if (endpoint.contains("Async")) {
/* URLConnectionHTTPConduitFactory does not support async */
Assumptions.assumeThat(HTTPConduitImpl.findDefaultHTTPConduitImpl())
.isNotEqualTo(HTTPConduitImpl.URLConnectionHTTPConduitFactory);
}
{
/*
* Let server return 500
*/
final int payloadLen = (int) converter.convert("501K").asLongValue();
final String reqId = UUID.randomUUID().toString();
RestAssured.given()
.header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, "1")
.header(RedirectRest.REQUEST_ID_HEADER, reqId)
.header(RedirectRest.STATUS_CODE_HEADER, "500")
.body(LargeSlowServiceImpl.largeString(payloadLen))
.post("/RedirectRest/" + endpoint)
.then()
.statusCode(500);

final String propString = RestAssured.given()
.get("/RedirectRest/retransmitCache-tempFiles/" + reqId)
.then()
.statusCode(200)
.extract().body().asString();

Properties props = new Properties();
props.load(new StringReader(propString));

Assertions.assertThat(props.size()).isEqualTo(1);
for (Entry<Object, Object> en : props.entrySet()) {
String path = (String) en.getKey();
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(Path.of(path)).doesNotExist();
Assertions.assertThat((String) en.getValue())
.startsWith("<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<soap:Body><ns2:retransmitCache xmlns:ns2=\"https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test\">"
+ "<expectedFileCount>");
Assertions.assertThat((String) en.getValue())
.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>");
Assertions.assertThat((String) en.getValue())
.contains("<payload>" + LargeSlowServiceImpl.largeString(payloadLen) + "</payload>");
final MemorySizeConverter converter = new MemorySizeConverter();
{
final int payloadLen = (int) converter.convert(payloadSize).asLongValue();
final Properties props = retransmitCache(payloadLen, 0, endpoint);
Assertions.assertThat(props.size()).isEqualTo(expectedFileCount);

if (expectedFileCount >= 1) {
for (Entry<Object, Object> en : props.entrySet()) {
final String path = (String) en.getKey();
Path p = Path.of(path);
if (path.contains("qcxf-TempStore-")) {
Assertions.assertThat(p).doesNotExist();
}
Assertions.assertThat(p).satisfies(new Condition<Path>(RetransmitCacheServiceImpl::isRetransmitFile,
"a retransmit file matching 'qcxf-TempStore-*' or 'cos*tmp'", "fairy"));
assertContent((String) en.getValue(), payloadLen);
}
}

}
}

private Properties retransmitCache(final int payloadLen, int expectedFileCount, String syncAsync) throws IOException {
private static void assertContent(String content, int payloadLen) {
Assertions.assertThat(content)
.startsWith("<soap:Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<soap:Body><ns2:retransmitCache xmlns:ns2=\"https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test\">"
+ "<expectedFileCount>");
Assertions.assertThat(content)
.endsWith("</payload></ns2:retransmitCache></soap:Body></soap:Envelope>");
Assertions.assertThat(content)
.contains("<payload>" + LargeSlowServiceImpl.largeString(payloadLen) + "</payload>");
}

private static Properties retransmitCache(final int payloadLen, int expectedFileCount, String syncAsync)
throws IOException {
String body = RestAssured.given()
.header(RedirectRest.EXPECTED_FILE_COUNT_HEADER, String.valueOf(expectedFileCount))
.body(LargeSlowServiceImpl.largeString(payloadLen))
Expand All @@ -292,6 +328,7 @@ private Properties retransmitCache(final int payloadLen, int expectedFileCount,
final Properties props = new Properties();
props.load(new StringReader(body));
Assertions.assertThat(props.get("payload.length")).isEqualTo(String.valueOf(payloadLen));
props.remove("payload.length");
return props;
}

Expand Down

0 comments on commit 89d9083

Please sign in to comment.