Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy/GroupByUntil Changes #1727

Merged

Conversation

benjchristensen
Copy link
Member

This is a proposed change to groupBy and groupByUntil that does the following:

  1. Eliminate groupByUntil and rolls that functionality into groupBy
  2. Adds support for reactive pull backpressure to groupBy
  3. Child GroupedObservables can now be unsubscribed and they will be cleaned up and then new instances for the same key can be emitted, like groupByUntil worked, except that now instead of passing in a special durationSelector function, the child can be composed using take/takeUntil/etc to cause an unsubscribe.

If the previous non-obvious groupBy behavior is wanted, then instead of unsubscribing, it can be filtered to ignore all further data, which is what the old groupBy used to do when a child was unsubscribed.

The reason for these changes are:

  • Adding backpressure to groupByUntil was very difficult with its existing signature as the duration selector function effectively required a GroupedObservable being a PublishSubject which meant multicasting. In this specific case it may have been possible to do backpressure with the multicasting, but it would be difficult and non-obvious as generally multicasting means the stream is "hot" and reactive pull backpressure can't be applied.
  • The existing groupBy almost always confused people as to what would happen when they unsubscribed a child using take or takeUntil. It always surprised people that it meant all further data would be dropped but the key and group would not be garbage collected. Almost always on an infinite stream people determined they needed groupByUntil, but only after stumbling around. After speaking with @headinthebox about this briefly we had enough agreement to submit this proposal and discuss further.

Now groupBy can behave by default as people expect with unsubscribe and support infinite streams, garbage collection, etc as well as backpressure.

I will provide usage examples below.

neerajrj and others added 2 commits October 5, 2014 22:23
This collapses groupByUntil and groupBy into a single groupBy operator. The new implementation has 2 major changes:

1) It supports reactive pull backpressure.
2) Child GroupedObservables can be unsubscribed and they will be cleaned up and then new instances for the same key can be emitted, like groupByUntil, except that now instead of passing in a special durationSelector function, the child can be composed using take/takeUntil/etc to cause an unsubscribe.

If the previous non-obvious groupBy behavior is wanted, then instead of unsubscribing, it can be filtered to ignore all further data, which is what the old groupBy used to do when a child was unsubscribed.
@benjchristensen
Copy link
Member Author

// odd/even into 2 lists
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100]

@benjchristensen
Copy link
Member Author

This one unsubscribes each group after 10 but then emits a new group with the same odd/even keys as new values come in:

// odd/even into lists of 10
Observable.range(1, 100)
        .groupBy(n -> n % 2 == 0)
        .flatMap(g -> {
            return g.take(10).toList();
        }).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[41, 43, 45, 47, 49, 51, 53, 55, 57, 59]
[42, 44, 46, 48, 50, 52, 54, 56, 58, 60]
[61, 63, 65, 67, 69, 71, 73, 75, 77, 79]
[62, 64, 66, 68, 70, 72, 74, 76, 78, 80]
[81, 83, 85, 87, 89, 91, 93, 95, 97, 99]
[82, 84, 86, 88, 90, 92, 94, 96, 98, 100]

@benjchristensen
Copy link
Member Author

//odd/even into lists of 20 but only take the first 2 groups
Observable.range(1, 100)
     .groupBy(n -> n % 2 == 0)
     .flatMap(g -> {
         return g.take(20).toList();
     }).take(2).forEach(System.out::println);
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40]

@benjchristensen
Copy link
Member Author

//odd/even into 2 lists with numbers less than 30
Observable.range(1, 100)
   .groupBy(n -> n % 2 == 0)
   .flatMap(g -> {
       return g.takeWhile(i -> i < 30).toList();
   }).filter(l -> !l.isEmpty()).forEach(System.out::println);
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28]
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29]

@benjchristensen
Copy link
Member Author

Observable.from(Arrays.asList("a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c", "a", "b", "c"))
.groupBy(n -> n)
.flatMap(g -> {
    return g.take(3).reduce((s, s2) -> s + s2);
}).forEach(System.out::println);
aaa
bbb
ccc
aaa
bbb
ccc

@neerajrj
Copy link
Contributor

neerajrj commented Oct 6, 2014

Looks good to me, the semantics are more clearer now.

@benjchristensen
Copy link
Member Author

Thanks Neeraj for the review.

I also confirmed the signature and semantic changes with @headinthebox so am proceeding with this.

benjchristensen added a commit that referenced this pull request Oct 6, 2014
@benjchristensen benjchristensen merged commit 0d74d98 into ReactiveX:1.x Oct 6, 2014
@benjchristensen benjchristensen deleted the groupByWithBackpressure branch October 6, 2014 17:50
@benjchristensen benjchristensen changed the title Proposed groupBy/groupByUntil Changes GroupBy/GroupByUntil Changes Oct 7, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants