1 /+++++++++++++++++++++++++++++
2  + This module is a submodule of rx.range.
3  + It provides basic operation a 'take'
4  +/
5  module rx.range.take;
6 
7 import rx.disposable;
8 import rx.observer;
9 import rx.observable;
10 import rx.util;
11 
12 import std.range : put;
13 
14 //####################
15 // Take
16 //####################
17 ///Creates a sub-observable consisting of only up to the first n elements of the given source.
18 auto take(TObservable)(auto ref TObservable observable, size_t n)
19 {
20     static struct TakeObservable
21     {
22     public:
23         alias ElementType = TObservable.ElementType;
24 
25     public:
26         this(TObservable observable, size_t n)
27         {
28             _observable = observable;
29             _count = n;
30         }
31 
32     public:
33         auto subscribe(TObserver)(TObserver observer)
34         {
35             static struct TakeObserver
36             {
37             public:
38                 this(TObserver observer, size_t count, Disposable disposable)
39                 {
40                     _observer = observer;
41                     _counter = new shared(AtomicCounter)(count);
42                     _disposable = disposable;
43                 }
44 
45             public:
46                 void put(ElementType obj)
47                 {
48                     auto result = _counter.tryDecrement();
49                     if (result.success)
50                     {
51                         .put(_observer, obj);
52                         if (result.count == 0)
53                         {
54                             static if (hasCompleted!TObserver)
55                             {
56                                 _observer.completed();
57                             }
58                             _disposable.dispose();
59                         }
60                     }
61                 }
62 
63                 void completed()
64                 {
65                     static if (hasCompleted!TObserver)
66                     {
67                         _observer.completed();
68                     }
69                     _disposable.dispose();
70                 }
71 
72                 void failure(Exception e)
73                 {
74                     static if (hasFailure!TObserver)
75                     {
76                         _observer.failure(e);
77                     }
78                     _disposable.dispose();
79                 }
80 
81             private:
82                 TObserver _observer;
83                 shared(AtomicCounter) _counter;
84                 Disposable _disposable;
85             }
86 
87             auto disposable = new SingleAssignmentDisposable;
88             disposable.setDisposable(disposableObject(doSubscribe(_observable,
89                     TakeObserver(observer, _count, disposable))));
90             return disposable;
91         }
92 
93     private:
94         TObservable _observable;
95         size_t _count;
96     }
97 
98     return TakeObservable(observable, n);
99 }
100 ///
101 unittest
102 {
103     import std.array;
104     import rx.subject;
105 
106     auto pub = new SubjectObject!int;
107     auto sub = appender!(int[]);
108 
109     auto d = pub.take(2).subscribe(sub);
110     foreach (i; 0 .. 10)
111     {
112         pub.put(i);
113     }
114 
115     import std.algorithm;
116 
117     assert(equal(sub.data, [0, 1]));
118 }
119 
120 unittest
121 {
122     import rx.subject;
123 
124     auto subject = new SubjectObject!int;
125     auto taken = subject.take(1);
126     static assert(isObservable!(typeof(taken), int));
127 
128     import std.array : appender;
129 
130     auto buf = appender!(int[]);
131     auto disposable = taken.subscribe(buf);
132 
133     subject.put(0);
134     assert(buf.data.length == 1);
135     subject.put(1);
136     assert(buf.data.length == 1);
137 
138     auto buf2 = appender!(int[]);
139     taken.subscribe(buf2);
140     assert(buf2.data.length == 0);
141     subject.put(2);
142     assert(buf2.data.length == 1);
143     assert(buf.data.length == 1);
144     subject.put(3);
145     assert(buf2.data.length == 1);
146     assert(buf.data.length == 1);
147 }
148 
149 unittest
150 {
151     import rx.subject;
152 
153     auto sub = new SubjectObject!int;
154     auto taken = sub.take(2);
155 
156     int countPut = 0;
157     int countCompleted = 0;
158     struct TestObserver
159     {
160         void put(int n)
161         {
162             countPut++;
163         }
164 
165         void completed()
166         {
167             countCompleted++;
168         }
169     }
170 
171     auto d = taken.doSubscribe(TestObserver());
172     assert(countPut == 0);
173     sub.put(1);
174     assert(countPut == 1);
175     assert(countCompleted == 0);
176     sub.put(2);
177     assert(countPut == 2);
178     assert(countCompleted == 1);
179 }
180 
181 unittest
182 {
183     import rx.subject : SubjectObject;
184 
185     auto source1 = new SubjectObject!int;
186     auto source2 = new SubjectObject!int;
187 
188     import rx.algorithm : merge;
189 
190     auto source = merge(source1, source2).take(2);
191     int[] result;
192     source.doSubscribe!(n => result ~= n);
193 
194     .put(source1, 0);
195     .put(source2, 1);
196     .put(source1, 2);
197     .put(source2, 3);
198 
199     assert(result.length == 2);
200     assert(result[0] == 0);
201     assert(result[1] == 1);
202 }