1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'scan' 3 +/ 4 module rx.algorithm.scan; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.util; 10 11 import std.functional; 12 import std.range; 13 14 //#################### 15 // Scan 16 //#################### 17 struct ScanObserver(alias f, TObserver, E, TAccumulate) 18 { 19 mixin SimpleObserverImpl!(TObserver, E); 20 21 public: 22 this(TObserver observer, TAccumulate seed) 23 { 24 _observer = observer; 25 _current = seed; 26 } 27 28 static if (hasCompleted!TObserver || hasFailure!TObserver) 29 { 30 this(TObserver observer, TAccumulate seed, Disposable disposable) 31 { 32 _observer = observer; 33 _current = seed; 34 _disposable = disposable; 35 } 36 } 37 38 public: 39 void putImpl(E obj) 40 { 41 alias fun = binaryFun!f; 42 _current = fun(_current, obj); 43 .put(_observer, _current); 44 } 45 46 private: 47 TAccumulate _current; 48 } 49 50 unittest 51 { 52 import std.array : appender; 53 54 auto buf = appender!(int[]); 55 alias TObserver = ScanObserver!((a, b) => a + b, typeof(buf), int, int); 56 auto observer = TObserver(buf, 0); 57 foreach (i; 1 .. 6) 58 { 59 .put(observer, i); 60 } 61 auto result = buf.data; 62 assert(result.length == 5); 63 assert(result[0] == 1); 64 assert(result[1] == 3); 65 assert(result[2] == 6); 66 assert(result[3] == 10); 67 assert(result[4] == 15); 68 } 69 70 struct ScanObservable(alias f, TObservable, TAccumulate) 71 { 72 alias ElementType = TAccumulate; 73 74 public: 75 this(TObservable observable, TAccumulate seed) 76 { 77 _observable = observable; 78 _seed = seed; 79 } 80 81 public: 82 auto subscribe(TObserver)(TObserver observer) 83 { 84 alias ObserverType = ScanObserver!(f, TObserver, TObservable.ElementType, TAccumulate); 85 static if (hasCompleted!TObserver || hasFailure!TObserver) 86 { 87 auto disposable = new SingleAssignmentDisposable; 88 disposable.setDisposable(disposableObject(doSubscribe(_observable, 89 ObserverType(observer, _seed, disposable)))); 90 return disposable; 91 } 92 else 93 { 94 return doSubscribe(_observable, ObserverType(observer, _seed)); 95 } 96 } 97 98 private: 99 TObservable _observable; 100 TAccumulate _seed; 101 } 102 103 unittest 104 { 105 alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int); 106 static assert(isObservable!(Scan, int)); 107 } 108 109 unittest 110 { 111 import rx.subject : SubjectObject; 112 113 auto sub = new SubjectObject!int; 114 115 alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int); 116 auto s = Scan(sub, 0); 117 118 import std.stdio : writeln; 119 120 auto disposable = s.subscribe((int i) => writeln(i)); 121 static assert(isDisposable!(typeof(disposable))); 122 } 123 124 /// 125 template scan(alias f) 126 { 127 auto scan(TObservable, TAccumulate)(auto ref TObservable observable, TAccumulate seed) 128 { 129 return ScanObservable!(f, TObservable, TAccumulate)(observable, seed); 130 } 131 } 132 /// 133 unittest 134 { 135 import rx.subject : SubjectObject; 136 137 auto subject = new SubjectObject!int; 138 139 auto sum = subject.scan!((a, b) => a + b)(0); 140 static assert(isObservable!(typeof(sum), int)); 141 142 import std.array : appender; 143 144 auto buf = appender!(int[]); 145 auto disposable = sum.subscribe(buf); 146 scope (exit) 147 disposable.dispose(); 148 149 foreach (_; 0 .. 5) 150 { 151 subject.put(1); 152 } 153 154 auto result = buf.data; 155 assert(result.length == 5); 156 import std.algorithm : equal; 157 158 assert(equal(result, [1, 2, 3, 4, 5])); 159 }