1 /+++++++++++++++++++++++++++++ 2 + This module is a submodule of rx.range. 3 + It provides basic operation a 'drop' 4 +/ 5 module rx.range.drop; 6 7 import rx.disposable; 8 import rx.observer; 9 import rx.observable; 10 import rx.util; 11 12 import std.range : put; 13 14 //#################### 15 // Drop 16 //#################### 17 ///Creates the observable that results from discarding the first n elements from the given source. 18 auto drop(TObservable)(auto ref TObservable observable, size_t n) 19 { 20 static struct DropObservable 21 { 22 public: 23 alias ElementType = TObservable.ElementType; 24 25 public: 26 this(TObservable observable, size_t n) 27 { 28 _observable = observable; 29 _count = n; 30 } 31 32 public: 33 auto subscribe(TObserver)(TObserver observer) 34 { 35 static struct DropObserver 36 { 37 mixin SimpleObserverImpl!(TObserver, ElementType); 38 39 public: 40 this(TObserver observer, size_t count) 41 { 42 _observer = observer; 43 _counter = new shared(AtomicCounter)(count); 44 } 45 46 static if (hasCompleted!TObserver || hasFailure!TObserver) 47 { 48 this(TObserver observer, size_t count, Disposable disposable) 49 { 50 _observer = observer; 51 _counter = new shared(AtomicCounter)(count); 52 _disposable = disposable; 53 } 54 } 55 56 private: 57 void putImpl(ElementType obj) 58 { 59 if (_counter.tryUpdateCount()) 60 { 61 .put(_observer, obj); 62 } 63 } 64 65 private: 66 shared(AtomicCounter) _counter; 67 } 68 69 static if (hasCompleted!TObserver || hasFailure!TObserver) 70 { 71 auto disposable = new SingleAssignmentDisposable; 72 disposable.setDisposable(disposableObject(doSubscribe(_observable, 73 DropObserver(observer, _count, disposable)))); 74 return disposable; 75 } 76 else 77 { 78 return doSubscribe(_observable, DropObserver(observer, _count)); 79 } 80 } 81 82 private: 83 TObservable _observable; 84 size_t _count; 85 } 86 87 return DropObservable(observable, n); 88 } 89 /// 90 unittest 91 { 92 import rx.subject; 93 94 auto subject = new SubjectObject!int; 95 auto dropped = subject.drop(1); 96 static assert(isObservable!(typeof(dropped), int)); 97 98 import std.array : appender; 99 100 auto buf = appender!(int[]); 101 auto disposable = dropped.subscribe(buf); 102 103 subject.put(0); 104 assert(buf.data.length == 0); 105 subject.put(1); 106 assert(buf.data.length == 1); 107 108 auto buf2 = appender!(int[]); 109 dropped.subscribe(buf2); 110 assert(buf2.data.length == 0); 111 subject.put(2); 112 assert(buf2.data.length == 0); 113 assert(buf.data.length == 2); 114 subject.put(3); 115 assert(buf2.data.length == 1); 116 assert(buf.data.length == 3); 117 } 118 119 unittest 120 { 121 import rx.subject : SubjectObject; 122 123 auto sub = new SubjectObject!(int[]); 124 int count = 0; 125 auto d = sub.drop(1).subscribe((int) { count++; }); 126 scope (exit) 127 d.dispose(); 128 129 assert(count == 0); 130 sub.put([1, 2]); 131 assert(count == 0); 132 sub.put([2, 3]); 133 assert(count == 2); 134 } 135 136 unittest 137 { 138 import rx.subject : SubjectObject; 139 140 auto source1 = new SubjectObject!int; 141 auto source2 = new SubjectObject!int; 142 143 import rx.algorithm : merge; 144 145 auto source = merge(source1, source2).drop(2); 146 int[] result; 147 source.doSubscribe!(n => result ~= n); 148 149 .put(source1, 0); 150 .put(source2, 1); 151 .put(source1, 2); 152 .put(source2, 3); 153 154 assert(result.length == 2); 155 assert(result[0] == 2); 156 assert(result[1] == 3); 157 }