From d56dc2edfd3781754022620e39b56b3fb46cca50 Mon Sep 17 00:00:00 2001 From: Niklas Baudy Date: Tue, 27 Aug 2019 16:49:33 +0200 Subject: [PATCH] 3.x: Remove vararg overloads for combineLatest in Observable + Flowable Fix tests. --- .../io/reactivex/rxjava3/core/Flowable.java | 171 ++---------------- .../io/reactivex/rxjava3/core/Observable.java | 104 +---------- .../rxjava3/flowable/FlowableNullTests.java | 76 -------- .../flowable/FlowableCombineLatestTest.java | 10 +- .../ObservableCombineLatestTest.java | 10 +- .../observable/ObservableNullTests.java | 76 -------- .../CombineLatestArrayDelayErrorTckTest.java | 5 +- .../tck/CombineLatestArrayTckTest.java | 5 +- 8 files changed, 36 insertions(+), 421 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index f16fa2cf03..cb7fbd726f 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -129,9 +129,9 @@ * }, BackpressureStrategy.BUFFER); * * System.out.println("Subscribe!"); - * + * * source.subscribe(System.out::println); - * + * * System.out.println("Done!"); * *

@@ -277,50 +277,6 @@ public static Flowable combineLatest(Publisher[] sources, return combineLatest(sources, combiner, bufferSize()); } - /** - * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of - * the source Publishers each time an item is received from any of the source Publishers, where this - * aggregation is defined by a specified function. - *

- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the - * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a - * {@code Function} passed to the method would trigger a {@code ClassCastException}. - *

- * If any of the sources never produces an item but only terminates (normally or with an error), the - * resulting sequence terminates immediately (normally or with all the errors accumulated until that point). - * If that input source is also synchronous, other sources after it will not be subscribed to. - *

- * If there are no source Publishers provided, the resulting sequence completes immediately without emitting - * any items and without any calls to the combiner function. - * - *

- *
Backpressure:
- *
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s - * are requested in a bounded manner, however, their backpressure is not enforced (the operator won't signal - * {@code MissingBackpressureException}) and may lead to {@code OutOfMemoryError} due to internal buffer bloat.
- *
Scheduler:
- *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param - * the common base type of source values - * @param - * the result type - * @param sources - * the collection of source Publishers - * @param combiner - * the aggregation function used to combine the items emitted by the source Publishers - * @return a Flowable that emits items that are the result of combining the items emitted by the source - * Publishers by means of the given aggregation function - * @see ReactiveX operators documentation: CombineLatest - */ - @SchedulerSupport(SchedulerSupport.NONE) - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatest(Function combiner, Publisher... sources) { - return combineLatest(sources, combiner, bufferSize()); - } - /** * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this @@ -515,100 +471,6 @@ public static Flowable combineLatestDelayError(Publisher[ return combineLatestDelayError(sources, combiner, bufferSize()); } - /** - * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of - * the source Publishers each time an item is received from any of the source Publishers, where this - * aggregation is defined by a specified function and delays any error from the sources until - * all source Publishers terminate. - *

- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the - * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a - * {@code Function} passed to the method would trigger a {@code ClassCastException}. - *

- * If any of the sources never produces an item but only terminates (normally or with an error), the - * resulting sequence terminates immediately (normally or with all the errors accumulated until that point). - * If that input source is also synchronous, other sources after it will not be subscribed to. - *

- * If there are no source Publishers provided, the resulting sequence completes immediately without emitting - * any items and without any calls to the combiner function. - * - *

- *
Backpressure:
- *
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s - * are requested in a bounded manner, however, their backpressure is not enforced (the operator won't signal - * {@code MissingBackpressureException}) and may lead to {@code OutOfMemoryError} due to internal buffer bloat.
- *
Scheduler:
- *
{@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param - * the common base type of source values - * @param - * the result type - * @param sources - * the collection of source Publishers - * @param combiner - * the aggregation function used to combine the items emitted by the source Publishers - * @return a Flowable that emits items that are the result of combining the items emitted by the source - * Publishers by means of the given aggregation function - * @see ReactiveX operators documentation: CombineLatest - */ - @SchedulerSupport(SchedulerSupport.NONE) - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatestDelayError(Function combiner, - Publisher... sources) { - return combineLatestDelayError(sources, combiner, bufferSize()); - } - - /** - * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of - * the source Publishers each time an item is received from any of the source Publisher, where this - * aggregation is defined by a specified function and delays any error from the sources until - * all source Publishers terminate. - *

- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the - * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a - * {@code Function} passed to the method would trigger a {@code ClassCastException}. - *

- * If any of the sources never produces an item but only terminates (normally or with an error), the - * resulting sequence terminates immediately (normally or with all the errors accumulated until that point). - * If that input source is also synchronous, other sources after it will not be subscribed to. - *

- * If there are no source Publishers provided, the resulting sequence completes immediately without emitting - * any items and without any calls to the combiner function. - * - *

- *
Backpressure:
- *
The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s - * are requested in a bounded manner, however, their backpressure is not enforced (the operator won't signal - * {@code MissingBackpressureException}) and may lead to {@code OutOfMemoryError} due to internal buffer bloat.
- *
Scheduler:
- *
{@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param - * the common base type of source values - * @param - * the result type - * @param sources - * the collection of source Publishers - * @param combiner - * the aggregation function used to combine the items emitted by the source Publishers - * @param bufferSize - * the internal buffer size and prefetch amount applied to every source Publisher - * @return a Flowable that emits items that are the result of combining the items emitted by the source - * Publishers by means of the given aggregation function - * @see ReactiveX operators documentation: CombineLatest - */ - @SchedulerSupport(SchedulerSupport.NONE) - @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) - public static Flowable combineLatestDelayError(Function combiner, - int bufferSize, Publisher... sources) { - return combineLatestDelayError(sources, combiner, bufferSize); - } - /** * Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of * the source Publishers each time an item is received from any of the source Publishers, where this @@ -802,8 +664,7 @@ public static Flowable combineLatest( BiFunction combiner) { ObjectHelper.requireNonNull(source1, "source1 is null"); ObjectHelper.requireNonNull(source2, "source2 is null"); - Function f = Functions.toFunction(combiner); - return combineLatest(f, source1, source2); + return combineLatest(new Publisher[] { source1, source2 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -853,7 +714,7 @@ public static Flowable combineLatest( ObjectHelper.requireNonNull(source1, "source1 is null"); ObjectHelper.requireNonNull(source2, "source2 is null"); ObjectHelper.requireNonNull(source3, "source3 is null"); - return combineLatest(Functions.toFunction(combiner), source1, source2, source3); + return combineLatest(new Publisher[] { source1, source2, source3 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -907,7 +768,7 @@ public static Flowable combineLatest( ObjectHelper.requireNonNull(source2, "source2 is null"); ObjectHelper.requireNonNull(source3, "source3 is null"); ObjectHelper.requireNonNull(source4, "source4 is null"); - return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4); + return combineLatest(new Publisher[] { source1, source2, source3, source4 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -966,7 +827,7 @@ public static Flowable combineLatest( ObjectHelper.requireNonNull(source3, "source3 is null"); ObjectHelper.requireNonNull(source4, "source4 is null"); ObjectHelper.requireNonNull(source5, "source5 is null"); - return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5); + return combineLatest(new Publisher[] { source1, source2, source3, source4, source5 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -1029,7 +890,7 @@ public static Flowable combineLatest( ObjectHelper.requireNonNull(source4, "source4 is null"); ObjectHelper.requireNonNull(source5, "source5 is null"); ObjectHelper.requireNonNull(source6, "source6 is null"); - return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6); + return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -1097,7 +958,7 @@ public static Flowable combineLatest( ObjectHelper.requireNonNull(source5, "source5 is null"); ObjectHelper.requireNonNull(source6, "source6 is null"); ObjectHelper.requireNonNull(source7, "source7 is null"); - return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6, source7); + return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6, source7 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -1169,7 +1030,7 @@ public static Flowable combineLatest( ObjectHelper.requireNonNull(source6, "source6 is null"); ObjectHelper.requireNonNull(source7, "source7 is null"); ObjectHelper.requireNonNull(source8, "source8 is null"); - return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6, source7, source8); + return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6, source7, source8 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -1246,7 +1107,7 @@ public static Flowable combineLatest( ObjectHelper.requireNonNull(source7, "source7 is null"); ObjectHelper.requireNonNull(source8, "source8 is null"); ObjectHelper.requireNonNull(source9, "source9 is null"); - return combineLatest(Functions.toFunction(combiner), source1, source2, source3, source4, source5, source6, source7, source8, source9); + return combineLatest(new Publisher[] { source1, source2, source3, source4, source5, source6, source7, source8, source9 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -10870,15 +10731,15 @@ public final Flowable> groupBy(FunctionA use case for specifying an {@code evictingMapFactory} is where the source is infinite and fast and * over time the number of keys grows enough to be a concern in terms of the memory footprint of the * internal hash map containing the {@link GroupedFlowable}s. - * + * *

The map created by an {@code evictingMapFactory} must be thread-safe. - * + * *

An example of an {@code evictingMapFactory} using CacheBuilder from the Guava library is below: - * + * *


      * Function<Consumer<Object>, Map<Integer, Object>> evictingMapFactory =
      *   notify ->
@@ -10905,7 +10766,7 @@ public final  Flowable> groupBy(Function
- * + * *

* *

@@ -11238,7 +11099,7 @@ public final Single lastOrError() { * Example: *


      * // Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
-     * 
+     *
      * public final class CustomSubscriber<T> implements FlowableSubscriber<T>, Subscription {
      *
      *     // The downstream's Subscriber that will receive the onXXX events
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index bcc013a10f..dc003e1f0a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -168,49 +168,6 @@ public static int bufferSize() {
         return Flowable.bufferSize();
     }
 
-    /**
-     * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
-     * the source ObservableSources each time an item is received from any of the source ObservableSources, where this
-     * aggregation is defined by a specified function.
-     * 

- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the - * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a - * {@code Function} passed to the method would trigger a {@code ClassCastException}. - *

- * If any of the sources never produces an item but only terminates (normally or with an error), the - * resulting sequence terminates immediately (normally or with all the errors accumulated till that point). - * If that input source is also synchronous, other sources after it will not be subscribed to. - *

- * If there are no ObservableSources provided, the resulting sequence completes immediately without emitting - * any items and without any calls to the combiner function. - * - *

- * - *

- *
Scheduler:
- *
{@code combineLatest} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param - * the common base type of source values - * @param - * the result type - * @param sources - * the collection of source ObservableSources - * @param combiner - * the aggregation function used to combine the items emitted by the source ObservableSources - * @param bufferSize - * the internal buffer size and prefetch amount applied to every source Observable - * @return an Observable that emits items that are the result of combining the items emitted by the source - * ObservableSources by means of the given aggregation function - * @see ReactiveX operators documentation: CombineLatest - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public static Observable combineLatest(Function combiner, int bufferSize, ObservableSource... sources) { - return combineLatest(sources, combiner, bufferSize); - } - /** * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this @@ -437,7 +394,7 @@ public static Observable combineLatest( BiFunction combiner) { ObjectHelper.requireNonNull(source1, "source1 is null"); ObjectHelper.requireNonNull(source2, "source2 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2); + return combineLatest(new ObservableSource[] { source1, source2 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -482,7 +439,7 @@ public static Observable combineLatest( ObjectHelper.requireNonNull(source1, "source1 is null"); ObjectHelper.requireNonNull(source2, "source2 is null"); ObjectHelper.requireNonNull(source3, "source3 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3); + return combineLatest(new ObservableSource[] { source1, source2, source3 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -531,7 +488,7 @@ public static Observable combineLatest( ObjectHelper.requireNonNull(source2, "source2 is null"); ObjectHelper.requireNonNull(source3, "source3 is null"); ObjectHelper.requireNonNull(source4, "source4 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4); + return combineLatest(new ObservableSource[] { source1, source2, source3, source4 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -585,7 +542,7 @@ public static Observable combineLatest( ObjectHelper.requireNonNull(source3, "source3 is null"); ObjectHelper.requireNonNull(source4, "source4 is null"); ObjectHelper.requireNonNull(source5, "source5 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5); + return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -643,7 +600,7 @@ public static Observable combineLatest( ObjectHelper.requireNonNull(source4, "source4 is null"); ObjectHelper.requireNonNull(source5, "source5 is null"); ObjectHelper.requireNonNull(source6, "source6 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6); + return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -706,7 +663,7 @@ public static Observable combineLatest( ObjectHelper.requireNonNull(source5, "source5 is null"); ObjectHelper.requireNonNull(source6, "source6 is null"); ObjectHelper.requireNonNull(source7, "source7 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7); + return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -773,7 +730,7 @@ public static Observable combineLatest( ObjectHelper.requireNonNull(source6, "source6 is null"); ObjectHelper.requireNonNull(source7, "source7 is null"); ObjectHelper.requireNonNull(source8, "source8 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8); + return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -845,7 +802,7 @@ public static Observable combineLates ObjectHelper.requireNonNull(source7, "source7 is null"); ObjectHelper.requireNonNull(source8, "source8 is null"); ObjectHelper.requireNonNull(source9, "source9 is null"); - return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8, source9); + return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8, source9 }, Functions.toFunction(combiner), bufferSize()); } /** @@ -890,51 +847,6 @@ public static Observable combineLatestDelayError(ObservableSource - * - *

- * Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the - * implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a - * {@code Function} passed to the method would trigger a {@code ClassCastException}. - *

- * If any of the sources never produces an item but only terminates (normally or with an error), the - * resulting sequence terminates immediately (normally or with all the errors accumulated till that point). - * If that input source is also synchronous, other sources after it will not be subscribed to. - *

- * If there are no ObservableSources provided, the resulting sequence completes immediately without emitting - * any items and without any calls to the combiner function. - * - *

- *
Scheduler:
- *
{@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param - * the common base type of source values - * @param - * the result type - * @param sources - * the collection of source ObservableSources - * @param combiner - * the aggregation function used to combine the items emitted by the source ObservableSources - * @param bufferSize - * the internal buffer size and prefetch amount applied to every source Observable - * @return an Observable that emits items that are the result of combining the items emitted by the source - * ObservableSources by means of the given aggregation function - * @see ReactiveX operators documentation: CombineLatest - */ - @CheckReturnValue - @SchedulerSupport(SchedulerSupport.NONE) - public static Observable combineLatestDelayError(Function combiner, - int bufferSize, ObservableSource... sources) { - return combineLatestDelayError(sources, combiner, bufferSize); - } - /** * Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of * the source ObservableSources each time an item is received from any of the source ObservableSources, where this diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java index 1e279f64c1..dffc68f230 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java @@ -76,27 +76,6 @@ public void ambIterableOneIsNull() { .assertError(NullPointerException.class); } - @Test(expected = NullPointerException.class) - public void combineLatestVarargsNull() { - Flowable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, (Publisher[])null); - } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestVarargsOneIsNull() { - Flowable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, Flowable.never(), null).blockingLast(); - } - @Test(expected = NullPointerException.class) public void combineLatestIterableNull() { Flowable.combineLatestDelayError((Iterable>)null, new Function() { @@ -133,23 +112,6 @@ public Object apply(Object[] v) { }).blockingLast(); } - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestVarargsFunctionNull() { - Flowable.combineLatestDelayError(null, Flowable.never()); - } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestVarargsFunctionReturnsNull() { - Flowable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return null; - } - }, just1).blockingLast(); - } - @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void combineLatestIterableFunctionNull() { @@ -2759,12 +2721,6 @@ public void combineLatestDelayErrorIterableFunctionNull() { Flowable.combineLatestDelayError(Arrays.asList(just1), null, 128); } - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsFunctionNull() { - Flowable.combineLatestDelayError(null, 128, Flowable.never()); - } - @Test(expected = NullPointerException.class) public void zipFlowableNull() { Flowable.zip((Flowable>)null, new Function() { @@ -2795,27 +2751,6 @@ public void concatFlowableNull() { Flowable.concat((Flowable>)null); } - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsNull() { - Flowable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, 128, (Flowable[])null); - } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsOneIsNull() { - Flowable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, 128, Flowable.never(), null).blockingLast(); - } - @Test(expected = NullPointerException.class) public void combineLatestDelayErrorIterableNull() { Flowable.combineLatestDelayError((Iterable>)null, new Function() { @@ -2876,15 +2811,4 @@ public void delaySubscriptionOtherNull() { public void sampleFlowableNull() { just1.sample(null); } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsFunctionReturnsNull() { - Flowable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return null; - } - }, 128, just1).blockingLast(); - } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java index bbffd97ee4..0a07b99d81 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java @@ -1305,15 +1305,14 @@ public Object apply(Object a, Object b) throws Exception { @Test public void errorDelayed() { Flowable.combineLatestDelayError( + new Publisher[] { Flowable.error(new TestException()), Flowable.just(1) }, new Function() { @Override public Object apply(Object[] a) throws Exception { return a; } }, - 128, - Flowable.error(new TestException()), - Flowable.just(1) + 128 ) .test() .assertFailure(TestException.class); @@ -1323,15 +1322,14 @@ public Object apply(Object[] a) throws Exception { @Test public void errorDelayed2() { Flowable.combineLatestDelayError( + new Publisher[] { Flowable.error(new TestException()).startWithItem(1), Flowable.empty() }, new Function() { @Override public Object apply(Object[] a) throws Exception { return a; } }, - 128, - Flowable.error(new TestException()).startWithItem(1), - Flowable.empty() + 128 ) .test() .assertFailure(TestException.class); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java index bb27338d84..e20e96892d 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java @@ -926,15 +926,14 @@ public Object apply(Object a, Object b) throws Exception { @Test public void errorDelayed() { Observable.combineLatestDelayError( + new ObservableSource[] { Observable.error(new TestException()), Observable.just(1) }, new Function() { @Override public Object apply(Object[] a) throws Exception { return a; } }, - 128, - Observable.error(new TestException()), - Observable.just(1) + 128 ) .test() .assertFailure(TestException.class); @@ -944,15 +943,14 @@ public Object apply(Object[] a) throws Exception { @Test public void errorDelayed2() { Observable.combineLatestDelayError( + new ObservableSource[] { Observable.error(new TestException()).startWithItem(1), Observable.empty() }, new Function() { @Override public Object apply(Object[] a) throws Exception { return a; } }, - 128, - Observable.error(new TestException()).startWithItem(1), - Observable.empty() + 128 ) .test() .assertFailure(TestException.class); diff --git a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java index 41cfb91bb2..76952c0904 100644 --- a/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java @@ -74,27 +74,6 @@ public void ambIterableOneIsNull() { .assertError(NullPointerException.class); } - @Test(expected = NullPointerException.class) - public void combineLatestVarargsNull() { - Observable.combineLatest(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, 128, (Observable[])null); - } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestVarargsOneIsNull() { - Observable.combineLatest(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, 128, Observable.never(), null).blockingLast(); - } - @Test(expected = NullPointerException.class) public void combineLatestIterableNull() { Observable.combineLatest((Iterable>)null, new Function() { @@ -131,23 +110,6 @@ public Object apply(Object[] v) { }, 128).blockingLast(); } - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestVarargsFunctionNull() { - Observable.combineLatest(null, 128, Observable.never()); - } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestVarargsFunctionReturnsNull() { - Observable.combineLatest(new Function() { - @Override - public Object apply(Object[] v) { - return null; - } - }, 128, just1).blockingLast(); - } - @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void combineLatestIterableFunctionNull() { @@ -165,27 +127,6 @@ public Object apply(Object[] v) { }, 128).blockingLast(); } - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsNull() { - Observable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, 128, (Observable[])null); - } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsOneIsNull() { - Observable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return 1; - } - }, 128, Observable.never(), null).blockingLast(); - } - @Test(expected = NullPointerException.class) public void combineLatestDelayErrorIterableNull() { Observable.combineLatestDelayError((Iterable>)null, new Function() { @@ -222,23 +163,6 @@ public Object apply(Object[] v) { }, 128).blockingLast(); } - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsFunctionNull() { - Observable.combineLatestDelayError(null, 128, Observable.never()); - } - - @SuppressWarnings("unchecked") - @Test(expected = NullPointerException.class) - public void combineLatestDelayErrorVarargsFunctionReturnsNull() { - Observable.combineLatestDelayError(new Function() { - @Override - public Object apply(Object[] v) { - return null; - } - }, 128, just1).blockingLast(); - } - @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void combineLatestDelayErrorIterableFunctionNull() { diff --git a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java index eb662c8b90..dcb5a94fe6 100644 --- a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java +++ b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayDelayErrorTckTest.java @@ -27,14 +27,13 @@ public class CombineLatestArrayDelayErrorTckTest extends BaseTck { public Publisher createPublisher(long elements) { return Flowable.combineLatestDelayError( + new Publisher[] { Flowable.just(1L), Flowable.fromIterable(iterate(elements)) }, new Function() { @Override public Long apply(Object[] a) throws Exception { return (Long)a[0]; } - }, - Flowable.just(1L), - Flowable.fromIterable(iterate(elements)) + } ) ; } diff --git a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java index 7b57576d6e..a776e28e71 100644 --- a/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java +++ b/src/test/java/io/reactivex/rxjava3/tck/CombineLatestArrayTckTest.java @@ -27,14 +27,13 @@ public class CombineLatestArrayTckTest extends BaseTck { public Publisher createPublisher(long elements) { return Flowable.combineLatest( + new Publisher[] { Flowable.just(1L), Flowable.fromIterable(iterate(elements)) }, new Function() { @Override public Long apply(Object[] a) throws Exception { return (Long)a[0]; } - }, - Flowable.just(1L), - Flowable.fromIterable(iterate(elements)) + } ) ; }