1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'filter'
3  +/
4 module rx.algorithm.filter;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 
11 import std.functional : unaryFun;
12 import std.range : put;
13 
14 //####################
15 // Filter
16 //####################
17 ///Implements the higher order filter function. The predicate is passed to std.functional.unaryFun, and can either accept a string, or any callable that can be executed via pred(element).
18 template filter(alias pred)
19 {
20     auto filter(TObservable)(auto ref TObservable observable)
21     {
22         return FilterObservable!(pred, TObservable)(observable);
23     }
24 }
25 
26 ///
27 unittest
28 {
29     import rx.subject : Subject, SubjectObject;
30     import std.array : appender;
31 
32     Subject!int sub = new SubjectObject!int;
33     auto filtered = sub.filter!(n => n % 2 == 0);
34     auto buffer = appender!(int[])();
35     auto disposable = filtered.subscribe(buffer);
36     scope (exit)
37         disposable.dispose();
38 
39     sub.put(0);
40     sub.put(1);
41     sub.put(2);
42     sub.put(3);
43 
44     import std.algorithm : equal;
45 
46     assert(equal(buffer.data, [0, 2][]));
47 }
48 
49 unittest
50 {
51     import rx.subject : Subject, SubjectObject;
52     import std.array : appender;
53 
54     Subject!int sub = new SubjectObject!int;
55     auto filtered = sub.filter!"a % 2 == 0";
56     auto buffer = appender!(int[])();
57     auto disposable = filtered.subscribe(buffer);
58     scope (exit)
59         disposable.dispose();
60 
61     sub.put(0);
62     sub.put(1);
63     sub.put(2);
64     sub.put(3);
65 
66     import std.algorithm : equal;
67 
68     assert(equal(buffer.data, [0, 2][]));
69 }
70 
71 unittest
72 {
73     import rx.subject : SubjectObject;
74 
75     auto sub = new SubjectObject!(int[]);
76 
77     auto sum = 0;
78     auto observer = (int n) { sum += n; };
79 
80     auto d = sub.filter!(a => a.length > 0).subscribe(observer);
81     scope (exit)
82         d.dispose();
83 
84     assert(sum == 0);
85 
86     sub.put([]);
87     sub.put([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
88 
89     assert(sum == 55);
90 }
91 
92 unittest
93 {
94     static assert(!__traits(compiles, {
95             import rx.subject : SubjectObject;
96 
97             auto sub = new SubjectObject!int;
98             auto sum = 0;
99             auto d = sub.filter!(a => a.length > 0)
100             .doSubscribe!(n => sum += n); //a.length can not compile
101         }));
102 }
103 
104 unittest
105 {
106     import rx.subject : SubjectObject;
107 
108     auto sub = new SubjectObject!int;
109 
110     auto sum = 0;
111     auto d = sub.filter!(a => a > 0)
112         .doSubscribe!(n => sum += n);
113     scope (exit)
114         d.dispose();
115 
116     assert(sum == 0);
117 
118     .put(sub, [-1, -2, -3]);
119     .put(sub, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
120 
121     assert(sum == 55);
122 }
123 
124 ///
125 struct FilterObservable(alias pred, TObservable)
126 {
127     alias ElementType = TObservable.ElementType;
128 
129 public:
130     ///
131     this(TObservable observable)
132     {
133         _observable = observable;
134     }
135 
136 public:
137     ///
138     auto subscribe(TObserver)(TObserver observer)
139     {
140         alias ObserverType = FilterObserver!(pred, TObserver, ElementType);
141         static if (hasCompleted!TObserver || hasFailure!TObserver)
142         {
143             auto disposable = new SingleAssignmentDisposable;
144             disposable.setDisposable(disposableObject(doSubscribe(_observable,
145                     ObserverType(observer, disposable))));
146             return disposable;
147         }
148         else
149         {
150             return doSubscribe(_observable, ObserverType(observer));
151         }
152     }
153 
154 private:
155     TObservable _observable;
156 }
157 
158 struct FilterObserver(alias pred, TObserver, E)
159 {
160     mixin SimpleObserverImpl!(TObserver, E);
161 
162 public:
163     this(TObserver observer)
164     {
165         _observer = observer;
166     }
167 
168     static if (hasCompleted!TObserver || hasFailure!TObserver)
169     {
170         this(TObserver observer, Disposable disposable)
171         {
172             _observer = observer;
173             _disposable = disposable;
174         }
175     }
176 
177 private:
178     void putImpl(E obj)
179     {
180         alias fun = unaryFun!pred;
181         if (fun(obj))
182             .put(_observer, obj);
183     }
184 }