Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observer + Subscriber #793

Merged
merged 6 commits into from
Jan 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import groovy.lang.Closure;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;

public class GroovyCreateWrapper<T> implements OnSubscribe<T> {
Expand All @@ -29,7 +29,7 @@ public GroovyCreateWrapper(Closure<Void> closure) {
}

@Override
public void call(Observer<? super T> op) {
public void call(Subscriber<? super T> op) {
Object o = closure.call(op);
/*
* If the new signature is being used, we will get NULL back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,21 @@
*/
package rx.lang.groovy

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;

import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
import static org.junit.Assert.*
import static org.mockito.Matchers.*
import static org.mockito.Mockito.*

import org.junit.Before
import org.junit.Test
import org.mockito.Mock
import org.mockito.MockitoAnnotations

import rx.Notification
import rx.Observable
import rx.Observer
import rx.Subscription
import rx.Observable.OnSubscribeFunc
import rx.subscriptions.Subscriptions

def class ObservableTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package rx.lang.jruby;

import org.jruby.RubyProc;
import org.jruby.Ruby;
import org.jruby.RubyProc;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.javasupport.JavaUtil;

import rx.util.functions.Action;
import rx.util.functions.Action0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package rx.lang.jruby;

import org.jruby.RubyProc;
import org.jruby.Ruby;
import org.jruby.RubyProc;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.javasupport.JavaUtil;

import rx.util.functions.Func0;
import rx.util.functions.Func1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package rx.lang.scala
trait Subject[T] extends Observable[T] with Observer[T] {
private [scala] val asJavaSubject: rx.subjects.Subject[_ >: T, _<: T]

val asJavaObservable: rx.Observable[_ <: T] = asJavaSubject.toObservable()
val asJavaObservable: rx.Observable[_ <: T] = asJavaSubject

override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubject
override def onNext(value: T): Unit = { asJavaObserver.onNext(value)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
package rx.android.observables;

import rx.Observable;
import rx.Observer;
import rx.operators.OperationObserveFromAndroidComponent;

import android.app.Activity;
import android.app.Fragment;
import android.os.Build;
import android.support.v4.app.FragmentActivity;


public final class AndroidObservable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package rx.android.schedulers;

import rx.Scheduler;
import android.os.Handler;
import android.os.Looper;
import rx.Scheduler;

/**
* Schedulers that have Android specific functionality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
*/
package rx.android.schedulers;

import android.os.Handler;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Func2;

import java.util.concurrent.TimeUnit;
import android.os.Handler;

/**
* Schedules actions to run on an Android Handler thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@
import rx.Observer;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.subjects.PublishSubject;
import android.app.Activity;
import android.app.Fragment;
import android.os.Looper;
import android.util.Log;

import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class OperationObserveFromAndroidComponent {

public static <T> Observable<T> observeFromAndroidComponent(Observable<T> source, android.app.Fragment fragment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.android.observables;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -29,12 +29,9 @@
import rx.Observable;
import rx.Observer;
import rx.observers.TestObserver;
import rx.operators.OperationObserveFromAndroidComponent;
import android.app.Activity;
import android.app.Fragment;
import android.os.Build;
import android.support.v4.app.FragmentActivity;
import rx.android.observables.AndroidObservable;


@RunWith(RobolectricTestRunner.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
import rx.operators.OperationObserveFromAndroidComponent;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -149,8 +150,8 @@ public void itForwardsOnErrorToTargetObserver() {
public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Throwable {
PublishSubject<Integer> source = PublishSubject.create();

final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source.toObservable(), mockFragment);
operator.onSubscribe(mockObserver);
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source, mockFragment);
operator.onSubscribe(new TestSubscriber<Integer>(mockObserver));

source.onNext(1);
releaseComponentRef(operator);
Expand All @@ -167,8 +168,8 @@ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Thr
public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable {
PublishSubject<Integer> source = PublishSubject.create();

final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source.toObservable(), mockFragment);
operator.onSubscribe(mockObserver);
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source, mockFragment);
operator.onSubscribe(new TestSubscriber<Integer>(mockObserver));

source.onNext(1);
releaseComponentRef(operator);
Expand Down Expand Up @@ -203,7 +204,7 @@ private void releaseComponentRef(Observable.OnSubscribeFunc<Integer> operator) t
@Test
public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
PublishSubject<Integer> source = PublishSubject.create();
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source.toObservable(), mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver<Integer>(mockObserver));

source.onNext(1);

Expand All @@ -219,7 +220,7 @@ public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
@Test
public void itDoesNotForwardOnErrorIfFragmentIsDetached() {
PublishSubject<Integer> source = PublishSubject.create();
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source.toObservable(), mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver<Integer>(mockObserver));

source.onNext(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void onResponseReceived(HttpResponse response) throws HttpException, I

@Override
public Subscription onSubscribe(Observer<? super byte[]> observer) {
parentSubscription.add(contentSubject.toObservable().subscribe(observer));
parentSubscription.add(contentSubject.subscribe(observer));
return parentSubscription;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -605,7 +606,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -662,7 +663,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -721,7 +722,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -782,7 +783,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -845,7 +846,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -910,7 +911,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -977,7 +978,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1046,7 +1047,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1117,7 +1118,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1190,7 +1191,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down Expand Up @@ -1243,7 +1244,7 @@ public void call() {
subject.onCompleted();
}
});
return subject.toObservable();
return subject;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observer;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand All @@ -28,7 +29,7 @@
* a terminal state has been reached.
* @param <T> the observed value type
*/
abstract class LatchedObserver<T> extends Observer<T> {
abstract class LatchedObserver<T> implements Observer<T> {
/** The CountDownLatch to count-down on a terminal state. */
protected final CountDownLatch latch;
/** Contains the error. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package rx.util.async.operators;

import java.util.concurrent.Future;

import rx.Observable;
import rx.Scheduler;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

/**
* Defer the execution of a factory method which produces an observable sequence.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

import rx.Observable;
import rx.Subscription;
import rx.util.Exceptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.util.async.operators;

import java.util.concurrent.Callable;

import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
Expand Down
Loading