1 module rx.range.take;
2 
3 import rx.disposable;
4 import rx.observer;
5 import rx.observable;
6 import rx.util;
7 
8 import std.range : put;
9 
10 //####################
11 // Take
12 //####################
13 ///Creates a sub-observable consisting of only up to the first n elements of the given source.
14 auto take(TObservable)(auto ref TObservable observable, size_t n)
15 {
16     static struct TakeObservable
17     {
18     public:
19         alias ElementType = TObservable.ElementType;
20 
21     public:
22         this(TObservable observable, size_t n)
23         {
24             _observable = observable;
25             _count = n;
26         }
27 
28     public:
29         auto subscribe(TObserver)(TObserver observer)
30         {
31             static struct TakeObserver
32             {
33             public:
34                 this(TObserver observer, size_t count, Disposable disposable)
35                 {
36                     _observer = observer;
37                     _counter = new shared(AtomicCounter)(count);
38                     _disposable = disposable;
39                 }
40 
41             public:
42                 void put(ElementType obj)
43                 {
44                     auto result = _counter.tryDecrement();
45                     if (result.success)
46                     {
47                         .put(_observer, obj);
48                         if (result.count == 0)
49                         {
50                             static if (hasCompleted!TObserver)
51                             {
52                                 _observer.completed();
53                             }
54                             _disposable.dispose();
55                         }
56                     }
57                 }
58 
59                 void completed()
60                 {
61                     static if (hasCompleted!TObserver)
62                     {
63                         _observer.completed();
64                     }
65                     _disposable.dispose();
66                 }
67 
68                 void failure(Exception e)
69                 {
70                     static if (hasFailure!TObserver)
71                     {
72                         _observer.failure(e);
73                     }
74                     _disposable.dispose();
75                 }
76 
77             private:
78                 TObserver _observer;
79                 shared(AtomicCounter) _counter;
80                 Disposable _disposable;
81             }
82 
83             auto disposable = new SingleAssignmentDisposable;
84             disposable.setDisposable(disposableObject(doSubscribe(_observable,
85                     TakeObserver(observer, _count, disposable))));
86             return disposable;
87         }
88 
89     private:
90         TObservable _observable;
91         size_t _count;
92     }
93 
94     return TakeObservable(observable, n);
95 }
96 ///
97 unittest
98 {
99     import std.array;
100     import rx.subject;
101 
102     auto pub = new SubjectObject!int;
103     auto sub = appender!(int[]);
104 
105     auto d = pub.take(2).subscribe(sub);
106     foreach (i; 0 .. 10)
107     {
108         pub.put(i);
109     }
110 
111     import std.algorithm;
112 
113     assert(equal(sub.data, [0, 1]));
114 }
115 
116 unittest
117 {
118     import rx.subject;
119 
120     auto subject = new SubjectObject!int;
121     auto taken = subject.take(1);
122     static assert(isObservable!(typeof(taken), int));
123 
124     import std.array : appender;
125 
126     auto buf = appender!(int[]);
127     auto disposable = taken.subscribe(buf);
128 
129     subject.put(0);
130     assert(buf.data.length == 1);
131     subject.put(1);
132     assert(buf.data.length == 1);
133 
134     auto buf2 = appender!(int[]);
135     taken.subscribe(buf2);
136     assert(buf2.data.length == 0);
137     subject.put(2);
138     assert(buf2.data.length == 1);
139     assert(buf.data.length == 1);
140     subject.put(3);
141     assert(buf2.data.length == 1);
142     assert(buf.data.length == 1);
143 }
144 
145 unittest
146 {
147     import rx.subject;
148 
149     auto sub = new SubjectObject!int;
150     auto taken = sub.take(2);
151 
152     int countPut = 0;
153     int countCompleted = 0;
154     struct TestObserver
155     {
156         void put(int n)
157         {
158             countPut++;
159         }
160 
161         void completed()
162         {
163             countCompleted++;
164         }
165     }
166 
167     auto d = taken.doSubscribe(TestObserver());
168     assert(countPut == 0);
169     sub.put(1);
170     assert(countPut == 1);
171     assert(countCompleted == 0);
172     sub.put(2);
173     assert(countPut == 2);
174     assert(countCompleted == 1);
175 }
176 
177 unittest
178 {
179     import rx.subject : SubjectObject;
180 
181     auto source1 = new SubjectObject!int;
182     auto source2 = new SubjectObject!int;
183 
184     import rx.algorithm : merge;
185 
186     auto source = merge(source1, source2).take(2);
187     int[] result;
188     source.doSubscribe!(n => result ~= n);
189 
190     .put(source1, 0);
191     .put(source2, 1);
192     .put(source1, 2);
193     .put(source2, 3);
194 
195     assert(result.length == 2);
196     assert(result[0] == 0);
197     assert(result[1] == 1);
198 }