1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'any'
3  +/
4 module rx.algorithm.any;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 
11 import std.functional : unaryFun;
12 import std.range : put;
13 
14 struct AnyObserver(TObserver, E, alias pred = "true")
15 {
16     this() @disable;
17     this(TObserver observer, Disposable cancel)
18     {
19         _observer = observer;
20         _cancel = cancel;
21         _ticket = new Ticket;
22     }
23 
24     void put(E obj)
25     {
26         alias fun = unaryFun!pred;
27         if (fun(obj))
28         {
29             if (!_ticket.stamp())
30                 return;
31 
32             _observer.put(true);
33             _cancel.dispose();
34         }
35     }
36 
37     void failure(Exception)
38     {
39         if (!_ticket.stamp())
40             return;
41 
42         _observer.put(false);
43 
44         static if (hasCompleted!TObserver)
45         {
46             _observer.completed();
47         }
48         _cancel.dispose();
49     }
50 
51     void completed()
52     {
53         if (!_ticket.stamp())
54             return;
55 
56         _observer.put(false);
57         static if (hasCompleted!TObserver)
58         {
59             _observer.completed();
60         }
61         _cancel.dispose();
62     }
63 
64 private:
65     TObserver _observer;
66     Disposable _cancel;
67     Ticket _ticket;
68 }
69 
70 unittest
71 {
72     import std.array : Appender;
73 
74     alias Buffer = Appender!(bool[]);
75     alias TObserver = AnyObserver!(Buffer, int);
76     assert(isObserver!(TObserver, int));
77 }
78 
79 unittest
80 {
81     import std.array : Appender, appender;
82 
83     alias TObserver = AnyObserver!(Appender!(bool[]), int);
84     auto buf = appender!(bool[]);
85     auto a = TObserver(buf, NopDisposable.instance);
86 
87     assert(buf.data.length == 0);
88     a.put(0);
89     assert(buf.data.length == 1);
90     assert(buf.data[0] == true);
91     a.put(0);
92     assert(buf.data.length == 1);
93 
94     auto b = a;
95     b.put(1);
96     assert(buf.data.length == 1);
97 }
98 
99 unittest
100 {
101     import std.array : Appender, appender;
102 
103     alias TObserver = AnyObserver!(Appender!(bool[]), int);
104 
105     auto buf = appender!(bool[]);
106     {
107         auto a = TObserver(buf, NopDisposable.instance);
108         assert(buf.data.length == 0);
109         a.failure(null);
110         assert(buf.data.length == 1);
111         assert(buf.data[0] == false);
112     }
113     buf.clear();
114     {
115         auto a = TObserver(buf, NopDisposable.instance);
116         assert(buf.data.length == 0);
117         a.completed();
118         assert(buf.data.length == 1);
119         assert(buf.data[0] == false);
120     }
121 }
122 
123 unittest
124 {
125     import std.array : Appender, appender;
126 
127     alias TObserver = AnyObserver!(Appender!(bool[]), int, "a % 2 == 0");
128 
129     auto buf = appender!(bool[]);
130     auto a = TObserver(buf, NopDisposable.instance);
131 
132     assert(buf.data.length == 0);
133     a.put(1);
134     assert(buf.data.length == 0);
135     a.put(2);
136     assert(buf.data.length == 1);
137     assert(buf.data[0] == true);
138 }
139 
140 struct AnyObservable(TObservable, alias pred = "true")
141 {
142     alias ElementType = bool;
143 
144     this(TObservable observable)
145     {
146         _observable = observable;
147     }
148 
149     Disposable subscribe(TObserver)(auto ref TObserver observer)
150     {
151         alias ObserverType = AnyObserver!(TObserver, TObservable.ElementType, pred);
152 
153         auto subscription = new SingleAssignmentDisposable;
154         subscription.setDisposable(disposableObject(_observable.doSubscribe(ObserverType(observer,
155                 subscription))));
156         return subscription;
157     }
158 
159 private:
160     TObservable _observable;
161 }
162 
163 unittest
164 {
165     alias TObservable = AnyObservable!(Observable!int);
166 
167     import rx.subject : SubjectObject;
168 
169     auto sub = new SubjectObject!int;
170     auto o1 = TObservable(sub);
171 
172     import std.array : appender;
173 
174     auto buf = appender!(bool[]);
175     auto d = o1.subscribe(buf);
176 
177     assert(buf.data.length == 0);
178     sub.put(1);
179     assert(buf.data.length == 1);
180     assert(buf.data[0] == true);
181 
182     d.dispose();
183 }
184 
185 unittest
186 {
187     alias TObservable = AnyObservable!(Observable!int, "a % 2 == 0");
188 
189     import rx.subject : SubjectObject;
190 
191     auto sub = new SubjectObject!int;
192     auto o1 = TObservable(sub);
193 
194     import std.array : appender;
195 
196     auto buf = appender!(bool[]);
197     auto d = o1.subscribe(buf);
198 
199     assert(buf.data.length == 0);
200     sub.put(1);
201     assert(buf.data.length == 0);
202     sub.put(2);
203     assert(buf.data.length == 1);
204     assert(buf.data[0] == true);
205 
206     d.dispose();
207 }
208 
209 unittest
210 {
211     alias TObservable = AnyObservable!(Observable!int, a => a % 3 == 0);
212 
213     import rx.subject : SubjectObject;
214 
215     auto sub = new SubjectObject!int;
216     auto o1 = TObservable(sub);
217 
218     import std.array : appender;
219 
220     auto buf = appender!(bool[]);
221     auto d = o1.subscribe(buf);
222 
223     assert(buf.data.length == 0);
224     sub.put(1);
225     assert(buf.data.length == 0);
226     sub.put(3);
227     assert(buf.data.length == 1);
228     assert(buf.data[0] == true);
229 
230     d.dispose();
231 }
232 
233 unittest
234 {
235     alias TObservable = AnyObservable!(Observable!int);
236 
237     import rx.subject : SubjectObject;
238 
239     auto sub = new SubjectObject!int;
240     auto o1 = TObservable(sub);
241 
242     import std.array : appender;
243 
244     auto buf = appender!(bool[]);
245     auto d = o1.subscribe(buf);
246 
247     assert(buf.data.length == 0);
248     sub.failure(null);
249     assert(buf.data.length == 1);
250     assert(buf.data[0] == false);
251 
252     sub.put(0);
253     assert(buf.data.length == 1);
254 
255     d.dispose();
256 }
257 
258 unittest
259 {
260     alias TObservable = AnyObservable!(Observable!int);
261 
262     import rx.subject : SubjectObject;
263 
264     auto sub = new SubjectObject!int;
265     auto o1 = TObservable(sub);
266 
267     import std.array : appender;
268 
269     auto buf = appender!(bool[]);
270     auto d = o1.subscribe(buf);
271 
272     assert(buf.data.length == 0);
273     sub.completed();
274     assert(buf.data.length == 1);
275     assert(buf.data[0] == false);
276 
277     sub.put(0);
278     assert(buf.data.length == 1);
279 
280     d.dispose();
281 }
282 
283 unittest
284 {
285     alias TObservable = AnyObservable!(Observable!int);
286 
287     import rx.subject : SubjectObject;
288 
289     auto sub = new SubjectObject!int;
290     auto observable = TObservable(sub);
291 
292     import std.array : appender;
293 
294     auto buf = appender!(bool[]);
295     auto d = observable.subscribe(buf);
296 
297     d.dispose();
298 
299     assert(buf.data.length == 0);
300     sub.put(0);
301     assert(buf.data.length == 0);
302 }
303 
304 ///
305 template any(alias pred = "true")
306 {
307     AnyObservable!(TObservable, pred) any(TObservable)(auto ref TObservable observable)
308     {
309         return typeof(return)(observable);
310     }
311 }
312 ///
313 unittest
314 {
315     import rx.subject : SubjectObject;
316 
317     auto sub = new SubjectObject!int;
318 
319     bool result = false;
320     sub.any!("a % 2 == 0").doSubscribe((bool) { result = true; });
321 
322     assert(result == false);
323     sub.put(1);
324     assert(result == false);
325     sub.put(0);
326     assert(result == true);
327 }
328 
329 unittest
330 {
331     import rx.subject : SubjectObject;
332 
333     auto sub = new SubjectObject!int;
334 
335     bool result = false;
336     sub.any!().doSubscribe((bool) { result = true; });
337 
338     assert(result == false);
339     sub.put(1);
340     assert(result == true);
341 }
342 
343 ///
344 AnyObservable!TObservable any(TObservable)(auto ref TObservable observable)
345 {
346     return typeof(return)(observable);
347 }
348 ///
349 unittest
350 {
351     import rx.subject : SubjectObject;
352 
353     auto sub = new SubjectObject!int;
354 
355     bool result = false;
356     sub.any().doSubscribe((bool) { result = true; });
357 
358     assert(result == false);
359     sub.put(1);
360     assert(result == true);
361 }
362 
363 unittest
364 {
365     import rx.algorithm : filter;
366     import rx.subject : SubjectObject;
367 
368     auto sub = new SubjectObject!int;
369 
370     bool result = true;
371     sub.filter!"a % 2 == 0"().any().doSubscribe((bool t) { result = t; });
372 
373     assert(result == true);
374     sub.completed();
375     assert(result == false);
376 }
377 
378 unittest
379 {
380     import rx.algorithm : filter;
381     import rx.subject : SubjectObject;
382 
383     auto sub = new SubjectObject!int;
384 
385     bool result = true;
386     sub.filter!"a % 2 == 0"().any().doSubscribe!(t => result = t);
387 
388     assert(result == true);
389     sub.completed();
390     assert(result == false);
391 }