Skip to content

Commit de94ab3

Browse files
committed
Operation GroupByUntil v5
1 parent 0e4cd7b commit de94ab3

File tree

3 files changed

+565
-0
lines changed

3 files changed

+565
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

+27
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import rx.operators.OperationFinally;
5555
import rx.operators.OperationFirstOrDefault;
5656
import rx.operators.OperationGroupBy;
57+
import rx.operators.OperationGroupByUntil;
5758
import rx.operators.OperationInterval;
5859
import rx.operators.OperationJoin;
5960
import rx.operators.OperationJoinPatterns;
@@ -129,6 +130,7 @@
129130
import rx.util.functions.Func9;
130131
import rx.util.functions.FuncN;
131132
import rx.util.functions.Function;
133+
import rx.util.functions.Functions;
132134

133135
/**
134136
* The Observable interface that implements the Reactive Pattern.
@@ -6143,4 +6145,29 @@ public <K, V> Observable<Map<K, Collection<V>>> toMultimap(Func1<? super T, ? ex
61436145
public <U> Observable<T> skipUntil(Observable<U> other) {
61446146
return create(new OperationSkipUntil<T, U>(this, other));
61456147
}
6148+
6149+
/**
6150+
* Groups the elements of an observable sequence according to a specified key selector function until the duration observable expires for the key.
6151+
* @param keySelector A function to extract the key for each element.
6152+
* @param durationSelector A function to signal the expiration of a group.
6153+
* @return A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
6154+
*
6155+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211932.aspx'>MSDN: Observable.GroupByUntil</a>
6156+
*/
6157+
public <TKey, TDuration> Observable<GroupedObservable<TKey, T>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super GroupedObservable<TKey, T>, ? extends Observable<TDuration>> durationSelector) {
6158+
return groupByUntil(keySelector, Functions.<T>identity(), durationSelector);
6159+
}
6160+
6161+
/**
6162+
* Groups the elements of an observable sequence according to a specified key and value selector function until the duration observable expires for the key.
6163+
* @param keySelector A function to extract the key for each element.
6164+
* @param valueSelector A function to map each source element to an element in an onbservable group.
6165+
* @param durationSelector A function to signal the expiration of a group.
6166+
* @return A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
6167+
*
6168+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229433.aspx'>MSDN: Observable.GroupByUntil</a>
6169+
*/
6170+
public <TKey, TValue, TDuration> Observable<GroupedObservable<TKey, TValue>> groupByUntil(Func1<? super T, ? extends TKey> keySelector, Func1<? super T, ? extends TValue> valueSelector, Func1<? super GroupedObservable<TKey, TValue>, ? extends Observable<TDuration>> durationSelector) {
6171+
return create(new OperationGroupByUntil<T, TKey, TValue, TDuration>(this, keySelector, valueSelector, durationSelector));
6172+
}
61466173
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.ArrayList;
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import rx.Observable;
23+
import rx.Observable.OnSubscribeFunc;
24+
import rx.Observer;
25+
import rx.Subscription;
26+
import rx.observables.GroupedObservable;
27+
import rx.subjects.PublishSubject;
28+
import rx.subjects.Subject;
29+
import rx.subscriptions.CompositeSubscription;
30+
import rx.subscriptions.SerialSubscription;
31+
import rx.subscriptions.Subscriptions;
32+
import rx.util.functions.Func1;
33+
34+
/**
35+
* Groups the elements of an observable sequence according to a specified key selector, value selector and duration selector function.
36+
*
37+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh211932.aspx'>MSDN: Observable.GroupByUntil</a>
38+
* @see <a href='http://msdn.microsoft.com/en-us/library/hh229433.aspx'>MSDN: Observable.GroupByUntil</a>
39+
*/
40+
public class OperationGroupByUntil<TSource, TKey, TResult, TDuration> implements OnSubscribeFunc<GroupedObservable<TKey, TResult>> {
41+
final Observable<TSource> source;
42+
final Func1<? super TSource, ? extends TKey> keySelector;
43+
final Func1<? super TSource, ? extends TResult> valueSelector;
44+
final Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector;
45+
public OperationGroupByUntil(Observable<TSource> source,
46+
Func1<? super TSource, ? extends TKey> keySelector,
47+
Func1<? super TSource, ? extends TResult> valueSelector,
48+
Func1<? super GroupedObservable<TKey, TResult>, ? extends Observable<TDuration>> durationSelector) {
49+
this.source = source;
50+
this.keySelector = keySelector;
51+
this.valueSelector = valueSelector;
52+
this.durationSelector = durationSelector;
53+
}
54+
55+
@Override
56+
public Subscription onSubscribe(Observer<? super GroupedObservable<TKey, TResult>> t1) {
57+
SerialSubscription cancel = new SerialSubscription();
58+
ResultSink sink = new ResultSink(t1, cancel);
59+
cancel.setSubscription(sink.run());
60+
return cancel;
61+
}
62+
/** The source value sink and group manager. */
63+
class ResultSink implements Observer<TSource> {
64+
/** Guarded by gate. */
65+
protected final Observer<? super GroupedObservable<TKey, TResult>> observer;
66+
protected final Subscription cancel;
67+
protected final CompositeSubscription group = new CompositeSubscription();
68+
protected final Object gate = new Object();
69+
/** Guarded by gate. */
70+
protected final Map<TKey, GroupSubject<TKey, TResult>> map = new HashMap<TKey, GroupSubject<TKey, TResult>>();
71+
public ResultSink(Observer<? super GroupedObservable<TKey, TResult>> observer, Subscription cancel) {
72+
this.observer = observer;
73+
this.cancel = cancel;
74+
}
75+
/** Prepare the subscription tree. */
76+
public Subscription run() {
77+
SerialSubscription toSource = new SerialSubscription();
78+
group.add(toSource);
79+
80+
toSource.setSubscription(source.subscribe(this));
81+
82+
return group;
83+
}
84+
85+
@Override
86+
public void onNext(TSource args) {
87+
TKey key;
88+
TResult value;
89+
try {
90+
key = keySelector.call(args);
91+
value = valueSelector.call(args);
92+
} catch (Throwable t) {
93+
onError(t);
94+
return;
95+
}
96+
97+
GroupSubject<TKey, TResult> g;
98+
boolean newGroup = false;
99+
synchronized (key) {
100+
g = map.get(key);
101+
if (g == null) {
102+
g = create(key);
103+
map.put(key, g);
104+
newGroup = true;
105+
}
106+
}
107+
108+
if (newGroup) {
109+
Observable<TDuration> duration;
110+
try {
111+
duration = durationSelector.call(g);
112+
} catch (Throwable t) {
113+
onError(t);
114+
return;
115+
}
116+
117+
synchronized (gate) {
118+
observer.onNext(g);
119+
}
120+
121+
SerialSubscription durationHandle = new SerialSubscription();
122+
group.add(durationHandle);
123+
124+
DurationObserver durationObserver = new DurationObserver(key, durationHandle);
125+
durationHandle.setSubscription(duration.subscribe(durationObserver));
126+
127+
}
128+
129+
synchronized (gate) {
130+
g.onNext(value);
131+
}
132+
}
133+
134+
@Override
135+
public void onError(Throwable e) {
136+
synchronized (gate) {
137+
List<GroupSubject<TKey, TResult>> gs = new ArrayList<GroupSubject<TKey, TResult>>(map.values());
138+
map.clear();
139+
for (GroupSubject<TKey, TResult> g : gs) {
140+
g.onError(e);
141+
}
142+
observer.onError(e);
143+
}
144+
cancel.unsubscribe();
145+
}
146+
147+
@Override
148+
public void onCompleted() {
149+
synchronized (gate) {
150+
List<GroupSubject<TKey, TResult>> gs = new ArrayList<GroupSubject<TKey, TResult>>(map.values());
151+
map.clear();
152+
for (GroupSubject<TKey, TResult> g : gs) {
153+
g.onCompleted();
154+
}
155+
observer.onCompleted();
156+
}
157+
cancel.unsubscribe();
158+
}
159+
/** Create a new group. */
160+
public GroupSubject<TKey, TResult> create(TKey key) {
161+
PublishSubject<TResult> publish = PublishSubject.create();
162+
return new GroupSubject<TKey, TResult>(key, publish);
163+
}
164+
/** Terminate a group. */
165+
public void expire(TKey key, Subscription handle) {
166+
synchronized (gate) {
167+
GroupSubject<TKey, TResult> g = map.remove(key);
168+
if (g != null) {
169+
g.onCompleted();
170+
}
171+
}
172+
handle.unsubscribe();
173+
}
174+
/** Observe the completion of a group. */
175+
class DurationObserver implements Observer<TDuration> {
176+
final TKey key;
177+
final Subscription handle;
178+
public DurationObserver(TKey key, Subscription handle) {
179+
this.key = key;
180+
this.handle = handle;
181+
}
182+
@Override
183+
public void onNext(TDuration args) {
184+
expire(key, handle);
185+
}
186+
187+
@Override
188+
public void onError(Throwable e) {
189+
ResultSink.this.onError(e);
190+
}
191+
192+
@Override
193+
public void onCompleted() {
194+
expire(key, handle);
195+
}
196+
197+
}
198+
}
199+
protected static <T> OnSubscribeFunc<T> neverSubscribe() {
200+
return new OnSubscribeFunc<T>() {
201+
@Override
202+
public Subscription onSubscribe(Observer<? super T> t1) {
203+
return Subscriptions.empty();
204+
}
205+
};
206+
}
207+
/** A grouped observable with subject-like behavior. */
208+
public static class GroupSubject<K, V> extends GroupedObservable<K, V> implements Observer<V> {
209+
protected final Subject<V, V> publish;
210+
public GroupSubject(K key, Subject<V, V> publish) {
211+
super(key, OperationGroupByUntil.<V>neverSubscribe());
212+
this.publish = publish;
213+
}
214+
215+
@Override
216+
public Subscription subscribe(Observer<? super V> observer) {
217+
return publish.subscribe(observer);
218+
}
219+
220+
@Override
221+
public void onNext(V args) {
222+
publish.onNext(args);
223+
}
224+
225+
@Override
226+
public void onError(Throwable e) {
227+
publish.onError(e);
228+
}
229+
230+
@Override
231+
public void onCompleted() {
232+
publish.onCompleted();
233+
}
234+
235+
}
236+
}

0 commit comments

Comments
 (0)