Skip to content

Commit

Permalink
Merge pull request #477 from benjchristensen/subscription-bugfixes
Browse files Browse the repository at this point in the history
CompositeSubscription bugfixes
  • Loading branch information
benjchristensen committed Nov 7, 2013
2 parents 442292c + 1d5991c commit 8222607
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,36 @@ public CompositeSubscription(Subscription... subscriptions) {
}
}

/**
* Remove and unsubscribe all subscriptions but do not unsubscribe the outer CompositeSubscription.
*/
public void clear() {
Collection<Throwable> es = null;
for (Subscription s : subscriptions.keySet()) {
try {
s.unsubscribe();
this.subscriptions.remove(s);
} catch (Throwable e) {
if (es == null) {
es = new ArrayList<Throwable>();
}
es.add(e);
}
}
if (es != null) {
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
}
}

/**
* Remove the {@link Subscription} and unsubscribe it.
*
* @param s
*/
public void remove(Subscription s) {
this.subscriptions.remove(s);
// also unsubscribe from it: http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
s.unsubscribe();
}

public boolean isUnsubscribed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -82,4 +82,66 @@ public void unsubscribe() {
// we should still have unsubscribed to the second one
assertEquals(1, counter.get());
}

@Test
public void testRemoveUnsubscribes() {
BooleanSubscription s1 = new BooleanSubscription();
BooleanSubscription s2 = new BooleanSubscription();

CompositeSubscription s = new CompositeSubscription();
s.add(s1);
s.add(s2);

s.remove(s1);

assertTrue(s1.isUnsubscribed());
assertFalse(s2.isUnsubscribed());
}

@Test
public void testClear() {
BooleanSubscription s1 = new BooleanSubscription();
BooleanSubscription s2 = new BooleanSubscription();

CompositeSubscription s = new CompositeSubscription();
s.add(s1);
s.add(s2);

assertFalse(s1.isUnsubscribed());
assertFalse(s2.isUnsubscribed());

s.clear();

assertTrue(s1.isUnsubscribed());
assertTrue(s1.isUnsubscribed());
assertFalse(s.isUnsubscribed());

BooleanSubscription s3 = new BooleanSubscription();

s.add(s3);
s.unsubscribe();

assertTrue(s3.isUnsubscribed());
assertTrue(s.isUnsubscribed());
}

@Test
public void testUnsubscribeIdempotence() {
final AtomicInteger counter = new AtomicInteger();
CompositeSubscription s = new CompositeSubscription();
s.add(new Subscription() {

@Override
public void unsubscribe() {
counter.incrementAndGet();
}
});

s.unsubscribe();
s.unsubscribe();
s.unsubscribe();

// we should have only unsubscribed once
assertEquals(1, counter.get());
}
}

0 comments on commit 8222607

Please sign in to comment.