1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'merge' 3 +/ 4 module rx.algorithm.merge; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 11 //#################### 12 // Merge 13 //#################### 14 struct MergeObservable(TObservable1, TObservable2) 15 { 16 import std.traits : CommonType; 17 18 alias ElementType = CommonType!(TObservable1.ElementType, TObservable2.ElementType); 19 20 public: 21 this(TObservable1 o1, TObservable2 o2) 22 { 23 _observable1 = o1; 24 _observable2 = o2; 25 } 26 27 public: 28 auto subscribe(T)(T observer) 29 { 30 auto d1 = _observable1.doSubscribe(observer); 31 auto d2 = _observable2.doSubscribe(observer); 32 return new CompositeDisposable(disposableObject(d1), disposableObject(d2)); 33 } 34 35 private: 36 TObservable1 _observable1; 37 TObservable2 _observable2; 38 } 39 40 /// 41 MergeObservable!(T1, T2) merge(T1, T2)(auto ref T1 observable1, auto ref T2 observable2) 42 { 43 return typeof(return)(observable1, observable2); 44 } 45 /// 46 unittest 47 { 48 import rx.subject : SubjectObject; 49 50 auto s1 = new SubjectObject!int; 51 auto s2 = new SubjectObject!short; 52 53 auto merged = s1.merge(s2); 54 55 int count = 0; 56 auto d = merged.doSubscribe((int n) { count++; }); 57 58 assert(count == 0); 59 s1.put(1); 60 assert(count == 1); 61 s2.put(2); 62 assert(count == 2); 63 64 d.dispose(); 65 66 s1.put(10); 67 assert(count == 2); 68 s2.put(100); 69 assert(count == 2); 70 }