1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'filter' 3 +/ 4 module rx.algorithm.filter; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 11 import std.functional : unaryFun; 12 import std.range : put; 13 14 //#################### 15 // Filter 16 //#################### 17 ///Implements the higher order filter function. The predicate is passed to std.functional.unaryFun, and can either accept a string, or any callable that can be executed via pred(element). 18 template filter(alias pred) 19 { 20 auto filter(TObservable)(auto ref TObservable observable) 21 { 22 static struct FilterObservable 23 { 24 alias ElementType = TObservable.ElementType; 25 26 public: 27 this(TObservable observable) 28 { 29 _observable = observable; 30 } 31 32 public: 33 auto subscribe(TObserver)(TObserver observer) 34 { 35 static struct FilterObserver 36 { 37 mixin SimpleObserverImpl!(TObserver, ElementType); 38 39 public: 40 this(TObserver observer) 41 { 42 _observer = observer; 43 } 44 45 static if (hasCompleted!TObserver || hasFailure!TObserver) 46 { 47 this(TObserver observer, Disposable disposable) 48 { 49 _observer = observer; 50 _disposable = disposable; 51 } 52 } 53 54 private: 55 void putImpl(ElementType obj) 56 { 57 alias fun = unaryFun!pred; 58 if (fun(obj)) 59 .put(_observer, obj); 60 } 61 } 62 63 alias ObserverType = FilterObserver; 64 static if (hasCompleted!TObserver || hasFailure!TObserver) 65 { 66 auto disposable = new SingleAssignmentDisposable; 67 disposable.setDisposable(disposableObject(doSubscribe(_observable, 68 FilterObserver(observer, disposable)))); 69 return disposable; 70 } 71 else 72 { 73 return doSubscribe(_observable, FilterObserver(observer)); 74 } 75 } 76 77 private: 78 TObservable _observable; 79 } 80 81 return FilterObservable(observable); 82 } 83 } 84 85 /// 86 unittest 87 { 88 import rx.subject : Subject, SubjectObject; 89 import std.array : appender; 90 91 Subject!int sub = new SubjectObject!int; 92 auto filtered = sub.filter!(n => n % 2 == 0); 93 auto buffer = appender!(int[])(); 94 auto disposable = filtered.subscribe(buffer); 95 scope (exit) 96 disposable.dispose(); 97 98 sub.put(0); 99 sub.put(1); 100 sub.put(2); 101 sub.put(3); 102 103 import std.algorithm : equal; 104 105 assert(equal(buffer.data, [0, 2][])); 106 } 107 108 unittest 109 { 110 import rx.subject : Subject, SubjectObject; 111 import std.array : appender; 112 113 Subject!int sub = new SubjectObject!int; 114 auto filtered = sub.filter!"a % 2 == 0"; 115 auto buffer = appender!(int[])(); 116 auto disposable = filtered.subscribe(buffer); 117 scope (exit) 118 disposable.dispose(); 119 120 sub.put(0); 121 sub.put(1); 122 sub.put(2); 123 sub.put(3); 124 125 import std.algorithm : equal; 126 127 assert(equal(buffer.data, [0, 2][])); 128 } 129 130 unittest 131 { 132 import rx.subject : SubjectObject; 133 134 auto sub = new SubjectObject!(int[]); 135 136 auto sum = 0; 137 auto observer = (int n) { sum += n; }; 138 139 auto d = sub.filter!(a => a.length > 0).subscribe(observer); 140 scope (exit) 141 d.dispose(); 142 143 assert(sum == 0); 144 145 sub.put([]); 146 sub.put([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 147 148 assert(sum == 55); 149 } 150 151 unittest 152 { 153 static assert(!__traits(compiles, { 154 import rx.subject : SubjectObject; 155 156 auto sub = new SubjectObject!int; 157 auto sum = 0; 158 auto d = sub.filter!(a => a.length > 0).doSubscribe!(n => sum += n); //a.length can not compile 159 })); 160 } 161 162 unittest 163 { 164 import rx.subject : SubjectObject; 165 166 auto sub = new SubjectObject!int; 167 168 auto sum = 0; 169 auto d = sub.filter!(a => a > 0).doSubscribe!(n => sum += n); 170 scope (exit) 171 d.dispose(); 172 173 assert(sum == 0); 174 175 .put(sub, [-1, -2, -3]); 176 .put(sub, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 177 178 assert(sum == 55); 179 }