1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'filter' 3 +/ 4 module rx.algorithm.filter; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 11 import std.functional : unaryFun; 12 import std.range : put; 13 14 //#################### 15 // Filter 16 //#################### 17 ///Implements the higher order filter function. The predicate is passed to std.functional.unaryFun, and can either accept a string, or any callable that can be executed via pred(element). 18 template filter(alias pred) 19 { 20 auto filter(TObservable)(auto ref TObservable observable) 21 { 22 return FilterObservable!(pred, TObservable)(observable); 23 } 24 } 25 26 /// 27 unittest 28 { 29 import rx.subject : Subject, SubjectObject; 30 import std.array : appender; 31 32 Subject!int sub = new SubjectObject!int; 33 auto filtered = sub.filter!(n => n % 2 == 0); 34 auto buffer = appender!(int[])(); 35 auto disposable = filtered.subscribe(buffer); 36 scope (exit) 37 disposable.dispose(); 38 39 sub.put(0); 40 sub.put(1); 41 sub.put(2); 42 sub.put(3); 43 44 import std.algorithm : equal; 45 46 assert(equal(buffer.data, [0, 2][])); 47 } 48 49 unittest 50 { 51 import rx.subject : Subject, SubjectObject; 52 import std.array : appender; 53 54 Subject!int sub = new SubjectObject!int; 55 auto filtered = sub.filter!"a % 2 == 0"; 56 auto buffer = appender!(int[])(); 57 auto disposable = filtered.subscribe(buffer); 58 scope (exit) 59 disposable.dispose(); 60 61 sub.put(0); 62 sub.put(1); 63 sub.put(2); 64 sub.put(3); 65 66 import std.algorithm : equal; 67 68 assert(equal(buffer.data, [0, 2][])); 69 } 70 71 unittest 72 { 73 import rx.subject : SubjectObject; 74 75 auto sub = new SubjectObject!(int[]); 76 77 auto sum = 0; 78 auto observer = (int n) { sum += n; }; 79 80 auto d = sub.filter!(a => a.length > 0).subscribe(observer); 81 scope (exit) 82 d.dispose(); 83 84 assert(sum == 0); 85 86 sub.put([]); 87 sub.put([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 88 89 assert(sum == 55); 90 } 91 92 unittest 93 { 94 static assert(!__traits(compiles, { 95 import rx.subject : SubjectObject; 96 97 auto sub = new SubjectObject!int; 98 auto sum = 0; 99 auto d = sub.filter!(a => a.length > 0) 100 .doSubscribe!(n => sum += n); //a.length can not compile 101 })); 102 } 103 104 unittest 105 { 106 import rx.subject : SubjectObject; 107 108 auto sub = new SubjectObject!int; 109 110 auto sum = 0; 111 auto d = sub.filter!(a => a > 0) 112 .doSubscribe!(n => sum += n); 113 scope (exit) 114 d.dispose(); 115 116 assert(sum == 0); 117 118 .put(sub, [-1, -2, -3]); 119 .put(sub, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 120 121 assert(sum == 55); 122 } 123 124 /// 125 struct FilterObservable(alias pred, TObservable) 126 { 127 alias ElementType = TObservable.ElementType; 128 129 public: 130 /// 131 this(TObservable observable) 132 { 133 _observable = observable; 134 } 135 136 public: 137 /// 138 auto subscribe(TObserver)(TObserver observer) 139 { 140 alias ObserverType = FilterObserver!(pred, TObserver, ElementType); 141 static if (hasCompleted!TObserver || hasFailure!TObserver) 142 { 143 auto disposable = new SingleAssignmentDisposable; 144 disposable.setDisposable(disposableObject(doSubscribe(_observable, 145 ObserverType(observer, disposable)))); 146 return disposable; 147 } 148 else 149 { 150 return doSubscribe(_observable, ObserverType(observer)); 151 } 152 } 153 154 private: 155 TObservable _observable; 156 } 157 158 struct FilterObserver(alias pred, TObserver, E) 159 { 160 mixin SimpleObserverImpl!(TObserver, E); 161 162 public: 163 this(TObserver observer) 164 { 165 _observer = observer; 166 } 167 168 static if (hasCompleted!TObserver || hasFailure!TObserver) 169 { 170 this(TObserver observer, Disposable disposable) 171 { 172 _observer = observer; 173 _disposable = disposable; 174 } 175 } 176 177 private: 178 void putImpl(E obj) 179 { 180 alias fun = unaryFun!pred; 181 if (fun(obj)) 182 .put(_observer, obj); 183 } 184 }