1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'filter'
3  +/
4 module rx.algorithm.filter;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 
11 import std.functional : unaryFun;
12 import std.range : put;
13 
14 //####################
15 // Filter
16 //####################
17 ///Implements the higher order filter function. The predicate is passed to std.functional.unaryFun, and can either accept a string, or any callable that can be executed via pred(element).
18 template filter(alias pred)
19 {
20     auto filter(TObservable)(auto ref TObservable observable)
21     {
22         static struct FilterObservable
23         {
24             alias ElementType = TObservable.ElementType;
25 
26         public:
27             this(TObservable observable)
28             {
29                 _observable = observable;
30             }
31 
32         public:
33             auto subscribe(TObserver)(TObserver observer)
34             {
35                 static struct FilterObserver
36                 {
37                     mixin SimpleObserverImpl!(TObserver, ElementType);
38 
39                 public:
40                     this(TObserver observer)
41                     {
42                         _observer = observer;
43                     }
44 
45                     static if (hasCompleted!TObserver || hasFailure!TObserver)
46                     {
47                         this(TObserver observer, Disposable disposable)
48                         {
49                             _observer = observer;
50                             _disposable = disposable;
51                         }
52                     }
53 
54                 private:
55                     void putImpl(ElementType obj)
56                     {
57                         alias fun = unaryFun!pred;
58                         if (fun(obj))
59                             .put(_observer, obj);
60                     }
61                 }
62 
63                 alias ObserverType = FilterObserver;
64                 static if (hasCompleted!TObserver || hasFailure!TObserver)
65                 {
66                     auto disposable = new SingleAssignmentDisposable;
67                     disposable.setDisposable(disposableObject(doSubscribe(_observable,
68                             FilterObserver(observer, disposable))));
69                     return disposable;
70                 }
71                 else
72                 {
73                     return doSubscribe(_observable, FilterObserver(observer));
74                 }
75             }
76 
77         private:
78             TObservable _observable;
79         }
80 
81         return FilterObservable(observable);
82     }
83 }
84 
85 ///
86 unittest
87 {
88     import rx.subject : Subject, SubjectObject;
89     import std.array : appender;
90 
91     Subject!int sub = new SubjectObject!int;
92     auto filtered = sub.filter!(n => n % 2 == 0);
93     auto buffer = appender!(int[])();
94     auto disposable = filtered.subscribe(buffer);
95     scope (exit)
96         disposable.dispose();
97 
98     sub.put(0);
99     sub.put(1);
100     sub.put(2);
101     sub.put(3);
102 
103     import std.algorithm : equal;
104 
105     assert(equal(buffer.data, [0, 2][]));
106 }
107 
108 unittest
109 {
110     import rx.subject : Subject, SubjectObject;
111     import std.array : appender;
112 
113     Subject!int sub = new SubjectObject!int;
114     auto filtered = sub.filter!"a % 2 == 0";
115     auto buffer = appender!(int[])();
116     auto disposable = filtered.subscribe(buffer);
117     scope (exit)
118         disposable.dispose();
119 
120     sub.put(0);
121     sub.put(1);
122     sub.put(2);
123     sub.put(3);
124 
125     import std.algorithm : equal;
126 
127     assert(equal(buffer.data, [0, 2][]));
128 }
129 
130 unittest
131 {
132     import rx.subject : SubjectObject;
133 
134     auto sub = new SubjectObject!(int[]);
135 
136     auto sum = 0;
137     auto observer = (int n) { sum += n; };
138 
139     auto d = sub.filter!(a => a.length > 0).subscribe(observer);
140     scope (exit)
141         d.dispose();
142 
143     assert(sum == 0);
144 
145     sub.put([]);
146     sub.put([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
147 
148     assert(sum == 55);
149 }
150 
151 unittest
152 {
153     static assert(!__traits(compiles, {
154             import rx.subject : SubjectObject;
155 
156             auto sub = new SubjectObject!int;
157             auto sum = 0;
158             auto d = sub.filter!(a => a.length > 0).doSubscribe!(n => sum += n); //a.length can not compile
159         }));
160 }
161 
162 unittest
163 {
164     import rx.subject : SubjectObject;
165 
166     auto sub = new SubjectObject!int;
167 
168     auto sum = 0;
169     auto d = sub.filter!(a => a > 0).doSubscribe!(n => sum += n);
170     scope (exit)
171         d.dispose();
172 
173     assert(sum == 0);
174 
175     .put(sub, [-1, -2, -3]);
176     .put(sub, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
177 
178     assert(sum == 55);
179 }