1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'map' 3 +/ 4 module rx.algorithm.map; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.util; 10 11 import std.functional; 12 import std.range; 13 14 //#################### 15 // Map 16 //#################### 17 struct MapObserver(alias f, TObserver, E) 18 { 19 mixin SimpleObserverImpl!(TObserver, E); 20 21 public: 22 this(TObserver observer) 23 { 24 _observer = observer; 25 } 26 27 static if (hasCompleted!TObserver || hasFailure!TObserver) 28 { 29 this(TObserver observer, Disposable disposable) 30 { 31 _observer = observer; 32 _disposable = disposable; 33 } 34 } 35 private: 36 void putImpl(E obj) 37 { 38 alias fun = unaryFun!f; 39 .put(_observer, fun(obj)); 40 } 41 } 42 43 unittest 44 { 45 import std.conv : to; 46 47 alias TObserver = MapObserver!(o => to!string(o), Observer!string, int); 48 49 static assert(isObserver!(TObserver, int)); 50 } 51 52 struct MapObservable(alias f, TObservable) 53 { 54 alias ElementType = typeof({ 55 return unaryFun!(f)(TObservable.ElementType.init); 56 }()); 57 58 public: 59 this(TObservable observable) 60 { 61 _observable = observable; 62 } 63 64 public: 65 auto subscribe(TObserver)(TObserver observer) 66 { 67 alias ObserverType = MapObserver!(f, TObserver, TObservable.ElementType); 68 static if (hasCompleted!TObserver || hasFailure!TObserver) 69 { 70 auto disposable = new SingleAssignmentDisposable; 71 disposable.setDisposable(disposableObject(doSubscribe(_observable, 72 ObserverType(observer, disposable)))); 73 return disposable; 74 } 75 else 76 { 77 return doSubscribe(_observable, ObserverType(observer)); 78 } 79 } 80 81 private: 82 TObservable _observable; 83 } 84 85 unittest 86 { 87 import rx.subject; 88 import std.conv : to; 89 90 alias TObservable = MapObservable!(n => to!string(n), Subject!int); 91 static assert(is(TObservable.ElementType : string)); 92 static assert(isSubscribable!(TObservable, Observer!string)); 93 94 int putCount = 0; 95 int completedCount = 0; 96 int failureCount = 0; 97 struct TestObserver 98 { 99 void put(string n) 100 { 101 putCount++; 102 } 103 104 void completed() 105 { 106 completedCount++; 107 } 108 109 void failure(Exception) 110 { 111 failureCount++; 112 } 113 } 114 115 auto sub = new SubjectObject!int; 116 auto observable = TObservable(sub); 117 auto disposable = observable.subscribe(TestObserver()); 118 assert(putCount == 0); 119 sub.put(0); 120 assert(putCount == 1); 121 sub.put(1); 122 assert(putCount == 2); 123 disposable.dispose(); 124 sub.put(2); 125 assert(putCount == 2); 126 } 127 128 /// 129 template map(alias f) 130 { 131 MapObservable!(f, TObservable) map(TObservable)(auto ref TObservable observable) 132 { 133 return typeof(return)(observable); 134 } 135 } 136 /// 137 unittest 138 { 139 import rx.subject; 140 import std.array : appender; 141 import std.conv : to; 142 143 Subject!int sub = new SubjectObject!int; 144 auto mapped = sub.map!(n => to!string(n)); 145 static assert(isObservable!(typeof(mapped), string)); 146 static assert(isSubscribable!(typeof(mapped), Observer!string)); 147 148 auto buffer = appender!(string[])(); 149 auto disposable = mapped.subscribe(buffer); 150 scope (exit) 151 disposable.dispose(); 152 153 sub.put(0); 154 sub.put(1); 155 sub.put(2); 156 157 import std.algorithm : equal; 158 159 assert(equal(buffer.data, ["0", "1", "2"][])); 160 } 161 /// 162 unittest 163 { 164 import rx.subject; 165 import std.array : appender; 166 import std.conv : to; 167 168 Subject!int sub = new SubjectObject!int; 169 auto mapped = sub.map!"a * 2"; 170 static assert(isObservable!(typeof(mapped), int)); 171 static assert(isSubscribable!(typeof(mapped), Observer!int)); 172 173 auto buffer = appender!(int[])(); 174 auto disposable = mapped.subscribe(buffer); 175 scope (exit) 176 disposable.dispose(); 177 178 sub.put(0); 179 sub.put(1); 180 sub.put(2); 181 182 import std.algorithm : equal; 183 184 assert(equal(buffer.data, [0, 2, 4][])); 185 } 186 187 unittest 188 { 189 import rx.observable : asObservable; 190 191 auto data = [1, 2, 3, 4]; 192 int[] result; 193 data.asObservable().map!"a + 2"().doSubscribe!(n => result ~= n); 194 195 assert(result == [3, 4, 5, 6]); 196 }