1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'groupBy'
3  +/
4 module rx.algorithm.groupby;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.subject;
10 import rx.util;
11 
12 import std.functional : unaryFun;
13 import std.range : put;
14 
15 //####################
16 // GroupBy
17 //####################
18 ///
19 interface GroupedObservable(TKey, E) : Observable!E
20 {
21     TKey key() const pure nothrow @safe @nogc @property;
22 }
23 
24 private class GroupedObservableObject(TKey, E) : GroupedObservable!(TKey, E)
25 {
26     alias ElementType = E;
27 
28 public:
29     this(TKey key, Observable!E subject, RefCountDisposable cancel = null)
30     {
31         _key = key;
32         _subject = subject;
33         _cancel = cancel;
34     }
35 
36 public:
37     TKey key() const pure nothrow @safe @nogc @property
38     {
39         return _key;
40     }
41 
42 public:
43     Disposable subscribe(Observer!E observer)
44     {
45         if (_cancel is null)
46         {
47             return _subject.subscribe(observer);
48         }
49         else
50         {
51             auto canceler = _cancel.getDisposable();
52             auto subscription = _subject.subscribe(observer);
53             return new CompositeDisposable(canceler, subscription);
54         }
55     }
56 
57 private:
58     TKey _key;
59     Observable!E _subject;
60     RefCountDisposable _cancel;
61 }
62 
63 private class GroupByObserver(alias selector, TObserver, E)
64 {
65 public:
66     alias TKey = typeof({ return unaryFun!(selector)(E.init); }());
67 
68 public:
69     this(TObserver observer, Disposable disposable, RefCountDisposable refCountedDisposable)
70     {
71         _observer = observer;
72         _disposable = disposable;
73         _refCountDisposable = refCountedDisposable;
74     }
75 
76 public:
77     void put(E obj)
78     {
79         alias keySelector = unaryFun!selector;
80 
81         Subject!E writer;
82 
83         TKey key;
84         bool fireNewEntry = false;
85 
86         try
87         {
88             key = keySelector(obj);
89 
90             static if (__traits(compiles, { TKey unused = null; }))
91             {
92                 if (key is null)
93                 {
94                     if (_null is null)
95                     {
96                         _null = new SubjectObject!E;
97                         fireNewEntry = true;
98                     }
99                     writer = _null;
100                 }
101                 else
102                 {
103                     if (key in _map)
104                     {
105                         writer = _map[key];
106                     }
107                     else
108                     {
109                         _map[key] = writer = new SubjectObject!E;
110                         fireNewEntry = true;
111                     }
112                 }
113             }
114             else
115             {
116                 if (key in _map)
117                 {
118                     writer = _map[key];
119                 }
120                 else
121                 {
122                     _map[key] = writer = new SubjectObject!E;
123                     fireNewEntry = true;
124                 }
125             }
126         }
127         catch (Exception e)
128         {
129             failure(e);
130             return;
131         }
132 
133         if (fireNewEntry)
134         {
135             auto group = new GroupedObservableObject!(TKey, E)(key, writer, _refCountDisposable);
136             .put(_observer, group);
137         }
138 
139         .put(writer, obj);
140     }
141 
142     void completed()
143     {
144         static if (__traits(compiles, { TKey unused = null; }))
145         {
146             if (_null !is null)
147             {
148                 _null.completed();
149             }
150         }
151         foreach (sink; _map.values)
152         {
153             sink.completed();
154         }
155         static if (hasCompleted!TObserver)
156         {
157             _observer.completed();
158         }
159         _disposable.dispose();
160     }
161 
162     void failure(Exception e)
163     {
164         static if (__traits(compiles, { TKey unused = null; }))
165         {
166             if (_null !is null)
167             {
168                 _null.failure(e);
169             }
170         }
171         foreach (sink; _map.values)
172         {
173             sink.failure(e);
174         }
175         static if (hasFailure!TObserver)
176         {
177             _observer.failure(e);
178         }
179         _disposable.dispose();
180     }
181 
182 private:
183     TObserver _observer;
184     Disposable _disposable;
185     Subject!E[TKey] _map;
186     static if (__traits(compiles, { TKey unused = null; }))
187     {
188         Subject!E _null;
189     }
190     RefCountDisposable _refCountDisposable;
191 }
192 
193 unittest
194 {
195     alias TObserver = GroupByObserver!(n => n % 10, Observer!(GroupedObservable!(int, int)), int);
196 
197     auto observer = new CounterObserver!(GroupedObservable!(int, int));
198     auto refCount = new RefCountDisposable(NopDisposable.instance);
199     auto group = new TObserver(observer, NopDisposable.instance, refCount);
200 
201     assert(observer.putCount == 0);
202     group.put(0);
203     assert(observer.putCount == 1);
204     assert(observer.lastValue.key == 0);
205     group.put(0);
206     assert(observer.putCount == 1);
207     assert(observer.lastValue.key == 0);
208 
209     group.put(1);
210     assert(observer.putCount == 2);
211     assert(observer.lastValue.key == 1);
212     group.put(11);
213     assert(observer.putCount == 2);
214     assert(observer.lastValue.key == 1);
215 
216     group.put(3);
217     assert(observer.putCount == 3);
218     assert(observer.lastValue.key == 3);
219 }
220 
221 unittest
222 {
223     alias TObserver = GroupByObserver!(n => n % 2 == 0,
224             Observer!(GroupedObservable!(bool, int)), int);
225 
226     import std.typecons : Tuple, tuple;
227     import rx.algorithm.map : map;
228 
229     auto tester = new CounterObserver!(Tuple!(bool, int));
230     auto observer = observerObject!(GroupedObservable!(bool, int))(
231             (GroupedObservable!(bool, int) observable) {
232         observable.map!(n => tuple(observable.key, n)).doSubscribe(tester);
233     });
234 
235     auto refCount = new RefCountDisposable(NopDisposable.instance);
236 
237     auto group = new TObserver(observer, NopDisposable.instance, refCount);
238 
239     group.put(0);
240     assert(tester.putCount == 1);
241     assert(tester.lastValue == tuple(true, 0));
242     group.put(1);
243     assert(tester.putCount == 2);
244     assert(tester.lastValue == tuple(false, 1));
245     group.put(3);
246     assert(tester.putCount == 3);
247     assert(tester.lastValue == tuple(false, 3));
248 }
249 
250 unittest
251 {
252     alias TObserver = GroupByObserver!(n => n % 2 == 0,
253             Observer!(GroupedObservable!(bool, int)), int);
254 
255     auto tester = new CounterObserver!int;
256     auto observer = observerObject!(GroupedObservable!(bool, int))(
257             (GroupedObservable!(bool, int) observable) {
258         observable.doSubscribe(tester);
259     });
260 
261     auto refCount = new RefCountDisposable(NopDisposable.instance);
262 
263     auto group = new TObserver(observer, NopDisposable.instance, refCount);
264 
265     assert(tester.putCount == 0);
266     assert(tester.completedCount == 0);
267     assert(tester.failureCount == 0);
268 
269     group.put(0);
270 
271     assert(tester.putCount == 1);
272     assert(tester.completedCount == 0);
273     assert(tester.failureCount == 0);
274 
275     group.completed();
276 
277     assert(tester.putCount == 1);
278     assert(tester.completedCount == 1);
279     assert(tester.failureCount == 0);
280 }
281 
282 private struct GroupByObservable(alias selector, TObservable)
283 {
284     static assert(isObservable!TObservable);
285 
286     alias TKey = typeof({
287         return unaryFun!(selector)(TObservable.ElementType.init);
288     }());
289     alias ElementType = GroupedObservable!(TKey, TObservable.ElementType);
290 
291 public:
292     this(TObservable observable)
293     {
294         _observable = observable;
295     }
296 
297 public:
298     Disposable subscribe(TObserver)(TObserver observer)
299     {
300         auto result = new SingleAssignmentDisposable;
301         auto refCountDisposable = new RefCountDisposable(result);
302 
303         alias ObserverType = GroupByObserver!(selector, TObserver, TObservable.ElementType);
304 
305         auto subscription = _observable.doSubscribe(new ObserverType(observer,
306                 result, refCountDisposable));
307         result.setDisposable(disposableObject(subscription));
308         return result;
309     }
310 
311 private:
312     TObservable _observable;
313 }
314 
315 unittest
316 {
317     alias TObservable = GroupByObservable!(n => n % 10, Observable!int);
318     static assert(is(TObservable.TKey == int));
319     static assert(is(TObservable.ElementType == GroupedObservable!(int, int)));
320 
321     auto subject = new SubjectObject!int;
322     auto group = TObservable(subject);
323 
324     auto observer = new CounterObserver!(GroupedObservable!(int, int));
325     auto disposable = group.subscribe(observer);
326 
327     subject.put(0);
328     assert(observer.putCount == 1);
329     subject.put(0);
330     assert(observer.putCount == 1);
331     subject.put(10);
332     assert(observer.putCount == 1);
333 
334     subject.put(11);
335     assert(observer.putCount == 2);
336 
337     subject.put(12);
338     assert(observer.putCount == 3);
339 
340     subject.put(102);
341     assert(observer.putCount == 3);
342 }
343 
344 ///
345 template groupBy(alias selector)
346 {
347     GroupByObservable!(selector, TObservable) groupBy(TObservable)(auto ref TObservable observable)
348     {
349         static assert(isObservable!TObservable);
350 
351         return typeof(return)(observable);
352     }
353 }
354 
355 ///
356 unittest
357 {
358     auto sub = new SubjectObject!int;
359 
360     auto group = sub.groupBy!(n => n % 10);
361 
362     auto tester = new CounterObserver!(typeof(group).ElementType);
363     auto disposable = group.subscribe(tester);
364 
365     sub.put(0);
366     assert(tester.putCount == 1);
367     assert(tester.lastValue.key == 0);
368 
369     sub.put(10);
370     assert(tester.putCount == 1);
371 }
372 
373 ///
374 unittest
375 {
376     auto sub = new SubjectObject!string;
377 
378     auto group = sub.groupBy!(text => text);
379 
380     auto tester = new CounterObserver!(typeof(group).ElementType);
381     auto disposable = group.subscribe(tester);
382 
383     sub.put("A");
384     assert(tester.putCount == 1);
385     assert(tester.lastValue.key == "A");
386 
387     sub.put("B");
388     assert(tester.putCount == 2);
389     assert(tester.lastValue.key == "B");
390 
391     sub.put("XXX");
392     assert(tester.putCount == 3);
393     assert(tester.lastValue.key == "XXX");
394 }
395 
396 unittest
397 {
398     auto sub = new SubjectObject!string;
399 
400     string delegate(string _) dg = (test) { throw new Exception(""); };
401 
402     auto group = sub.groupBy!(dg);
403 
404     auto tester = new CounterObserver!(typeof(group).ElementType);
405     auto disposable = group.subscribe(tester);
406 
407     sub.put("A");
408     assert(tester.putCount == 0);
409     assert(tester.completedCount == 0);
410     assert(tester.failureCount == 1);
411 }
412 
413 unittest
414 {
415     auto sub = new SubjectObject!string;
416 
417     auto group = sub.groupBy!(test => null);
418 
419     auto tester = new CounterObserver!(typeof(group).ElementType);
420     auto disposable = group.subscribe(tester);
421 
422     sub.put("A");
423     assert(tester.putCount == 1);
424     assert(tester.lastValue.key is null);
425 }
426 
427 unittest
428 {
429     auto sub = new SubjectObject!string;
430 
431     auto group = sub.groupBy!(test => null);
432 
433     auto tester = new CounterObserver!string;
434     auto disposable = group.doSubscribe!((o) {
435         o.doSubscribe(tester);
436     });
437 
438     sub.put("A");
439     assert(tester.putCount == 1);
440     assert(tester.lastValue == "A");
441 }
442 
443 unittest
444 {
445     import rx;
446 
447     auto sub = new SubjectObject!int;
448 
449     auto group = sub.groupBy!(i => i % 2 == 0);
450 
451     auto evenObserver = new CounterObserver!int;
452     auto oddObserver = new CounterObserver!int;
453 
454     auto container = new CompositeDisposable();
455     auto disposable = group.doSubscribe!((o) {
456         container.insert(o.fold!"a + b"(0).doSubscribe(o.key ? evenObserver : oddObserver));
457     });
458     container.insert(disposable);
459 
460     scope (exit)
461         container.dispose();
462 
463     sub.put(1);
464     assert(oddObserver.putCount == 0);
465     sub.put(2);
466     assert(evenObserver.putCount == 0);
467     sub.put(3);
468     sub.put(4);
469     sub.completed();
470     assert(oddObserver.putCount == 1);
471     assert(oddObserver.lastValue == 4);
472     assert(evenObserver.putCount == 1);
473     assert(evenObserver.lastValue == 6);
474 }