1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'merge' 3 +/ 4 module rx.algorithm.merge; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 import std.range : put; 11 12 //#################### 13 // Merge 14 //#################### 15 struct MergeObservable(TObservable1, TObservable2) 16 { 17 import std.traits : CommonType; 18 19 alias ElementType = CommonType!(TObservable1.ElementType, TObservable2.ElementType); 20 21 public: 22 this(TObservable1 o1, TObservable2 o2) 23 { 24 _observable1 = o1; 25 _observable2 = o2; 26 } 27 28 public: 29 auto subscribe(T)(T observer) 30 { 31 static struct MergeObserver 32 { 33 T _observer; 34 shared(AtomicCounter) _counter; 35 Disposable _subscription; 36 37 void put(ElementType obj) 38 { 39 if (_counter.isZero) return; 40 41 .put(_observer, obj); 42 } 43 44 void completed() 45 { 46 auto result = _counter.tryDecrement(); 47 if (result.success && result.count == 0) 48 { 49 static if (hasCompleted!T) 50 { 51 _observer.completed(); 52 } 53 _subscription.dispose(); 54 } 55 } 56 57 void failure(Exception e) 58 { 59 if (_counter.trySetZero()) 60 { 61 static if (hasFailure!T) 62 { 63 _observer.failure(e); 64 } 65 _subscription.dispose(); 66 } 67 } 68 } 69 70 auto subscription = new SingleAssignmentDisposable; 71 auto counter = new shared(AtomicCounter)(2); 72 auto mergeObserver = MergeObserver(observer, counter, subscription); 73 auto d1 = _observable1.doSubscribe(mergeObserver); 74 auto d2 = _observable2.doSubscribe(mergeObserver); 75 subscription.setDisposable(new CompositeDisposable(disposableObject(d1), disposableObject(d2))); 76 return subscription; 77 } 78 79 private: 80 TObservable1 _observable1; 81 TObservable2 _observable2; 82 } 83 84 /// 85 MergeObservable!(T1, T2) merge(T1, T2)(auto ref T1 observable1, auto ref T2 observable2) 86 { 87 return typeof(return)(observable1, observable2); 88 } 89 /// 90 unittest 91 { 92 import rx.subject : SubjectObject; 93 94 auto s1 = new SubjectObject!int; 95 auto s2 = new SubjectObject!short; 96 97 auto merged = s1.merge(s2); 98 99 int count = 0; 100 auto d = merged.doSubscribe((int n) { count++; }); 101 102 assert(count == 0); 103 s1.put(1); 104 assert(count == 1); 105 s2.put(2); 106 assert(count == 2); 107 108 d.dispose(); 109 110 s1.put(10); 111 assert(count == 2); 112 s2.put(100); 113 assert(count == 2); 114 } 115 116 unittest 117 { 118 import rx : SubjectObject, CounterObserver; 119 120 auto s1 = new SubjectObject!int; 121 auto s2 = new SubjectObject!int; 122 123 auto merged = merge(s1, s2); 124 auto observer = new CounterObserver!int; 125 126 auto disposable = merged.doSubscribe(observer); 127 scope(exit) disposable.dispose(); 128 129 s1.put(0); 130 assert(observer.putCount == 1); 131 s2.put(1); 132 assert(observer.putCount == 2); 133 s1.completed(); 134 assert(observer.completedCount == 0); 135 s2.completed(); 136 assert(observer.completedCount == 1); 137 } 138 139 unittest 140 { 141 import rx : SubjectObject, CounterObserver; 142 143 auto source1 = new SubjectObject!int; 144 auto source2 = new SubjectObject!int; 145 auto subject = merge(source1, source2); 146 147 auto counter = new CounterObserver!int; 148 subject.subscribe(counter); 149 150 source1.put(0); 151 assert(counter.putCount == 1); 152 assert(counter.lastValue == 0); 153 source1.completed(); 154 assert(counter.completedCount == 0); 155 156 source2.put(1); 157 assert(counter.putCount == 2); 158 assert(counter.lastValue == 1); 159 160 assert(counter.completedCount == 0); 161 source2.completed(); 162 assert(counter.completedCount == 1); 163 } 164 165 unittest 166 { 167 import rx : SubjectObject, CounterObserver; 168 169 auto s1 = new SubjectObject!int; 170 auto s2 = new SubjectObject!int; 171 172 auto merged = merge(s1, s2); 173 auto observer = new CounterObserver!int; 174 175 auto disposable = merged.doSubscribe(observer); 176 scope(exit) disposable.dispose(); 177 178 s1.put(0); 179 assert(observer.putCount == 1); 180 s2.put(1); 181 assert(observer.putCount == 2); 182 183 auto ex = new Exception("TEST"); 184 s1.failure(ex); 185 assert(observer.failureCount == 1); 186 assert(observer.lastException == ex); 187 188 s2.put(2); 189 assert(observer.putCount == 2); 190 191 s2.completed(); 192 assert(observer.completedCount == 0); 193 } 194 195 unittest 196 { 197 import rx : SubjectObject, CounterObserver; 198 199 auto s1 = new SubjectObject!int; 200 auto s2 = new SubjectObject!int; 201 202 auto merged = merge(s1, s2); 203 auto observer = new CounterObserver!int; 204 205 auto disposable = merged.doSubscribe(observer); 206 207 s1.put(0); 208 s2.put(1); 209 assert(observer.putCount == 2); 210 211 disposable.dispose(); 212 213 s1.put(2); 214 s1.completed(); 215 216 s2.put(3); 217 s2.completed(); 218 // no effect 219 assert(observer.putCount == 2); 220 assert(observer.completedCount == 0); 221 assert(observer.failureCount == 0); 222 } 223 224 unittest 225 { 226 import rx : SubjectObject; 227 228 auto s1 = new SubjectObject!int; 229 auto s2 = new SubjectObject!int; 230 231 int result = -1; 232 auto disposable = merge(s1, s2).doSubscribe((int n) { result = n; }); 233 234 s1.put(0); 235 assert(result == 0); 236 s2.put(1); 237 assert(result == 1); 238 239 s1.failure(null); 240 s2.put(2); 241 assert(result == 1); 242 } 243 244 /// 245 auto merge(TObservable)(auto ref TObservable observable) 246 if (isObservable!TObservable && isObservable!(TObservable.ElementType)) 247 { 248 import rx.subject : SubjectObject; 249 250 static struct MergeObservable_Flat 251 { 252 alias ElementType = TObservable.ElementType.ElementType; 253 254 this(TObservable observable) 255 { 256 _observable = observable; 257 } 258 259 auto subscribe(TObserver)(TObserver observer) 260 { 261 auto subject = new SubjectObject!ElementType; 262 auto groupSubscription = new CompositeDisposable; 263 auto innerSubscription = subject.doSubscribe(observer); 264 auto outerSubscription = _observable.doSubscribe((TObservable.ElementType obj) { 265 auto subscription = obj.doSubscribe(subject); 266 groupSubscription.insert(disposableObject(subscription)); 267 }, { subject.completed(); }, (Exception e) { subject.failure(e); }); 268 return new CompositeDisposable(groupSubscription, innerSubscription, outerSubscription); 269 } 270 271 TObservable _observable; 272 } 273 274 return MergeObservable_Flat(observable); 275 } 276 277 /// 278 unittest 279 { 280 import rx.algorithm.groupby : groupBy; 281 import rx.algorithm.map : map; 282 import rx.algorithm.fold : fold; 283 import rx.subject : SubjectObject, CounterObserver; 284 285 auto subject = new SubjectObject!int; 286 auto counted = subject.groupBy!(n => n % 10).map!(o => o.fold!((a, b) => a + 1)(0)).merge(); 287 288 auto counter = new CounterObserver!int; 289 290 auto disposable = counted.subscribe(counter); 291 292 subject.put(0); 293 subject.put(0); 294 assert(counter.putCount == 0); 295 subject.completed(); 296 assert(counter.putCount == 1); 297 assert(counter.lastValue == 2); 298 }