1 /+++++++++++++++++++++++++++++
2  + This module defines some operations like range.
3  +/
4 module rx.range;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.util;
10 
11 import core.atomic : cas, atomicLoad;
12 import std.range : put;
13 
14 /+++++++++++++++++++++++++++++
15  + Overview
16  +/
17 unittest
18 {
19     import rx.subject;
20     import std.algorithm : equal;
21     import std.array : appender;
22     import std.conv : to;
23 
24     auto subject = new SubjectObject!int;
25     auto pub = subject.drop(2).take(3);
26 
27     auto buf = appender!(int[]);
28     auto disposable = pub.subscribe(observerObject!int(buf));
29 
30     foreach (i; 0 .. 10)
31     {
32         subject.put(i);
33     }
34 
35     auto result = buf.data;
36     assert(equal(result, [2, 3, 4]));
37 }
38 
39 //####################
40 // Drop
41 //####################
42 ///Creates the observable that results from discarding the first n elements from the given source.
43 auto drop(TObservable)(auto ref TObservable observable, size_t n)
44 {
45     static struct DropObservable
46     {
47     public:
48         alias ElementType = TObservable.ElementType;
49 
50     public:
51         this(TObservable observable, size_t n)
52         {
53             _observable = observable;
54             _count = n;
55         }
56 
57     public:
58         auto subscribe(TObserver)(TObserver observer)
59         {
60             static struct DropObserver
61             {
62                 mixin SimpleObserverImpl!(TObserver, ElementType);
63 
64             public:
65                 this(TObserver observer, size_t count)
66                 {
67                     _observer = observer;
68                     _counter = new shared(AtomicCounter)(count);
69                 }
70 
71                 static if (hasCompleted!TObserver || hasFailure!TObserver)
72                 {
73                     this(TObserver observer, size_t count, Disposable disposable)
74                     {
75                         _observer = observer;
76                         _counter = new shared(AtomicCounter)(count);
77                         _disposable = disposable;
78                     }
79                 }
80 
81             private:
82                 void putImpl(ElementType obj)
83                 {
84                     if (_counter.tryUpdateCount())
85                     {
86                         .put(_observer, obj);
87                     }
88                 }
89 
90             private:
91                 shared(AtomicCounter) _counter;
92             }
93 
94             static if (hasCompleted!TObserver || hasFailure!TObserver)
95             {
96                 auto disposable = new SingleAssignmentDisposable;
97                 disposable.setDisposable(disposableObject(doSubscribe(_observable,
98                         DropObserver(observer, _count, disposable))));
99                 return disposable;
100             }
101             else
102             {
103                 return doSubscribe(_observable, DropObserver(observer, _count));
104             }
105         }
106 
107     private:
108         TObservable _observable;
109         size_t _count;
110     }
111 
112     return DropObservable(observable, n);
113 }
114 ///
115 unittest
116 {
117     import rx.subject;
118 
119     auto subject = new SubjectObject!int;
120     auto dropped = subject.drop(1);
121     static assert(isObservable!(typeof(dropped), int));
122 
123     import std.array : appender;
124 
125     auto buf = appender!(int[]);
126     auto disposable = dropped.subscribe(buf);
127 
128     subject.put(0);
129     assert(buf.data.length == 0);
130     subject.put(1);
131     assert(buf.data.length == 1);
132 
133     auto buf2 = appender!(int[]);
134     dropped.subscribe(buf2);
135     assert(buf2.data.length == 0);
136     subject.put(2);
137     assert(buf2.data.length == 0);
138     assert(buf.data.length == 2);
139     subject.put(3);
140     assert(buf2.data.length == 1);
141     assert(buf.data.length == 3);
142 }
143 
144 unittest
145 {
146     import rx.subject : SubjectObject;
147 
148     auto sub = new SubjectObject!(int[]);
149     int count = 0;
150     auto d = sub.drop(1).subscribe((int) { count++; });
151     scope (exit)
152         d.dispose();
153 
154     assert(count == 0);
155     sub.put([1, 2]);
156     assert(count == 0);
157     sub.put([2, 3]);
158     assert(count == 2);
159 }
160 
161 //####################
162 // Take
163 //####################
164 ///Creates a sub-observable consisting of only up to the first n elements of the given source.
165 auto take(TObservable)(auto ref TObservable observable, size_t n)
166 {
167     static struct TakeObservable
168     {
169     public:
170         alias ElementType = TObservable.ElementType;
171 
172     public:
173         this(TObservable observable, size_t n)
174         {
175             _observable = observable;
176             _count = n;
177         }
178 
179     public:
180         auto subscribe(TObserver)(TObserver observer)
181         {
182             static struct TakeObserver
183             {
184             public:
185                 this(TObserver observer, size_t count, Disposable disposable)
186                 {
187                     _observer = observer;
188                     _count = count;
189                     _disposable = disposable;
190                 }
191 
192             public:
193                 void put(ElementType obj)
194                 {
195                     shared(size_t) oldValue = void;
196                     size_t newValue = void;
197                     do
198                     {
199                         oldValue = _count;
200                         if (oldValue == 0)
201                             return;
202 
203                         newValue = atomicLoad(oldValue) - 1;
204                     }
205                     while (!cas(&_count, oldValue, newValue));
206 
207                     .put(_observer, obj);
208                     if (newValue == 0)
209                     {
210                         static if (hasCompleted!TObserver)
211                         {
212                             _observer.completed();
213                         }
214                         _disposable.dispose();
215                     }
216                 }
217 
218                 void completed()
219                 {
220                     static if (hasCompleted!TObserver)
221                     {
222                         _observer.completed();
223                     }
224                     _disposable.dispose();
225                 }
226 
227                 void failure(Exception e)
228                 {
229                     static if (hasFailure!TObserver)
230                     {
231                         _observer.failure(e);
232                     }
233                     _disposable.dispose();
234                 }
235 
236             private:
237                 TObserver _observer;
238                 shared(size_t) _count;
239                 Disposable _disposable;
240             }
241 
242             auto disposable = new SingleAssignmentDisposable;
243             disposable.setDisposable(disposableObject(doSubscribe(_observable,
244                     TakeObserver(observer, _count, disposable))));
245             return disposable;
246         }
247 
248     private:
249         TObservable _observable;
250         size_t _count;
251     }
252 
253     return TakeObservable(observable, n);
254 }
255 ///
256 unittest
257 {
258     import std.array;
259     import rx.subject;
260 
261     auto pub = new SubjectObject!int;
262     auto sub = appender!(int[]);
263 
264     auto d = pub.take(2).subscribe(sub);
265     foreach (i; 0 .. 10)
266     {
267         pub.put(i);
268     }
269 
270     import std.algorithm;
271 
272     assert(equal(sub.data, [0, 1]));
273 }
274 
275 unittest
276 {
277     import rx.subject;
278 
279     auto subject = new SubjectObject!int;
280     auto taken = subject.take(1);
281     static assert(isObservable!(typeof(taken), int));
282 
283     import std.array : appender;
284 
285     auto buf = appender!(int[]);
286     auto disposable = taken.subscribe(buf);
287 
288     subject.put(0);
289     assert(buf.data.length == 1);
290     subject.put(1);
291     assert(buf.data.length == 1);
292 
293     auto buf2 = appender!(int[]);
294     taken.subscribe(buf2);
295     assert(buf2.data.length == 0);
296     subject.put(2);
297     assert(buf2.data.length == 1);
298     assert(buf.data.length == 1);
299     subject.put(3);
300     assert(buf2.data.length == 1);
301     assert(buf.data.length == 1);
302 }
303 
304 unittest
305 {
306     import rx.subject;
307 
308     auto sub = new SubjectObject!int;
309     auto taken = sub.take(2);
310 
311     int countPut = 0;
312     int countCompleted = 0;
313     struct TestObserver
314     {
315         void put(int n)
316         {
317             countPut++;
318         }
319 
320         void completed()
321         {
322             countCompleted++;
323         }
324     }
325 
326     auto d = taken.doSubscribe(TestObserver());
327     assert(countPut == 0);
328     sub.put(1);
329     assert(countPut == 1);
330     assert(countCompleted == 0);
331     sub.put(2);
332     assert(countPut == 2);
333     assert(countCompleted == 1);
334 }
335 
336 //####################
337 // TakeLast
338 //####################
339 ///Creates a observable that take only a last element of the given source.
340 auto takeLast(TObservable)(auto ref TObservable observable)
341 {
342     static struct TakeLastObservable
343     {
344     public:
345         alias ElementType = TObservable.ElementType;
346 
347     public:
348         this(ref TObservable observable)
349         {
350             _observable = observable;
351         }
352 
353     public:
354         auto subscribe(TObserver)(auto ref TObserver observer)
355         {
356             static class TakeLastObserver
357             {
358             public:
359                 this(ref TObserver observer, SingleAssignmentDisposable disposable)
360                 {
361                     _observer = observer;
362                     _disposable = disposable;
363                 }
364 
365             public:
366                 void put(ElementType obj)
367                 {
368                     _current = obj;
369                     _hasValue = true;
370                 }
371 
372                 void completed()
373                 {
374                     if (_hasValue)
375                         .put(_observer, _current);
376 
377                     static if (hasCompleted!TObserver)
378                     {
379                         _observer.completed();
380                     }
381                     _disposable.dispose();
382                 }
383 
384                 static if (hasFailure!TObserver)
385                 {
386                     void failure(Exception e)
387                     {
388                         _observer.failure(e);
389                     }
390                 }
391 
392             private:
393                 bool _hasValue = false;
394                 ElementType _current;
395                 TObserver _observer;
396                 SingleAssignmentDisposable _disposable;
397             }
398 
399             auto d = new SingleAssignmentDisposable;
400             d.setDisposable(disposableObject(doSubscribe(_observable,
401                     new TakeLastObserver(observer, d))));
402             return d;
403         }
404 
405     private:
406         TObservable _observable;
407     }
408 
409     return TakeLastObservable(observable);
410 }
411 ///
412 unittest
413 {
414     import rx.subject;
415 
416     auto sub = new SubjectObject!int;
417 
418     int putCount = 0;
419     int completedCount = 0;
420     struct TestObserver
421     {
422         void put(int n)
423         {
424             putCount++;
425         }
426 
427         void completed()
428         {
429             completedCount++;
430         }
431     }
432 
433     auto d = sub.takeLast.subscribe(TestObserver());
434 
435     assert(putCount == 0);
436     sub.put(1);
437     assert(putCount == 0);
438     sub.put(10);
439     assert(putCount == 0);
440     sub.completed();
441     assert(putCount == 1);
442     assert(completedCount == 1);
443 
444     sub.put(100);
445     assert(putCount == 1);
446     assert(completedCount == 1);
447 }
448 
449 unittest
450 {
451     import rx.subject : SubjectObject;
452 
453     auto sub = new SubjectObject!(int[]);
454 
455     int count = 0;
456     auto d = sub.takeLast.subscribe((int) { count++; });
457     scope(exit) d.dispose();
458 
459     assert(count == 0);
460     sub.put([0]);
461     assert(count == 0);
462     sub.put([1, 2]);
463     assert(count == 0);
464     sub.completed();
465     assert(count == 2);
466 }