1 /+++++++++++++++++++++++++++++
2  + This module is a submodule of rx.range.
3  + It provides basic operation a 'drop'
4  +/
5 module rx.range.drop;
6 
7 import rx.disposable;
8 import rx.observer;
9 import rx.observable;
10 import rx.util;
11 
12 import std.range : put;
13 
14 //####################
15 // Drop
16 //####################
17 ///Creates the observable that results from discarding the first n elements from the given source.
18 auto drop(TObservable)(auto ref TObservable observable, size_t n)
19 {
20     static struct DropObservable
21     {
22     public:
23         alias ElementType = TObservable.ElementType;
24 
25     public:
26         this(TObservable observable, size_t n)
27         {
28             _observable = observable;
29             _count = n;
30         }
31 
32     public:
33         auto subscribe(TObserver)(TObserver observer)
34         {
35             static struct DropObserver
36             {
37                 mixin SimpleObserverImpl!(TObserver, ElementType);
38 
39             public:
40                 this(TObserver observer, size_t count)
41                 {
42                     _observer = observer;
43                     _counter = new shared(AtomicCounter)(count);
44                 }
45 
46                 static if (hasCompleted!TObserver || hasFailure!TObserver)
47                 {
48                     this(TObserver observer, size_t count, Disposable disposable)
49                     {
50                         _observer = observer;
51                         _counter = new shared(AtomicCounter)(count);
52                         _disposable = disposable;
53                     }
54                 }
55 
56             private:
57                 void putImpl(ElementType obj)
58                 {
59                     if (_counter.tryUpdateCount())
60                     {
61                         .put(_observer, obj);
62                     }
63                 }
64 
65             private:
66                 shared(AtomicCounter) _counter;
67             }
68 
69             static if (hasCompleted!TObserver || hasFailure!TObserver)
70             {
71                 auto disposable = new SingleAssignmentDisposable;
72                 disposable.setDisposable(disposableObject(doSubscribe(_observable,
73                         DropObserver(observer, _count, disposable))));
74                 return disposable;
75             }
76             else
77             {
78                 return doSubscribe(_observable, DropObserver(observer, _count));
79             }
80         }
81 
82     private:
83         TObservable _observable;
84         size_t _count;
85     }
86 
87     return DropObservable(observable, n);
88 }
89 ///
90 unittest
91 {
92     import rx.subject;
93 
94     auto subject = new SubjectObject!int;
95     auto dropped = subject.drop(1);
96     static assert(isObservable!(typeof(dropped), int));
97 
98     import std.array : appender;
99 
100     auto buf = appender!(int[]);
101     auto disposable = dropped.subscribe(buf);
102 
103     subject.put(0);
104     assert(buf.data.length == 0);
105     subject.put(1);
106     assert(buf.data.length == 1);
107 
108     auto buf2 = appender!(int[]);
109     dropped.subscribe(buf2);
110     assert(buf2.data.length == 0);
111     subject.put(2);
112     assert(buf2.data.length == 0);
113     assert(buf.data.length == 2);
114     subject.put(3);
115     assert(buf2.data.length == 1);
116     assert(buf.data.length == 3);
117 }
118 
119 unittest
120 {
121     import rx.subject : SubjectObject;
122 
123     auto sub = new SubjectObject!(int[]);
124     int count = 0;
125     auto d = sub.drop(1).subscribe((int) { count++; });
126     scope (exit)
127         d.dispose();
128 
129     assert(count == 0);
130     sub.put([1, 2]);
131     assert(count == 0);
132     sub.put([2, 3]);
133     assert(count == 2);
134 }
135 
136 unittest
137 {
138     import rx.subject : SubjectObject;
139 
140     auto source1 = new SubjectObject!int;
141     auto source2 = new SubjectObject!int;
142 
143     import rx.algorithm : merge;
144 
145     auto source = merge(source1, source2).drop(2);
146     int[] result;
147     source.doSubscribe!(n => result ~= n);
148 
149     .put(source1, 0);
150     .put(source2, 1);
151     .put(source1, 2);
152     .put(source2, 3);
153 
154     assert(result.length == 2);
155     assert(result[0] == 2);
156     assert(result[1] == 3);
157 }