diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java index 68f846b885..507a8a867d 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java @@ -98,7 +98,7 @@ public void onStart() { @Override public void onNext(Observable t) { if (t instanceof ScalarSynchronousObservable) { - handleScalarSynchronousObservable(t); + handleScalarSynchronousObservable((ScalarSynchronousObservable)t); } else { if (t == null || isUnsubscribed()) { return; @@ -128,7 +128,7 @@ private void handleNewSource(Observable t) { request(1); } - private void handleScalarSynchronousObservable(Observable t) { + private void handleScalarSynchronousObservable(ScalarSynchronousObservable t) { // fast-path for scalar, synchronous values such as Observable.from(int) /** * Without this optimization: @@ -154,8 +154,8 @@ private void handleScalarSynchronousObservable(Observable t) { } } - private void handleScalarSynchronousObservableWithoutRequestLimits(Observable t) { - T value = ((ScalarSynchronousObservable) t).get(); + private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable t) { + T value = t.get(); if (getEmitLock()) { try { actual.onNext(value); @@ -177,15 +177,15 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(Observable t) { + private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable t) { if (getEmitLock()) { boolean emitted = false; try { long r = mergeProducer.requested; if (r > 0) { emitted = true; - actual.onNext(((ScalarSynchronousObservable) t).get()); - mergeProducer.REQUESTED.decrementAndGet(mergeProducer); + actual.onNext(t.get()); + MergeProducer.REQUESTED.decrementAndGet(mergeProducer); // we handle this Observable without ever incrementing the wip or touching other machinery so just return here return; } @@ -203,7 +203,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(Observable) t).get()); + scalarValueQueue.onNext(t.get()); } catch (MissingBackpressureException e) { onError(e); } @@ -295,7 +295,7 @@ private int drainScalarValueQueue() { } } // decrement the number we emitted from outstanding requests - mergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining); + MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining); } return emittedWhileDraining; } @@ -500,7 +500,7 @@ private void emit(T t, boolean complete) { } else { parentSubscriber.actual.onNext(t); emitted++; - producer.REQUESTED.decrementAndGet(producer); + MergeProducer.REQUESTED.decrementAndGet(producer); } } else { // no requests available, so enqueue it @@ -587,7 +587,7 @@ private int drainRequested() { } // decrement the number we emitted from outstanding requests - producer.REQUESTED.getAndAdd(producer, -emitted); + MergeProducer.REQUESTED.getAndAdd(producer, -emitted); return emitted; } diff --git a/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java b/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java index dabb857d7f..04c377906c 100644 --- a/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java @@ -173,7 +173,7 @@ public static RxRingBuffer getSpmcInstance() { @Override protected SpscArrayQueue createObject() { - return new SpscArrayQueue(SIZE); + return new SpscArrayQueue(SIZE); } }; @@ -182,12 +182,12 @@ protected SpscArrayQueue createObject() { @Override protected SpmcArrayQueue createObject() { - return new SpmcArrayQueue(SIZE); + return new SpmcArrayQueue(SIZE); } }; - private RxRingBuffer(Queue queue, int size) { + private RxRingBuffer(Queue queue, int size) { this.queue = queue; this.pool = null; this.size = size; @@ -201,7 +201,7 @@ private RxRingBuffer(ObjectPool> pool, int size) { public void release() { if (pool != null) { - Queue q = queue; + Queue q = queue; q.clear(); queue = null; pool.returnObject(q); @@ -214,7 +214,7 @@ public void unsubscribe() { } /* for unit tests */RxRingBuffer() { - this(new SynchronizedQueue(SIZE), SIZE); + this(new SynchronizedQueue(SIZE), SIZE); } /** diff --git a/rxjava-core/src/main/java/rx/internal/util/SubscriptionRandomList.java b/rxjava-core/src/main/java/rx/internal/util/SubscriptionRandomList.java index 39d40730bc..c694508076 100644 --- a/rxjava-core/src/main/java/rx/internal/util/SubscriptionRandomList.java +++ b/rxjava-core/src/main/java/rx/internal/util/SubscriptionRandomList.java @@ -113,15 +113,15 @@ public void clear() { } public void forEach(Action1 action) { - Object[] ss; + T[] ss=null; synchronized (this) { if (unsubscribed || subscriptions == null) { return; } - ss = subscriptions.toArray(); + ss = subscriptions.toArray(ss); } - for (Object t : ss) { - action.call((T) t); + for (T t : ss) { + action.call(t); } } diff --git a/rxjava-core/src/test/java/rx/internal/util/IndexedRingBufferTest.java b/rxjava-core/src/test/java/rx/internal/util/IndexedRingBufferTest.java index 100e9e8d98..6eade7d795 100644 --- a/rxjava-core/src/test/java/rx/internal/util/IndexedRingBufferTest.java +++ b/rxjava-core/src/test/java/rx/internal/util/IndexedRingBufferTest.java @@ -211,13 +211,13 @@ public void testForEachAcrossSections() { buffer.add(i); } - final ArrayList list = new ArrayList(); + final ArrayList list = new ArrayList(); int nextIndex = buffer.forEach(accumulate(list), 5000); assertEquals(10000, list.size()); - assertEquals(5000, list.get(0)); - assertEquals(9999, list.get(4999)); - assertEquals(0, list.get(5000)); - assertEquals(4999, list.get(9999)); + assertEquals(Integer.valueOf(5000), list.get(0)); + assertEquals(Integer.valueOf(9999), list.get(4999)); + assertEquals(Integer.valueOf(0), list.get(5000)); + assertEquals(Integer.valueOf(4999), list.get(9999)); assertEquals(5000, nextIndex); } @@ -364,11 +364,11 @@ public void call() { assertEquals(0, exceptions.size()); } - private Func1 accumulate(final ArrayList list) { - return new Func1() { + private Func1 accumulate(final ArrayList list) { + return new Func1() { @Override - public Boolean call(Object t1) { + public Boolean call(T t1) { list.add(t1); return true; } diff --git a/rxjava-core/src/test/java/rx/internal/util/RxRingBufferBase.java b/rxjava-core/src/test/java/rx/internal/util/RxRingBufferBase.java index bc5d08c3ad..0b29f8da91 100644 --- a/rxjava-core/src/test/java/rx/internal/util/RxRingBufferBase.java +++ b/rxjava-core/src/test/java/rx/internal/util/RxRingBufferBase.java @@ -15,23 +15,15 @@ */ package rx.internal.util; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Arrays; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import rx.Producer; -import rx.Scheduler; -import rx.Subscriber; import rx.exceptions.MissingBackpressureException; -import rx.functions.Action0; import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; public abstract class RxRingBufferBase { @@ -59,7 +51,6 @@ public void addAndPollFailBackpressure() throws MissingBackpressureException { RxRingBuffer b = createRingBuffer(); - TestSubscriber s = new TestSubscriber(); try { for (int i = 0; i < RxRingBuffer.SIZE; i++) { // System.out.println("Add: " + i); @@ -82,7 +73,6 @@ public void addAndPollFailBackpressure() throws MissingBackpressureException { @Test public void addAndPoll() throws MissingBackpressureException { RxRingBuffer b = createRingBuffer(); - TestSubscriber s = new TestSubscriber(); b.onNext("o"); b.onNext("o"); b.poll();