1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Observable.
3  +/
4 module rx.observable;
5 
6 import std.functional : unaryFun;
7 import std.range : put, isInputRange, isOutputRange, ElementType;
8 
9 import rx.disposable;
10 import rx.observer;
11 import rx.util;
12 
13 ///Tests if something is a Observable.
14 template isObservable(T, E)
15 {
16     enum bool isObservable = is(T.ElementType : E) && is(typeof({
17                 T observable = void;
18                 Observer!E observer = void;
19                 auto d = observable.subscribe(observer);
20                 static assert(isDisposable!(typeof(d)));
21             }()));
22 }
23 ///
24 unittest
25 {
26     struct TestObservable
27     {
28         alias ElementType = int;
29 
30         Disposable subscribe(T)(T observer)
31         {
32             static assert(isObserver!(T, int));
33             return null;
34         }
35     }
36 
37     static assert(isObservable!(TestObservable, int));
38     static assert(!isObservable!(TestObservable, Object));
39 }
40 
41 ///Test if the observer can subscribe to the observable.
42 template isSubscribable(TObservable, TObserver)
43 {
44     enum bool isSubscribable = is(typeof({
45                 static assert(isOutputRange!(TObserver, TObservable.ElementType));
46 
47                 TObservable observable = void;
48                 TObserver observer = void;
49                 auto d = observable.subscribe(observer);
50                 static assert(isDisposable!(typeof(d)));
51             }()));
52 }
53 ///
54 unittest
55 {
56     struct TestDisposable
57     {
58         void dispose()
59         {
60         }
61     }
62 
63     struct TestObserver
64     {
65         void put(int n)
66         {
67         }
68 
69         void completed()
70         {
71         }
72 
73         void failure(Exception e)
74         {
75         }
76     }
77 
78     struct TestObservable
79     {
80         alias ElementType = int;
81 
82         TestDisposable subscribe(TestObserver observer)
83         {
84             return TestDisposable();
85         }
86     }
87 
88     static assert(isSubscribable!(TestObservable, TestObserver));
89 }
90 
91 ///The helper for subscribe easier.
92 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut,
93         void delegate() doCompleted, void delegate(Exception) doFailure)
94 {
95     return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure));
96 }
97 ///ditto
98 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
99         void delegate(E) doPut, void delegate() doCompleted)
100 {
101     return doSubscribe(observable, makeObserver(doPut, doCompleted));
102 }
103 ///ditto
104 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
105         void delegate(E) doPut, void delegate(Exception) doFailure)
106 {
107     return doSubscribe(observable, makeObserver(doPut, doFailure));
108 }
109 ///ditto
110 auto doSubscribe(alias f, TObservable)(auto ref TObservable observable)
111 {
112     alias fun = unaryFun!f;
113     return doSubscribe(observable, (TObservable.ElementType obj) {
114         static if (__traits(compiles, { fun(obj); }))
115         {
116             fun(obj);
117         }
118         else
119         {
120             struct DoSubscribeObserver
121             {
122                 void put(T)(T obj)
123                 {
124                     fun(obj);
125                 }
126             }
127 
128             DoSubscribeObserver observer;
129             .put(observer, obj);
130         }
131     });
132 }
133 ///ditto
134 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer)
135 {
136     import std.format : format;
137 
138     static assert(isObservable!(TObservable, TObservable.ElementType),
139             format!"%s is invalid as an Observable"(TObservable.stringof));
140 
141     alias ElementType = TObservable.ElementType;
142     static if (isSubscribable!(TObservable, TObserver))
143         return observable.subscribe(observer);
144     else static if (isSubscribable!(TObservable, Observer!ElementType))
145         return observable.subscribe(observerObject!ElementType(observer));
146     else
147     {
148         static assert(false, format!"%s can not subscribe '%s', it published by %s"(
149                 TObserver.stringof, ElementType.stringof, TObservable.stringof));
150     }
151 }
152 ///
153 unittest
154 {
155     struct TestObservable
156     {
157         alias ElementType = int;
158 
159         auto subscribe(TObserver)(TObserver observer)
160         {
161             .put(observer, [0, 1, 2]);
162             return NopDisposable.instance;
163         }
164     }
165 
166     TestObservable observable;
167     int[] result;
168     observable.doSubscribe!(n => result ~= n);
169     assert(result.length == 3);
170 }
171 
172 ///
173 unittest
174 {
175     struct TestObserver
176     {
177         void put(int n)
178         {
179         }
180     }
181 
182     struct TestObservable1
183     {
184         alias ElementType = int;
185         Disposable subscribe(Observer!int observer)
186         {
187             return null;
188         }
189     }
190 
191     struct TestObservable2
192     {
193         alias ElementType = int;
194         Disposable subscribe(T)(T observer)
195         {
196             return null;
197         }
198     }
199 
200     TestObservable1 o1;
201     auto d0 = o1.doSubscribe((int n) {  }, () {  }, (Exception e) {  });
202     auto d1 = o1.doSubscribe((int n) {  }, () {  });
203     auto d2 = o1.doSubscribe((int n) {  }, (Exception e) {  });
204     auto d3 = o1.doSubscribe((int n) {  });
205     auto d4 = o1.doSubscribe(TestObserver());
206     TestObservable2 o2;
207     auto d5 = o2.doSubscribe((int n) {  }, () {  }, (Exception e) {  });
208     auto d6 = o2.doSubscribe((int n) {  }, () {  });
209     auto d7 = o2.doSubscribe((int n) {  }, (Exception e) {  });
210     auto d8 = o2.doSubscribe((int n) {  });
211     auto d9 = o2.doSubscribe(TestObserver());
212 }
213 
214 unittest
215 {
216     static struct TestObservable
217     {
218         alias ElementType = int[];
219 
220         Disposable subscribe(TObserver)(TObserver observer)
221         {
222             .put(observer, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
223             return null;
224         }
225     }
226 
227     TestObservable observable;
228 
229     int sum1 = 0;
230 
231     void add(int n)
232     {
233         sum1 += n;
234     }
235 
236     observable.doSubscribe!(add); // by value
237     assert(sum1 == 55);
238 
239     int sum2 = 0;
240     observable.doSubscribe!(n => sum2 += n); // by value
241     assert(sum2 == 55);
242 
243     int size = 0;
244     observable.doSubscribe!(arr => size += arr.length); // by array
245     assert(size == 10);
246 }
247 
248 ///Wrapper for Observable objects.
249 interface Observable(E)
250 {
251     alias ElementType = E;
252 
253     Disposable subscribe(Observer!E observer);
254 }
255 
256 unittest
257 {
258     static assert(isObservable!(Observable!int, int));
259 }
260 
261 unittest
262 {
263     static struct TestNoObservable
264     {
265         Disposable subscribe(Observer!int observer)
266         {
267             return null;
268         }
269     }
270 
271     static assert(!isObservable!(TestNoObservable, int));
272 }
273 
274 ///Class that implements Observable interface and wraps the subscribe method in virtual function.
275 class ObservableObject(R, E) : Observable!E
276 {
277 public:
278     this(R observable)
279     {
280         _observable = observable;
281     }
282 
283 public:
284     Disposable subscribe(Observer!E observer)
285     {
286         return disposableObject(_observable.subscribe(observer));
287     }
288 
289 private:
290     R _observable;
291 }
292 
293 ///Wraps subscribe method in virtual function.
294 template observableObject(E)
295 {
296     Observable!E observableObject(R)(auto ref R observable)
297     {
298         static if (is(R : Observable!E))
299         {
300             return observable;
301         }
302         else
303         {
304             return new ObservableObject!(R, E)(observable);
305         }
306     }
307 }
308 ///
309 unittest
310 {
311     int subscribeCount = 0;
312     class TestObservable : Observable!int
313     {
314         Disposable subscribe(Observer!int observer)
315         {
316             subscribeCount++;
317             return NopDisposable.instance;
318         }
319     }
320 
321     auto test = new TestObservable;
322     auto observable = observableObject!int(test);
323     assert(observable is test);
324     assert(subscribeCount == 0);
325     auto d = observable.subscribe(null);
326     assert(subscribeCount == 1);
327 }
328 
329 unittest
330 {
331     int disposeCount = 0;
332     int subscribeCount = 0;
333 
334     struct TestDisposable
335     {
336         void dispose()
337         {
338             disposeCount++;
339         }
340     }
341 
342     struct TestObservable
343     {
344         TestDisposable subscribe(Observer!int observer)
345         {
346             subscribeCount++;
347             return TestDisposable();
348         }
349     }
350 
351     Observable!int observable = observableObject!int(TestObservable());
352     assert(subscribeCount == 0);
353     Disposable disposable = observable.subscribe(null);
354     assert(subscribeCount == 1);
355     assert(disposeCount == 0);
356     disposable.dispose();
357     assert(disposeCount == 1);
358 }
359 
360 //#########################
361 // Defer
362 //#########################
363 ///Create observable by function that template parameter.
364 auto defer(E, alias f)()
365 {
366     static struct DeferObservable
367     {
368         alias ElementType = E;
369 
370     public:
371         auto subscribe(TObserver)(TObserver observer)
372         {
373             static struct DeferObserver
374             {
375             public:
376                 this(TObserver observer, EventSignal signal)
377                 {
378                     _observer = observer;
379                     _signal = signal;
380                 }
381 
382             public:
383                 void put(E obj)
384                 {
385                     if (_signal.signal)
386                         return;
387 
388                     static if (hasFailure!TObserver)
389                     {
390                         try
391                         {
392                             .put(_observer, obj);
393                         }
394                         catch (Exception e)
395                         {
396                             _observer.failure(e);
397                         }
398                     }
399                     else
400                     {
401                         .put(_observer, obj);
402                     }
403                 }
404 
405                 void completed()
406                 {
407                     if (_signal.signal)
408                         return;
409                     _signal.setSignal();
410 
411                     static if (hasCompleted!TObserver)
412                     {
413                         _observer.completed();
414                     }
415                 }
416 
417                 void failure(Exception e)
418                 {
419                     if (_signal.signal)
420                         return;
421                     _signal.setSignal();
422 
423                     static if (hasFailure!TObserver)
424                     {
425                         _observer.failure(e);
426                     }
427                 }
428 
429             private:
430                 TObserver _observer;
431                 EventSignal _signal;
432             }
433 
434             alias fun = unaryFun!f;
435             auto d = new SignalDisposable;
436             fun(DeferObserver(observer, d.signal));
437             return d;
438         }
439     }
440 
441     return DeferObservable();
442 }
443 ///
444 unittest
445 {
446     auto sub = defer!(int, (observer) {
447         observer.put(1);
448         observer.put(2);
449         observer.put(3);
450         observer.completed();
451     });
452 
453     int countPut = 0;
454     int countCompleted = 0;
455     struct A
456     {
457         void put(int n)
458         {
459             countPut++;
460         }
461 
462         void completed()
463         {
464             countCompleted++;
465         }
466     }
467 
468     assert(countPut == 0);
469     assert(countCompleted == 0);
470     auto d = sub.doSubscribe(A());
471     assert(countPut == 3);
472     assert(countCompleted == 1);
473 }
474 
475 unittest
476 {
477     auto sub = defer!(int, (observer) {
478         observer.put(0);
479         observer.failure(new Exception(""));
480         observer.put(1);
481     });
482 
483     int countPut = 0;
484     int countFailure = 0;
485     struct A
486     {
487         void put(int n)
488         {
489             countPut++;
490         }
491 
492         void failure(Exception e)
493         {
494             countFailure++;
495         }
496     }
497 
498     assert(countPut == 0);
499     assert(countFailure == 0);
500     auto d = sub.doSubscribe(A());
501     assert(countPut == 1);
502     assert(countFailure == 1);
503 }
504 
505 unittest
506 {
507     auto sub = defer!(int, (observer) {
508         observer.put(0);
509         observer.failure(new Exception(""));
510         observer.put(1);
511     });
512 
513     int countPut = 0;
514     struct A
515     {
516         void put(int n)
517         {
518             countPut++;
519         }
520     }
521 
522     assert(countPut == 0);
523     auto d = sub.doSubscribe(A());
524     assert(countPut == 1);
525 }
526 
527 unittest
528 {
529     Disposable subscribeImpl(Observer!int observer)
530     {
531         .put(observer, 1);
532         return null;
533     }
534 
535     import std.array : appender;
536 
537     auto buf = appender!(int[]);
538 
539     auto put1 = defer!int(&subscribeImpl);
540     auto d = put1.doSubscribe(buf);
541 
542     assert(buf.data.length == 1);
543     assert(buf.data[0] == 1);
544     assert(d is null);
545 }
546 
547 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl)
548 {
549     struct DeferObservable
550     {
551         alias ElementType = E;
552 
553         TSubscribe _subscribeImpl;
554 
555         this(ref TSubscribe subscribeImpl)
556         {
557             _subscribeImpl = subscribeImpl;
558         }
559 
560         auto subscribe(TObserver)(auto ref TObserver observer)
561         {
562             return _subscribeImpl(observer);
563         }
564     }
565 
566     return DeferObservable(subscribeImpl);
567 }
568 
569 unittest
570 {
571     import std.array : appender;
572 
573     auto buf = appender!(int[]);
574 
575     auto put12 = defer!int((Observer!int observer) {
576         .put(observer, 1);
577         .put(observer, 2);
578         return NopDisposable.instance;
579     });
580     auto d = put12.doSubscribe(buf);
581 
582     assert(buf.data.length == 2);
583     assert(buf.data[0] == 1);
584     assert(buf.data[1] == 2);
585 }
586 
587 auto empty(E)()
588 {
589     static struct EmptyObservable
590     {
591         alias ElementType = E;
592 
593         Disposable subscribe(TObserver)(auto ref TObserver observer)
594         {
595             static if (hasCompleted!TObserver)
596             {
597                 observer.completed();
598             }
599             return NopDisposable.instance;
600         }
601     }
602 
603     return EmptyObservable();
604 }
605 
606 unittest
607 {
608     auto completed = false;
609     auto o = empty!int();
610 
611     assert(!completed);
612     auto d = o.doSubscribe((int n) {  }, () { completed = true; });
613     assert(completed);
614 }
615 
616 auto never(E)()
617 {
618     static struct NeverObservable
619     {
620         alias ElementType = E;
621 
622         Disposable subscribe(TObserver)(auto ref TObserver observer)
623         {
624             return NopDisposable.instance;
625         }
626     }
627 
628     return NeverObservable();
629 }
630 
631 unittest
632 {
633     auto o = never!int();
634     auto d = o.doSubscribe((int) {  });
635     d.dispose();
636 }
637 
638 auto error(E)(auto ref Exception e)
639 {
640     static struct ErrorObservable
641     {
642         alias ElementType = E;
643 
644         Exception _e;
645 
646         this(ref Exception e)
647         {
648             _e = e;
649         }
650 
651         Disposable subscribe(TObserver)(auto ref TObserver observer)
652         {
653             static if (hasFailure!TObserver)
654             {
655                 observer.failure(_e);
656             }
657             return NopDisposable.instance;
658         }
659     }
660 
661     return ErrorObservable(e);
662 }
663 
664 unittest
665 {
666     auto expected = new Exception("TEST");
667     auto o = error!int(expected);
668 
669     Exception actual = null;
670     o.doSubscribe((int n) {  }, (Exception e) { actual = e; });
671     assert(actual is expected);
672 }
673 
674 ///
675 auto from(R)(auto ref R input) if (isInputRange!R)
676 {
677     alias E = ElementType!R;
678 
679     static struct FromObservable
680     {
681         alias ElementType = E;
682 
683         this(ref R input)
684         {
685             this.input = input;
686         }
687 
688         Disposable subscribe(TObserver)(auto ref TObserver observer)
689                 if (isOutputRange!(TObserver, ElementType))
690         {
691             .put(observer, input);
692             return NopDisposable.instance;
693         }
694 
695         R input;
696     }
697 
698     return FromObservable(input);
699 }
700 ///
701 alias asObservable = from;
702 
703 ///
704 unittest
705 {
706     import std.range : iota;
707 
708     auto obs = from(iota(10));
709     auto res = new int[10];
710     auto d = obs.subscribe(res[]);
711     scope (exit)
712         d.dispose();
713 
714     assert(res.length == 10);
715     assert(res[0] == 0);
716     assert(res[9] == 9);
717 }
718 
719 ///
720 unittest
721 {
722     import std.range : iota;
723 
724     auto obs = iota(10).asObservable();
725     auto res = new int[10];
726     auto d = obs.subscribe(res[]);
727     scope (exit)
728         d.dispose();
729 
730     assert(res.length == 10);
731     assert(res[0] == 0);
732     assert(res[9] == 9);
733 }