1 module rx.range.take; 2 3 import rx.disposable; 4 import rx.observer; 5 import rx.observable; 6 import rx.util; 7 8 import std.range : put; 9 10 //#################### 11 // Take 12 //#################### 13 ///Creates a sub-observable consisting of only up to the first n elements of the given source. 14 auto take(TObservable)(auto ref TObservable observable, size_t n) 15 { 16 static struct TakeObservable 17 { 18 public: 19 alias ElementType = TObservable.ElementType; 20 21 public: 22 this(TObservable observable, size_t n) 23 { 24 _observable = observable; 25 _count = n; 26 } 27 28 public: 29 auto subscribe(TObserver)(TObserver observer) 30 { 31 static struct TakeObserver 32 { 33 public: 34 this(TObserver observer, size_t count, Disposable disposable) 35 { 36 _observer = observer; 37 _counter = new shared(AtomicCounter)(count); 38 _disposable = disposable; 39 } 40 41 public: 42 void put(ElementType obj) 43 { 44 auto result = _counter.tryDecrement(); 45 if (result.success) 46 { 47 .put(_observer, obj); 48 if (result.count == 0) 49 { 50 static if (hasCompleted!TObserver) 51 { 52 _observer.completed(); 53 } 54 _disposable.dispose(); 55 } 56 } 57 } 58 59 void completed() 60 { 61 static if (hasCompleted!TObserver) 62 { 63 _observer.completed(); 64 } 65 _disposable.dispose(); 66 } 67 68 void failure(Exception e) 69 { 70 static if (hasFailure!TObserver) 71 { 72 _observer.failure(e); 73 } 74 _disposable.dispose(); 75 } 76 77 private: 78 TObserver _observer; 79 shared(AtomicCounter) _counter; 80 Disposable _disposable; 81 } 82 83 auto disposable = new SingleAssignmentDisposable; 84 disposable.setDisposable(disposableObject(doSubscribe(_observable, 85 TakeObserver(observer, _count, disposable)))); 86 return disposable; 87 } 88 89 private: 90 TObservable _observable; 91 size_t _count; 92 } 93 94 return TakeObservable(observable, n); 95 } 96 /// 97 unittest 98 { 99 import std.array; 100 import rx.subject; 101 102 auto pub = new SubjectObject!int; 103 auto sub = appender!(int[]); 104 105 auto d = pub.take(2).subscribe(sub); 106 foreach (i; 0 .. 10) 107 { 108 pub.put(i); 109 } 110 111 import std.algorithm; 112 113 assert(equal(sub.data, [0, 1])); 114 } 115 116 unittest 117 { 118 import rx.subject; 119 120 auto subject = new SubjectObject!int; 121 auto taken = subject.take(1); 122 static assert(isObservable!(typeof(taken), int)); 123 124 import std.array : appender; 125 126 auto buf = appender!(int[]); 127 auto disposable = taken.subscribe(buf); 128 129 subject.put(0); 130 assert(buf.data.length == 1); 131 subject.put(1); 132 assert(buf.data.length == 1); 133 134 auto buf2 = appender!(int[]); 135 taken.subscribe(buf2); 136 assert(buf2.data.length == 0); 137 subject.put(2); 138 assert(buf2.data.length == 1); 139 assert(buf.data.length == 1); 140 subject.put(3); 141 assert(buf2.data.length == 1); 142 assert(buf.data.length == 1); 143 } 144 145 unittest 146 { 147 import rx.subject; 148 149 auto sub = new SubjectObject!int; 150 auto taken = sub.take(2); 151 152 int countPut = 0; 153 int countCompleted = 0; 154 struct TestObserver 155 { 156 void put(int n) 157 { 158 countPut++; 159 } 160 161 void completed() 162 { 163 countCompleted++; 164 } 165 } 166 167 auto d = taken.doSubscribe(TestObserver()); 168 assert(countPut == 0); 169 sub.put(1); 170 assert(countPut == 1); 171 assert(countCompleted == 0); 172 sub.put(2); 173 assert(countPut == 2); 174 assert(countCompleted == 1); 175 } 176 177 unittest 178 { 179 import rx.subject : SubjectObject; 180 181 auto source1 = new SubjectObject!int; 182 auto source2 = new SubjectObject!int; 183 184 import rx.algorithm : merge; 185 186 auto source = merge(source1, source2).take(2); 187 int[] result; 188 source.doSubscribe!(n => result ~= n); 189 190 .put(source1, 0); 191 .put(source2, 1); 192 .put(source1, 2); 193 .put(source2, 3); 194 195 assert(result.length == 2); 196 assert(result[0] == 0); 197 assert(result[1] == 1); 198 }