diff --git a/astrix-bom/pom.xml b/astrix-bom/pom.xml index ec35c3479..dd58a3dec 100644 --- a/astrix-bom/pom.xml +++ b/astrix-bom/pom.xml @@ -98,6 +98,11 @@ astrix-versioning ${project.version} + + ${project.groupId} + astrix-reactor + ${project.version} + ${project.groupId} astrix-remoting diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/CompletableFutureTypeHandlerPlugin.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/CompletableFutureTypeHandlerPlugin.java index ff26f37d2..0f6a3aa9f 100644 --- a/astrix-context/src/main/java/com/avanza/astrix/beans/core/CompletableFutureTypeHandlerPlugin.java +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/CompletableFutureTypeHandlerPlugin.java @@ -17,32 +17,29 @@ import java.util.concurrent.CompletableFuture; -public class CompletableFutureTypeHandlerPlugin implements ReactiveTypeHandlerPlugin> { +import rx.Observable; - @Override - public void subscribe(ReactiveExecutionListener listener, CompletableFuture reactiveType) { - reactiveType.whenComplete((result, throwable) -> { - if (throwable != null) { - listener.onError(throwable); - } else { - listener.onResult(result); - } - }); - } +public class CompletableFutureTypeHandlerPlugin implements ReactiveTypeHandlerPlugin> { @Override - public void completeExceptionally(Throwable error, CompletableFuture reactiveType) { - reactiveType.completeExceptionally(error); + public Observable toObservable(CompletableFuture reactiveType) { + return Observable.unsafeCreate( + subscriber -> reactiveType.whenComplete((result, throwable) -> { + if (throwable == null) { + subscriber.onNext(result); + subscriber.onCompleted(); + } else { + subscriber.onError(throwable); + } + }) + ); } @Override - public void complete(Object result, CompletableFuture reactiveType) { - reactiveType.complete(result); - } - - @Override - public CompletableFuture newReactiveType() { - return new CompletableFuture(); + public CompletableFuture toReactiveType(Observable observable) { + CompletableFuture reactiveType = new CompletableFuture<>(); + observable.subscribe(reactiveType::complete, reactiveType::completeExceptionally); + return reactiveType; } @Override diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/FutureResult.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/FutureResult.java deleted file mode 100644 index e2c326cc5..000000000 --- a/astrix-context/src/main/java/com/avanza/astrix/beans/core/FutureResult.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2014 Avanza Bank AB - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.avanza.astrix.beans.core; -/** - * Holds the result from the completion of - * a ListenableFutureAdapter. - * - * @author Elias Lindholm - * - * @param - */ -public final class FutureResult { - - private final T result; - private final Throwable exception; - - public FutureResult(T result, Throwable exception) { - this.result = result; - this.exception = exception; - } - - /** - * If the underlying computation ended with an error, then - * this method returns the given Throwable. - * - * @return A Throwable if the underlying computation ended with an error, null otherwise - */ - public Throwable getException() { - return exception; - } - - /** - * If the underlying computation ended successfully, then - * this method returns the result (possibly null). - * - * @return - */ - public T getResult() { - return result; - } - -} diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java index 0f515d533..7a4c87d9e 100644 --- a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java @@ -15,75 +15,41 @@ */ package com.avanza.astrix.beans.core; +import rx.Observable; + import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import rx.Completable; -import rx.Observable; -import rx.Single; public final class ReactiveTypeConverterImpl implements ReactiveTypeConverter { - private final Map, RxTypeConverter>> rxTypeConverters = Map.of( - Single.class, rxTypeConverter(Single::toObservable, Observable::toSingle), - Completable.class, rxTypeConverter(Completable::toObservable, Observable::toCompletable) + private static final List> wellKnownTypes = List.of( + new CompletableFutureTypeHandlerPlugin(), + new RxCompletableTypeHandlerPlugin(), + new RxSingleTypeHandlerPlugin() ); private final Map, ReactiveTypeHandlerPlugin>> pluginByReactiveType; public ReactiveTypeConverterImpl(List> typeConverterPlugins) { - Map, ReactiveTypeHandlerPlugin>> pluginByReactiveType = new LinkedHashMap<>(typeConverterPlugins.size() + 1); + Map, ReactiveTypeHandlerPlugin>> pluginByReactiveType = new LinkedHashMap<>(typeConverterPlugins.size() + wellKnownTypes.size()); for (ReactiveTypeHandlerPlugin> asyncTypeConverterPlugin : typeConverterPlugins) { pluginByReactiveType.put(asyncTypeConverterPlugin.reactiveTypeHandled(), asyncTypeConverterPlugin); } - pluginByReactiveType.putIfAbsent(CompletableFuture.class, new CompletableFutureTypeHandlerPlugin()); + wellKnownTypes.forEach(it -> pluginByReactiveType.putIfAbsent(it.reactiveTypeHandled(), it)); this.pluginByReactiveType = Map.copyOf(pluginByReactiveType); } @Override public Observable toObservable(Class fromType, T reactiveType) { - RxTypeConverter rxTypeConverter = findRxTypeConverter(fromType); - if (rxTypeConverter != null) { - return rxTypeConverter.toObservable(reactiveType); - } - ReactiveTypeHandlerPlugin plugin = getPlugin(fromType); - return Observable.unsafeCreate((s) -> { - plugin.subscribe(new ReactiveExecutionListener() { - @Override - public void onResult(Object result) { - s.onNext(result); - s.onCompleted(); - } - @Override - public void onError(Throwable t) { - s.onError(t); - } - }, reactiveType); - }); + return plugin.toObservable(reactiveType); } @Override public T toCustomReactiveType(Class targetType, Observable observable) { - RxTypeConverter rxTypeConverter = findRxTypeConverter(targetType); - if (rxTypeConverter != null) { - return rxTypeConverter.toRxType(observable); - } - ReactiveTypeHandlerPlugin plugin = getPlugin(targetType); - T reactiveType = plugin.newReactiveType(); - // Eagerly subscribe to the given observable - observable.subscribe((next) -> plugin.complete(next, reactiveType), (error) -> plugin.completeExceptionally(error, reactiveType)); - return reactiveType; - } - - private RxTypeConverter findRxTypeConverter(Class rxType) { - @SuppressWarnings("unchecked") - RxTypeConverter rxTypeConverter = (RxTypeConverter) rxTypeConverters.get(rxType); - return rxTypeConverter; + return plugin.toReactiveType(observable); } private ReactiveTypeHandlerPlugin getPlugin(Class type) { @@ -97,29 +63,7 @@ private ReactiveTypeHandlerPlugin getPlugin(Class type) { @Override public boolean isReactiveType(Class> type) { - return rxTypeConverters.containsKey(type) || pluginByReactiveType.containsKey(type); - } - - private static RxTypeConverter rxTypeConverter(Function> toObservable, Function, T> toType) { - return new RxTypeConverter<>() { - @Override - public Observable toObservable(T rxTypeInstance) { - return toObservable.apply(rxTypeInstance); - } - - @Override - public T toRxType(Observable observable) { - return toType.apply(observable); - } - }; - } - - private interface RxTypeConverter { - - Observable toObservable(T rxTypeInstance); - - T toRxType(Observable observable); - + return pluginByReactiveType.containsKey(type); } } diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java index 39a016625..006b5f1bc 100644 --- a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java @@ -15,37 +15,30 @@ */ package com.avanza.astrix.beans.core; +import rx.Observable; + +/** + * Plugin for converting between {@link Observable} and a reactive type + * + * @param a reactive or asynchronous type, such as {@link rx.Single Single} or {@link java.util.concurrent.CompletableFuture CompletableFuture} + */ public interface ReactiveTypeHandlerPlugin { + /** - * Subscribes a ReactiveExecutionListener to a reactive type. - * - * @param listener - * @param reactiveType - */ - void subscribe(ReactiveExecutionListener listener, T reactiveType); - - /** - * Completes a reactive execution created using {@link #newReactiveType()} with an error. - * - * This method will only be invoked with instances created using {@link #newReactiveType()}, - * so its safe to downcast the reactiveType argument to the type returned by - * {@link #newReactiveType()} - * - * @param error - * @param reactiveType + * Convert from reactive type {@code T} to {@link Observable} + * + * The returned {@link Observable} should not be already subscribed by the method. + * */ - void completeExceptionally(Throwable error, T reactiveType); - + Observable toObservable(T reactiveType); + /** - * Successfully completes a reactive execution created using newReactiveType with - * a given result. - * - * @param result - * @param reactiveType + * Convert from {@link Observable} to reactive type {@code T} + * + * The incoming {@link Observable} should get subscribed by this method. + * */ - void complete(Object result, T reactiveType); - - T newReactiveType(); - + T toReactiveType(Observable observable); + Class reactiveTypeHandled(); } diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxCompletableTypeHandlerPlugin.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxCompletableTypeHandlerPlugin.java new file mode 100644 index 000000000..07de056fd --- /dev/null +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxCompletableTypeHandlerPlugin.java @@ -0,0 +1,41 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.beans.core; + +import rx.Completable; +import rx.Observable; +import rx.subjects.ReplaySubject; + +public class RxCompletableTypeHandlerPlugin implements ReactiveTypeHandlerPlugin { + + @Override + public Observable toObservable(Completable reactiveType) { + return reactiveType.toObservable(); + } + + @Override + public Completable toReactiveType(Observable observable) { + ReplaySubject subject = ReplaySubject.createWithSize(1); + observable.subscribe(subject); + return subject.toCompletable(); + } + + @Override + public Class reactiveTypeHandled() { + return Completable.class; + } + +} diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxSingleTypeHandlerPlugin.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxSingleTypeHandlerPlugin.java new file mode 100644 index 000000000..52b6afd28 --- /dev/null +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxSingleTypeHandlerPlugin.java @@ -0,0 +1,42 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.beans.core; + +import rx.Observable; +import rx.Single; +import rx.subjects.ReplaySubject; + +public class RxSingleTypeHandlerPlugin implements ReactiveTypeHandlerPlugin> { + + @Override + public Observable toObservable(Single reactiveType) { + return reactiveType.toObservable(); + } + + @Override + public Single toReactiveType(Observable observable) { + ReplaySubject subject = ReplaySubject.createWithSize(1); + observable.subscribe(subject); + return subject.toSingle(); + } + + @Override + @SuppressWarnings("unchecked") + public Class> reactiveTypeHandled() { + Class> type = Single.class; + return (Class>) type; + } +} diff --git a/astrix-contracts/pom.xml b/astrix-contracts/pom.xml index db9849c70..1df4539ce 100644 --- a/astrix-contracts/pom.xml +++ b/astrix-contracts/pom.xml @@ -13,11 +13,6 @@ astrix-context ${project.version} - - ${project.groupId} - astrix-gs - ${project.version} - junit junit diff --git a/astrix-contracts/src/main/java/com/avanza/astrix/contracts/ReactiveTypeHandlerContract.java b/astrix-contracts/src/main/java/com/avanza/astrix/contracts/ReactiveTypeHandlerContract.java index 002c30740..7c25784bc 100644 --- a/astrix-contracts/src/main/java/com/avanza/astrix/contracts/ReactiveTypeHandlerContract.java +++ b/astrix-contracts/src/main/java/com/avanza/astrix/contracts/ReactiveTypeHandlerContract.java @@ -15,87 +15,87 @@ */ package com.avanza.astrix.contracts; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Before; -import org.junit.Test; - -import com.avanza.astrix.beans.core.ReactiveExecutionListener; import com.avanza.astrix.beans.core.ReactiveTypeConverter; import com.avanza.astrix.beans.core.ReactiveTypeConverterImpl; import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; - +import org.junit.Test; import rx.Observable; -import rx.Subscriber; +import rx.observers.TestSubscriber; +import rx.subjects.ReplaySubject; + +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; public abstract class ReactiveTypeHandlerContract { - - private ReactiveTypeConverter reactiveTypeConverter; - private ReactiveTypeHandlerPlugin reactiveTypeHandler; - - @Before - public void setup() { - reactiveTypeHandler = newReactiveTypeHandler(); - reactiveTypeConverter = new ReactiveTypeConverterImpl(Arrays.asList(reactiveTypeHandler)); + + private final ReactiveTypeConverter reactiveTypeConverter; + private final Class reactiveTypeHandled; + + protected ReactiveTypeHandlerContract(ReactiveTypeHandlerPlugin reactiveTypeHandler) { + this.reactiveTypeConverter = new ReactiveTypeConverterImpl(singletonList(reactiveTypeHandler)); + this.reactiveTypeHandled = reactiveTypeHandler.reactiveTypeHandled(); } - - protected abstract ReactiveTypeHandlerPlugin newReactiveTypeHandler(); - @Test(timeout=2000) + @SuppressWarnings("SameParameterValue") + protected void assertValue(TestSubscriber testSubscriber, V value) { + testSubscriber.assertValue(value); + } + + @Test(timeout = 2000) public final void reactiveTypeListenerIsNotifiedAsynchronouslyWhenReactiveExecutionCompletes() throws Exception { - T reactiveType = reactiveTypeHandler.newReactiveType(); - - - ReactiveResultSpy resultSpy = new ReactiveResultSpy(); - reactiveTypeHandler.subscribe(resultSpy, reactiveType); // Subscribe after execution completes - assertFalse(resultSpy.isDone()); - assertNull(resultSpy.error); - assertNull(resultSpy.lastElement); - - reactiveTypeHandler.complete("foo", reactiveType); // Complete reactive execution - assertTrue(resultSpy.isDone()); - assertEquals("foo", resultSpy.lastElement); - assertNull(resultSpy.error); + ReplaySubject subject = ReplaySubject.createWithSize(1); + T reactiveType = toCustomReactiveType(subject); + Observable observable = toObservable(reactiveType); + + TestSubscriber resultSpy = TestSubscriber.create(); + observable.subscribe(resultSpy); // Subscribe after execution completes + resultSpy.assertNotCompleted(); + resultSpy.assertNoErrors(); + resultSpy.assertNoValues(); + + subject.onNext("foo"); + subject.onCompleted(); // Complete reactive execution + resultSpy.assertCompleted(); + assertValue(resultSpy, "foo"); + resultSpy.assertNoErrors(); } @Test(timeout=2000) public final void reactiveTypeListenerIsNotifiedSynchronouslyIfReactiveExecutionAlreadyCompleted() throws Exception { - T reactiveType = reactiveTypeHandler.newReactiveType(); + ReplaySubject subject = ReplaySubject.createWithSize(1); + T reactiveType = toCustomReactiveType(subject); + Observable observable = toObservable(reactiveType); - reactiveTypeHandler.complete("foo", reactiveType); // Completes reactive execution - - ReactiveResultSpy resultSpy = new ReactiveResultSpy(); - reactiveTypeHandler.subscribe(resultSpy, reactiveType); // Subscribe after execution completes + subject.onNext("foo"); + subject.onCompleted(); // Completes reactive execution + + TestSubscriber resultSpy = TestSubscriber.create(); + observable.subscribe(resultSpy); // Subscribe after execution completes - assertTrue(resultSpy.isDone()); - assertEquals("foo", resultSpy.lastElement); - assertNull(resultSpy.error); + resultSpy.assertCompleted(); + assertValue(resultSpy, "foo"); + resultSpy.assertNoErrors(); } @Test(timeout=2000) public final void reactiveTypeToObservableShouldNotBlock() throws Exception { - T reactiveType = reactiveTypeHandler.newReactiveType(); + ReplaySubject subject = ReplaySubject.createWithSize(1); + T reactiveType = toCustomReactiveType(subject); + Observable observable = toObservable(reactiveType); - Observable observable = reactiveTypeConverter.toObservable(reactiveTypeHandler.reactiveTypeHandled(), reactiveType); + TestSubscriber resultSpy = TestSubscriber.create(); + observable.subscribe(resultSpy); - ObservableSpy reactiveResultListener = new ObservableSpy(); - observable.subscribe(reactiveResultListener); + resultSpy.assertNotCompleted(); - assertFalse(reactiveResultListener.isDone()); - - reactiveTypeHandler.complete("foo", reactiveType); - - assertTrue(reactiveResultListener.isDone()); - assertEquals("foo", reactiveResultListener.lastElement); - assertNull(reactiveResultListener.error); + subject.onNext("foo"); + subject.onCompleted(); + + resultSpy.assertCompleted(); + assertValue(resultSpy, "foo"); + resultSpy.assertNoErrors(); } @Test(timeout=1000) @@ -110,11 +110,11 @@ public final void reactiveTypeToObservable_CreatedObserverIsSubscribedInConversi assertEquals(0, sourceSubscriptionCount.get()); T reactiveType = toCustomReactiveType(emitsFoo); assertEquals(1, sourceSubscriptionCount.get()); + + TestSubscriber resultSpy = TestSubscriber.create(); + toObservable(reactiveType).subscribe(resultSpy); - ReactiveResultSpy resultSpy = new ReactiveResultSpy(); - reactiveTypeHandler.subscribe(resultSpy, reactiveType); - - assertEquals("foo", resultSpy.lastElement); + assertValue(resultSpy, "foo"); } @Test @@ -127,90 +127,43 @@ public final void onlySubscribesOneTimeToSourceObservableIfConvertingBackAndFort }); T reactiveType = toCustomReactiveType(emitsFoo); - Observable reconstructedObservable = (Observable) reactiveTypeConverter.toObservable(reactiveTypeHandler.reactiveTypeHandled(), reactiveType); + Observable reconstructedObservable = toObservable(reactiveType); reactiveType = toCustomReactiveType(reconstructedObservable); assertEquals(1, sourceSubscriptionCount.get()); - ReactiveResultSpy resultSpy = new ReactiveResultSpy(); - reactiveTypeHandler.subscribe(resultSpy, reactiveType); + TestSubscriber resultSpy = TestSubscriber.create(); + toObservable(reactiveType).subscribe(resultSpy); - assertEquals("foo", resultSpy.lastElement); + assertValue(resultSpy, "foo"); } @Test public final void notifiesExceptionalResults() throws Exception { - T reactiveType = reactiveTypeHandler.newReactiveType(); + ReplaySubject subject = ReplaySubject.createWithSize(1); + T reactiveType = toCustomReactiveType(subject); + Observable observable = toObservable(reactiveType); + + TestSubscriber resultSpy = TestSubscriber.create(); + observable.subscribe(resultSpy); // Subscribe after execution completes + resultSpy.assertNotCompleted(); + resultSpy.assertNoErrors(); + resultSpy.assertNoValues(); + + RuntimeException error = new RuntimeException("foo"); + subject.onError(error); // Complete reactive execution + resultSpy.assertTerminalEvent(); + resultSpy.assertNoValues(); + resultSpy.assertError(error); + } - ReactiveResultSpy resultSpy = new ReactiveResultSpy(); - reactiveTypeHandler.subscribe(resultSpy, reactiveType); // Subscribe after execution completes - assertFalse(resultSpy.isDone()); - assertNull(resultSpy.error); - assertNull(resultSpy.lastElement); - - reactiveTypeHandler.completeExceptionally(new RuntimeException("foo"), reactiveType); // Complete reactive execution - assertTrue(resultSpy.isDone()); - assertNull(resultSpy.lastElement); - assertNotNull(resultSpy.error); - assertEquals("foo", resultSpy.error.getMessage()); + private Observable toObservable(T reactiveType) { + return reactiveTypeConverter.toObservable(reactiveTypeHandled, reactiveType); } - @SuppressWarnings("unchecked") private T toCustomReactiveType(Observable emitsFoo) { - return (T) reactiveTypeConverter.toCustomReactiveType(reactiveType(), emitsFoo); + return reactiveTypeConverter.toCustomReactiveType(reactiveTypeHandled, emitsFoo); } - private Class super T> reactiveType() { - return this.reactiveTypeHandler.reactiveTypeHandled(); - } - - - private static class ObservableSpy extends Subscriber { - - private final CountDownLatch done = new CountDownLatch(1); - private Throwable error; - private Object lastElement; - - @Override - public void onCompleted() { - done.countDown(); - } - - @Override - public void onError(Throwable e) { - this.error = e; - } - - @Override - public void onNext(Object next) { - this.lastElement = next; - } - - public boolean isDone() { - return done.getCount() == 0; - } - } - private static class ReactiveResultSpy implements ReactiveExecutionListener { - - private final CountDownLatch done = new CountDownLatch(1); - private Throwable error; - private Object lastElement; - - @Override - public void onError(Throwable e) { - this.error = e; - done.countDown(); - } - - @Override - public void onResult(Object result) { - this.lastElement = result; - done.countDown(); - } - - public boolean isDone() { - return done.getCount() == 0; - } - } } diff --git a/astrix-contracts/src/test/java/com/avanza/astrix/contracts/CompletableFutureReactiveTypeHandlerTest.java b/astrix-contracts/src/test/java/com/avanza/astrix/contracts/CompletableFutureTypeHandlerPluginTest.java similarity index 69% rename from astrix-contracts/src/test/java/com/avanza/astrix/contracts/CompletableFutureReactiveTypeHandlerTest.java rename to astrix-contracts/src/test/java/com/avanza/astrix/contracts/CompletableFutureTypeHandlerPluginTest.java index 69c1faf3a..1335459ff 100644 --- a/astrix-contracts/src/test/java/com/avanza/astrix/contracts/CompletableFutureReactiveTypeHandlerTest.java +++ b/astrix-contracts/src/test/java/com/avanza/astrix/contracts/CompletableFutureTypeHandlerPluginTest.java @@ -15,15 +15,14 @@ */ package com.avanza.astrix.contracts; +import com.avanza.astrix.beans.core.CompletableFutureTypeHandlerPlugin; + import java.util.concurrent.CompletableFuture; -import com.avanza.astrix.beans.core.CompletableFutureTypeHandlerPlugin; -import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; +public class CompletableFutureTypeHandlerPluginTest extends ReactiveTypeHandlerContract> { -public class CompletableFutureReactiveTypeHandlerTest extends ReactiveTypeHandlerContract> { - @Override - protected ReactiveTypeHandlerPlugin> newReactiveTypeHandler() { - return new CompletableFutureTypeHandlerPlugin(); + public CompletableFutureTypeHandlerPluginTest() { + super(new CompletableFutureTypeHandlerPlugin()); } - + } diff --git a/astrix-contracts/src/test/java/com/avanza/astrix/contracts/RxCompletableTypeHandlerPluginTest.java b/astrix-contracts/src/test/java/com/avanza/astrix/contracts/RxCompletableTypeHandlerPluginTest.java new file mode 100644 index 000000000..6a2b78c04 --- /dev/null +++ b/astrix-contracts/src/test/java/com/avanza/astrix/contracts/RxCompletableTypeHandlerPluginTest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.contracts; + +import com.avanza.astrix.beans.core.RxCompletableTypeHandlerPlugin; +import rx.Completable; +import rx.observers.TestSubscriber; + +public class RxCompletableTypeHandlerPluginTest extends ReactiveTypeHandlerContract { + + public RxCompletableTypeHandlerPluginTest() { + super(new RxCompletableTypeHandlerPlugin()); + } + + @Override + protected void assertValue(TestSubscriber testSubscriber, V value) { + testSubscriber.assertNoValues(); + } + +} diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveExecutionListener.java b/astrix-contracts/src/test/java/com/avanza/astrix/contracts/RxSingleTypeHandlerPluginTest.java similarity index 66% rename from astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveExecutionListener.java rename to astrix-contracts/src/test/java/com/avanza/astrix/contracts/RxSingleTypeHandlerPluginTest.java index a33cdd112..587d94d5a 100644 --- a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveExecutionListener.java +++ b/astrix-contracts/src/test/java/com/avanza/astrix/contracts/RxSingleTypeHandlerPluginTest.java @@ -13,9 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.avanza.astrix.beans.core; +package com.avanza.astrix.contracts; -public interface ReactiveExecutionListener { - void onError(Throwable t); - void onResult(Object result); -} \ No newline at end of file +import com.avanza.astrix.beans.core.RxSingleTypeHandlerPlugin; +import rx.Single; + +public class RxSingleTypeHandlerPluginTest extends ReactiveTypeHandlerContract> { + + public RxSingleTypeHandlerPluginTest() { + super(new RxSingleTypeHandlerPlugin()); + } + +} diff --git a/astrix-gs/pom.xml b/astrix-gs/pom.xml index defda5208..dcd89f896 100644 --- a/astrix-gs/pom.xml +++ b/astrix-gs/pom.xml @@ -91,5 +91,11 @@ log4j-slf4j-impl test + + ${project.groupId} + astrix-contracts + ${project.version} + test + \ No newline at end of file diff --git a/astrix-gs/src/main/java/com/avanza/astrix/gs/AsyncFutureTypeHandler.java b/astrix-gs/src/main/java/com/avanza/astrix/gs/AsyncFutureTypeHandler.java index 3896ff883..ac830ea60 100644 --- a/astrix-gs/src/main/java/com/avanza/astrix/gs/AsyncFutureTypeHandler.java +++ b/astrix-gs/src/main/java/com/avanza/astrix/gs/AsyncFutureTypeHandler.java @@ -15,17 +15,12 @@ */ package com.avanza.astrix.gs; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.avanza.astrix.beans.core.ReactiveExecutionListener; import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; import com.gigaspaces.async.AsyncFuture; -import com.gigaspaces.async.AsyncFutureListener; -import com.gigaspaces.async.internal.DefaultAsyncResult; +import com.gigaspaces.async.SettableFuture; + +import rx.Observable; + /** * * @author Elias Lindholm @@ -34,151 +29,32 @@ public class AsyncFutureTypeHandler implements ReactiveTypeHandlerPlugin> { @Override - public void subscribe(ReactiveExecutionListener listener, AsyncFuture reactiveType) { - reactiveType.setListener(result -> { - if (result.getException() != null) { - listener.onError(result.getException()); - } else { - listener.onResult(result.getResult()); - } - }); - } - - @Override - public void completeExceptionally(Throwable error, AsyncFuture reactiveType) { - AsyncFutureImpl.class.cast(reactiveType).setError(error); - } - - @SuppressWarnings("unchecked") - @Override - public void complete(Object result, AsyncFuture reactiveType) { - AsyncFutureImpl.class.cast(reactiveType).setResult(result); + public Observable toObservable(AsyncFuture reactiveType) { + return Observable.unsafeCreate( + subscriber -> reactiveType.setListener(result -> { + Exception exception = result.getException(); + if (exception == null) { + subscriber.onNext(result.getResult()); + subscriber.onCompleted(); + } else { + subscriber.onError(exception); + } + }) + ); } @Override - public AsyncFuture newReactiveType() { - return new AsyncFutureImpl<>(); + public AsyncFuture toReactiveType(Observable observable) { + SettableFuture reactiveType = new SettableFuture<>(); + observable.subscribe(reactiveType::setResult, reactiveType::setResult); + return reactiveType; } @SuppressWarnings("unchecked") @Override public Class> reactiveTypeHandled() { - Class> class1 = AsyncFuture.class; - return (Class>) class1; - } - - public static class AsyncFutureImpl implements AsyncFuture { - - private final CountDownLatch done = new CountDownLatch(1); - private volatile T result; - private volatile Throwable error; - private volatile FutureListenerNotifier futureListener; - - private class FutureListenerNotifier { - - private final AsyncFutureListener futureListener; - private final AtomicBoolean notified = new AtomicBoolean(false); - - public FutureListenerNotifier(AsyncFutureListener futureListener) { - this.futureListener = futureListener; - } - - public void ensureNotified() { - boolean doNotify = notified.compareAndSet(false, true); - if (doNotify) { - futureListener.onResult(new DefaultAsyncResult(result, asException(error))); - } - } - - private Exception asException(Throwable error) { - if (error == null) { - return null; - } - if (error instanceof Exception) { - return (Exception) error; - } - return new RuntimeException(error); - } - - } - - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return done.getCount() == 0; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - done.await(); - return getResult(); - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (!done.await(timeout, unit)) { - throw new TimeoutException(); - } - return getResult(); - } - - - public void setError(Throwable t1) { - error = t1; - done.countDown(); - notifyListener(); - } - - public void setResult(T result) { - this.result = result; - done.countDown(); - notifyListener(); - } - - private void notifyListener() { - if (this.futureListener != null) { - this.futureListener.ensureNotified(); - } - } - - private T getResult() throws ExecutionException { - if (error != null) { - throw new ExecutionException(error); - } - return result; - } - - /** - * Sets a listener for the result of this future. The listener is guaranteed to be invoked exactly - * one time when this future completes. - * - * If this Future is already completed then the listener will be invoked on the same thread - * that invokes this method. Otherwise the listener will be invoked on the - * thread that completes the computation of this future. - * - * If this method is invoked multiple times, then only the last listener set is guaranteed - * to receive a callback with the result. - * - * - * @param futureListener - */ - @Override - public void setListener(AsyncFutureListener futureListener) { - this.futureListener = new FutureListenerNotifier(futureListener); - if (isDone()) { - this.futureListener.ensureNotified(); - } - } + Class> type = AsyncFuture.class; + return (Class>) type; } } diff --git a/astrix-contracts/src/test/java/com/avanza/astrix/contracts/AsyncFutureReactiveTypeHandlerTest.java b/astrix-gs/src/test/java/com/avanza/astrix/gs/AsyncFutureTypeHandlerTest.java similarity index 62% rename from astrix-contracts/src/test/java/com/avanza/astrix/contracts/AsyncFutureReactiveTypeHandlerTest.java rename to astrix-gs/src/test/java/com/avanza/astrix/gs/AsyncFutureTypeHandlerTest.java index 9e4132e1f..ef8ee8f9d 100644 --- a/astrix-contracts/src/test/java/com/avanza/astrix/contracts/AsyncFutureReactiveTypeHandlerTest.java +++ b/astrix-gs/src/test/java/com/avanza/astrix/gs/AsyncFutureTypeHandlerTest.java @@ -13,17 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.avanza.astrix.contracts; +package com.avanza.astrix.gs; -import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; -import com.avanza.astrix.gs.AsyncFutureTypeHandler; +import com.avanza.astrix.contracts.ReactiveTypeHandlerContract; import com.gigaspaces.async.AsyncFuture; -public class AsyncFutureReactiveTypeHandlerTest extends ReactiveTypeHandlerContract> { +public class AsyncFutureTypeHandlerTest extends ReactiveTypeHandlerContract> { - @Override - protected ReactiveTypeHandlerPlugin> newReactiveTypeHandler() { - return new AsyncFutureTypeHandler(); + public AsyncFutureTypeHandlerTest() { + super(new AsyncFutureTypeHandler()); } } diff --git a/astrix-integration-tests/src/test/java/com/avanza/astrix/integration/tests/ClusteredProxyLibraryTest.java b/astrix-integration-tests/src/test/java/com/avanza/astrix/integration/tests/ClusteredProxyLibraryTest.java index eddfa32d2..90bd7622c 100644 --- a/astrix-integration-tests/src/test/java/com/avanza/astrix/integration/tests/ClusteredProxyLibraryTest.java +++ b/astrix-integration-tests/src/test/java/com/avanza/astrix/integration/tests/ClusteredProxyLibraryTest.java @@ -15,27 +15,12 @@ */ package com.avanza.astrix.integration.tests; -import static com.avanza.astrix.integration.tests.TestLunchRestaurantBuilder.lunchRestaurant; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.hamcrest.Description; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.openspaces.core.GigaSpace; -import org.openspaces.core.executor.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.avanza.astrix.beans.core.AstrixSettings; import com.avanza.astrix.beans.registry.InMemoryServiceRegistry; import com.avanza.astrix.config.DynamicConfig; import com.avanza.astrix.context.AstrixConfigurer; import com.avanza.astrix.context.AstrixContext; import com.avanza.astrix.core.ServiceUnavailableException; -import com.avanza.astrix.gs.AsyncFutureTypeHandler.AsyncFutureImpl; import com.avanza.astrix.integration.tests.domain.api.LunchRestaurant; import com.avanza.astrix.integration.tests.domain.api.LunchService; import com.avanza.astrix.integration.tests.domain.api.LunchStatistics; @@ -45,6 +30,19 @@ import com.avanza.gs.test.PuConfigurers; import com.avanza.gs.test.RunningPu; import com.gigaspaces.async.AsyncFuture; +import org.hamcrest.Description; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.openspaces.core.GigaSpace; +import org.openspaces.core.executor.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.avanza.astrix.integration.tests.TestLunchRestaurantBuilder.lunchRestaurant; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class ClusteredProxyLibraryTest { @@ -97,8 +95,7 @@ public void itsPossibleToInvokeMethodsReturningAsyncFutureWithFaultTolerance() t configurer.enableFaultTolerance(true); astrix = autoClosables.add(configurer.configure()); GigaSpace gigaSpace = astrix.waitForBean(GigaSpace.class, "lunch-space", 10000); - AsyncFuture future = gigaSpace.asyncRead(LunchRestaurant.template()); - assertEquals(AsyncFutureImpl.class, future.getClass()); + gigaSpace.asyncRead(LunchRestaurant.template()); } @Test(expected = ServiceUnavailableException.class) diff --git a/astrix-reactor/pom.xml b/astrix-reactor/pom.xml new file mode 100644 index 000000000..dcd1cba81 --- /dev/null +++ b/astrix-reactor/pom.xml @@ -0,0 +1,36 @@ + + + 4.0.0 + + + com.avanza.astrix + astrix-parent + 2.0.5-SNAPSHOT + + + astrix-reactor + + + + ${project.groupId} + astrix-context + ${project.version} + + + + io.projectreactor + reactor-core + + + + ${project.groupId} + astrix-contracts + ${project.version} + test + + + + + \ No newline at end of file diff --git a/astrix-reactor/src/main/java/com/avanza/astrix/reactor/FluxModule.java b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/FluxModule.java new file mode 100644 index 000000000..3db3decc1 --- /dev/null +++ b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/FluxModule.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.reactor; + +import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; +import com.avanza.astrix.context.AstrixContextPlugin; +import com.avanza.astrix.modules.ModuleContext; + +public class FluxModule implements AstrixContextPlugin { + + @Override + public void prepare(ModuleContext moduleContext) { + moduleContext.bind(ReactiveTypeHandlerPlugin.class, FluxTypeHandlerPlugin.class); + + moduleContext.export(ReactiveTypeHandlerPlugin.class); + } + +} diff --git a/astrix-reactor/src/main/java/com/avanza/astrix/reactor/FluxTypeHandlerPlugin.java b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/FluxTypeHandlerPlugin.java new file mode 100644 index 000000000..c15cf765f --- /dev/null +++ b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/FluxTypeHandlerPlugin.java @@ -0,0 +1,47 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.reactor; + +import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import rx.Observable; + +public class FluxTypeHandlerPlugin implements ReactiveTypeHandlerPlugin> { + + @Override + public Observable toObservable(Flux reactiveType) { + return Observable.unsafeCreate( + subscriber -> reactiveType.subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted) + ); + } + + @Override + public Flux toReactiveType(Observable observable) { + Many sink = Sinks.many().replay().all(); + observable.subscribe(sink::tryEmitNext, sink::tryEmitError, sink::tryEmitComplete); + return sink.asFlux(); + } + + @Override + @SuppressWarnings("unchecked") + public Class> reactiveTypeHandled() { + Class> type = Flux.class; + return (Class>) type; + } +} diff --git a/astrix-reactor/src/main/java/com/avanza/astrix/reactor/MonoModule.java b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/MonoModule.java new file mode 100644 index 000000000..df98fa4d9 --- /dev/null +++ b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/MonoModule.java @@ -0,0 +1,31 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.reactor; + +import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; +import com.avanza.astrix.context.AstrixContextPlugin; +import com.avanza.astrix.modules.ModuleContext; + +public class MonoModule implements AstrixContextPlugin { + + @Override + public void prepare(ModuleContext moduleContext) { + moduleContext.bind(ReactiveTypeHandlerPlugin.class, MonoTypeHandlerPlugin.class); + + moduleContext.export(ReactiveTypeHandlerPlugin.class); + } + +} diff --git a/astrix-reactor/src/main/java/com/avanza/astrix/reactor/MonoTypeHandlerPlugin.java b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/MonoTypeHandlerPlugin.java new file mode 100644 index 000000000..ebff2ce18 --- /dev/null +++ b/astrix-reactor/src/main/java/com/avanza/astrix/reactor/MonoTypeHandlerPlugin.java @@ -0,0 +1,47 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.reactor; + +import com.avanza.astrix.beans.core.ReactiveTypeHandlerPlugin; + +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.One; +import rx.Observable; + +public class MonoTypeHandlerPlugin implements ReactiveTypeHandlerPlugin> { + + @Override + public Observable toObservable(Mono reactiveType) { + return Observable.unsafeCreate( + subscriber -> reactiveType.subscribe(subscriber::onNext, subscriber::onError, subscriber::onCompleted) + ); + } + + @Override + public Mono toReactiveType(Observable observable) { + One sink = Sinks.one(); + observable.subscribe(sink::tryEmitValue, sink::tryEmitError, sink::tryEmitEmpty); + return sink.asMono(); + } + + @Override + @SuppressWarnings("unchecked") + public Class> reactiveTypeHandled() { + Class> type = Mono.class; + return (Class>) type; + } +} diff --git a/astrix-reactor/src/main/resources/META-INF/services/com.avanza.astrix.context.AstrixContextPlugin b/astrix-reactor/src/main/resources/META-INF/services/com.avanza.astrix.context.AstrixContextPlugin new file mode 100644 index 000000000..056eec166 --- /dev/null +++ b/astrix-reactor/src/main/resources/META-INF/services/com.avanza.astrix.context.AstrixContextPlugin @@ -0,0 +1,2 @@ +com.avanza.astrix.reactor.FluxModule +com.avanza.astrix.reactor.MonoModule \ No newline at end of file diff --git a/astrix-reactor/src/test/java/com/avanza/astrix/reactor/FluxTypeHandlerPluginTest.java b/astrix-reactor/src/test/java/com/avanza/astrix/reactor/FluxTypeHandlerPluginTest.java new file mode 100644 index 000000000..9c7ba8a3d --- /dev/null +++ b/astrix-reactor/src/test/java/com/avanza/astrix/reactor/FluxTypeHandlerPluginTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.reactor; + +import com.avanza.astrix.contracts.ReactiveTypeHandlerContract; +import reactor.core.publisher.Flux; + +public class FluxTypeHandlerPluginTest extends ReactiveTypeHandlerContract> { + + public FluxTypeHandlerPluginTest() { + super(new FluxTypeHandlerPlugin()); + } + +} diff --git a/astrix-reactor/src/test/java/com/avanza/astrix/reactor/MonoTypeHandlerPluginTest.java b/astrix-reactor/src/test/java/com/avanza/astrix/reactor/MonoTypeHandlerPluginTest.java new file mode 100644 index 000000000..e0acf6c97 --- /dev/null +++ b/astrix-reactor/src/test/java/com/avanza/astrix/reactor/MonoTypeHandlerPluginTest.java @@ -0,0 +1,27 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.reactor; + +import com.avanza.astrix.contracts.ReactiveTypeHandlerContract; +import reactor.core.publisher.Mono; + +public class MonoTypeHandlerPluginTest extends ReactiveTypeHandlerContract> { + + public MonoTypeHandlerPluginTest() { + super(new MonoTypeHandlerPlugin()); + } + +} diff --git a/pom.xml b/pom.xml index f0a9821ff..6322f0223 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,7 @@ astrix-test-util astrix-spring astrix-http + astrix-reactor astrix-remoting examples doc-snippets @@ -142,6 +143,7 @@ 1.7.32 16.1.1 1.3.8 + 2020.0.18 1.5.18 0.4.1 2.13.2.1 @@ -210,6 +212,14 @@ import + + io.projectreactor + reactor-bom + ${reactor-bom.version} + pom + import + + com.google.code.findbugs jsr305
- * - * @author Elias Lindholm - * - * @param - */ -public final class FutureResult { - - private final T result; - private final Throwable exception; - - public FutureResult(T result, Throwable exception) { - this.result = result; - this.exception = exception; - } - - /** - * If the underlying computation ended with an error, then - * this method returns the given Throwable. - * - * @return A Throwable if the underlying computation ended with an error, null otherwise - */ - public Throwable getException() { - return exception; - } - - /** - * If the underlying computation ended successfully, then - * this method returns the result (possibly null). - * - * @return - */ - public T getResult() { - return result; - } - -} diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java index 0f515d533..7a4c87d9e 100644 --- a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeConverterImpl.java @@ -15,75 +15,41 @@ */ package com.avanza.astrix.beans.core; +import rx.Observable; + import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import rx.Completable; -import rx.Observable; -import rx.Single; public final class ReactiveTypeConverterImpl implements ReactiveTypeConverter { - private final Map, RxTypeConverter>> rxTypeConverters = Map.of( - Single.class, rxTypeConverter(Single::toObservable, Observable::toSingle), - Completable.class, rxTypeConverter(Completable::toObservable, Observable::toCompletable) + private static final List> wellKnownTypes = List.of( + new CompletableFutureTypeHandlerPlugin(), + new RxCompletableTypeHandlerPlugin(), + new RxSingleTypeHandlerPlugin() ); private final Map, ReactiveTypeHandlerPlugin>> pluginByReactiveType; public ReactiveTypeConverterImpl(List> typeConverterPlugins) { - Map, ReactiveTypeHandlerPlugin>> pluginByReactiveType = new LinkedHashMap<>(typeConverterPlugins.size() + 1); + Map, ReactiveTypeHandlerPlugin>> pluginByReactiveType = new LinkedHashMap<>(typeConverterPlugins.size() + wellKnownTypes.size()); for (ReactiveTypeHandlerPlugin> asyncTypeConverterPlugin : typeConverterPlugins) { pluginByReactiveType.put(asyncTypeConverterPlugin.reactiveTypeHandled(), asyncTypeConverterPlugin); } - pluginByReactiveType.putIfAbsent(CompletableFuture.class, new CompletableFutureTypeHandlerPlugin()); + wellKnownTypes.forEach(it -> pluginByReactiveType.putIfAbsent(it.reactiveTypeHandled(), it)); this.pluginByReactiveType = Map.copyOf(pluginByReactiveType); } @Override public Observable toObservable(Class fromType, T reactiveType) { - RxTypeConverter rxTypeConverter = findRxTypeConverter(fromType); - if (rxTypeConverter != null) { - return rxTypeConverter.toObservable(reactiveType); - } - ReactiveTypeHandlerPlugin plugin = getPlugin(fromType); - return Observable.unsafeCreate((s) -> { - plugin.subscribe(new ReactiveExecutionListener() { - @Override - public void onResult(Object result) { - s.onNext(result); - s.onCompleted(); - } - @Override - public void onError(Throwable t) { - s.onError(t); - } - }, reactiveType); - }); + return plugin.toObservable(reactiveType); } @Override public T toCustomReactiveType(Class targetType, Observable observable) { - RxTypeConverter rxTypeConverter = findRxTypeConverter(targetType); - if (rxTypeConverter != null) { - return rxTypeConverter.toRxType(observable); - } - ReactiveTypeHandlerPlugin plugin = getPlugin(targetType); - T reactiveType = plugin.newReactiveType(); - // Eagerly subscribe to the given observable - observable.subscribe((next) -> plugin.complete(next, reactiveType), (error) -> plugin.completeExceptionally(error, reactiveType)); - return reactiveType; - } - - private RxTypeConverter findRxTypeConverter(Class rxType) { - @SuppressWarnings("unchecked") - RxTypeConverter rxTypeConverter = (RxTypeConverter) rxTypeConverters.get(rxType); - return rxTypeConverter; + return plugin.toReactiveType(observable); } private ReactiveTypeHandlerPlugin getPlugin(Class type) { @@ -97,29 +63,7 @@ private ReactiveTypeHandlerPlugin getPlugin(Class type) { @Override public boolean isReactiveType(Class> type) { - return rxTypeConverters.containsKey(type) || pluginByReactiveType.containsKey(type); - } - - private static RxTypeConverter rxTypeConverter(Function> toObservable, Function, T> toType) { - return new RxTypeConverter<>() { - @Override - public Observable toObservable(T rxTypeInstance) { - return toObservable.apply(rxTypeInstance); - } - - @Override - public T toRxType(Observable observable) { - return toType.apply(observable); - } - }; - } - - private interface RxTypeConverter { - - Observable toObservable(T rxTypeInstance); - - T toRxType(Observable observable); - + return pluginByReactiveType.containsKey(type); } } diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java index 39a016625..006b5f1bc 100644 --- a/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/ReactiveTypeHandlerPlugin.java @@ -15,37 +15,30 @@ */ package com.avanza.astrix.beans.core; +import rx.Observable; + +/** + * Plugin for converting between {@link Observable} and a reactive type + * + * @param a reactive or asynchronous type, such as {@link rx.Single Single} or {@link java.util.concurrent.CompletableFuture CompletableFuture} + */ public interface ReactiveTypeHandlerPlugin { + /** - * Subscribes a ReactiveExecutionListener to a reactive type. - * - * @param listener - * @param reactiveType - */ - void subscribe(ReactiveExecutionListener listener, T reactiveType); - - /** - * Completes a reactive execution created using {@link #newReactiveType()} with an error. - * - * This method will only be invoked with instances created using {@link #newReactiveType()}, - * so its safe to downcast the reactiveType argument to the type returned by - * {@link #newReactiveType()} - * - * @param error - * @param reactiveType + * Convert from reactive type {@code T} to {@link Observable} + * + * The returned {@link Observable} should not be already subscribed by the method. + * */ - void completeExceptionally(Throwable error, T reactiveType); - + Observable toObservable(T reactiveType); + /** - * Successfully completes a reactive execution created using newReactiveType with - * a given result. - * - * @param result - * @param reactiveType + * Convert from {@link Observable} to reactive type {@code T} + * + * The incoming {@link Observable} should get subscribed by this method. + * */ - void complete(Object result, T reactiveType); - - T newReactiveType(); - + T toReactiveType(Observable observable); + Class reactiveTypeHandled(); } diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxCompletableTypeHandlerPlugin.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxCompletableTypeHandlerPlugin.java new file mode 100644 index 000000000..07de056fd --- /dev/null +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxCompletableTypeHandlerPlugin.java @@ -0,0 +1,41 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.beans.core; + +import rx.Completable; +import rx.Observable; +import rx.subjects.ReplaySubject; + +public class RxCompletableTypeHandlerPlugin implements ReactiveTypeHandlerPlugin { + + @Override + public Observable toObservable(Completable reactiveType) { + return reactiveType.toObservable(); + } + + @Override + public Completable toReactiveType(Observable observable) { + ReplaySubject subject = ReplaySubject.createWithSize(1); + observable.subscribe(subject); + return subject.toCompletable(); + } + + @Override + public Class reactiveTypeHandled() { + return Completable.class; + } + +} diff --git a/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxSingleTypeHandlerPlugin.java b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxSingleTypeHandlerPlugin.java new file mode 100644 index 000000000..52b6afd28 --- /dev/null +++ b/astrix-context/src/main/java/com/avanza/astrix/beans/core/RxSingleTypeHandlerPlugin.java @@ -0,0 +1,42 @@ +/* + * Copyright 2014 Avanza Bank AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.avanza.astrix.beans.core; + +import rx.Observable; +import rx.Single; +import rx.subjects.ReplaySubject; + +public class RxSingleTypeHandlerPlugin implements ReactiveTypeHandlerPlugin> { + + @Override + public Observable toObservable(Single reactiveType) { + return reactiveType.toObservable(); + } + + @Override + public Single toReactiveType(Observable observable) { + ReplaySubject subject = ReplaySubject.createWithSize(1); + observable.subscribe(subject); + return subject.toSingle(); + } + + @Override + @SuppressWarnings("unchecked") + public Class> reactiveTypeHandled() { + Class> type = Single.class; + return (Class>) type; + } +} diff --git a/astrix-contracts/pom.xml b/astrix-contracts/pom.xml index db9849c70..1df4539ce 100644 --- a/astrix-contracts/pom.xml +++ b/astrix-contracts/pom.xml @@ -13,11 +13,6 @@ astrix-context ${project.version}
+ * The returned {@link Observable} should not be already subscribed by the method. + *
+ * The incoming {@link Observable} should get subscribed by this method. + *