Skip to content

Commit

Permalink
Merge pull request #931 from soundcloud/android-operator-improvements
Browse files Browse the repository at this point in the history
A number of improvements to OperatorObserveFromAndroidComponent
  • Loading branch information
benjchristensen committed Mar 6, 2014
2 parents 18916d1 + c298d5e commit 51ecaed
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private AndroidObservable() {}
* @return a new observable sequence that will emit notifications on the main UI thread
*/
public static <T> Observable<T> fromActivity(Activity activity, Observable<T> sourceObservable) {
Assertions.assertUiThread();
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, activity);
}

Expand Down Expand Up @@ -87,6 +88,7 @@ public static <T> Observable<T> fromActivity(Activity activity, Observable<T> so
* @return a new observable sequence that will emit notifications on the main UI thread
*/
public static <T> Observable<T> fromFragment(Object fragment, Observable<T> sourceObservable) {
Assertions.assertUiThread();
if (USES_SUPPORT_FRAGMENTS && fragment instanceof android.support.v4.app.Fragment) {
return OperatorObserveFromAndroidComponent.observeFromAndroidComponent(sourceObservable, (android.support.v4.app.Fragment) fragment);
} else if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.HONEYCOMB && fragment instanceof Fragment) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package rx.android.observables;

import android.os.Looper;

public class Assertions {
public static void assertUiThread() {
if (Looper.getMainLooper() != Looper.myLooper()) {
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package rx.android.observables;

import android.os.Looper;
import android.view.View;
import android.widget.CompoundButton;
import android.widget.EditText;
import rx.Observable;
import rx.operators.OperatorCompoundButtonInput;
import rx.operators.OperatorEditTextInput;
import rx.operators.OperatorViewClick;

import android.view.View;
import android.widget.CompoundButton;
import android.widget.EditText;

public class ViewObservable {

public static Observable<View> clicks(final View view, final boolean emitInitialValue) {
Expand All @@ -38,10 +38,5 @@ public static Observable<Boolean> input(final CompoundButton button, final boole
return Observable.create(new OperatorCompoundButtonInput(button, emitInitialValue));
}

public static void assertUiThread() {
if (Looper.getMainLooper() != Looper.myLooper()) {
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.android.observables.ViewObservable;
import rx.android.observables.Assertions;
import rx.android.subscriptions.AndroidSubscriptions;
import rx.functions.Action0;
import android.view.View;
Expand All @@ -40,7 +40,7 @@ public OperatorCompoundButtonInput(final CompoundButton button, final boolean em

@Override
public void call(final Subscriber<? super Boolean> observer) {
ViewObservable.assertUiThread();
Assertions.assertUiThread();
final CompositeOnCheckedChangeListener composite = CachedListeners.getFromViewOrCreate(button);

final CompoundButton.OnCheckedChangeListener listener = new CompoundButton.OnCheckedChangeListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.android.observables.ViewObservable;
import rx.android.observables.Assertions;
import rx.android.subscriptions.AndroidSubscriptions;
import rx.functions.Action0;
import android.text.Editable;
Expand All @@ -36,7 +36,7 @@ public OperatorEditTextInput(final EditText input, final boolean emitInitialValu

@Override
public void call(final Subscriber<? super String> observer) {
ViewObservable.assertUiThread();
Assertions.assertUiThread();
final TextWatcher watcher = new SimpleTextWatcher() {
@Override
public void afterTextChanged(final Editable editable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import rx.Observer;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.android.subscriptions.AndroidSubscriptions;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

import android.app.Activity;
import android.os.Looper;
import android.util.Log;

public class OperatorObserveFromAndroidComponent {
Expand All @@ -44,8 +44,8 @@ private static class OnSubscribeBase<T, AndroidComponent> implements Observable.
private static final String LOG_TAG = "AndroidObserver";

private final Observable<T> source;
private AndroidComponent componentRef;
private Observer<? super T> observerRef;
private volatile AndroidComponent componentRef;
private volatile Observer<? super T> observerRef;

private OnSubscribeBase(Observable<T> source, AndroidComponent component) {
this.source = source;
Expand All @@ -54,9 +54,9 @@ private OnSubscribeBase(Observable<T> source, AndroidComponent component) {

private void log(String message) {
if (Log.isLoggable(LOG_TAG, Log.DEBUG)) {
Log.d(LOG_TAG, "componentRef = " + componentRef);
Log.d(LOG_TAG, "observerRef = " + observerRef);
Log.d(LOG_TAG, message);
String thread = Thread.currentThread().getName();
Log.d(LOG_TAG, "[" + thread + "] componentRef = " + componentRef + "; observerRef = " + observerRef);
Log.d(LOG_TAG, "[" + thread + "]" + message);
}
}

Expand All @@ -65,15 +65,15 @@ protected boolean isComponentValid(AndroidComponent component) {
}

@Override
public void call(Subscriber<? super T> subscriber) {
assertUiThread();
public void call(final Subscriber<? super T> subscriber) {
observerRef = subscriber;
source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
if (componentRef != null && isComponentValid(componentRef)) {
observerRef.onCompleted();
} else {
unsubscribe();
log("onComplete: target component released or detached; dropping message");
}
}
Expand All @@ -83,6 +83,7 @@ public void onError(Throwable e) {
if (componentRef != null && isComponentValid(componentRef)) {
observerRef.onError(e);
} else {
unsubscribe();
log("onError: target component released or detached; dropping message");
}
}
Expand All @@ -92,11 +93,12 @@ public void onNext(T args) {
if (componentRef != null && isComponentValid(componentRef)) {
observerRef.onNext(args);
} else {
unsubscribe();
log("onNext: target component released or detached; dropping message");
}
}
});
subscriber.add(AndroidSubscriptions.unsubscribeInUiThread(new Action0() {
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
log("unsubscribing from source sequence");
Expand All @@ -109,12 +111,6 @@ private void releaseReferences() {
observerRef = null;
componentRef = null;
}

private void assertUiThread() {
if (Looper.getMainLooper() != Looper.myLooper()) {
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
}
}
}

private static final class OnSubscribeFragment<T> extends OnSubscribeBase<T, android.app.Fragment> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.android.observables.ViewObservable;
import rx.android.observables.Assertions;
import rx.android.subscriptions.AndroidSubscriptions;
import rx.functions.Action0;
import android.view.View;
Expand All @@ -39,7 +39,7 @@ public OperatorViewClick(final View view, final boolean emitInitialValue) {

@Override
public void call(final Subscriber<? super View> observer) {
ViewObservable.assertUiThread();
Assertions.assertUiThread();
final CompositeOnClickListener composite = CachedListeners.getFromViewOrCreate(view);

final View.OnClickListener listener = new View.OnClickListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package rx.android.observables;

import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verify;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -25,14 +25,20 @@
import org.robolectric.Robolectric;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import rx.Observable;
import rx.Observer;
import rx.observers.TestObserver;

import android.app.Activity;
import android.app.Fragment;
import android.support.v4.app.FragmentActivity;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


@RunWith(RobolectricTestRunner.class)
@Config(manifest = Config.NONE)
Expand Down Expand Up @@ -79,4 +85,36 @@ public void itSupportsNativeFragments() {
public void itThrowsIfObjectPassedIsNotAFragment() {
AndroidObservable.fromFragment("not a fragment", Observable.never());
}

@Test(expected = IllegalStateException.class)
public void itThrowsIfObserverCallsFromFragmentFromBackgroundThread() throws Throwable {
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
AndroidObservable.fromFragment(fragment, Observable.empty());
return null;
}
});
try {
future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}

@Test(expected = IllegalStateException.class)
public void itThrowsIfObserverCallsFromActivityFromBackgroundThread() throws Throwable {
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
AndroidObservable.fromActivity(activity, Observable.empty());
return null;
}
});
try {
future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException e) {
throw e.getCause();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,6 @@ public void setupMocks() {
when(mockFragment.isAdded()).thenReturn(true);
}

@Test
public void itThrowsIfObserverSubscribesFromBackgroundThread() throws Exception {
final Observable<Integer> testObservable = Observable.from(1);
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
OperatorObserveFromAndroidComponent.observeFromAndroidComponent(
testObservable, mockFragment).subscribe(mockObserver);
return null;
}
});
future.get(1, TimeUnit.SECONDS);
verify(mockObserver).onError(any(IllegalStateException.class));
verifyNoMoreInteractions(mockObserver);
}

// TODO needs to be fixed, see comments inline below
@Ignore
public void itObservesTheSourceSequenceOnTheMainUIThread() {
Expand Down

0 comments on commit 51ecaed

Please sign in to comment.