1 module rx.range.takeLast; 2 3 import rx.disposable; 4 import rx.observer; 5 import rx.observable; 6 import rx.util; 7 8 import std.range : put; 9 10 //#################### 11 // TakeLast 12 //#################### 13 ///Creates a observable that take only a last element of the given source. 14 auto takeLast(TObservable)(auto ref TObservable observable) 15 { 16 static struct TakeLastObservable 17 { 18 public: 19 alias ElementType = TObservable.ElementType; 20 21 public: 22 this(ref TObservable observable) 23 { 24 _observable = observable; 25 } 26 27 public: 28 auto subscribe(TObserver)(auto ref TObserver observer) 29 { 30 static class TakeLastObserver 31 { 32 public: 33 this(ref TObserver observer, SingleAssignmentDisposable disposable) 34 { 35 _observer = observer; 36 _disposable = disposable; 37 } 38 39 public: 40 void put(ElementType obj) 41 { 42 _current = obj; 43 _hasValue = true; 44 } 45 46 void completed() 47 { 48 if (_hasValue) 49 .put(_observer, _current); 50 51 static if (hasCompleted!TObserver) 52 { 53 _observer.completed(); 54 } 55 _disposable.dispose(); 56 } 57 58 static if (hasFailure!TObserver) 59 { 60 void failure(Exception e) 61 { 62 _observer.failure(e); 63 } 64 } 65 66 private: 67 bool _hasValue = false; 68 ElementType _current; 69 TObserver _observer; 70 SingleAssignmentDisposable _disposable; 71 } 72 73 auto d = new SingleAssignmentDisposable; 74 d.setDisposable(disposableObject(doSubscribe(_observable, 75 new TakeLastObserver(observer, d)))); 76 return d; 77 } 78 79 private: 80 TObservable _observable; 81 } 82 83 return TakeLastObservable(observable); 84 } 85 /// 86 unittest 87 { 88 import rx.subject; 89 90 auto sub = new SubjectObject!int; 91 92 int putCount = 0; 93 int completedCount = 0; 94 struct TestObserver 95 { 96 void put(int n) 97 { 98 putCount++; 99 } 100 101 void completed() 102 { 103 completedCount++; 104 } 105 } 106 107 auto d = sub.takeLast.subscribe(TestObserver()); 108 109 assert(putCount == 0); 110 sub.put(1); 111 assert(putCount == 0); 112 sub.put(10); 113 assert(putCount == 0); 114 sub.completed(); 115 assert(putCount == 1); 116 assert(completedCount == 1); 117 118 sub.put(100); 119 assert(putCount == 1); 120 assert(completedCount == 1); 121 } 122 123 unittest 124 { 125 import rx.subject : SubjectObject; 126 127 auto sub = new SubjectObject!(int[]); 128 129 int count = 0; 130 auto d = sub.takeLast.subscribe((int) { count++; }); 131 scope(exit) d.dispose(); 132 133 assert(count == 0); 134 sub.put([0]); 135 assert(count == 0); 136 sub.put([1, 2]); 137 assert(count == 0); 138 sub.completed(); 139 assert(count == 2); 140 } 141 142 unittest 143 { 144 import rx : SubjectObject, merge; 145 146 auto source1 = new SubjectObject!int; 147 auto source2 = new SubjectObject!int; 148 149 auto source = merge(source1, source2).takeLast(); 150 int[] result; 151 source.doSubscribe!(n => result ~= n); 152 153 .put(source1, 0); 154 .put(source2, 1); 155 source1.completed(); 156 157 assert(result.length == 0); 158 159 .put(source2, 2); 160 source2.completed(); 161 162 assert(result.length == 1); 163 assert(result[0] == 2); 164 }