diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java index c02d57459..26e0e9014 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java @@ -482,11 +482,8 @@ void customizers( CXFRecorder recorder, CxfFixedConfig config, BuildProducer customizers) { - final HTTPConduitImpl factory = HTTPConduitImpl.fromOptional( - config.httpConduitFactory(), - hc5Present(), - "quarkus.cxf.http-conduit-impl", - HTTPConduitImpl.QuarkusCXFDefault); + final HTTPConduitImpl factory = config.httpConduitFactory() + .orElse(hc5Present() ? HTTPConduitImpl.CXFDefault : HTTPConduitImpl.QuarkusCXFDefault); switch (factory) { case CXFDefault: // nothing to do @@ -507,7 +504,7 @@ void customizers( static boolean hc5Present() { try { - Class.forName("io.quarkiverse.cxf.transport.http.hc5.QuarkusAsyncHTTPConduitFactory"); + Class.forName("io.quarkiverse.cxf.transport.http.hc5.QuarkusWorkQueueImpl"); return true; } catch (ClassNotFoundException e) { /* Fine, we can set the chosen ConduitFactory */ diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java index e3055e8e9..dbfa7f86b 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFClientInfo.java @@ -274,8 +274,7 @@ public CXFClientInfo(CXFClientData other, CxfConfig cxfConfig, CxfClientConfig c * In that case, the HTTPConduitFactory set on the Bus based on quarkus.cxf.http-conduit-impl * should kick in. */ - this.httpConduitImpl = HTTPConduitImpl.fromOptional(config.httpConduitFactory(), CXFRecorder.isHc5Present(), - "quarkus.cxf.client." + configKey + ".http-conduit-impl", null); + this.httpConduitImpl = config.httpConduitFactory().orElse(null); this.configKey = configKey; } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientConfig.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientConfig.java index 34da1f1c1..3ce33c8b7 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientConfig.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientConfig.java @@ -488,24 +488,6 @@ public enum HTTPConduitImpl { HttpClientHTTPConduitFactory, @ConfigDocEnumValue("URLConnectionHTTPConduitFactory") URLConnectionHTTPConduitFactory; - - public static HTTPConduitImpl fromOptional(Optional optional, boolean hc5Present, String key, - HTTPConduitImpl defaultValue) { - if (optional.isPresent() && optional.get() != HTTPConduitImpl.CXFDefault && hc5Present) { - /* - * This is bad: the user has to choose whether he wants URLConnectionHTTPConduitFactory or - * QuarkusAsyncHTTPConduitFactory - */ - throw new RuntimeException("You cannot use " + key + "=" + optional.get().name() - + " and io.quarkiverse.cxf:quarkus-cxf-rt-transports-http-hc5 simultaneously." + " Either remove " + key - + "=" + optional.get().name() + " from application.properties" - + " or remove the io.quarkiverse.cxf:quarkus-cxf-rt-transports-http-hc5 dependency"); - } else if (!optional.isPresent() && hc5Present) { - return HTTPConduitImpl.CXFDefault; - } else { - return optional.orElse(defaultValue); - } - } } public enum WellKnownHostnameVerifier { diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusHTTPConduitFactory.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusHTTPConduitFactory.java index 497ccdfec..41752eada 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusHTTPConduitFactory.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/QuarkusHTTPConduitFactory.java @@ -66,7 +66,7 @@ public HTTPConduit createConduit(HTTPTransportFactory f, Bus b, EndpointInfo loc if (httpConduitImpl == null) { httpConduitImpl = cxFixedConfig.httpConduitFactory().orElse(null); } - if (httpConduitImpl == HTTPConduitImpl.CXFDefault && hc5Present && busHTTPConduitFactory != null) { + if (httpConduitImpl == null && hc5Present && busHTTPConduitFactory != null) { return configure( busHTTPConduitFactory.createConduit(f, b, localInfo, target), cxfClientInfo); diff --git a/extensions/transports-http-hc5/deployment/src/test/java/io/quarkiverse/cxf/transport/http/hc5/deployment/Hc5ConduitFactoryTest.java b/extensions/transports-http-hc5/deployment/src/test/java/io/quarkiverse/cxf/transport/http/hc5/deployment/Hc5ConduitFactoryTest.java index 0c008b5d6..3b424ba6f 100644 --- a/extensions/transports-http-hc5/deployment/src/test/java/io/quarkiverse/cxf/transport/http/hc5/deployment/Hc5ConduitFactoryTest.java +++ b/extensions/transports-http-hc5/deployment/src/test/java/io/quarkiverse/cxf/transport/http/hc5/deployment/Hc5ConduitFactoryTest.java @@ -9,6 +9,8 @@ import org.apache.cxf.endpoint.Client; import org.apache.cxf.frontend.ClientProxy; import org.apache.cxf.transport.http.HTTPConduitFactory; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory; import org.assertj.core.api.Assertions; import org.jboss.logging.Logger; import org.jboss.shrinkwrap.api.ShrinkWrap; @@ -17,8 +19,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import io.quarkiverse.cxf.annotation.CXFClient; -import io.quarkiverse.cxf.transport.http.hc5.QuarkusAsyncHTTPConduit; -import io.quarkiverse.cxf.transport.http.hc5.QuarkusAsyncHTTPConduitFactory; import io.quarkus.test.QuarkusUnitTest; public class Hc5ConduitFactoryTest { @@ -42,10 +42,10 @@ public class Hc5ConduitFactoryTest { void conduitFactory() { final Bus bus = BusFactory.getDefaultBus(); final HTTPConduitFactory factory = bus.getExtension(HTTPConduitFactory.class); - Assertions.assertThat(factory).isInstanceOf(QuarkusAsyncHTTPConduitFactory.class); + Assertions.assertThat(factory).isInstanceOf(AsyncHTTPConduitFactory.class); final Client client = ClientProxy.getClient(helloService); - Assertions.assertThat(client.getConduit()).isInstanceOf(QuarkusAsyncHTTPConduit.class); + Assertions.assertThat(client.getConduit()).isInstanceOf(AsyncHTTPConduit.class); /* ... and make sure that the alternative conduit works */ Assertions.assertThat(helloService.hello("Joe")).isEqualTo("Hello Joe"); diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/Hc5Recorder.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/Hc5Recorder.java index d8c88cf81..e9fa31d46 100644 --- a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/Hc5Recorder.java +++ b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/Hc5Recorder.java @@ -4,6 +4,7 @@ import org.apache.cxf.Bus; import org.apache.cxf.transport.http.HTTPConduitFactory; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory; import org.apache.cxf.workqueue.WorkQueueManager; import org.eclipse.microprofile.context.ManagedExecutor; @@ -31,7 +32,7 @@ public RuntimeValue> customizeBus() { } else { throw new IllegalStateException(ManagedExecutor.class.getName() + " not available in Arc"); } - bus.setExtension(new QuarkusAsyncHTTPConduitFactory(bus), HTTPConduitFactory.class); + bus.setExtension(new QuarkusAsyncHttpResponseWrapperFactory(), AsyncHttpResponseWrapperFactory.class); }); } } diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduit.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduit.java deleted file mode 100644 index 166eaf6ae..000000000 --- a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduit.java +++ /dev/null @@ -1,219 +0,0 @@ -package io.quarkiverse.cxf.transport.http.hc5; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.Future; -import java.util.function.Consumer; - -import org.apache.cxf.Bus; -import org.apache.cxf.service.model.EndpointInfo; -import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit; -import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory; -import org.apache.cxf.ws.addressing.EndpointReferenceType; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.core5.concurrent.FutureCallback; -import org.apache.hc.core5.function.Supplier; -import org.apache.hc.core5.http.EntityDetails; -import org.apache.hc.core5.http.Header; -import org.apache.hc.core5.http.HttpException; -import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.nio.AsyncPushConsumer; -import org.apache.hc.core5.http.nio.AsyncRequestProducer; -import org.apache.hc.core5.http.nio.AsyncResponseConsumer; -import org.apache.hc.core5.http.nio.CapacityChannel; -import org.apache.hc.core5.http.nio.HandlerFactory; -import org.apache.hc.core5.http.nio.ssl.TlsStrategy; -import org.apache.hc.core5.http.protocol.HttpContext; -import org.apache.hc.core5.io.CloseMode; -import org.apache.hc.core5.reactor.IOReactorStatus; -import org.apache.hc.core5.util.TimeValue; -import org.eclipse.microprofile.context.ThreadContext; - -import io.quarkus.arc.Arc; - -/** - * An {@link AsyncHTTPConduit} with custom {@link #getHttpAsyncClient(TlsStrategy)}. - */ -public class QuarkusAsyncHTTPConduit extends AsyncHTTPConduit { - - public QuarkusAsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, AsyncHTTPConduitFactory factory) - throws IOException { - super(b, ei, t, factory); - } - - /** - * @param tlsStrategy - * @return a Custom {@link CloseableHttpAsyncClient} whose {@code execute(*)} methods contextualize (see - * {@link ThreadContext}) the passed {@link AsyncResponseConsumer} - * @throws IOException - */ - @Override - public synchronized CloseableHttpAsyncClient getHttpAsyncClient(TlsStrategy tlsStrategy) throws IOException { - return new QuarkusAsyncClient(super.getHttpAsyncClient(tlsStrategy)); - } - - static class QuarkusAsyncClient extends CloseableHttpAsyncClient { - private final CloseableHttpAsyncClient delegate; - - public QuarkusAsyncClient(CloseableHttpAsyncClient delegate) { - super(); - this.delegate = delegate; - } - - @Override - public void close(CloseMode closeMode) { - delegate.close(closeMode); - } - - @Override - public void close() throws IOException { - delegate.close(); - } - - @Override - public int hashCode() { - return delegate.hashCode(); - } - - @Override - public void start() { - delegate.start(); - } - - @Override - public IOReactorStatus getStatus() { - return delegate.getStatus(); - } - - @Override - public void awaitShutdown(TimeValue waitTime) throws InterruptedException { - delegate.awaitShutdown(waitTime); - } - - @Override - public void initiateShutdown() { - delegate.initiateShutdown(); - } - - @Override - public boolean equals(Object obj) { - return delegate.equals(obj); - } - - @Override - public void register(String hostname, String uriPattern, Supplier supplier) { - delegate.register(hostname, uriPattern, supplier); - } - - @Override - public String toString() { - return delegate.toString(); - } - - @Override - protected Future doExecute( - final HttpHost target, - final AsyncRequestProducer requestProducer, - final AsyncResponseConsumer responseConsumer, - final HandlerFactory pushHandlerFactory, - final HttpContext context, - final FutureCallback callback) { - return delegate.execute( - target, - requestProducer, - new ContextualizedResponseConsumer(responseConsumer), - pushHandlerFactory, - context, - callback); - } - } - - /** - * Wraps the delegate in {@link ThreadContext#contextualConsumer(Consumer)} so that context propagation works for - * async clients - * - * @param - */ - static class ContextualizedResponseConsumer implements AsyncResponseConsumer { - private final AsyncResponseConsumer delegate; - private Consumer> contextualConsumer; - - public ContextualizedResponseConsumer(AsyncResponseConsumer delegate) { - super(); - this.delegate = delegate; - final ThreadContext threadContext = Arc.container().select(ThreadContext.class).get(); - /* - * We need to call this threadContext.contextualConsumer() here in the constructor to store the context - * because consumeResponse() is called from another thread where the context is not available anymore - */ - this.contextualConsumer = threadContext.contextualConsumer(args -> { - try { - delegate.consumeResponse( - args.response, - args.entityDetails, - args.context, - args.resultCallback); - } catch (HttpException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - - @Override - public void updateCapacity(CapacityChannel capacityChannel) throws IOException { - delegate.updateCapacity(capacityChannel); - } - - @Override - public void consumeResponse(HttpResponse response, EntityDetails entityDetails, HttpContext context, - FutureCallback resultCallback) throws HttpException, IOException { - contextualConsumer.accept(new ConsumeResponseArgs<>(response, entityDetails, context, resultCallback)); - } - - @Override - public void releaseResources() { - delegate.releaseResources(); - } - - @Override - public void consume(ByteBuffer src) throws IOException { - delegate.consume(src); - } - - @Override - public void informationResponse(HttpResponse response, HttpContext context) throws HttpException, IOException { - delegate.informationResponse(response, context); - } - - @Override - public void streamEnd(List trailers) throws HttpException, IOException { - delegate.streamEnd(trailers); - } - - @Override - public void failed(Exception cause) { - delegate.failed(cause); - } - - static class ConsumeResponseArgs { - public ConsumeResponseArgs(HttpResponse response, EntityDetails entityDetails, HttpContext context, - FutureCallback resultCallback) { - super(); - this.response = response; - this.entityDetails = entityDetails; - this.context = context; - this.resultCallback = resultCallback; - } - - private final HttpResponse response; - private final EntityDetails entityDetails; - private final HttpContext context; - private final FutureCallback resultCallback; - } - } - -} diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduitFactory.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduitFactory.java deleted file mode 100644 index 3c3af49a4..000000000 --- a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHTTPConduitFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -package io.quarkiverse.cxf.transport.http.hc5; - -import java.io.IOException; -import java.util.Map; - -import org.apache.cxf.Bus; -import org.apache.cxf.service.model.EndpointInfo; -import org.apache.cxf.transport.http.HTTPConduit; -import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory; -import org.apache.cxf.ws.addressing.EndpointReferenceType; - -/** - * Returns {@link QuarkusAsyncHTTPConduit}s from {@link #createConduit(Bus, EndpointInfo, EndpointReferenceType)}. - */ -public class QuarkusAsyncHTTPConduitFactory extends AsyncHTTPConduitFactory { - - public QuarkusAsyncHTTPConduitFactory(Bus b) { - super(b); - } - - public QuarkusAsyncHTTPConduitFactory(Map conf) { - super(conf); - } - - @Override - public HTTPConduit createConduit(Bus bus, EndpointInfo localInfo, - EndpointReferenceType target) throws IOException { - return new QuarkusAsyncHTTPConduit(bus, localInfo, target, this); - } - -} diff --git a/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHttpResponseWrapperFactory.java b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHttpResponseWrapperFactory.java new file mode 100644 index 000000000..ce3b535f0 --- /dev/null +++ b/extensions/transports-http-hc5/runtime/src/main/java/io/quarkiverse/cxf/transport/http/hc5/QuarkusAsyncHttpResponseWrapperFactory.java @@ -0,0 +1,41 @@ +package io.quarkiverse.cxf.transport.http.hc5; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory; +import org.apache.hc.core5.http.HttpResponse; +import org.eclipse.microprofile.context.ThreadContext; + +import io.quarkus.arc.Arc; + +public class QuarkusAsyncHttpResponseWrapperFactory implements AsyncHttpResponseWrapperFactory { + + @Override + public AsyncHttpResponseWrapper create() { + ThreadContext threadContext = Arc.container().select(ThreadContext.class).get(); + /* + * We need to call this threadContext.contextualConsumer() here in the constructor to store the context + * because consumeResponse() is called from another thread where the context is not available anymore + */ + final BiConsumer> contextualConsumer = threadContext.contextualConsumer( + (HttpResponse response, Consumer delegate) -> delegate.accept(response)); + return new MyAsyncResponseCallback(contextualConsumer); + } + + public static class MyAsyncResponseCallback implements AsyncHttpResponseWrapper { + + final BiConsumer> contextualConsumer; + + public MyAsyncResponseCallback(BiConsumer> contextualConsumer) { + super(); + this.contextualConsumer = contextualConsumer; + } + + @Override + public void responseReceived(HttpResponse response, Consumer delegate) { + contextualConsumer.accept(response, delegate); + } + + } +} diff --git a/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5ContextlessTest.java b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5ContextlessTest.java index 92114ccd7..9d9ccce20 100644 --- a/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5ContextlessTest.java +++ b/integration-tests/hc5/src/test/java/io/quarkiverse/cxf/hc5/it/Hc5ContextlessTest.java @@ -1,6 +1,8 @@ package io.quarkiverse.cxf.hc5.it; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import jakarta.inject.Inject; @@ -28,9 +30,12 @@ class Hc5ContextlessTest { @Test void contextless() throws InterruptedException { Assertions.assertThat(myCalculator.add(5, 5)).isEqualTo(10); + + final CountDownLatch latch = new CountDownLatch(1); myCalculator.addAsync(5, 7, response -> { try { Assertions.assertThat(response.get().getReturn()).isEqualTo(12); + latch.countDown(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -38,5 +43,6 @@ void contextless() throws InterruptedException { throw new RuntimeException(e); } }); + latch.await(10, TimeUnit.SECONDS); } }