1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Observable.
3  +/
4 module rx.observable;
5 
6 import std.functional : unaryFun;
7 import std.range : put, isInputRange, isOutputRange, ElementType;
8 
9 import rx.disposable;
10 import rx.observer;
11 import rx.util;
12 
13 ///Tests if something is a Observable.
14 template isObservable(T, E)
15 {
16     enum bool isObservable = is(T.ElementType : E) && is(typeof({
17                 T observable = void;
18                 Observer!E observer = void;
19                 auto d = observable.subscribe(observer);
20                 static assert(isDisposable!(typeof(d)));
21             }()));
22 }
23 ///ditto
24 template isObservable(TObservable)
25 {
26     enum bool isObservable = __traits(compiles, {
27             static assert(isObservable!(TObservable, TObservable.ElementType));
28         });
29 
30 }
31 
32 ///
33 unittest
34 {
35     struct TestObservable
36     {
37         alias ElementType = int;
38 
39         Disposable subscribe(T)(T observer)
40         {
41             static assert(isObserver!(T, int));
42             return null;
43         }
44     }
45 
46     static assert(isObservable!(TestObservable));
47     static assert(isObservable!(TestObservable, int));
48     static assert(!isObservable!(TestObservable, Object));
49 }
50 
51 ///
52 unittest
53 {
54     static assert(isObservable!(Observable!int));
55     static assert(!isObservable!(Observer!int));
56     static assert(!isObservable!(string));
57     static assert(!isObservable!(Object));
58 }
59 
60 ///Test if the observer can subscribe to the observable.
61 template isSubscribable(TObservable, TObserver)
62 {
63     enum bool isSubscribable = is(typeof({
64                 static assert(isOutputRange!(TObserver, TObservable.ElementType));
65 
66                 TObservable observable = void;
67                 TObserver observer = void;
68                 auto d = observable.subscribe(observer);
69                 static assert(isDisposable!(typeof(d)));
70             }()));
71 }
72 ///
73 unittest
74 {
75     struct TestDisposable
76     {
77         void dispose()
78         {
79         }
80     }
81 
82     struct TestObserver
83     {
84         void put(int n)
85         {
86         }
87 
88         void completed()
89         {
90         }
91 
92         void failure(Exception e)
93         {
94         }
95     }
96 
97     struct TestObservable
98     {
99         alias ElementType = int;
100 
101         TestDisposable subscribe(TestObserver observer)
102         {
103             return TestDisposable();
104         }
105     }
106 
107     static assert(isSubscribable!(TestObservable, TestObserver));
108 }
109 
110 ///The helper for subscribe easier.
111 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut,
112         void delegate() doCompleted, void delegate(Exception) doFailure)
113 {
114     return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure));
115 }
116 ///ditto
117 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
118         void delegate(E) doPut, void delegate() doCompleted)
119 {
120     return doSubscribe(observable, makeObserver(doPut, doCompleted));
121 }
122 ///ditto
123 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
124         void delegate(E) doPut, void delegate(Exception) doFailure)
125 {
126     return doSubscribe(observable, makeObserver(doPut, doFailure));
127 }
128 ///ditto
129 auto doSubscribe(alias f, TObservable)(auto ref TObservable observable)
130 {
131     alias fun = unaryFun!f;
132     return doSubscribe(observable, (TObservable.ElementType obj) {
133         static if (__traits(compiles, { fun(obj); }))
134         {
135             fun(obj);
136         }
137         else
138         {
139             struct DoSubscribeObserver
140             {
141                 void put(T)(T obj)
142                 {
143                     fun(obj);
144                 }
145             }
146 
147             DoSubscribeObserver observer;
148             .put(observer, obj);
149         }
150     });
151 }
152 ///ditto
153 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer)
154 {
155     import std.format : format;
156 
157     static assert(isObservable!(TObservable, TObservable.ElementType),
158             format!"%s is invalid as an Observable"(TObservable.stringof));
159 
160     alias ElementType = TObservable.ElementType;
161     static if (isSubscribable!(TObservable, TObserver))
162         return observable.subscribe(observer);
163     else static if (isSubscribable!(TObservable, Observer!ElementType))
164         return observable.subscribe(observerObject!ElementType(observer));
165     else
166     {
167         static assert(false, format!"%s can not subscribe '%s', it published by %s"(
168                 TObserver.stringof, ElementType.stringof, TObservable.stringof));
169     }
170 }
171 ///
172 unittest
173 {
174     struct TestObservable
175     {
176         alias ElementType = int;
177 
178         auto subscribe(TObserver)(TObserver observer)
179         {
180             .put(observer, [0, 1, 2]);
181             return NopDisposable.instance;
182         }
183     }
184 
185     TestObservable observable;
186     int[] result;
187     observable.doSubscribe!(n => result ~= n);
188     assert(result.length == 3);
189 }
190 
191 ///
192 unittest
193 {
194     struct TestObserver
195     {
196         void put(int n)
197         {
198         }
199     }
200 
201     struct TestObservable1
202     {
203         alias ElementType = int;
204         Disposable subscribe(Observer!int observer)
205         {
206             return null;
207         }
208     }
209 
210     struct TestObservable2
211     {
212         alias ElementType = int;
213         Disposable subscribe(T)(T observer)
214         {
215             return null;
216         }
217     }
218 
219     TestObservable1 o1;
220     auto d0 = o1.doSubscribe((int n) {}, () {}, (Exception e) {});
221     auto d1 = o1.doSubscribe((int n) {}, () {});
222     auto d2 = o1.doSubscribe((int n) {}, (Exception e) {});
223     auto d3 = o1.doSubscribe((int n) {});
224     auto d4 = o1.doSubscribe(TestObserver());
225     TestObservable2 o2;
226     auto d5 = o2.doSubscribe((int n) {}, () {}, (Exception e) {});
227     auto d6 = o2.doSubscribe((int n) {}, () {});
228     auto d7 = o2.doSubscribe((int n) {}, (Exception e) {});
229     auto d8 = o2.doSubscribe((int n) {});
230     auto d9 = o2.doSubscribe(TestObserver());
231 }
232 
233 unittest
234 {
235     static struct TestObservable
236     {
237         alias ElementType = int[];
238 
239         Disposable subscribe(TObserver)(TObserver observer)
240         {
241             .put(observer, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
242             return null;
243         }
244     }
245 
246     TestObservable observable;
247 
248     int sum1 = 0;
249 
250     void add(int n)
251     {
252         sum1 += n;
253     }
254 
255     observable.doSubscribe!(add); // by value
256     assert(sum1 == 55);
257 
258     int sum2 = 0;
259     observable.doSubscribe!(n => sum2 += n); // by value
260     assert(sum2 == 55);
261 
262     int size = 0;
263     observable.doSubscribe!(arr => size += arr.length); // by array
264     assert(size == 10);
265 }
266 
267 ///Wrapper for Observable objects.
268 interface Observable(E)
269 {
270     alias ElementType = E;
271 
272     Disposable subscribe(Observer!E observer);
273 }
274 
275 unittest
276 {
277     static assert(isObservable!(Observable!int, int));
278 }
279 
280 unittest
281 {
282     static struct TestNoObservable
283     {
284         Disposable subscribe(Observer!int observer)
285         {
286             return null;
287         }
288     }
289 
290     static assert(!isObservable!(TestNoObservable, int));
291 }
292 
293 ///Class that implements Observable interface and wraps the subscribe method in virtual function.
294 class ObservableObject(R, E) : Observable!E
295 {
296 public:
297     this(R observable)
298     {
299         _observable = observable;
300     }
301 
302 public:
303     Disposable subscribe(Observer!E observer)
304     {
305         return disposableObject(_observable.subscribe(observer));
306     }
307 
308 private:
309     R _observable;
310 }
311 
312 ///Wraps subscribe method in virtual function.
313 template observableObject(E)
314 {
315     Observable!E observableObject(R)(auto ref R observable)
316     {
317         static if (is(R : Observable!E))
318         {
319             return observable;
320         }
321         else
322         {
323             return new ObservableObject!(R, E)(observable);
324         }
325     }
326 }
327 ///
328 unittest
329 {
330     int subscribeCount = 0;
331     class TestObservable : Observable!int
332     {
333         Disposable subscribe(Observer!int observer)
334         {
335             subscribeCount++;
336             return NopDisposable.instance;
337         }
338     }
339 
340     auto test = new TestObservable;
341     auto observable = observableObject!int(test);
342     assert(observable is test);
343     assert(subscribeCount == 0);
344     auto d = observable.subscribe(null);
345     assert(subscribeCount == 1);
346 }
347 
348 unittest
349 {
350     int disposeCount = 0;
351     int subscribeCount = 0;
352 
353     struct TestDisposable
354     {
355         void dispose()
356         {
357             disposeCount++;
358         }
359     }
360 
361     struct TestObservable
362     {
363         TestDisposable subscribe(Observer!int observer)
364         {
365             subscribeCount++;
366             return TestDisposable();
367         }
368     }
369 
370     Observable!int observable = observableObject!int(TestObservable());
371     assert(subscribeCount == 0);
372     Disposable disposable = observable.subscribe(null);
373     assert(subscribeCount == 1);
374     assert(disposeCount == 0);
375     disposable.dispose();
376     assert(disposeCount == 1);
377 }
378 
379 //#########################
380 // Defer
381 //#########################
382 private struct DeferObserver(TObserver, E)
383 {
384 public:
385     this(TObserver observer, EventSignal signal)
386     {
387         _observer = observer;
388         _signal = signal;
389     }
390 
391 public:
392     void put(E obj)
393     {
394         if (_signal.signal)
395             return;
396 
397         static if (hasFailure!TObserver)
398         {
399             try
400             {
401                 .put(_observer, obj);
402             }
403             catch (Exception e)
404             {
405                 _observer.failure(e);
406             }
407         }
408         else
409         {
410             .put(_observer, obj);
411         }
412     }
413 
414     void completed()
415     {
416         if (_signal.signal)
417             return;
418         _signal.setSignal();
419 
420         static if (hasCompleted!TObserver)
421         {
422             _observer.completed();
423         }
424     }
425 
426     void failure(Exception e)
427     {
428         if (_signal.signal)
429             return;
430         _signal.setSignal();
431 
432         static if (hasFailure!TObserver)
433         {
434             _observer.failure(e);
435         }
436     }
437 
438 private:
439     TObserver _observer;
440     EventSignal _signal;
441 }
442 
443 ///Create observable by function that template parameter.
444 auto defer(E, alias f)()
445 {
446     static struct DeferObservable
447     {
448         alias ElementType = E;
449 
450     public:
451         auto subscribe(TObserver)(TObserver observer)
452         {
453             alias ObserverType = DeferObserver!(TObserver, E);
454             alias fun = unaryFun!f;
455             auto d = new SignalDisposable;
456             static if (__traits(compiles, {
457                     auto disposable = fun(ObserverType(observer, d.signal));
458                     static assert(isDisposable!(typeof(disposable)));
459                 }))
460             {
461                 auto subscription = fun(ObserverType(observer, d.signal));
462                 return new CompositeDisposable(subscription, d);
463             }
464             else
465             {
466                 fun(ObserverType(observer, d.signal));
467                 return d;
468             }
469         }
470     }
471 
472     return DeferObservable();
473 }
474 ///
475 unittest
476 {
477     auto sub = defer!(int, (observer) {
478         observer.put(1);
479         observer.put(2);
480         observer.put(3);
481         observer.completed();
482     });
483 
484     int countPut = 0;
485     int countCompleted = 0;
486     struct A
487     {
488         void put(int n)
489         {
490             countPut++;
491         }
492 
493         void completed()
494         {
495             countCompleted++;
496         }
497     }
498 
499     assert(countPut == 0);
500     assert(countCompleted == 0);
501     auto d = sub.doSubscribe(A());
502     assert(countPut == 3);
503     assert(countCompleted == 1);
504 }
505 
506 unittest
507 {
508     auto sub = defer!(int, (observer) {
509         observer.put(0);
510         observer.failure(new Exception(""));
511         observer.put(1);
512     });
513 
514     int countPut = 0;
515     int countFailure = 0;
516     struct A
517     {
518         void put(int n)
519         {
520             countPut++;
521         }
522 
523         void failure(Exception e)
524         {
525             countFailure++;
526         }
527     }
528 
529     assert(countPut == 0);
530     assert(countFailure == 0);
531     auto d = sub.doSubscribe(A());
532     assert(countPut == 1);
533     assert(countFailure == 1);
534 }
535 
536 unittest
537 {
538     auto sub = defer!(int, (observer) {
539         observer.put(0);
540         observer.failure(new Exception(""));
541         observer.put(1);
542     });
543 
544     int countPut = 0;
545     struct A
546     {
547         void put(int n)
548         {
549             countPut++;
550         }
551     }
552 
553     assert(countPut == 0);
554     auto d = sub.doSubscribe(A());
555     assert(countPut == 1);
556 }
557 
558 unittest
559 {
560     Disposable subscribeImpl(Observer!int observer)
561     {
562         .put(observer, 1);
563         return null;
564     }
565 
566     import std.array : appender;
567 
568     auto buf = appender!(int[]);
569 
570     auto put1 = defer!int(&subscribeImpl);
571     auto d = put1.subscribe(buf);
572 
573     assert(buf.data.length == 1);
574     assert(buf.data[0] == 1);
575     assert(d !is null);
576 }
577 
578 unittest
579 {
580     auto sub = defer!(int, (observer) {
581         observer.put(1);
582         return NopDisposable.instance;
583     });
584 
585     auto called = false;
586     sub.doSubscribe((int _) { called = true; });
587     assert(called);
588 }
589 
590 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl)
591 {
592     struct DeferObservable
593     {
594         alias ElementType = E;
595 
596         TSubscribe _subscribeImpl;
597 
598         this(ref TSubscribe subscribeImpl)
599         {
600             _subscribeImpl = subscribeImpl;
601         }
602 
603         auto subscribe(TObserver)(auto ref TObserver observer)
604         {
605             alias ObserverType = DeferObserver!(TObserver, E);
606 
607             auto cancel = new SignalDisposable;
608             static if (__traits(compiles, {
609                     auto d = _subscribeImpl(ObserverType.init);
610                     static assert(isDisposable!(typeof(d)));
611                 }))
612             {
613                 auto subscription = disposableObject(_subscribeImpl(ObserverType(observer,
614                         cancel.signal)));
615                 return new CompositeDisposable(cancel, subscription);
616             }
617             else static if (__traits(compiles, {
618                     _subscribeImpl(ObserverType.init);
619                 }))
620             {
621                 _subscribeImpl(ObserverType(observer, cancel.signal));
622                 return cancel;
623             }
624             else static if (__traits(compiles, {
625                     auto d = _subscribeImpl(Observer!E.init);
626                     static assert(isDisposable!(typeof(d)));
627                 }))
628             {
629                 auto subscription = _subscribeImpl(observerObject!E(ObserverType(observer,
630                         cancel.signal)));
631                 return new CompositeDisposable(cancel, subscription);
632             }
633             else static if (__traits(compiles, {
634                     _subscribeImpl(Observer!E.init);
635                 }))
636             {
637                 _subscribeImpl(observerObject!E(ObserverType(observer, cancel.signal)));
638                 return cancel;
639             }
640             else
641             {
642                 static assert(false);
643             }
644         }
645     }
646 
647     return DeferObservable(subscribeImpl);
648 }
649 
650 unittest
651 {
652     import std.array : appender;
653 
654     auto buf = appender!(int[]);
655 
656     auto put12 = defer!int((Observer!int observer) {
657         .put(observer, 1);
658         .put(observer, 2);
659         return NopDisposable.instance;
660     });
661     auto d = put12.subscribe(buf);
662 
663     assert(buf.data.length == 2);
664     assert(buf.data[0] == 1);
665     assert(buf.data[1] == 2);
666 }
667 
668 unittest
669 {
670     int last = 0;
671     struct TestObserver
672     {
673         void put(int n)
674         {
675             last = n;
676         }
677     }
678 
679     auto disposed1 = false;
680     auto sub1 = defer!int((Observer!int observer) {
681         observer.put(1);
682         return new AnonymousDisposable({ disposed1 = true; });
683     });
684 
685     auto sub2 = defer!int((Observer!int observer) { observer.put(100); });
686 
687     TestObserver observer;
688     last = 0;
689     auto disposable1 = sub1.subscribe(observer);
690     assert(last == 1);
691     assert(!disposed1);
692     disposable1.dispose();
693     assert(disposed1);
694 
695     last = 0;
696     auto disposable2 = sub2.subscribe(observer);
697     assert(last == 100);
698     disposable2.dispose();
699 }
700 
701 unittest
702 {
703     size_t hasBeenCalled1 = 0;
704     struct SubscribeImpl1
705     {
706         Disposable opCall(TObserver)(TObserver observer)
707         {
708             hasBeenCalled1++;
709             return NopDisposable.instance;
710         }
711     }
712 
713     size_t hasBeenCalled2 = 0;
714     struct SubscribeImpl2
715     {
716         void opCall(TObserver)(TObserver observer)
717         {
718             hasBeenCalled2++;
719         }
720     }
721 
722     struct TestObserver
723     {
724         void put(int _)
725         {
726         }
727     }
728 
729     SubscribeImpl1 impl1;
730     auto sub1 = defer!int(impl1);
731 
732     assert(hasBeenCalled1 == 0);
733     auto disposable1 = sub1.subscribe(TestObserver());
734     assert(hasBeenCalled1 == 1);
735     disposable1.dispose();
736 
737     SubscribeImpl2 impl2;
738     auto sub2 = defer!int(impl2);
739 
740     assert(hasBeenCalled2 == 0);
741     auto disposable2 = sub2.subscribe(TestObserver());
742     assert(hasBeenCalled2 == 1);
743     disposable2.dispose();
744 }
745 
746 unittest
747 {
748     class SubscribeImpl1
749     {
750     public:
751         void publish()
752         {
753             .put(_observer, 1);
754         }
755 
756     public:
757         Disposable opCall(TObserver)(TObserver observer)
758         {
759             _observer = observerObject!int(observer);
760             return NopDisposable.instance;
761         }
762 
763     private:
764         Observer!int _observer;
765     }
766 
767     import rx.subject : CounterObserver;
768 
769     auto observer1 = new CounterObserver!int;
770     auto impl1 = new SubscribeImpl1;
771     auto sub1 = defer!int(impl1);
772     auto disposable1 = sub1.subscribe(observer1);
773     disposable1.dispose();
774 
775     assert(observer1.putCount == 0);
776     impl1.publish();
777     assert(observer1.putCount == 0);
778 
779     auto observer2 = new CounterObserver!int;
780     auto impl2 = new SubscribeImpl1;
781     auto sub2 = defer!int(impl2);
782     auto disposable2 = sub2.subscribe(observer2);
783     assert(observer2.putCount == 0);
784     impl2.publish();
785     assert(observer2.putCount == 1);
786     disposable2.dispose();
787     impl2.publish();
788     assert(observer2.putCount == 1);
789 }
790 
791 ///
792 auto empty(E)()
793 {
794     static struct EmptyObservable
795     {
796         alias ElementType = E;
797 
798         Disposable subscribe(TObserver)(auto ref TObserver observer)
799         {
800             static if (hasCompleted!TObserver)
801             {
802                 observer.completed();
803             }
804             return NopDisposable.instance;
805         }
806     }
807 
808     return EmptyObservable();
809 }
810 
811 unittest
812 {
813     auto completed = false;
814     auto o = empty!int();
815 
816     assert(!completed);
817     auto d = o.doSubscribe((int n) {}, () { completed = true; });
818     assert(completed);
819 }
820 
821 ///
822 auto never(E)()
823 {
824     static struct NeverObservable
825     {
826         alias ElementType = E;
827 
828         Disposable subscribe(TObserver)(auto ref TObserver observer)
829         {
830             return NopDisposable.instance;
831         }
832     }
833 
834     return NeverObservable();
835 }
836 
837 unittest
838 {
839     auto o = never!int();
840     auto d = o.doSubscribe((int) {});
841     d.dispose();
842 }
843 
844 ///
845 auto error(E)(auto ref Exception e)
846 {
847     static struct ErrorObservable
848     {
849         alias ElementType = E;
850 
851         Exception _e;
852 
853         this(ref Exception e)
854         {
855             _e = e;
856         }
857 
858         Disposable subscribe(TObserver)(auto ref TObserver observer)
859         {
860             static if (hasFailure!TObserver)
861             {
862                 observer.failure(_e);
863             }
864             return NopDisposable.instance;
865         }
866     }
867 
868     return ErrorObservable(e);
869 }
870 
871 unittest
872 {
873     auto expected = new Exception("TEST");
874     auto o = error!int(expected);
875 
876     Exception actual = null;
877     o.doSubscribe((int n) {}, (Exception e) { actual = e; });
878     assert(actual is expected);
879 }
880 
881 ///
882 auto from(R)(auto ref R input) if (isInputRange!R)
883 {
884     alias E = ElementType!R;
885 
886     static struct FromObservable
887     {
888         alias ElementType = E;
889 
890         this(ref R input)
891         {
892             this.input = input;
893         }
894 
895         Disposable subscribe(TObserver)(auto ref TObserver observer)
896                 if (isOutputRange!(TObserver, ElementType))
897         {
898             .put(observer, input);
899             static if (hasCompleted!TObserver)
900                 observer.completed();
901 
902             return NopDisposable.instance;
903         }
904 
905         R input;
906     }
907 
908     return FromObservable(input);
909 }
910 ///
911 alias asObservable = from;
912 
913 ///
914 unittest
915 {
916     import std.range : iota;
917 
918     auto obs = from(iota(10));
919     auto res = new int[10];
920     auto d = obs.subscribe(res[]);
921     scope (exit)
922         d.dispose();
923 
924     assert(res.length == 10);
925     assert(res[0] == 0);
926     assert(res[9] == 9);
927 }
928 
929 ///
930 unittest
931 {
932     import std.range : iota;
933 
934     auto obs = iota(10).asObservable();
935     auto res = new int[10];
936     auto d = obs.subscribe(res[]);
937     scope (exit)
938         d.dispose();
939 
940     assert(res.length == 10);
941     assert(res[0] == 0);
942     assert(res[9] == 9);
943 }
944 
945 ///
946 unittest
947 {
948     import rx;
949     import std.range : iota;
950 
951     auto observable = iota(10).asObservable();
952     auto observer = new CounterObserver!int;
953 
954     auto disposable = observable.subscribe(observer);
955     scope (exit)
956         disposable.dispose();
957 
958     assert(observer.putCount == 10);
959     assert(observer.completedCount == 1);
960 }