1 module rx.range.drop;
2 
3 import rx.disposable;
4 import rx.observer;
5 import rx.observable;
6 import rx.util;
7 
8 import std.range : put;
9 
10 //####################
11 // Drop
12 //####################
13 ///Creates the observable that results from discarding the first n elements from the given source.
14 auto drop(TObservable)(auto ref TObservable observable, size_t n)
15 {
16     static struct DropObservable
17     {
18     public:
19         alias ElementType = TObservable.ElementType;
20 
21     public:
22         this(TObservable observable, size_t n)
23         {
24             _observable = observable;
25             _count = n;
26         }
27 
28     public:
29         auto subscribe(TObserver)(TObserver observer)
30         {
31             static struct DropObserver
32             {
33                 mixin SimpleObserverImpl!(TObserver, ElementType);
34 
35             public:
36                 this(TObserver observer, size_t count)
37                 {
38                     _observer = observer;
39                     _counter = new shared(AtomicCounter)(count);
40                 }
41 
42                 static if (hasCompleted!TObserver || hasFailure!TObserver)
43                 {
44                     this(TObserver observer, size_t count, Disposable disposable)
45                     {
46                         _observer = observer;
47                         _counter = new shared(AtomicCounter)(count);
48                         _disposable = disposable;
49                     }
50                 }
51 
52             private:
53                 void putImpl(ElementType obj)
54                 {
55                     if (_counter.tryUpdateCount())
56                     {
57                         .put(_observer, obj);
58                     }
59                 }
60 
61             private:
62                 shared(AtomicCounter) _counter;
63             }
64 
65             static if (hasCompleted!TObserver || hasFailure!TObserver)
66             {
67                 auto disposable = new SingleAssignmentDisposable;
68                 disposable.setDisposable(disposableObject(doSubscribe(_observable,
69                         DropObserver(observer, _count, disposable))));
70                 return disposable;
71             }
72             else
73             {
74                 return doSubscribe(_observable, DropObserver(observer, _count));
75             }
76         }
77 
78     private:
79         TObservable _observable;
80         size_t _count;
81     }
82 
83     return DropObservable(observable, n);
84 }
85 ///
86 unittest
87 {
88     import rx.subject;
89 
90     auto subject = new SubjectObject!int;
91     auto dropped = subject.drop(1);
92     static assert(isObservable!(typeof(dropped), int));
93 
94     import std.array : appender;
95 
96     auto buf = appender!(int[]);
97     auto disposable = dropped.subscribe(buf);
98 
99     subject.put(0);
100     assert(buf.data.length == 0);
101     subject.put(1);
102     assert(buf.data.length == 1);
103 
104     auto buf2 = appender!(int[]);
105     dropped.subscribe(buf2);
106     assert(buf2.data.length == 0);
107     subject.put(2);
108     assert(buf2.data.length == 0);
109     assert(buf.data.length == 2);
110     subject.put(3);
111     assert(buf2.data.length == 1);
112     assert(buf.data.length == 3);
113 }
114 
115 unittest
116 {
117     import rx.subject : SubjectObject;
118 
119     auto sub = new SubjectObject!(int[]);
120     int count = 0;
121     auto d = sub.drop(1).subscribe((int) { count++; });
122     scope (exit)
123         d.dispose();
124 
125     assert(count == 0);
126     sub.put([1, 2]);
127     assert(count == 0);
128     sub.put([2, 3]);
129     assert(count == 2);
130 }
131 
132 unittest
133 {
134     import rx.subject : SubjectObject;
135 
136     auto source1 = new SubjectObject!int;
137     auto source2 = new SubjectObject!int;
138 
139     import rx.algorithm : merge;
140 
141     auto source = merge(source1, source2).drop(2);
142     int[] result;
143     source.doSubscribe!(n => result ~= n);
144 
145     .put(source1, 0);
146     .put(source2, 1);
147     .put(source1, 2);
148     .put(source2, 3);
149 
150     assert(result.length == 2);
151     assert(result[0] == 2);
152     assert(result[1] == 3);
153 }