1 /+++++++++++++++++++++++++++++
2 + This module is a submodule of rx.range.
3 + It provides basic operation a 'drop'
4 +/
5 module rx.range.drop;
6
7 import rx.disposable;
8 import rx.observer;
9 import rx.observable;
10 import rx.util;
11
12 import std.range : put;
13
14 //####################
15 // Drop
16 //####################
17 ///Creates the observable that results from discarding the first n elements from the given source.
18 auto drop(TObservable)(auto ref TObservable observable, size_t n)
19 {
20 static struct DropObservable
21 {
22 public:
23 alias ElementType = TObservable.ElementType;
24
25 public:
26 this(TObservable observable, size_t n)
27 {
28 _observable = observable;
29 _count = n;
30 }
31
32 public:
33 auto subscribe(TObserver)(TObserver observer)
34 {
35 static struct DropObserver
36 {
37 mixin SimpleObserverImpl!(TObserver, ElementType);
38
39 public:
40 this(TObserver observer, size_t count)
41 {
42 _observer = observer;
43 _counter = new shared(AtomicCounter)(count);
44 }
45
46 static if (hasCompleted!TObserver || hasFailure!TObserver)
47 {
48 this(TObserver observer, size_t count, Disposable disposable)
49 {
50 _observer = observer;
51 _counter = new shared(AtomicCounter)(count);
52 _disposable = disposable;
53 }
54 }
55
56 private:
57 void putImpl(ElementType obj)
58 {
59 if (_counter.tryUpdateCount())
60 {
61 .put(_observer, obj);
62 }
63 }
64
65 private:
66 shared(AtomicCounter) _counter;
67 }
68
69 static if (hasCompleted!TObserver || hasFailure!TObserver)
70 {
71 auto disposable = new SingleAssignmentDisposable;
72 disposable.setDisposable(disposableObject(doSubscribe(_observable,
73 DropObserver(observer, _count, disposable))));
74 return disposable;
75 }
76 else
77 {
78 return doSubscribe(_observable, DropObserver(observer, _count));
79 }
80 }
81
82 private:
83 TObservable _observable;
84 size_t _count;
85 }
86
87 return DropObservable(observable, n);
88 }
89 ///
90 unittest
91 {
92 import rx.subject;
93
94 auto subject = new SubjectObject!int;
95 auto dropped = subject.drop(1);
96 static assert(isObservable!(typeof(dropped), int));
97
98 import std.array : appender;
99
100 auto buf = appender!(int[]);
101 auto disposable = dropped.subscribe(buf);
102
103 subject.put(0);
104 assert(buf.data.length == 0);
105 subject.put(1);
106 assert(buf.data.length == 1);
107
108 auto buf2 = appender!(int[]);
109 dropped.subscribe(buf2);
110 assert(buf2.data.length == 0);
111 subject.put(2);
112 assert(buf2.data.length == 0);
113 assert(buf.data.length == 2);
114 subject.put(3);
115 assert(buf2.data.length == 1);
116 assert(buf.data.length == 3);
117 }
118
119 unittest
120 {
121 import rx.subject : SubjectObject;
122
123 auto sub = new SubjectObject!(int[]);
124 int count = 0;
125 auto d = sub.drop(1).subscribe((int) { count++; });
126 scope (exit)
127 d.dispose();
128
129 assert(count == 0);
130 sub.put([1, 2]);
131 assert(count == 0);
132 sub.put([2, 3]);
133 assert(count == 2);
134 }
135
136 unittest
137 {
138 import rx.subject : SubjectObject;
139
140 auto source1 = new SubjectObject!int;
141 auto source2 = new SubjectObject!int;
142
143 import rx.algorithm : merge;
144
145 auto source = merge(source1, source2).drop(2);
146 int[] result;
147 source.doSubscribe!(n => result ~= n);
148
149 .put(source1, 0);
150 .put(source2, 1);
151 .put(source1, 2);
152 .put(source2, 3);
153
154 assert(result.length == 2);
155 assert(result[0] == 2);
156 assert(result[1] == 3);
157 }