1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'tee' 3 +/ 4 module rx.algorithm.tee; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 11 import std.functional : unaryFun; 12 import std.range : put; 13 14 //#################### 15 // Tee 16 //#################### 17 struct TeeObserver(alias f, TObserver, E) 18 { 19 mixin SimpleObserverImpl!(TObserver, E); 20 21 public: 22 this(TObserver observer) 23 { 24 _observer = observer; 25 } 26 27 static if (hasCompleted!TObserver || hasFailure!TObserver) 28 { 29 this(TObserver observer, Disposable disposable) 30 { 31 _observer = observer; 32 _disposable = disposable; 33 } 34 } 35 36 private: 37 void putImpl(E obj) 38 { 39 unaryFun!f(obj); 40 .put(_observer, obj); 41 } 42 } 43 44 struct TeeObservable(alias f, TObservable, E) 45 { 46 alias ElementType = E; 47 48 public: 49 this(TObservable observable) 50 { 51 _observable = observable; 52 } 53 54 public: 55 auto subscribe(T)(auto ref T observer) 56 { 57 alias ObserverType = TeeObserver!(f, T, E); 58 static if (hasCompleted!T || hasFailure!T) 59 { 60 auto disposable = new SingleAssignmentDisposable; 61 disposable.setDisposable(disposableObject(doSubscribe(_observable, 62 ObserverType(observer, disposable)))); 63 return disposable; 64 } 65 else 66 { 67 return doSubscribe(_observable, ObserverType(observer)); 68 } 69 } 70 71 private: 72 TObservable _observable; 73 } 74 75 /// 76 template tee(alias f) 77 { 78 TeeObservable!(f, TObservable, TObservable.ElementType) tee(TObservable)( 79 auto ref TObservable observable) 80 { 81 return typeof(return)(observable); 82 } 83 } 84 /// 85 unittest 86 { 87 import rx.subject : SubjectObject; 88 89 auto sub = new SubjectObject!int; 90 91 import std.array : appender; 92 93 auto buf1 = appender!(int[]); 94 auto buf2 = appender!(int[]); 95 96 import rx.algorithm : map; 97 98 auto disposable = sub.tee!(i => buf1.put(i))().map!(i => i * 2)().subscribe(buf2); 99 100 sub.put(1); 101 sub.put(2); 102 disposable.dispose(); 103 sub.put(3); 104 105 import std.algorithm : equal; 106 107 assert(equal(buf1.data, [1, 2])); 108 assert(equal(buf2.data, [2, 4])); 109 } 110 111 unittest 112 { 113 import rx.subject : SubjectObject; 114 115 auto sub = new SubjectObject!int; 116 117 int countPut = 0; 118 int countFailure = 0; 119 struct Test 120 { 121 void put(int) 122 { 123 countPut++; 124 } 125 126 void failure(Exception) 127 { 128 countFailure++; 129 } 130 } 131 132 int foo(int n) 133 { 134 if (n == 0) 135 throw new Exception(""); 136 return n * 2; 137 } 138 139 auto d = sub.tee!foo().doSubscribe(Test()); 140 scope (exit) 141 d.dispose(); 142 143 assert(countPut == 0); 144 sub.put(1); 145 assert(countPut == 1); 146 assert(countFailure == 0); 147 sub.put(0); 148 assert(countPut == 1); 149 assert(countFailure == 1); 150 }