1 /+++++++++++++++++++++++++++++ 2 + This module is a submodule of rx.range. 3 + It provides basic operation a 'take' 4 +/ 5 module rx.range.take; 6 7 import rx.disposable; 8 import rx.observer; 9 import rx.observable; 10 import rx.util; 11 12 import std.range : put; 13 14 //#################### 15 // Take 16 //#################### 17 ///Creates a sub-observable consisting of only up to the first n elements of the given source. 18 auto take(TObservable)(auto ref TObservable observable, size_t n) 19 { 20 static struct TakeObservable 21 { 22 public: 23 alias ElementType = TObservable.ElementType; 24 25 public: 26 this(TObservable observable, size_t n) 27 { 28 _observable = observable; 29 _count = n; 30 } 31 32 public: 33 auto subscribe(TObserver)(TObserver observer) 34 { 35 static struct TakeObserver 36 { 37 public: 38 this(TObserver observer, size_t count, Disposable disposable) 39 { 40 _observer = observer; 41 _counter = new shared(AtomicCounter)(count); 42 _disposable = disposable; 43 } 44 45 public: 46 void put(ElementType obj) 47 { 48 auto result = _counter.tryDecrement(); 49 if (result.success) 50 { 51 .put(_observer, obj); 52 if (result.count == 0) 53 { 54 static if (hasCompleted!TObserver) 55 { 56 _observer.completed(); 57 } 58 _disposable.dispose(); 59 } 60 } 61 } 62 63 void completed() 64 { 65 static if (hasCompleted!TObserver) 66 { 67 _observer.completed(); 68 } 69 _disposable.dispose(); 70 } 71 72 void failure(Exception e) 73 { 74 static if (hasFailure!TObserver) 75 { 76 _observer.failure(e); 77 } 78 _disposable.dispose(); 79 } 80 81 private: 82 TObserver _observer; 83 shared(AtomicCounter) _counter; 84 Disposable _disposable; 85 } 86 87 auto disposable = new SingleAssignmentDisposable; 88 disposable.setDisposable(disposableObject(doSubscribe(_observable, 89 TakeObserver(observer, _count, disposable)))); 90 return disposable; 91 } 92 93 private: 94 TObservable _observable; 95 size_t _count; 96 } 97 98 return TakeObservable(observable, n); 99 } 100 /// 101 unittest 102 { 103 import std.array; 104 import rx.subject; 105 106 auto pub = new SubjectObject!int; 107 auto sub = appender!(int[]); 108 109 auto d = pub.take(2).subscribe(sub); 110 foreach (i; 0 .. 10) 111 { 112 pub.put(i); 113 } 114 115 import std.algorithm; 116 117 assert(equal(sub.data, [0, 1])); 118 } 119 120 unittest 121 { 122 import rx.subject; 123 124 auto subject = new SubjectObject!int; 125 auto taken = subject.take(1); 126 static assert(isObservable!(typeof(taken), int)); 127 128 import std.array : appender; 129 130 auto buf = appender!(int[]); 131 auto disposable = taken.subscribe(buf); 132 133 subject.put(0); 134 assert(buf.data.length == 1); 135 subject.put(1); 136 assert(buf.data.length == 1); 137 138 auto buf2 = appender!(int[]); 139 taken.subscribe(buf2); 140 assert(buf2.data.length == 0); 141 subject.put(2); 142 assert(buf2.data.length == 1); 143 assert(buf.data.length == 1); 144 subject.put(3); 145 assert(buf2.data.length == 1); 146 assert(buf.data.length == 1); 147 } 148 149 unittest 150 { 151 import rx.subject; 152 153 auto sub = new SubjectObject!int; 154 auto taken = sub.take(2); 155 156 int countPut = 0; 157 int countCompleted = 0; 158 struct TestObserver 159 { 160 void put(int n) 161 { 162 countPut++; 163 } 164 165 void completed() 166 { 167 countCompleted++; 168 } 169 } 170 171 auto d = taken.doSubscribe(TestObserver()); 172 assert(countPut == 0); 173 sub.put(1); 174 assert(countPut == 1); 175 assert(countCompleted == 0); 176 sub.put(2); 177 assert(countPut == 2); 178 assert(countCompleted == 1); 179 } 180 181 unittest 182 { 183 import rx.subject : SubjectObject; 184 185 auto source1 = new SubjectObject!int; 186 auto source2 = new SubjectObject!int; 187 188 import rx.algorithm : merge; 189 190 auto source = merge(source1, source2).take(2); 191 int[] result; 192 source.doSubscribe!(n => result ~= n); 193 194 .put(source1, 0); 195 .put(source2, 1); 196 .put(source1, 2); 197 .put(source2, 3); 198 199 assert(result.length == 2); 200 assert(result[0] == 0); 201 assert(result[1] == 1); 202 }