1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Observer.
3  +/
4 module rx.observer;
5 
6 import std.range;
7 import std.typetuple;
8 
9 ///Tests if something has completed method.
10 template hasCompleted(T)
11 {
12     //dfmt off
13     enum bool hasCompleted = is(typeof({
14             T observer = void;
15             observer.completed();
16         }()));
17     //dfmt on
18 }
19 ///
20 unittest
21 {
22     struct A
23     {
24         void completed();
25     }
26 
27     struct B
28     {
29         void _completed();
30     }
31 
32     static assert(hasCompleted!A);
33     static assert(!hasCompleted!B);
34 }
35 
36 ///Tests if something has failure method.
37 template hasFailure(T)
38 {
39     //dfmt off
40     enum bool hasFailure = is(typeof({
41             T observer = void;
42             Exception e = void;
43             observer.failure(e);
44         }()));
45     //dfmt on
46 }
47 ///
48 unittest
49 {
50     struct A
51     {
52         void failure(Exception e);
53     }
54 
55     struct B
56     {
57         void _failure(Exception e);
58     }
59 
60     struct C
61     {
62         void failure();
63     }
64 
65     static assert(hasFailure!A);
66     static assert(!hasFailure!B);
67     static assert(!hasFailure!C);
68 }
69 
70 ///Tests if something is Observer.
71 template isObserver(T, E)
72 {
73     enum bool isObserver = isOutputRange!(T, E) && hasCompleted!T && hasFailure!T;
74 }
75 ///
76 unittest
77 {
78     struct TestObserver
79     {
80         void put(int n)
81         {
82         }
83 
84         void completed()
85         {
86         }
87 
88         void failure(Exception e)
89         {
90         }
91     }
92 
93     static assert(isObserver!(TestObserver, int));
94 }
95 
96 ///Wraps completed and failure method in virtual function.
97 interface Observer(E) : OutputRange!E
98 {
99     ///
100     void completed();
101     ///
102     void failure(Exception e);
103 }
104 ///
105 unittest
106 {
107     alias TObserver = Observer!byte;
108     static assert(isObserver!(TObserver, byte));
109 }
110 
111 ///Class that implements Observer interface and wraps the completed and failure method in virtual functions. This class extends the OutputRangeObject.
112 class ObserverObject(R, E...) : OutputRangeObject!(R, E), staticMap!(Observer, E)
113 {
114 public:
115     this(R range)
116     {
117         super(range);
118         _range = range;
119     }
120 
121 public:
122     ///
123     void completed()
124     {
125         static if (hasCompleted!R)
126         {
127             _range.completed();
128         }
129     }
130     ///
131     void failure(Exception e)
132     {
133         static if (hasFailure!R)
134         {
135             _range.failure(e);
136         }
137     }
138 
139 private:
140     R _range;
141 }
142 
143 ///Wraps subscribe method in virtual function.
144 template observerObject(E)
145 {
146     ObserverObject!(R, E) observerObject(R)(R range)
147     {
148         return new ObserverObject!(R, E)(range);
149     }
150 }
151 ///
152 unittest
153 {
154     struct TestObserver
155     {
156         void put(int n)
157         {
158         }
159 
160         void put(Object obj)
161         {
162         }
163     }
164 
165     Observer!int observer = observerObject!int(TestObserver());
166     observer.put(0);
167     observer.completed();
168     observer.failure(null);
169     static assert(isObserver!(typeof(observer), int));
170 }
171 
172 unittest
173 {
174     int putCount = 0;
175     int completedCount = 0;
176     int failureCount = 0;
177 
178     class TestObserver : Observer!int
179     {
180         void put(int n)
181         {
182             putCount++;
183         }
184 
185         void completed()
186         {
187             completedCount++;
188         }
189 
190         void failure(Exception e)
191         {
192             failureCount++;
193         }
194     }
195 
196     static assert(isObserver!(TestObserver, int));
197 
198     auto test = new TestObserver;
199     Observer!int observer = observerObject!int(test);
200     assert(putCount == 0);
201     observer.put(0);
202     assert(putCount == 1);
203     assert(completedCount == 0);
204     observer.completed();
205     assert(completedCount == 1);
206     assert(failureCount == 0);
207     observer.failure(null);
208     assert(failureCount == 1);
209 }
210 
211 unittest
212 {
213     int putCount = 0;
214     int completedCount = 0;
215     int failureCount = 0;
216 
217     struct TestObserver
218     {
219         void put(int n)
220         {
221             putCount++;
222         }
223 
224         void completed()
225         {
226             completedCount++;
227         }
228 
229         void failure(Exception e)
230         {
231             failureCount++;
232         }
233     }
234 
235     static assert(isObserver!(TestObserver, int));
236 
237     TestObserver test;
238     Observer!int observer = observerObject!int(test);
239     assert(putCount == 0);
240     observer.put(0);
241     assert(putCount == 1);
242     assert(completedCount == 0);
243     observer.completed();
244     assert(completedCount == 1);
245     assert(failureCount == 0);
246     observer.failure(null);
247     assert(failureCount == 1);
248 }
249 
250 unittest
251 {
252     struct TestObserver1
253     {
254         void put(int n)
255         {
256         }
257     }
258 
259     struct TestObserver2
260     {
261         void put(int n)
262         {
263         }
264 
265         void completed()
266         {
267         }
268     }
269 
270     struct TestObserver3
271     {
272         void put(int n)
273         {
274         }
275 
276         void failure(Exception e)
277         {
278         }
279     }
280 
281     struct TestObserver4
282     {
283         void put(int n)
284         {
285         }
286 
287         void completed()
288         {
289         }
290 
291         void failure(Exception e)
292         {
293         }
294     }
295 
296     Observer!int o1 = observerObject!int(TestObserver1());
297     Observer!int o2 = observerObject!int(TestObserver2());
298     Observer!int o3 = observerObject!int(TestObserver3());
299     Observer!int o4 = observerObject!int(TestObserver4());
300 
301     //dfmt off
302     o1.put(0); o1.completed(); o1.failure(null);
303     o2.put(0); o2.completed(); o2.failure(null);
304     o3.put(0); o3.completed(); o3.failure(null);
305     o4.put(0); o4.completed(); o4.failure(null);
306     //dfmt on
307 }
308 
309 ///
310 final class NopObserver(E) : Observer!E
311 {
312 private:
313     this()
314     {
315     }
316 
317 public:
318     void put(E)
319     {
320     }
321 
322     void completed()
323     {
324     }
325 
326     void failure(Exception)
327     {
328     }
329 
330 public:
331     ///
332     static Observer!E instance()
333     {
334         import std.concurrency : initOnce;
335 
336         static __gshared NopObserver!E inst;
337         return initOnce!inst(new NopObserver!E);
338     }
339 }
340 ///
341 unittest
342 {
343     Observer!int o1 = NopObserver!int.instance;
344     Observer!int o2 = NopObserver!int.instance;
345     assert(o1 !is null);
346     assert(o1 is o2);
347 }
348 
349 ///
350 final class DoneObserver(E) : Observer!E
351 {
352 private:
353     this()
354     {
355     }
356 public:
357     this(Exception e)
358     {
359         _exception = e;
360     }
361 
362 public:
363     Exception exception() @property
364     {
365         return _exception;
366     }
367     void exception(Exception e) @property
368     {
369         _exception = e;
370     }
371 
372 public:
373     void put(E)
374     {
375     }
376 
377     void completed()
378     {
379     }
380 
381     void failure(Exception)
382     {
383     }
384 
385 private:
386     Exception _exception;
387 
388 public:
389     static Observer!E instance()
390     {
391         import std.concurrency : initOnce;
392 
393         static __gshared DoneObserver!E inst;
394         return initOnce!inst(new DoneObserver!E);
395     }
396 }
397 ///
398 unittest
399 {
400     Observer!int o1 = DoneObserver!int.instance;
401     Observer!int o2 = DoneObserver!int.instance;
402     assert(o1 !is null);
403     assert(o1 is o2);
404 }
405 
406 unittest
407 {
408     auto e = new Exception("test");
409     auto observer = new DoneObserver!int(e);
410     assert(observer.exception is e);
411 }
412 
413 ///
414 public class CompositeObserver(E) : Observer!E
415 {
416 private:
417     this()
418     {
419     }
420 
421 public:
422     ///
423     this(Observer!E[] observers)
424     {
425         _observers = observers;
426     }
427 
428 public:
429     ///
430     Observer!E[] observers() @property
431     {
432         return _observers;
433     }
434 
435 public:
436     ///
437     void put(E obj)
438     {
439         foreach (observer; _observers)
440             .put(observer, obj);
441     }
442     ///
443     void completed()
444     {
445         foreach (observer; _observers)
446             observer.completed();
447     }
448     ///
449     void failure(Exception e)
450     {
451         foreach (observer; _observers)
452             observer.failure(e);
453     }
454     ///
455     CompositeObserver!E add(Observer!E observer)
456     {
457         return new CompositeObserver!E(_observers ~ observer);
458     }
459     ///
460     Observer!E remove(Observer!E observer)
461     {
462         import std.algorithm : countUntil;
463 
464         auto i = _observers.countUntil(observer);
465         if (i < 0)
466             return this;
467 
468         if (_observers.length == 1)
469             return CompositeObserver!E.empty;
470         if (_observers.length == 2)
471             return _observers[1 - i];
472 
473         return new CompositeObserver!E(_observers[0 .. i] ~ _observers[i + 1 .. $]);
474     }
475     ///
476     CompositeObserver!E removeStrict(Observer!E observer)
477     {
478         import std.algorithm : countUntil;
479 
480         auto i = _observers.countUntil(observer);
481         if (i < 0)
482             return this;
483 
484         if (_observers.length == 1)
485             return CompositeObserver!E.empty;
486         if (_observers.length == 2)
487         {
488             if (i == 0)
489                 return new CompositeObserver!E(_observers[1 .. $]);
490             if (i == 1)
491                 return new CompositeObserver!E(_observers[0 .. 1]);
492         }
493         return new CompositeObserver!E(_observers[0 .. i] ~ _observers[i + 1 .. $]);
494     }
495 
496 public:
497     ///
498     static CompositeObserver!E empty()
499     {
500         import std.concurrency : initOnce;
501 
502         static __gshared CompositeObserver!E inst;
503         return initOnce!inst(new CompositeObserver!E);
504     }
505 
506 private:
507     Observer!E[] _observers;
508 }
509 ///
510 unittest
511 {
512     int count = 0;
513     struct TestObserver
514     {
515         void put(int n)
516         {
517             count++;
518         }
519     }
520 
521     auto c1 = new CompositeObserver!int;
522     c1.put(0);
523     auto o1 = observerObject!int(TestObserver());
524     auto c2 = c1.add(o1);
525     c1.put(0);
526     assert(count == 0);
527     c2.put(0);
528     assert(count == 1);
529     auto c3 = c2.add(observerObject!int(TestObserver()));
530     c3.put(0);
531     assert(count == 3);
532     auto c4 = c3.remove(o1);
533     c4.put(0);
534     assert(count == 4);
535 }
536 unittest
537 {
538     int count = 0;
539     struct TestObserver
540     {
541         void put(int n)
542         {
543             count++;
544         }
545     }
546     auto c1 = new CompositeObserver!(int[]);
547     auto c2 = c1.add(observerObject!(int[])(TestObserver()));
548 
549     assert(count == 0);
550     c2.put([1, 2]);
551     assert(count == 2);
552 }
553 
554 ///The helper for the own observer.
555 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted,
556         void delegate(Exception) doFailure)
557 {
558     static struct AnonymouseObserver
559     {
560     public:
561         this(void delegate(E) doPut, void delegate() doCompleted, void delegate(Exception) doFailure)
562         {
563             _doPut = doPut;
564             _doCompleted = doCompleted;
565             _doFailure = doFailure;
566         }
567 
568     public:
569         void put(E obj)
570         {
571             if (_doPut !is null)
572                 _doPut(obj);
573         }
574 
575         void completed()
576         {
577             if (_doCompleted !is null)
578                 _doCompleted();
579         }
580 
581         void failure(Exception e)
582         {
583             if (_doFailure !is null)
584                 _doFailure(e);
585         }
586 
587     private:
588         void delegate(E) _doPut;
589         void delegate() _doCompleted;
590         void delegate(Exception) _doFailure;
591     }
592 
593     return AnonymouseObserver(doPut, doCompleted, doFailure);
594 }
595 ///ditto
596 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted)
597 {
598     static struct AnonymouseObserver
599     {
600     public:
601         this(void delegate(E) doPut, void delegate() doCompleted)
602         {
603             _doPut = doPut;
604             _doCompleted = doCompleted;
605         }
606 
607     public:
608         void put(E obj)
609         {
610             if (_doPut !is null)
611                 _doPut(obj);
612         }
613 
614         void completed()
615         {
616             if (_doCompleted !is null)
617                 _doCompleted();
618         }
619 
620     private:
621         void delegate(E) _doPut;
622         void delegate() _doCompleted;
623     }
624 
625     return AnonymouseObserver(doPut, doCompleted);
626 }
627 ///ditto
628 auto makeObserver(E)(void delegate(E) doPut, void delegate(Exception) doFailure)
629 {
630     static struct AnonymouseObserver
631     {
632     public:
633         this(void delegate(E) doPut, void delegate(Exception) doFailure)
634         {
635             _doPut = doPut;
636             _doFailure = doFailure;
637         }
638 
639     public:
640         void put(E obj)
641         {
642             if (_doPut !is null)
643                 _doPut(obj);
644         }
645 
646         void failure(Exception e)
647         {
648             if (_doFailure !is null)
649                 _doFailure(e);
650         }
651 
652     private:
653         void delegate(E) _doPut;
654         void delegate(Exception) _doFailure;
655     }
656 
657     return AnonymouseObserver(doPut, doFailure);
658 }
659 ///
660 unittest
661 {
662     int countPut = 0;
663     int countCompleted = 0;
664     int countFailure = 0;
665 
666     auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; }, (Exception) {
667         countFailure++;
668     });
669 
670     .put(observer, 0);
671     assert(countPut == 1);
672 
673     observer.completed();
674     assert(countCompleted == 1);
675 
676     observer.failure(null);
677     assert(countFailure == 1);
678 }
679 
680 unittest
681 {
682     int countPut = 0;
683     int countCompleted = 0;
684 
685     auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; });
686 
687     .put(observer, 0);
688     assert(countPut == 1);
689 
690     observer.completed();
691     assert(countCompleted == 1);
692 
693     static assert(!hasFailure!(typeof(observer)));
694 }
695 
696 unittest
697 {
698     int countPut = 0;
699     int countFailure = 0;
700 
701     auto observer = makeObserver((int) { countPut++; }, (Exception) {
702         countFailure++;
703     });
704 
705     .put(observer, 0);
706     assert(countPut == 1);
707 
708     static assert(!hasCompleted!(typeof(observer)));
709 
710     observer.failure(null);
711     assert(countFailure == 1);
712 }
713 
714 package mixin template SimpleObserverImpl(TObserver, E)
715 {
716 public:
717     void put(E obj)
718     {
719         static if (hasFailure!TObserver)
720         {
721             try
722             {
723                 putImpl(obj);
724             }
725             catch (Exception e)
726             {
727                 _observer.failure(e);
728                 _disposable.dispose();
729             }
730         }
731         else
732         {
733             putImpl(obj);
734         }
735     }
736 
737     static if (hasCompleted!TObserver)
738     {
739         void completed()
740         {
741             _observer.completed();
742             _disposable.dispose();
743         }
744     }
745     static if (hasFailure!TObserver)
746     {
747         void failure(Exception e)
748         {
749             _observer.failure(e);
750             _disposable.dispose();
751         }
752     }
753 private:
754     TObserver _observer;
755     static if (hasCompleted!TObserver || hasFailure!TObserver)
756     {
757         Disposable _disposable;
758     }
759 }