1 /+++++++++++++++++++++++++++++ 2 + This module is a submodule of rx.range. 3 + It provides basic operation a 'takeLast' 4 +/ 5 module rx.range.takeLast; 6 7 import rx.disposable; 8 import rx.observer; 9 import rx.observable; 10 import rx.util; 11 12 import std.range : put; 13 14 //#################### 15 // TakeLast 16 //#################### 17 ///Creates a observable that take only a last element of the given source. 18 auto takeLast(TObservable)(auto ref TObservable observable) 19 { 20 static struct TakeLastObservable 21 { 22 public: 23 alias ElementType = TObservable.ElementType; 24 25 public: 26 this(ref TObservable observable) 27 { 28 _observable = observable; 29 } 30 31 public: 32 auto subscribe(TObserver)(auto ref TObserver observer) 33 { 34 static class TakeLastObserver 35 { 36 public: 37 this(ref TObserver observer, SingleAssignmentDisposable disposable) 38 { 39 _observer = observer; 40 _disposable = disposable; 41 } 42 43 public: 44 void put(ElementType obj) 45 { 46 _current = obj; 47 _hasValue = true; 48 } 49 50 void completed() 51 { 52 if (_hasValue) 53 .put(_observer, _current); 54 55 static if (hasCompleted!TObserver) 56 { 57 _observer.completed(); 58 } 59 _disposable.dispose(); 60 } 61 62 static if (hasFailure!TObserver) 63 { 64 void failure(Exception e) 65 { 66 _observer.failure(e); 67 } 68 } 69 70 private: 71 bool _hasValue = false; 72 ElementType _current; 73 TObserver _observer; 74 SingleAssignmentDisposable _disposable; 75 } 76 77 auto d = new SingleAssignmentDisposable; 78 d.setDisposable(disposableObject(doSubscribe(_observable, 79 new TakeLastObserver(observer, d)))); 80 return d; 81 } 82 83 private: 84 TObservable _observable; 85 } 86 87 return TakeLastObservable(observable); 88 } 89 /// 90 unittest 91 { 92 import rx.subject; 93 94 auto sub = new SubjectObject!int; 95 96 int putCount = 0; 97 int completedCount = 0; 98 struct TestObserver 99 { 100 void put(int n) 101 { 102 putCount++; 103 } 104 105 void completed() 106 { 107 completedCount++; 108 } 109 } 110 111 auto d = sub.takeLast.subscribe(TestObserver()); 112 113 assert(putCount == 0); 114 sub.put(1); 115 assert(putCount == 0); 116 sub.put(10); 117 assert(putCount == 0); 118 sub.completed(); 119 assert(putCount == 1); 120 assert(completedCount == 1); 121 122 sub.put(100); 123 assert(putCount == 1); 124 assert(completedCount == 1); 125 } 126 127 unittest 128 { 129 import rx.subject : SubjectObject; 130 131 auto sub = new SubjectObject!(int[]); 132 133 int count = 0; 134 auto d = sub.takeLast.subscribe((int) { count++; }); 135 scope(exit) d.dispose(); 136 137 assert(count == 0); 138 sub.put([0]); 139 assert(count == 0); 140 sub.put([1, 2]); 141 assert(count == 0); 142 sub.completed(); 143 assert(count == 2); 144 } 145 146 unittest 147 { 148 import rx : SubjectObject, merge; 149 150 auto source1 = new SubjectObject!int; 151 auto source2 = new SubjectObject!int; 152 153 auto source = merge(source1, source2).takeLast(); 154 int[] result; 155 source.doSubscribe!(n => result ~= n); 156 157 .put(source1, 0); 158 .put(source2, 1); 159 source1.completed(); 160 161 assert(result.length == 0); 162 163 .put(source2, 2); 164 source2.completed(); 165 166 assert(result.length == 1); 167 assert(result[0] == 2); 168 }