1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'groupBy'
3  +/
4 module rx.algorithm.groupby;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.subject;
10 import rx.util;
11 
12 import std.functional : unaryFun;
13 import std.range : put;
14 
15 //####################
16 // GroupBy
17 //####################
18 ///
19 interface GroupedObservable(TKey, E) : Observable!E
20 {
21     TKey key() const pure nothrow @safe @nogc @property;
22 }
23 
24 private class GroupedObservableObject(TKey, E) : GroupedObservable!(TKey, E)
25 {
26     alias ElementType = E;
27 
28 public:
29     this(TKey key, Observable!E subject, RefCountDisposable cancel = null)
30     {
31         _key = key;
32         _subject = subject;
33         _cancel = cancel;
34     }
35 
36 public:
37     TKey key() const pure nothrow @safe @nogc @property
38     {
39         return _key;
40     }
41 
42 public:
43     Disposable subscribe(Observer!E observer)
44     {
45         if (_cancel is null)
46         {
47             return _subject.subscribe(observer);
48         }
49         else
50         {
51             auto canceler = _cancel.getDisposable();
52             auto subscription = _subject.subscribe(observer);
53             return new CompositeDisposable(canceler, subscription);
54         }
55     }
56 
57 private:
58     TKey _key;
59     Observable!E _subject;
60     RefCountDisposable _cancel;
61 }
62 
63 private class GroupByObserver(alias selector, TObserver, E)
64 {
65 public:
66     alias TKey = typeof({ return unaryFun!(selector)(E.init); }());
67 
68 public:
69     this(TObserver observer, Disposable disposable, RefCountDisposable refCountedDisposable)
70     {
71         _observer = observer;
72         _disposable = disposable;
73         _refCountDisposable = refCountedDisposable;
74     }
75 
76 public:
77     void put(E obj)
78     {
79         alias keySelector = unaryFun!selector;
80 
81         Subject!E writer;
82 
83         TKey key;
84         bool fireNewEntry = false;
85 
86         try
87         {
88             key = keySelector(obj);
89 
90             static if (__traits(compiles, { TKey unused = null; }))
91             {
92                 if (key is null)
93                 {
94                     if (_null is null)
95                     {
96                         _null = new SubjectObject!E;
97                         fireNewEntry = true;
98                     }
99                     writer = _null;
100                 }
101                 else
102                 {
103                     if (key in _map)
104                     {
105                         writer = _map[key];
106                     }
107                     else
108                     {
109                         _map[key] = writer = new SubjectObject!E;
110                         fireNewEntry = true;
111                     }
112                 }
113             }
114             else
115             {
116                 if (key in _map)
117                 {
118                     writer = _map[key];
119                 }
120                 else
121                 {
122                     _map[key] = writer = new SubjectObject!E;
123                     fireNewEntry = true;
124                 }
125             }
126         }
127         catch (Exception e)
128         {
129             failure(e);
130         }
131 
132         if (fireNewEntry)
133         {
134             auto group = new GroupedObservableObject!(TKey, E)(key, writer, _refCountDisposable);
135             .put(_observer, group);
136         }
137 
138         .put(writer, obj);
139     }
140 
141     void completed()
142     {
143         static if (__traits(compiles, { TKey unused = null; }))
144         {
145             if (_null !is null)
146             {
147                 _null.completed();
148             }
149         }
150         foreach (sink; _map.values)
151         {
152             sink.completed();
153         }
154         static if (hasCompleted!TObserver)
155         {
156             _observer.completed();
157         }
158         _disposable.dispose();
159     }
160 
161     void failure(Exception e)
162     {
163         static if (__traits(compiles, { TKey unused = null; }))
164         {
165             if (_null !is null)
166             {
167                 _null.failure(e);
168             }
169         }
170         foreach (sink; _map.values)
171         {
172             sink.failure(e);
173         }
174         static if (hasFailure!TObserver)
175         {
176             _observer.failure(e);
177         }
178         _disposable.dispose();
179     }
180 
181 private:
182     TObserver _observer;
183     Disposable _disposable;
184     Subject!E[TKey] _map;
185     static if (__traits(compiles, { TKey unused = null; }))
186     {
187         Subject!E _null;
188     }
189     RefCountDisposable _refCountDisposable;
190 }
191 
192 unittest
193 {
194     alias TObserver = GroupByObserver!(n => n % 10, Observer!(GroupedObservable!(int, int)), int);
195 
196     auto observer = new CounterObserver!(GroupedObservable!(int, int));
197     auto refCount = new RefCountDisposable(NopDisposable.instance);
198     auto group = new TObserver(observer, NopDisposable.instance, refCount);
199 
200     assert(observer.putCount == 0);
201     group.put(0);
202     assert(observer.putCount == 1);
203     assert(observer.lastValue.key == 0);
204     group.put(0);
205     assert(observer.putCount == 1);
206     assert(observer.lastValue.key == 0);
207 
208     group.put(1);
209     assert(observer.putCount == 2);
210     assert(observer.lastValue.key == 1);
211     group.put(11);
212     assert(observer.putCount == 2);
213     assert(observer.lastValue.key == 1);
214 
215     group.put(3);
216     assert(observer.putCount == 3);
217     assert(observer.lastValue.key == 3);
218 }
219 
220 unittest
221 {
222     alias TObserver = GroupByObserver!(n => n % 2 == 0,
223             Observer!(GroupedObservable!(bool, int)), int);
224 
225     import std.typecons : Tuple, tuple;
226     import rx.algorithm.map : map;
227 
228     auto tester = new CounterObserver!(Tuple!(bool, int));
229     auto observer = observerObject!(GroupedObservable!(bool, int))(
230             (GroupedObservable!(bool, int) observable) {
231         observable.map!(n => tuple(observable.key, n)).doSubscribe(tester);
232     });
233 
234     auto refCount = new RefCountDisposable(NopDisposable.instance);
235 
236     auto group = new TObserver(observer, NopDisposable.instance, refCount);
237 
238     group.put(0);
239     assert(tester.putCount == 1);
240     assert(tester.lastValue == tuple(true, 0));
241     group.put(1);
242     assert(tester.putCount == 2);
243     assert(tester.lastValue == tuple(false, 1));
244     group.put(3);
245     assert(tester.putCount == 3);
246     assert(tester.lastValue == tuple(false, 3));
247 }
248 
249 unittest
250 {
251     alias TObserver = GroupByObserver!(n => n % 2 == 0,
252             Observer!(GroupedObservable!(bool, int)), int);
253 
254     auto tester = new CounterObserver!int;
255     auto observer = observerObject!(GroupedObservable!(bool, int))(
256             (GroupedObservable!(bool, int) observable) {
257         observable.doSubscribe(tester);
258     });
259 
260     auto refCount = new RefCountDisposable(NopDisposable.instance);
261 
262     auto group = new TObserver(observer, NopDisposable.instance, refCount);
263 
264     assert(tester.putCount == 0);
265     assert(tester.completedCount == 0);
266     assert(tester.failureCount == 0);
267 
268     group.put(0);
269 
270     assert(tester.putCount == 1);
271     assert(tester.completedCount == 0);
272     assert(tester.failureCount == 0);
273 
274     group.completed();
275 
276     assert(tester.putCount == 1);
277     assert(tester.completedCount == 1);
278     assert(tester.failureCount == 0);
279 }
280 
281 private struct GroupByObservable(alias selector, TObservable)
282 {
283     static assert(isObservable!TObservable);
284 
285     alias TKey = typeof({
286         return unaryFun!(selector)(TObservable.ElementType.init);
287     }());
288     alias ElementType = GroupedObservable!(TKey, TObservable.ElementType);
289 
290 public:
291     this(TObservable observable)
292     {
293         _observable = observable;
294     }
295 
296 public:
297     Disposable subscribe(TObserver)(TObserver observer)
298     {
299         auto result = new SingleAssignmentDisposable;
300         auto refCountDisposable = new RefCountDisposable(result);
301 
302         alias ObserverType = GroupByObserver!(selector, TObserver, TObservable.ElementType);
303 
304         auto subscription = _observable.doSubscribe(new ObserverType(observer,
305                 result, refCountDisposable));
306         result.setDisposable(disposableObject(subscription));
307         return result;
308     }
309 
310 private:
311     TObservable _observable;
312 }
313 
314 unittest
315 {
316     alias TObservable = GroupByObservable!(n => n % 10, Observable!int);
317     static assert(is(TObservable.TKey == int));
318     static assert(is(TObservable.ElementType == GroupedObservable!(int, int)));
319 
320     auto subject = new SubjectObject!int;
321     auto group = TObservable(subject);
322 
323     auto observer = new CounterObserver!(GroupedObservable!(int, int));
324     auto disposable = group.subscribe(observer);
325 
326     subject.put(0);
327     assert(observer.putCount == 1);
328     subject.put(0);
329     assert(observer.putCount == 1);
330     subject.put(10);
331     assert(observer.putCount == 1);
332 
333     subject.put(11);
334     assert(observer.putCount == 2);
335 
336     subject.put(12);
337     assert(observer.putCount == 3);
338 
339     subject.put(102);
340     assert(observer.putCount == 3);
341 }
342 
343 ///
344 template groupBy(alias selector)
345 {
346     GroupByObservable!(selector, TObservable) groupBy(TObservable)(auto ref TObservable observable)
347     {
348         static assert(isObservable!TObservable);
349 
350         return typeof(return)(observable);
351     }
352 }
353 
354 ///
355 unittest
356 {
357     auto sub = new SubjectObject!int;
358 
359     auto group = sub.groupBy!(n => n % 10);
360 
361     auto tester = new CounterObserver!(typeof(group).ElementType);
362     auto disposable = group.subscribe(tester);
363 
364     sub.put(0);
365     assert(tester.putCount == 1);
366     assert(tester.lastValue.key == 0);
367 
368     sub.put(10);
369     assert(tester.putCount == 1);
370 }
371 
372 ///
373 unittest
374 {
375     auto sub = new SubjectObject!string;
376 
377     auto group = sub.groupBy!(text => text);
378 
379     auto tester = new CounterObserver!(typeof(group).ElementType);
380     auto disposable = group.subscribe(tester);
381 
382     sub.put("A");
383     assert(tester.putCount == 1);
384     assert(tester.lastValue.key == "A");
385 
386     sub.put("B");
387     assert(tester.putCount == 2);
388     assert(tester.lastValue.key == "B");
389 
390     sub.put("XXX");
391     assert(tester.putCount == 3);
392     assert(tester.lastValue.key == "XXX");
393 }