1 module rx.range.drop; 2 3 import rx.disposable; 4 import rx.observer; 5 import rx.observable; 6 import rx.util; 7 8 import std.range : put; 9 10 //#################### 11 // Drop 12 //#################### 13 ///Creates the observable that results from discarding the first n elements from the given source. 14 auto drop(TObservable)(auto ref TObservable observable, size_t n) 15 { 16 static struct DropObservable 17 { 18 public: 19 alias ElementType = TObservable.ElementType; 20 21 public: 22 this(TObservable observable, size_t n) 23 { 24 _observable = observable; 25 _count = n; 26 } 27 28 public: 29 auto subscribe(TObserver)(TObserver observer) 30 { 31 static struct DropObserver 32 { 33 mixin SimpleObserverImpl!(TObserver, ElementType); 34 35 public: 36 this(TObserver observer, size_t count) 37 { 38 _observer = observer; 39 _counter = new shared(AtomicCounter)(count); 40 } 41 42 static if (hasCompleted!TObserver || hasFailure!TObserver) 43 { 44 this(TObserver observer, size_t count, Disposable disposable) 45 { 46 _observer = observer; 47 _counter = new shared(AtomicCounter)(count); 48 _disposable = disposable; 49 } 50 } 51 52 private: 53 void putImpl(ElementType obj) 54 { 55 if (_counter.tryUpdateCount()) 56 { 57 .put(_observer, obj); 58 } 59 } 60 61 private: 62 shared(AtomicCounter) _counter; 63 } 64 65 static if (hasCompleted!TObserver || hasFailure!TObserver) 66 { 67 auto disposable = new SingleAssignmentDisposable; 68 disposable.setDisposable(disposableObject(doSubscribe(_observable, 69 DropObserver(observer, _count, disposable)))); 70 return disposable; 71 } 72 else 73 { 74 return doSubscribe(_observable, DropObserver(observer, _count)); 75 } 76 } 77 78 private: 79 TObservable _observable; 80 size_t _count; 81 } 82 83 return DropObservable(observable, n); 84 } 85 /// 86 unittest 87 { 88 import rx.subject; 89 90 auto subject = new SubjectObject!int; 91 auto dropped = subject.drop(1); 92 static assert(isObservable!(typeof(dropped), int)); 93 94 import std.array : appender; 95 96 auto buf = appender!(int[]); 97 auto disposable = dropped.subscribe(buf); 98 99 subject.put(0); 100 assert(buf.data.length == 0); 101 subject.put(1); 102 assert(buf.data.length == 1); 103 104 auto buf2 = appender!(int[]); 105 dropped.subscribe(buf2); 106 assert(buf2.data.length == 0); 107 subject.put(2); 108 assert(buf2.data.length == 0); 109 assert(buf.data.length == 2); 110 subject.put(3); 111 assert(buf2.data.length == 1); 112 assert(buf.data.length == 3); 113 } 114 115 unittest 116 { 117 import rx.subject : SubjectObject; 118 119 auto sub = new SubjectObject!(int[]); 120 int count = 0; 121 auto d = sub.drop(1).subscribe((int) { count++; }); 122 scope (exit) 123 d.dispose(); 124 125 assert(count == 0); 126 sub.put([1, 2]); 127 assert(count == 0); 128 sub.put([2, 3]); 129 assert(count == 2); 130 } 131 132 unittest 133 { 134 import rx.subject : SubjectObject; 135 136 auto source1 = new SubjectObject!int; 137 auto source2 = new SubjectObject!int; 138 139 import rx.algorithm : merge; 140 141 auto source = merge(source1, source2).drop(2); 142 int[] result; 143 source.doSubscribe!(n => result ~= n); 144 145 .put(source1, 0); 146 .put(source2, 1); 147 .put(source1, 2); 148 .put(source2, 3); 149 150 assert(result.length == 2); 151 assert(result[0] == 2); 152 assert(result[1] == 3); 153 }