1
+ /**
2
+ * Copyright 2013 Netflix, Inc.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+ package rx .operators ;
17
+
18
+ import java .util .ArrayList ;
19
+ import java .util .HashMap ;
20
+ import java .util .List ;
21
+ import java .util .Map ;
22
+ import rx .Observable ;
23
+ import rx .Observable .OnSubscribeFunc ;
24
+ import rx .Observer ;
25
+ import rx .Subscription ;
26
+ import rx .observables .GroupedObservable ;
27
+ import rx .subjects .PublishSubject ;
28
+ import rx .subjects .Subject ;
29
+ import rx .subscriptions .CompositeSubscription ;
30
+ import rx .subscriptions .SerialSubscription ;
31
+ import rx .subscriptions .Subscriptions ;
32
+ import rx .util .functions .Func1 ;
33
+
34
+ /**
35
+ * Groups the elements of an observable sequence according to a specified key selector, value selector and duration selector function.
36
+ *
37
+ * @see <a href='http://msdn.microsoft.com/en-us/library/hh211932.aspx'>MSDN: Observable.GroupByUntil</a>
38
+ * @see <a href='http://msdn.microsoft.com/en-us/library/hh229433.aspx'>MSDN: Observable.GroupByUntil</a>
39
+ */
40
+ public class OperationGroupByUntil <TSource , TKey , TResult , TDuration > implements OnSubscribeFunc <GroupedObservable <TKey , TResult >> {
41
+ final Observable <TSource > source ;
42
+ final Func1 <? super TSource , ? extends TKey > keySelector ;
43
+ final Func1 <? super TSource , ? extends TResult > valueSelector ;
44
+ final Func1 <? super GroupedObservable <TKey , TResult >, ? extends Observable <TDuration >> durationSelector ;
45
+ public OperationGroupByUntil (Observable <TSource > source ,
46
+ Func1 <? super TSource , ? extends TKey > keySelector ,
47
+ Func1 <? super TSource , ? extends TResult > valueSelector ,
48
+ Func1 <? super GroupedObservable <TKey , TResult >, ? extends Observable <TDuration >> durationSelector ) {
49
+ this .source = source ;
50
+ this .keySelector = keySelector ;
51
+ this .valueSelector = valueSelector ;
52
+ this .durationSelector = durationSelector ;
53
+ }
54
+
55
+ @ Override
56
+ public Subscription onSubscribe (Observer <? super GroupedObservable <TKey , TResult >> t1 ) {
57
+ SerialSubscription cancel = new SerialSubscription ();
58
+ ResultSink sink = new ResultSink (t1 , cancel );
59
+ cancel .setSubscription (sink .run ());
60
+ return cancel ;
61
+ }
62
+ /** The source value sink and group manager. */
63
+ class ResultSink implements Observer <TSource > {
64
+ /** Guarded by gate. */
65
+ protected final Observer <? super GroupedObservable <TKey , TResult >> observer ;
66
+ protected final Subscription cancel ;
67
+ protected final CompositeSubscription group = new CompositeSubscription ();
68
+ protected final Object gate = new Object ();
69
+ /** Guarded by gate. */
70
+ protected final Map <TKey , GroupSubject <TKey , TResult >> map = new HashMap <TKey , GroupSubject <TKey , TResult >>();
71
+ public ResultSink (Observer <? super GroupedObservable <TKey , TResult >> observer , Subscription cancel ) {
72
+ this .observer = observer ;
73
+ this .cancel = cancel ;
74
+ }
75
+ /** Prepare the subscription tree. */
76
+ public Subscription run () {
77
+ SerialSubscription toSource = new SerialSubscription ();
78
+ group .add (toSource );
79
+
80
+ toSource .setSubscription (source .subscribe (this ));
81
+
82
+ return group ;
83
+ }
84
+
85
+ @ Override
86
+ public void onNext (TSource args ) {
87
+ TKey key ;
88
+ TResult value ;
89
+ try {
90
+ key = keySelector .call (args );
91
+ value = valueSelector .call (args );
92
+ } catch (Throwable t ) {
93
+ onError (t );
94
+ return ;
95
+ }
96
+
97
+ GroupSubject <TKey , TResult > g ;
98
+ boolean newGroup = false ;
99
+ synchronized (key ) {
100
+ g = map .get (key );
101
+ if (g == null ) {
102
+ g = create (key );
103
+ map .put (key , g );
104
+ newGroup = true ;
105
+ }
106
+ }
107
+
108
+ if (newGroup ) {
109
+ Observable <TDuration > duration ;
110
+ try {
111
+ duration = durationSelector .call (g );
112
+ } catch (Throwable t ) {
113
+ onError (t );
114
+ return ;
115
+ }
116
+
117
+ synchronized (gate ) {
118
+ observer .onNext (g );
119
+ }
120
+
121
+ SerialSubscription durationHandle = new SerialSubscription ();
122
+ group .add (durationHandle );
123
+
124
+ DurationObserver durationObserver = new DurationObserver (key , durationHandle );
125
+ durationHandle .setSubscription (duration .subscribe (durationObserver ));
126
+
127
+ }
128
+
129
+ synchronized (gate ) {
130
+ g .onNext (value );
131
+ }
132
+ }
133
+
134
+ @ Override
135
+ public void onError (Throwable e ) {
136
+ synchronized (gate ) {
137
+ List <GroupSubject <TKey , TResult >> gs = new ArrayList <GroupSubject <TKey , TResult >>(map .values ());
138
+ map .clear ();
139
+ for (GroupSubject <TKey , TResult > g : gs ) {
140
+ g .onError (e );
141
+ }
142
+ observer .onError (e );
143
+ }
144
+ cancel .unsubscribe ();
145
+ }
146
+
147
+ @ Override
148
+ public void onCompleted () {
149
+ synchronized (gate ) {
150
+ List <GroupSubject <TKey , TResult >> gs = new ArrayList <GroupSubject <TKey , TResult >>(map .values ());
151
+ map .clear ();
152
+ for (GroupSubject <TKey , TResult > g : gs ) {
153
+ g .onCompleted ();
154
+ }
155
+ observer .onCompleted ();
156
+ }
157
+ cancel .unsubscribe ();
158
+ }
159
+ /** Create a new group. */
160
+ public GroupSubject <TKey , TResult > create (TKey key ) {
161
+ PublishSubject <TResult > publish = PublishSubject .create ();
162
+ return new GroupSubject <TKey , TResult >(key , publish );
163
+ }
164
+ /** Terminate a group. */
165
+ public void expire (TKey key , Subscription handle ) {
166
+ synchronized (gate ) {
167
+ GroupSubject <TKey , TResult > g = map .remove (key );
168
+ if (g != null ) {
169
+ g .onCompleted ();
170
+ }
171
+ }
172
+ handle .unsubscribe ();
173
+ }
174
+ /** Observe the completion of a group. */
175
+ class DurationObserver implements Observer <TDuration > {
176
+ final TKey key ;
177
+ final Subscription handle ;
178
+ public DurationObserver (TKey key , Subscription handle ) {
179
+ this .key = key ;
180
+ this .handle = handle ;
181
+ }
182
+ @ Override
183
+ public void onNext (TDuration args ) {
184
+ expire (key , handle );
185
+ }
186
+
187
+ @ Override
188
+ public void onError (Throwable e ) {
189
+ ResultSink .this .onError (e );
190
+ }
191
+
192
+ @ Override
193
+ public void onCompleted () {
194
+ expire (key , handle );
195
+ }
196
+
197
+ }
198
+ }
199
+ protected static <T > OnSubscribeFunc <T > neverSubscribe () {
200
+ return new OnSubscribeFunc <T >() {
201
+ @ Override
202
+ public Subscription onSubscribe (Observer <? super T > t1 ) {
203
+ return Subscriptions .empty ();
204
+ }
205
+ };
206
+ }
207
+ /** A grouped observable with subject-like behavior. */
208
+ public static class GroupSubject <K , V > extends GroupedObservable <K , V > implements Observer <V > {
209
+ protected final Subject <V , V > publish ;
210
+ public GroupSubject (K key , Subject <V , V > publish ) {
211
+ super (key , OperationGroupByUntil .<V >neverSubscribe ());
212
+ this .publish = publish ;
213
+ }
214
+
215
+ @ Override
216
+ public Subscription subscribe (Observer <? super V > observer ) {
217
+ return publish .subscribe (observer );
218
+ }
219
+
220
+ @ Override
221
+ public void onNext (V args ) {
222
+ publish .onNext (args );
223
+ }
224
+
225
+ @ Override
226
+ public void onError (Throwable e ) {
227
+ publish .onError (e );
228
+ }
229
+
230
+ @ Override
231
+ public void onCompleted () {
232
+ publish .onCompleted ();
233
+ }
234
+
235
+ }
236
+ }
0 commit comments