1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Observer.
3  +/
4 module rx.observer;
5 
6 import std.range;
7 import std.typetuple;
8 
9 ///Tests if something has completed method.
10 template hasCompleted(T)
11 {
12     //dfmt off
13     enum bool hasCompleted = is(typeof({
14             T observer = void;
15             observer.completed();
16         }()));
17     //dfmt on
18 }
19 ///
20 unittest
21 {
22     struct A
23     {
24         void completed();
25     }
26 
27     struct B
28     {
29         void _completed();
30     }
31 
32     static assert(hasCompleted!A);
33     static assert(!hasCompleted!B);
34 }
35 
36 ///Tests if something has failure method.
37 template hasFailure(T)
38 {
39     //dfmt off
40     enum bool hasFailure = is(typeof({
41             T observer = void;
42             Exception e = void;
43             observer.failure(e);
44         }()));
45     //dfmt on
46 }
47 ///
48 unittest
49 {
50     struct A
51     {
52         void failure(Exception e);
53     }
54 
55     struct B
56     {
57         void _failure(Exception e);
58     }
59 
60     struct C
61     {
62         void failure();
63     }
64 
65     static assert(hasFailure!A);
66     static assert(!hasFailure!B);
67     static assert(!hasFailure!C);
68 }
69 
70 ///Tests if something is Observer.
71 template isObserver(T, E)
72 {
73     enum bool isObserver = isOutputRange!(T, E) && hasCompleted!T && hasFailure!T;
74 }
75 ///
76 unittest
77 {
78     struct TestObserver
79     {
80         void put(int n)
81         {
82         }
83 
84         void completed()
85         {
86         }
87 
88         void failure(Exception e)
89         {
90         }
91     }
92 
93     static assert(isObserver!(TestObserver, int));
94 }
95 
96 ///Wraps completed and failure method in virtual function.
97 interface Observer(E) : OutputRange!E
98 {
99     ///
100     void completed();
101     ///
102     void failure(Exception e);
103 }
104 ///
105 unittest
106 {
107     alias TObserver = Observer!byte;
108     static assert(isObserver!(TObserver, byte));
109 }
110 
111 ///Class that implements Observer interface and wraps the completed and failure method in virtual functions. This class extends the OutputRangeObject.
112 class ObserverObject(R, E...) : OutputRangeObject!(R, E), staticMap!(Observer, E)
113 {
114 public:
115     this(R range)
116     {
117         super(range);
118         _range = range;
119     }
120 
121 public:
122     ///
123     void completed()
124     {
125         static if (hasCompleted!R)
126         {
127             _range.completed();
128         }
129     }
130     ///
131     void failure(Exception e)
132     {
133         static if (hasFailure!R)
134         {
135             _range.failure(e);
136         }
137     }
138 
139 private:
140     R _range;
141 }
142 
143 ///Wraps subscribe method in virtual function.
144 template observerObject(E)
145 {
146     ObserverObject!(R, E) observerObject(R)(R range)
147     {
148         return new ObserverObject!(R, E)(range);
149     }
150 }
151 ///
152 unittest
153 {
154     struct TestObserver
155     {
156         void put(int n)
157         {
158         }
159 
160         void put(Object obj)
161         {
162         }
163     }
164 
165     Observer!int observer = observerObject!int(TestObserver());
166     observer.put(0);
167     observer.completed();
168     observer.failure(null);
169     static assert(isObserver!(typeof(observer), int));
170 }
171 
172 unittest
173 {
174     int putCount = 0;
175     int completedCount = 0;
176     int failureCount = 0;
177 
178     class TestObserver : Observer!int
179     {
180         void put(int n)
181         {
182             putCount++;
183         }
184 
185         void completed()
186         {
187             completedCount++;
188         }
189 
190         void failure(Exception e)
191         {
192             failureCount++;
193         }
194     }
195 
196     static assert(isObserver!(TestObserver, int));
197 
198     auto test = new TestObserver;
199     Observer!int observer = observerObject!int(test);
200     assert(putCount == 0);
201     observer.put(0);
202     assert(putCount == 1);
203     assert(completedCount == 0);
204     observer.completed();
205     assert(completedCount == 1);
206     assert(failureCount == 0);
207     observer.failure(null);
208     assert(failureCount == 1);
209 }
210 
211 unittest
212 {
213     int putCount = 0;
214     int completedCount = 0;
215     int failureCount = 0;
216 
217     struct TestObserver
218     {
219         void put(int n)
220         {
221             putCount++;
222         }
223 
224         void completed()
225         {
226             completedCount++;
227         }
228 
229         void failure(Exception e)
230         {
231             failureCount++;
232         }
233     }
234 
235     static assert(isObserver!(TestObserver, int));
236 
237     TestObserver test;
238     Observer!int observer = observerObject!int(test);
239     assert(putCount == 0);
240     observer.put(0);
241     assert(putCount == 1);
242     assert(completedCount == 0);
243     observer.completed();
244     assert(completedCount == 1);
245     assert(failureCount == 0);
246     observer.failure(null);
247     assert(failureCount == 1);
248 }
249 
250 unittest
251 {
252     struct TestObserver1
253     {
254         void put(int n)
255         {
256         }
257     }
258 
259     struct TestObserver2
260     {
261         void put(int n)
262         {
263         }
264 
265         void completed()
266         {
267         }
268     }
269 
270     struct TestObserver3
271     {
272         void put(int n)
273         {
274         }
275 
276         void failure(Exception e)
277         {
278         }
279     }
280 
281     struct TestObserver4
282     {
283         void put(int n)
284         {
285         }
286 
287         void completed()
288         {
289         }
290 
291         void failure(Exception e)
292         {
293         }
294     }
295 
296     Observer!int o1 = observerObject!int(TestObserver1());
297     Observer!int o2 = observerObject!int(TestObserver2());
298     Observer!int o3 = observerObject!int(TestObserver3());
299     Observer!int o4 = observerObject!int(TestObserver4());
300 
301     //dfmt off
302     o1.put(0); o1.completed(); o1.failure(null);
303     o2.put(0); o2.completed(); o2.failure(null);
304     o3.put(0); o3.completed(); o3.failure(null);
305     o4.put(0); o4.completed(); o4.failure(null);
306     //dfmt on
307 }
308 
309 ///
310 final class NopObserver(E) : Observer!E
311 {
312 private:
313     this()
314     {
315     }
316 
317 public:
318     void put(E)
319     {
320     }
321 
322     void completed()
323     {
324     }
325 
326     void failure(Exception)
327     {
328     }
329 
330 public:
331     ///
332     static Observer!E instance()
333     {
334         import std.concurrency : initOnce;
335 
336         static __gshared NopObserver!E inst;
337         return initOnce!inst(new NopObserver!E);
338     }
339 }
340 ///
341 unittest
342 {
343     Observer!int o1 = NopObserver!int.instance;
344     Observer!int o2 = NopObserver!int.instance;
345     assert(o1 !is null);
346     assert(o1 is o2);
347 }
348 
349 ///
350 final class DoneObserver(E) : Observer!E
351 {
352 private:
353     this()
354     {
355     }
356 public:
357     this(Exception e)
358     {
359         _exception = e;
360     }
361 
362 public:
363     Exception exception() @property
364     {
365         return _exception;
366     }
367     void exception(Exception e) @property
368     {
369         _exception = e;
370     }
371 
372 public:
373     void put(E)
374     {
375     }
376 
377     void completed()
378     {
379     }
380 
381     void failure(Exception)
382     {
383     }
384 
385 private:
386     Exception _exception;
387 
388 public:
389     static Observer!E instance()
390     {
391         import std.concurrency : initOnce;
392 
393         static __gshared DoneObserver!E inst;
394         return initOnce!inst(new DoneObserver!E);
395     }
396 }
397 ///
398 unittest
399 {
400     Observer!int o1 = DoneObserver!int.instance;
401     Observer!int o2 = DoneObserver!int.instance;
402     assert(o1 !is null);
403     assert(o1 is o2);
404 }
405 
406 unittest
407 {
408     auto e = new Exception("test");
409     auto observer = new DoneObserver!int(e);
410     assert(observer.exception is e);
411 }
412 
413 ///
414 public class CompositeObserver(E) : Observer!E
415 {
416 private:
417     this()
418     {
419     }
420 
421 public:
422     this(Observer!E[] observers)
423     {
424         _observers = observers;
425     }
426 
427 public:
428     ///
429     void put(E obj)
430     {
431         foreach (observer; _observers)
432             .put(observer, obj);
433     }
434     ///
435     void completed()
436     {
437         foreach (observer; _observers)
438             observer.completed();
439     }
440     ///
441     void failure(Exception e)
442     {
443         foreach (observer; _observers)
444             observer.failure(e);
445     }
446     ///
447     CompositeObserver!E add(Observer!E observer)
448     {
449         return new CompositeObserver!E(_observers ~ observer);
450     }
451     ///
452     Observer!E remove(Observer!E observer)
453     {
454         import std.algorithm : countUntil;
455 
456         auto i = _observers.countUntil(observer);
457         if (i < 0)
458             return this;
459 
460         if (_observers.length == 1)
461             return CompositeObserver!E.empty;
462         if (_observers.length == 2)
463             return _observers[1 - i];
464 
465         return new CompositeObserver!E(_observers[0 .. i] ~ _observers[i + 1 .. $]);
466     }
467 
468 public:
469     ///
470     static CompositeObserver!E empty()
471     {
472         import std.concurrency : initOnce;
473 
474         static __gshared CompositeObserver!E inst;
475         return initOnce!inst(new CompositeObserver!E);
476     }
477 
478 private:
479     Observer!E[] _observers;
480 }
481 ///
482 unittest
483 {
484     int count = 0;
485     struct TestObserver
486     {
487         void put(int n)
488         {
489             count++;
490         }
491     }
492 
493     auto c1 = new CompositeObserver!int;
494     c1.put(0);
495     auto o1 = observerObject!int(TestObserver());
496     auto c2 = c1.add(o1);
497     c1.put(0);
498     assert(count == 0);
499     c2.put(0);
500     assert(count == 1);
501     auto c3 = c2.add(observerObject!int(TestObserver()));
502     c3.put(0);
503     assert(count == 3);
504     auto c4 = c3.remove(o1);
505     c4.put(0);
506     assert(count == 4);
507 }
508 unittest
509 {
510     int count = 0;
511     struct TestObserver
512     {
513         void put(int n)
514         {
515             count++;
516         }
517     }
518     auto c1 = new CompositeObserver!(int[]);
519     auto c2 = c1.add(observerObject!(int[])(TestObserver()));
520 
521     assert(count == 0);
522     c2.put([1, 2]);
523     assert(count == 2);
524 }
525 
526 ///The helper for the own observer.
527 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted,
528         void delegate(Exception) doFailure)
529 {
530     static struct AnonymouseObserver
531     {
532     public:
533         this(void delegate(E) doPut, void delegate() doCompleted, void delegate(Exception) doFailure)
534         {
535             _doPut = doPut;
536             _doCompleted = doCompleted;
537             _doFailure = doFailure;
538         }
539 
540     public:
541         void put(E obj)
542         {
543             if (_doPut !is null)
544                 _doPut(obj);
545         }
546 
547         void completed()
548         {
549             if (_doCompleted !is null)
550                 _doCompleted();
551         }
552 
553         void failure(Exception e)
554         {
555             if (_doFailure !is null)
556                 _doFailure(e);
557         }
558 
559     private:
560         void delegate(E) _doPut;
561         void delegate() _doCompleted;
562         void delegate(Exception) _doFailure;
563     }
564 
565     return AnonymouseObserver(doPut, doCompleted, doFailure);
566 }
567 ///ditto
568 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted)
569 {
570     static struct AnonymouseObserver
571     {
572     public:
573         this(void delegate(E) doPut, void delegate() doCompleted)
574         {
575             _doPut = doPut;
576             _doCompleted = doCompleted;
577         }
578 
579     public:
580         void put(E obj)
581         {
582             if (_doPut !is null)
583                 _doPut(obj);
584         }
585 
586         void completed()
587         {
588             if (_doCompleted !is null)
589                 _doCompleted();
590         }
591 
592     private:
593         void delegate(E) _doPut;
594         void delegate() _doCompleted;
595     }
596 
597     return AnonymouseObserver(doPut, doCompleted);
598 }
599 ///ditto
600 auto makeObserver(E)(void delegate(E) doPut, void delegate(Exception) doFailure)
601 {
602     static struct AnonymouseObserver
603     {
604     public:
605         this(void delegate(E) doPut, void delegate(Exception) doFailure)
606         {
607             _doPut = doPut;
608             _doFailure = doFailure;
609         }
610 
611     public:
612         void put(E obj)
613         {
614             if (_doPut !is null)
615                 _doPut(obj);
616         }
617 
618         void failure(Exception e)
619         {
620             if (_doFailure !is null)
621                 _doFailure(e);
622         }
623 
624     private:
625         void delegate(E) _doPut;
626         void delegate(Exception) _doFailure;
627     }
628 
629     return AnonymouseObserver(doPut, doFailure);
630 }
631 ///
632 unittest
633 {
634     int countPut = 0;
635     int countCompleted = 0;
636     int countFailure = 0;
637 
638     auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; }, (Exception) {
639         countFailure++;
640     });
641 
642     .put(observer, 0);
643     assert(countPut == 1);
644 
645     observer.completed();
646     assert(countCompleted == 1);
647 
648     observer.failure(null);
649     assert(countFailure == 1);
650 }
651 
652 unittest
653 {
654     int countPut = 0;
655     int countCompleted = 0;
656 
657     auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; });
658 
659     .put(observer, 0);
660     assert(countPut == 1);
661 
662     observer.completed();
663     assert(countCompleted == 1);
664 
665     static assert(!hasFailure!(typeof(observer)));
666 }
667 
668 unittest
669 {
670     int countPut = 0;
671     int countFailure = 0;
672 
673     auto observer = makeObserver((int) { countPut++; }, (Exception) {
674         countFailure++;
675     });
676 
677     .put(observer, 0);
678     assert(countPut == 1);
679 
680     static assert(!hasCompleted!(typeof(observer)));
681 
682     observer.failure(null);
683     assert(countFailure == 1);
684 }
685 
686 package mixin template SimpleObserverImpl(TObserver, E)
687 {
688 public:
689     void put(E obj)
690     {
691         static if (hasFailure!TObserver)
692         {
693             try
694             {
695                 putImpl(obj);
696             }
697             catch (Exception e)
698             {
699                 _observer.failure(e);
700                 _disposable.dispose();
701             }
702         }
703         else
704         {
705             putImpl(obj);
706         }
707     }
708 
709     static if (hasCompleted!TObserver)
710     {
711         void completed()
712         {
713             _observer.completed();
714             _disposable.dispose();
715         }
716     }
717     static if (hasFailure!TObserver)
718     {
719         void failure(Exception e)
720         {
721             _observer.failure(e);
722             _disposable.dispose();
723         }
724     }
725 private:
726     TObserver _observer;
727     static if (hasCompleted!TObserver || hasFailure!TObserver)
728     {
729         Disposable _disposable;
730     }
731 }