1 /+++++++++++++++++++++++++++++
2 + This module defines algorithm 'filter'
3 +/
4 module rx.algorithm.filter;
5
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10
11 import std.functional : unaryFun;
12 import std.range : put;
13
14 //####################
15 // Filter
16 //####################
17 ///Implements the higher order filter function. The predicate is passed to std.functional.unaryFun, and can either accept a string, or any callable that can be executed via pred(element).
18 template filter(alias pred)
19 {
20 auto filter(TObservable)(auto ref TObservable observable)
21 {
22 return FilterObservable!(pred, TObservable)(observable);
23 }
24 }
25
26 ///
27 unittest
28 {
29 import rx.subject : Subject, SubjectObject;
30 import std.array : appender;
31
32 Subject!int sub = new SubjectObject!int;
33 auto filtered = sub.filter!(n => n % 2 == 0);
34 auto buffer = appender!(int[])();
35 auto disposable = filtered.subscribe(buffer);
36 scope (exit)
37 disposable.dispose();
38
39 sub.put(0);
40 sub.put(1);
41 sub.put(2);
42 sub.put(3);
43
44 import std.algorithm : equal;
45
46 assert(equal(buffer.data, [0, 2][]));
47 }
48
49 unittest
50 {
51 import rx.subject : Subject, SubjectObject;
52 import std.array : appender;
53
54 Subject!int sub = new SubjectObject!int;
55 auto filtered = sub.filter!"a % 2 == 0";
56 auto buffer = appender!(int[])();
57 auto disposable = filtered.subscribe(buffer);
58 scope (exit)
59 disposable.dispose();
60
61 sub.put(0);
62 sub.put(1);
63 sub.put(2);
64 sub.put(3);
65
66 import std.algorithm : equal;
67
68 assert(equal(buffer.data, [0, 2][]));
69 }
70
71 unittest
72 {
73 import rx.subject : SubjectObject;
74
75 auto sub = new SubjectObject!(int[]);
76
77 auto sum = 0;
78 auto observer = (int n) { sum += n; };
79
80 auto d = sub.filter!(a => a.length > 0).subscribe(observer);
81 scope (exit)
82 d.dispose();
83
84 assert(sum == 0);
85
86 sub.put([]);
87 sub.put([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
88
89 assert(sum == 55);
90 }
91
92 unittest
93 {
94 static assert(!__traits(compiles, {
95 import rx.subject : SubjectObject;
96
97 auto sub = new SubjectObject!int;
98 auto sum = 0;
99 auto d = sub.filter!(a => a.length > 0)
100 .doSubscribe!(n => sum += n); //a.length can not compile
101 }));
102 }
103
104 unittest
105 {
106 import rx.subject : SubjectObject;
107
108 auto sub = new SubjectObject!int;
109
110 auto sum = 0;
111 auto d = sub.filter!(a => a > 0)
112 .doSubscribe!(n => sum += n);
113 scope (exit)
114 d.dispose();
115
116 assert(sum == 0);
117
118 .put(sub, [-1, -2, -3]);
119 .put(sub, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
120
121 assert(sum == 55);
122 }
123
124 ///
125 struct FilterObservable(alias pred, TObservable)
126 {
127 alias ElementType = TObservable.ElementType;
128
129 public:
130 ///
131 this(TObservable observable)
132 {
133 _observable = observable;
134 }
135
136 public:
137 ///
138 auto subscribe(TObserver)(TObserver observer)
139 {
140 alias ObserverType = FilterObserver!(pred, TObserver, ElementType);
141 static if (hasCompleted!TObserver || hasFailure!TObserver)
142 {
143 auto disposable = new SingleAssignmentDisposable;
144 disposable.setDisposable(disposableObject(doSubscribe(_observable,
145 ObserverType(observer, disposable))));
146 return disposable;
147 }
148 else
149 {
150 return doSubscribe(_observable, ObserverType(observer));
151 }
152 }
153
154 private:
155 TObservable _observable;
156 }
157
158 struct FilterObserver(alias pred, TObserver, E)
159 {
160 mixin SimpleObserverImpl!(TObserver, E);
161
162 public:
163 this(TObserver observer)
164 {
165 _observer = observer;
166 }
167
168 static if (hasCompleted!TObserver || hasFailure!TObserver)
169 {
170 this(TObserver observer, Disposable disposable)
171 {
172 _observer = observer;
173 _disposable = disposable;
174 }
175 }
176
177 private:
178 void putImpl(E obj)
179 {
180 alias fun = unaryFun!pred;
181 if (fun(obj))
182 .put(_observer, obj);
183 }
184 }