1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'scan' 3 +/ 4 module rx.algorithm.scan; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.util; 10 11 import std.functional; 12 import std.range; 13 import std.typecons; 14 15 //#################### 16 // Scan 17 //#################### 18 struct ScanObserver(alias f, TObserver, E, TAccumulate) 19 { 20 mixin SimpleObserverImpl!(TObserver, E); 21 22 public: 23 this(TObserver observer, TAccumulate seed) 24 { 25 _observer = observer; 26 _current = refCounted(seed); 27 } 28 29 static if (hasCompleted!TObserver || hasFailure!TObserver) 30 { 31 this(TObserver observer, TAccumulate seed, Disposable disposable) 32 { 33 _observer = observer; 34 _current = refCounted(seed); 35 _disposable = disposable; 36 } 37 } 38 39 public: 40 void putImpl(E obj) 41 { 42 alias fun = binaryFun!f; 43 _current = fun(_current, obj); 44 .put(_observer, _current); 45 } 46 47 private: 48 RefCounted!TAccumulate _current; 49 } 50 51 unittest 52 { 53 import std.array : appender; 54 55 auto buf = appender!(int[]); 56 alias TObserver = ScanObserver!((a, b) => a + b, typeof(buf), int, int); 57 auto observer = TObserver(buf, 0); 58 foreach (i; 1 .. 6) 59 { 60 .put(observer, i); 61 } 62 auto result = buf.data; 63 assert(result.length == 5); 64 assert(result[0] == 1); 65 assert(result[1] == 3); 66 assert(result[2] == 6); 67 assert(result[3] == 10); 68 assert(result[4] == 15); 69 } 70 71 struct ScanObservable(alias f, TObservable, TAccumulate) 72 { 73 alias ElementType = TAccumulate; 74 75 public: 76 this(TObservable observable, TAccumulate seed) 77 { 78 _observable = observable; 79 _seed = seed; 80 } 81 82 public: 83 auto subscribe(TObserver)(TObserver observer) 84 { 85 alias ObserverType = ScanObserver!(f, TObserver, TObservable.ElementType, TAccumulate); 86 static if (hasCompleted!TObserver || hasFailure!TObserver) 87 { 88 auto disposable = new SingleAssignmentDisposable; 89 disposable.setDisposable(disposableObject(doSubscribe(_observable, 90 ObserverType(observer, _seed, disposable)))); 91 return disposable; 92 } 93 else 94 { 95 return doSubscribe(_observable, ObserverType(observer, _seed)); 96 } 97 } 98 99 private: 100 TObservable _observable; 101 TAccumulate _seed; 102 } 103 104 unittest 105 { 106 alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int); 107 static assert(isObservable!(Scan, int)); 108 } 109 110 unittest 111 { 112 import rx.subject : SubjectObject; 113 114 auto sub = new SubjectObject!int; 115 116 alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int); 117 auto s = Scan(sub, 0); 118 119 import std.stdio : writeln; 120 121 auto disposable = s.subscribe((int i) => writeln(i)); 122 static assert(isDisposable!(typeof(disposable))); 123 } 124 125 /// 126 template scan(alias f) 127 { 128 auto scan(TObservable, TAccumulate)(auto ref TObservable observable, TAccumulate seed) 129 { 130 return ScanObservable!(f, TObservable, TAccumulate)(observable, seed); 131 } 132 } 133 /// 134 unittest 135 { 136 import rx.subject : SubjectObject; 137 138 auto subject = new SubjectObject!int; 139 140 auto sum = subject.scan!((a, b) => a + b)(0); 141 static assert(isObservable!(typeof(sum), int)); 142 143 import std.array : appender; 144 145 auto buf = appender!(int[]); 146 auto disposable = sum.subscribe(buf); 147 scope (exit) 148 disposable.dispose(); 149 150 foreach (_; 0 .. 5) 151 { 152 subject.put(1); 153 } 154 155 auto result = buf.data; 156 assert(result.length == 5); 157 import std.algorithm : equal; 158 159 assert(equal(result, [1, 2, 3, 4, 5])); 160 } 161 162 unittest 163 { 164 import rx.subject : SubjectObject; 165 166 auto subject1 = new SubjectObject!int; 167 auto subject2 = new SubjectObject!int; 168 169 import rx.algorithm : merge; 170 171 auto sum = merge(subject1, subject2).scan!"a + b"(0); 172 static assert(isObservable!(typeof(sum), int)); 173 174 import std.array : appender; 175 176 auto buf = appender!(int[]); 177 auto disposable = sum.subscribe(buf); 178 scope (exit) 179 disposable.dispose(); 180 181 subject1.put(1); // 1 182 subject2.put(1); // 2 183 subject1.put(-1); // 1 184 subject2.put(-1); // 0 185 subject2.put(1); // 1 186 subject2.put(-1); // 0 187 188 auto result = buf.data; 189 assert(result.length == 6); 190 import std.algorithm : equal; 191 192 assert(equal(result, [1, 2, 1, 0, 1, 0])); 193 }