1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'all'
3  +/
4 module rx.algorithm.all;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 
11 import std.functional : unaryFun;
12 import std.range : isOutputRange, put;
13 
14 struct AllObserver(TObserver, E, alias pred = "true")
15 {
16     static assert(isOutputRange!(TObserver, bool), "TObserver must be OutputRange of bool.");
17 
18 public:
19     this() @disable;
20 
21     this(TObserver observer, Disposable cancel)
22     {
23         _observer = observer;
24         _cancel = cast(shared) cancel;
25         _ticket = new Ticket;
26         _hasValue = new Ticket;
27     }
28 
29 public:
30     void put(E obj)
31     {
32         _hasValue.stamp();
33 
34         alias fun = unaryFun!pred;
35         static if (hasFailure!TObserver)
36         {
37             bool res = false;
38             try
39             {
40                 res = fun(obj);
41             }
42             catch (Exception e)
43             {
44                 if (!_ticket.stamp())
45                     return;
46 
47                 _observer.failure(e);
48                 dispose();
49                 return;
50             }
51 
52             if (!res)
53             {
54                 if (!_ticket.stamp())
55                     return;
56 
57                 .put(_observer, false);
58                 static if (hasCompleted!TObserver)
59                 {
60                     _observer.completed();
61                 }
62 
63                 dispose();
64             }
65         }
66         else
67         {
68             if (!fun(obj))
69             {
70                 if (!_ticket.stamp())
71                     return;
72 
73                 .put(_observer, false);
74                 static if (hasCompleted!TObserver)
75                 {
76                     _observer.completed();
77                 }
78 
79                 dispose();
80             }
81         }
82     }
83 
84     void failure(Exception e)
85     {
86         if (!_ticket.stamp())
87             return;
88 
89         static if (hasFailure!TObserver)
90         {
91             _observer.failure(e);
92         }
93 
94         dispose();
95     }
96 
97     void completed()
98     {
99         if (!_ticket.stamp())
100             return;
101 
102         .put(_observer, _hasValue.isStamped);
103         static if (hasCompleted!TObserver)
104         {
105             _observer.completed();
106         }
107 
108         dispose();
109     }
110 
111     void dispose()
112     {
113         auto cancel = assumeThreadLocal(exchange(_cancel, null));
114         if (cancel !is null)
115             cancel.dispose();
116     }
117 
118 private:
119     TObserver _observer;
120     shared(Disposable) _cancel;
121     Ticket _ticket;
122     Ticket _hasValue;
123 }
124 
125 unittest
126 {
127     static assert(!__traits(compiles, {
128             AllObserver!(Observer!string, int) observer;
129         }));
130 }
131 
132 unittest
133 {
134     alias TObserver = AllObserver!(Observer!bool, string);
135 
136     static assert(isOutputRange!(TObserver, string));
137     static assert(hasFailure!(TObserver));
138     static assert(hasCompleted!(TObserver));
139 }
140 
141 unittest
142 {
143     alias TObserver = AllObserver!(Observer!bool, string);
144 
145     static class CounterObserver : Observer!bool
146     {
147         void put(bool obj)
148         {
149             putCount++;
150             lastValue = obj;
151         }
152 
153         void failure(Exception e)
154         {
155             failureCount++;
156             lastException = e;
157         }
158 
159         void completed()
160         {
161             completedCount++;
162         }
163 
164         size_t putCount = 0;
165         size_t failureCount = 0;
166         size_t completedCount = 0;
167         bool lastValue;
168         Exception lastException;
169     }
170 
171     static class CounterDisposable : Disposable
172     {
173         void dispose()
174         {
175             disposeCount++;
176         }
177 
178         size_t disposeCount = 0;
179     }
180 
181     {
182         auto counterObserver = new CounterObserver;
183         auto counterDisposable = new CounterDisposable;
184         auto observer = TObserver(counterObserver, counterDisposable);
185 
186         .put(observer, "TEST");
187         observer.completed();
188         assert(counterObserver.putCount == 1);
189         assert(counterObserver.lastValue == true);
190         assert(counterObserver.completedCount == 1);
191         assert(counterDisposable.disposeCount == 1);
192     }
193 
194     {
195         auto counterObserver = new CounterObserver;
196         auto counterDisposable = new CounterDisposable;
197         auto observer = TObserver(counterObserver, counterDisposable);
198 
199         observer.completed();
200         assert(counterObserver.putCount == 1);
201         assert(counterObserver.lastValue == false);
202         assert(counterObserver.completedCount == 1);
203         assert(counterDisposable.disposeCount == 1);
204     }
205 
206     {
207         auto counterObserver = new CounterObserver;
208         auto counterDisposable = new CounterDisposable;
209         auto observer = TObserver(counterObserver, counterDisposable);
210 
211         auto e = new Exception("MyException");
212         observer.failure(e);
213         assert(counterObserver.putCount == 0);
214         assert(counterObserver.failureCount == 1);
215         assert(counterObserver.lastException is e);
216         assert(counterDisposable.disposeCount == 1);
217     }
218 }
219 
220 unittest
221 {
222     alias TObserver = AllObserver!(Observer!bool, int, "a % 2 == 0");
223 
224     static class CounterObserver : Observer!bool
225     {
226         void put(bool obj)
227         {
228             putCount++;
229             lastValue = obj;
230         }
231 
232         void failure(Exception e)
233         {
234             failureCount++;
235             lastException = e;
236         }
237 
238         void completed()
239         {
240             completedCount++;
241         }
242 
243         size_t putCount = 0;
244         size_t failureCount = 0;
245         size_t completedCount = 0;
246         bool lastValue;
247         Exception lastException;
248     }
249 
250     static class CounterDisposable : Disposable
251     {
252         void dispose()
253         {
254             disposeCount++;
255         }
256 
257         size_t disposeCount = 0;
258     }
259 
260     {
261         auto counterObserver = new CounterObserver;
262         auto counterDisposable = new CounterDisposable;
263         auto observer = TObserver(counterObserver, counterDisposable);
264 
265         .put(observer, 0);
266         observer.completed();
267         assert(counterObserver.putCount == 1);
268         assert(counterObserver.lastValue == true);
269         assert(counterObserver.completedCount == 1);
270         assert(counterDisposable.disposeCount == 1);
271     }
272 
273     {
274         auto counterObserver = new CounterObserver;
275         auto counterDisposable = new CounterDisposable;
276         auto observer = TObserver(counterObserver, counterDisposable);
277 
278         .put(observer, 1);
279         observer.completed();
280         assert(counterObserver.putCount == 1);
281         assert(counterObserver.lastValue == false);
282         assert(counterObserver.completedCount == 1);
283         assert(counterDisposable.disposeCount == 1);
284     }
285 }
286 
287 unittest
288 {
289     bool testThrow(int)
290     {
291         throw new Exception("MyException");
292     }
293 
294     alias TObserver = AllObserver!(Observer!bool, int, testThrow);
295 
296     static class CounterObserver : Observer!bool
297     {
298         void put(bool obj)
299         {
300             putCount++;
301             lastValue = obj;
302         }
303 
304         void failure(Exception e)
305         {
306             failureCount++;
307             lastException = e;
308         }
309 
310         void completed()
311         {
312             completedCount++;
313         }
314 
315         size_t putCount = 0;
316         size_t failureCount = 0;
317         size_t completedCount = 0;
318         bool lastValue;
319         Exception lastException;
320     }
321 
322     static class CounterDisposable : Disposable
323     {
324         void dispose()
325         {
326             disposeCount++;
327         }
328 
329         size_t disposeCount = 0;
330     }
331 
332     {
333         auto counterObserver = new CounterObserver;
334         auto counterDisposable = new CounterDisposable;
335         auto observer = TObserver(counterObserver, counterDisposable);
336 
337         .put(observer, 0);
338         observer.completed();
339         assert(counterObserver.putCount == 0);
340         assert(counterObserver.failureCount == 1);
341         assert(counterObserver.completedCount == 0);
342         assert(counterObserver.lastException.msg == "MyException");
343         assert(counterDisposable.disposeCount == 1);
344     }
345 }
346 
347 unittest
348 {
349     import std.array : Appender, appender;
350 
351     alias TObserver = AllObserver!(Appender!(bool[]), int);
352 
353     auto buf = appender!(bool[]);
354     auto observer = TObserver(buf, NopDisposable.instance);
355 
356     .put(observer, 0);
357     observer.completed();
358 
359     assert(buf.data.length == 1);
360     assert(buf.data[0] == true);
361 }
362 
363 struct AllObservable(TObservable, alias pred = "true")
364 {
365     alias ElementType = bool;
366 
367 public:
368     this(TObservable observable)
369     {
370         _observable = observable;
371     }
372 
373 public:
374     Disposable subscribe(TObserver)(auto ref TObserver observer)
375     {
376         alias ObserverType = AllObserver!(TObserver, TObservable.ElementType, pred);
377 
378         auto subscription = new SingleAssignmentDisposable;
379         subscription.setDisposable(disposableObject(_observable.doSubscribe(ObserverType(observer,
380                 subscription))));
381         return subscription;
382     }
383 
384 private:
385     TObservable _observable;
386 }
387 
388 unittest
389 {
390     alias TObservable = AllObservable!(Observable!int);
391 
392     static assert(isObservable!(TObservable, bool));
393 
394     import rx.subject : SubjectObject;
395 
396     auto sub = new SubjectObject!int;
397 
398     import std.array : appender;
399 
400     auto buf = appender!(bool[]);
401 
402     auto observable = TObservable(sub);
403     auto d = observable.subscribe(buf);
404 
405     sub.put(0);
406     sub.completed();
407     assert(buf.data.length == 1);
408     assert(buf.data[0] == true);
409 }
410 
411 ///
412 template all(alias pred = "true")
413 {
414     AllObservable!(TObservable, pred) all(TObservable)(auto ref TObservable observable)
415     {
416         return typeof(return)(observable);
417     }
418 }
419 ///
420 unittest
421 {
422     import rx.subject : SubjectObject;
423 
424     auto sub = new SubjectObject!int;
425 
426     bool result = false;
427     sub.all!"a % 2 == 0"().doSubscribe((bool res) { result = res; });
428 
429     sub.put(0);
430     sub.completed();
431     assert(result);
432 }
433 
434 unittest
435 {
436     import rx.subject : SubjectObject;
437 
438     auto sub = new SubjectObject!int;
439 
440     bool result = false;
441     sub.all!().doSubscribe((bool res) { result = res; });
442 
443     sub.put(0);
444     sub.completed();
445     assert(result);
446 }
447 
448 ///
449 AllObservable!TObservable all(TObservable)(auto ref TObservable observable)
450 {
451     return typeof(return)(observable);
452 }
453 ///
454 unittest
455 {
456     import rx.subject : SubjectObject;
457 
458     auto sub = new SubjectObject!int;
459 
460     bool result = false;
461     sub.all().doSubscribe((bool res) { result = res; });
462 
463     sub.put(0);
464     sub.completed();
465     assert(result);
466 }
467 
468 unittest
469 {
470     import rx.subject : SubjectObject;
471 
472     auto sub = new SubjectObject!int;
473 
474     import std.array : appender;
475 
476     auto buf = appender!(bool[]);
477 
478     auto d = sub.all!(a => a % 2 == 0).doSubscribe(buf);
479 
480     assert(buf.data.length == 0);
481     sub.put(1);
482     assert(buf.data.length == 1);
483     assert(buf.data[0] == false);
484 }
485 
486 unittest
487 {
488     import rx.subject : SubjectObject;
489 
490     auto sub = new SubjectObject!int;
491 
492     bool[] result;
493     auto d = sub.all!(a => a % 2 == 0).doSubscribe!(b => result ~= b);
494 
495     assert(result.length == 0);
496     sub.put(1);
497     assert(result.length == 1);
498     assert(result[0] == false);
499 }