Skip to content

Commit

Permalink
Merge pull request #1436 from gliptak/genericswarnings
Browse files Browse the repository at this point in the history
Correct warnings
  • Loading branch information
benjchristensen committed Jul 21, 2014
2 parents 0b9c1fb + 583412e commit 10e5b12
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 38 deletions.
22 changes: 11 additions & 11 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void onStart() {
@Override
public void onNext(Observable<? extends T> t) {
if (t instanceof ScalarSynchronousObservable) {
handleScalarSynchronousObservable(t);
handleScalarSynchronousObservable((ScalarSynchronousObservable)t);
} else {
if (t == null || isUnsubscribed()) {
return;
Expand Down Expand Up @@ -191,7 +191,7 @@ private void handleNewSource(Observable<? extends T> t) {
request(1);
}

private void handleScalarSynchronousObservable(Observable<? extends T> t) {
private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? extends T> t) {
// fast-path for scalar, synchronous values such as Observable.from(int)
/**
* Without this optimization:
Expand All @@ -217,8 +217,8 @@ private void handleScalarSynchronousObservable(Observable<? extends T> t) {
}
}

private void handleScalarSynchronousObservableWithoutRequestLimits(Observable<? extends T> t) {
T value = ((ScalarSynchronousObservable<T>) t).get();
private void handleScalarSynchronousObservableWithoutRequestLimits(ScalarSynchronousObservable<? extends T> t) {
T value = t.get();
if (getEmitLock()) {
try {
actual.onNext(value);
Expand All @@ -240,15 +240,15 @@ private void handleScalarSynchronousObservableWithoutRequestLimits(Observable<?
}
}

private void handleScalarSynchronousObservableWithRequestLimits(Observable<? extends T> t) {
private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronousObservable<? extends T> t) {
if (getEmitLock()) {
boolean emitted = false;
try {
long r = mergeProducer.requested;
if (r > 0) {
emitted = true;
actual.onNext(((ScalarSynchronousObservable<T>) t).get());
mergeProducer.REQUESTED.decrementAndGet(mergeProducer);
actual.onNext(t.get());
MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
return;
}
Expand All @@ -266,7 +266,7 @@ private void handleScalarSynchronousObservableWithRequestLimits(Observable<? ext
// enqueue the values for later delivery
initScalarValueQueueIfNeeded();
try {
scalarValueQueue.onNext(((ScalarSynchronousObservable<T>) t).get());
scalarValueQueue.onNext(t.get());
} catch (MissingBackpressureException e) {
onError(e);
}
Expand Down Expand Up @@ -359,7 +359,7 @@ private int drainScalarValueQueue() {
}
}
// decrement the number we emitted from outstanding requests
mergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
}
return emittedWhileDraining;
}
Expand Down Expand Up @@ -617,7 +617,7 @@ private void emit(T t, boolean complete) {
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
emitted++;
producer.REQUESTED.decrementAndGet(producer);
MergeProducer.REQUESTED.decrementAndGet(producer);
}
} else {
// no requests available, so enqueue it
Expand Down Expand Up @@ -709,7 +709,7 @@ private int drainRequested() {
}

// decrement the number we emitted from outstanding requests
producer.REQUESTED.getAndAdd(producer, -emitted);
MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
return emitted;
}

Expand Down
10 changes: 5 additions & 5 deletions rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static RxRingBuffer getSpmcInstance() {

@Override
protected SpscArrayQueue<Object> createObject() {
return new SpscArrayQueue(SIZE);
return new SpscArrayQueue<Object>(SIZE);
}

};
Expand All @@ -182,12 +182,12 @@ protected SpscArrayQueue<Object> createObject() {

@Override
protected SpmcArrayQueue<Object> createObject() {
return new SpmcArrayQueue(SIZE);
return new SpmcArrayQueue<Object>(SIZE);
}

};

private RxRingBuffer(Queue queue, int size) {
private RxRingBuffer(Queue<Object> queue, int size) {
this.queue = queue;
this.pool = null;
this.size = size;
Expand All @@ -201,7 +201,7 @@ private RxRingBuffer(ObjectPool<Queue<Object>> pool, int size) {

public void release() {
if (pool != null) {
Queue q = queue;
Queue<Object> q = queue;
q.clear();
queue = null;
pool.returnObject(q);
Expand All @@ -214,7 +214,7 @@ public void unsubscribe() {
}

/* for unit tests */RxRingBuffer() {
this(new SynchronizedQueue<Queue>(SIZE), SIZE);
this(new SynchronizedQueue<Object>(SIZE), SIZE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ public void clear() {
}

public void forEach(Action1<T> action) {
Object[] ss;
T[] ss=null;
synchronized (this) {
if (unsubscribed || subscriptions == null) {
return;
}
ss = subscriptions.toArray();
ss = subscriptions.toArray(ss);
}
for (Object t : ss) {
action.call((T) t);
for (T t : ss) {
action.call(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ public void testForEachAcrossSections() {
buffer.add(i);
}

final ArrayList<String> list = new ArrayList<String>();
final ArrayList<Integer> list = new ArrayList<Integer>();
int nextIndex = buffer.forEach(accumulate(list), 5000);
assertEquals(10000, list.size());
assertEquals(5000, list.get(0));
assertEquals(9999, list.get(4999));
assertEquals(0, list.get(5000));
assertEquals(4999, list.get(9999));
assertEquals(Integer.valueOf(5000), list.get(0));
assertEquals(Integer.valueOf(9999), list.get(4999));
assertEquals(Integer.valueOf(0), list.get(5000));
assertEquals(Integer.valueOf(4999), list.get(9999));
assertEquals(5000, nextIndex);
}

Expand Down Expand Up @@ -364,11 +364,11 @@ public void call() {
assertEquals(0, exceptions.size());
}

private Func1<Object, Boolean> accumulate(final ArrayList list) {
return new Func1<Object, Boolean>() {
private <T> Func1<T, Boolean> accumulate(final ArrayList<T> list) {
return new Func1<T, Boolean>() {

@Override
public Boolean call(Object t1) {
public Boolean call(T t1) {
list.add(t1);
return true;
}
Expand Down
10 changes: 0 additions & 10 deletions rxjava-core/src/test/java/rx/internal/util/RxRingBufferBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,15 @@
*/
package rx.internal.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public abstract class RxRingBufferBase {

Expand Down Expand Up @@ -59,7 +51,6 @@ public void addAndPollFailBackpressure() throws MissingBackpressureException {

RxRingBuffer b = createRingBuffer();

TestSubscriber<Object> s = new TestSubscriber<Object>();
try {
for (int i = 0; i < RxRingBuffer.SIZE; i++) {
// System.out.println("Add: " + i);
Expand All @@ -82,7 +73,6 @@ public void addAndPollFailBackpressure() throws MissingBackpressureException {
@Test
public void addAndPoll() throws MissingBackpressureException {
RxRingBuffer b = createRingBuffer();
TestSubscriber<Object> s = new TestSubscriber<Object>();
b.onNext("o");
b.onNext("o");
b.poll();
Expand Down

0 comments on commit 10e5b12

Please sign in to comment.