1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Observable.
3  +/
4 module rx.observable;
5 
6 import std.functional : unaryFun;
7 import std.range : put, isInputRange, isOutputRange, ElementType;
8 
9 import rx.disposable;
10 import rx.observer;
11 import rx.util;
12 
13 ///Tests if something is a Observable.
14 template isObservable(T, E)
15 {
16     enum bool isObservable = is(T.ElementType : E) && is(typeof({
17                 T observable = void;
18                 Observer!E observer = void;
19                 auto d = observable.subscribe(observer);
20                 static assert(isDisposable!(typeof(d)));
21             }()));
22 }
23 ///
24 unittest
25 {
26     struct TestObservable
27     {
28         alias ElementType = int;
29 
30         Disposable subscribe(T)(T observer)
31         {
32             static assert(isObserver!(T, int));
33             return null;
34         }
35     }
36 
37     static assert(isObservable!(TestObservable, int));
38     static assert(!isObservable!(TestObservable, Object));
39 }
40 
41 ///Test if the observer can subscribe to the observable.
42 template isSubscribable(TObservable, TObserver)
43 {
44     enum bool isSubscribable = is(typeof({
45                 TObservable observable = void;
46                 TObserver observer = void;
47                 auto d = observable.subscribe(observer);
48                 static assert(isDisposable!(typeof(d)));
49             }()));
50 }
51 ///
52 unittest
53 {
54     struct TestDisposable
55     {
56         void dispose()
57         {
58         }
59     }
60 
61     struct TestObserver
62     {
63         void put(int n)
64         {
65         }
66 
67         void completed()
68         {
69         }
70 
71         void failure(Exception e)
72         {
73         }
74     }
75 
76     struct TestObservable
77     {
78         TestDisposable subscribe(TestObserver observer)
79         {
80             return TestDisposable();
81         }
82     }
83 
84     static assert(isSubscribable!(TestObservable, TestObserver));
85 }
86 
87 ///The helper for subscribe easier.
88 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut,
89         void delegate() doCompleted, void delegate(Exception) doFailure)
90 {
91     return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure));
92 }
93 ///ditto
94 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
95         void delegate(E) doPut, void delegate() doCompleted)
96 {
97     return doSubscribe(observable, makeObserver(doPut, doCompleted));
98 }
99 ///ditto
100 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
101         void delegate(E) doPut, void delegate(Exception) doFailure)
102 {
103     return doSubscribe(observable, makeObserver(doPut, doFailure));
104 }
105 ///ditto
106 auto doSubscribe(alias f, TObservable)(auto ref TObservable observable)
107 {
108     alias fun = unaryFun!f;
109     return doSubscribe(observable, (TObservable.ElementType obj) { fun(obj); });
110 }
111 ///ditto
112 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer)
113 {
114     import std.format : format;
115 
116     static assert(isObservable!(TObservable, TObservable.ElementType),
117             format!"%s is invalid as an Observable"(TObservable.stringof));
118 
119     alias ElementType = TObservable.ElementType;
120     static if (isSubscribable!(TObservable, TObserver))
121         return observable.subscribe(observer);
122     else static if (isSubscribable!(TObservable, Observer!ElementType))
123         return observable.subscribe(observerObject!ElementType(observer));
124     else
125     {
126         static assert(false, format!"%s can not subscribe '%s', it published by %s"(
127                 TObserver.stringof, ElementType.stringof, TObservable.stringof));
128     }
129 }
130 ///
131 unittest
132 {
133     struct TestObservable
134     {
135         alias ElementType = int;
136 
137         auto subscribe(TObserver)(TObserver observer)
138         {
139             .put(observer, [0, 1, 2]);
140             return NopDisposable.instance;
141         }
142     }
143 
144     TestObservable observable;
145     int[] result;
146     observable.doSubscribe!(n => result ~= n);
147     assert(result.length == 3);
148 }
149 
150 ///
151 unittest
152 {
153     struct TestObserver
154     {
155         void put(int n)
156         {
157         }
158     }
159 
160     struct TestObservable1
161     {
162         alias ElementType = int;
163         Disposable subscribe(Observer!int observer)
164         {
165             return null;
166         }
167     }
168 
169     struct TestObservable2
170     {
171         alias ElementType = int;
172         Disposable subscribe(T)(T observer)
173         {
174             return null;
175         }
176     }
177 
178     TestObservable1 o1;
179     auto d0 = o1.doSubscribe((int n) {  }, () {  }, (Exception e) {  });
180     auto d1 = o1.doSubscribe((int n) {  }, () {  });
181     auto d2 = o1.doSubscribe((int n) {  }, (Exception e) {  });
182     auto d3 = o1.doSubscribe((int n) {  });
183     auto d4 = o1.doSubscribe(TestObserver());
184     TestObservable2 o2;
185     auto d5 = o2.doSubscribe((int n) {  }, () {  }, (Exception e) {  });
186     auto d6 = o2.doSubscribe((int n) {  }, () {  });
187     auto d7 = o2.doSubscribe((int n) {  }, (Exception e) {  });
188     auto d8 = o2.doSubscribe((int n) {  });
189     auto d9 = o2.doSubscribe(TestObserver());
190 }
191 
192 ///Wrapper for Observable objects.
193 interface Observable(E)
194 {
195     alias ElementType = E;
196 
197     Disposable subscribe(Observer!E observer);
198 }
199 
200 unittest
201 {
202     static assert(isObservable!(Observable!int, int));
203 }
204 
205 unittest
206 {
207     static struct TestNoObservable
208     {
209         Disposable subscribe(Observer!int observer)
210         {
211             return null;
212         }
213     }
214 
215     static assert(!isObservable!(TestNoObservable, int));
216 }
217 
218 ///Class that implements Observable interface and wraps the subscribe method in virtual function.
219 class ObservableObject(R, E) : Observable!E
220 {
221 public:
222     this(R observable)
223     {
224         _observable = observable;
225     }
226 
227 public:
228     Disposable subscribe(Observer!E observer)
229     {
230         return disposableObject(_observable.subscribe(observer));
231     }
232 
233 private:
234     R _observable;
235 }
236 
237 ///Wraps subscribe method in virtual function.
238 template observableObject(E)
239 {
240     Observable!E observableObject(R)(auto ref R observable)
241     {
242         static if (is(R : Observable!E))
243         {
244             return observable;
245         }
246         else
247         {
248             return new ObservableObject!(R, E)(observable);
249         }
250     }
251 }
252 ///
253 unittest
254 {
255     int subscribeCount = 0;
256     class TestObservable : Observable!int
257     {
258         Disposable subscribe(Observer!int observer)
259         {
260             subscribeCount++;
261             return NopDisposable.instance;
262         }
263     }
264 
265     auto test = new TestObservable;
266     auto observable = observableObject!int(test);
267     assert(observable is test);
268     assert(subscribeCount == 0);
269     auto d = observable.subscribe(null);
270     assert(subscribeCount == 1);
271 }
272 
273 unittest
274 {
275     int disposeCount = 0;
276     int subscribeCount = 0;
277 
278     struct TestDisposable
279     {
280         void dispose()
281         {
282             disposeCount++;
283         }
284     }
285 
286     struct TestObservable
287     {
288         TestDisposable subscribe(Observer!int observer)
289         {
290             subscribeCount++;
291             return TestDisposable();
292         }
293     }
294 
295     Observable!int observable = observableObject!int(TestObservable());
296     assert(subscribeCount == 0);
297     Disposable disposable = observable.subscribe(null);
298     assert(subscribeCount == 1);
299     assert(disposeCount == 0);
300     disposable.dispose();
301     assert(disposeCount == 1);
302 }
303 
304 //#########################
305 // Defer
306 //#########################
307 ///Create observable by function that template parameter.
308 auto defer(E, alias f)()
309 {
310     static struct DeferObservable
311     {
312         alias ElementType = E;
313 
314     public:
315         auto subscribe(TObserver)(TObserver observer)
316         {
317             static struct DeferObserver
318             {
319             public:
320                 this(TObserver observer, EventSignal signal)
321                 {
322                     _observer = observer;
323                     _signal = signal;
324                 }
325 
326             public:
327                 void put(E obj)
328                 {
329                     if (_signal.signal)
330                         return;
331 
332                     static if (hasFailure!TObserver)
333                     {
334                         try
335                         {
336                             .put(_observer, obj);
337                         }
338                         catch (Exception e)
339                         {
340                             _observer.failure(e);
341                         }
342                     }
343                     else
344                     {
345                         .put(_observer, obj);
346                     }
347                 }
348 
349                 void completed()
350                 {
351                     if (_signal.signal)
352                         return;
353                     _signal.setSignal();
354 
355                     static if (hasCompleted!TObserver)
356                     {
357                         _observer.completed();
358                     }
359                 }
360 
361                 void failure(Exception e)
362                 {
363                     if (_signal.signal)
364                         return;
365                     _signal.setSignal();
366 
367                     static if (hasFailure!TObserver)
368                     {
369                         _observer.failure(e);
370                     }
371                 }
372 
373             private:
374                 TObserver _observer;
375                 EventSignal _signal;
376             }
377 
378             alias fun = unaryFun!f;
379             auto d = new SignalDisposable;
380             fun(DeferObserver(observer, d.signal));
381             return d;
382         }
383     }
384 
385     return DeferObservable();
386 }
387 ///
388 unittest
389 {
390     auto sub = defer!(int, (observer) {
391         observer.put(1);
392         observer.put(2);
393         observer.put(3);
394         observer.completed();
395     });
396 
397     int countPut = 0;
398     int countCompleted = 0;
399     struct A
400     {
401         void put(int n)
402         {
403             countPut++;
404         }
405 
406         void completed()
407         {
408             countCompleted++;
409         }
410     }
411 
412     assert(countPut == 0);
413     assert(countCompleted == 0);
414     auto d = sub.doSubscribe(A());
415     assert(countPut == 3);
416     assert(countCompleted == 1);
417 }
418 
419 unittest
420 {
421     auto sub = defer!(int, (observer) {
422         observer.put(0);
423         observer.failure(new Exception(""));
424         observer.put(1);
425     });
426 
427     int countPut = 0;
428     int countFailure = 0;
429     struct A
430     {
431         void put(int n)
432         {
433             countPut++;
434         }
435 
436         void failure(Exception e)
437         {
438             countFailure++;
439         }
440     }
441 
442     assert(countPut == 0);
443     assert(countFailure == 0);
444     auto d = sub.doSubscribe(A());
445     assert(countPut == 1);
446     assert(countFailure == 1);
447 }
448 
449 unittest
450 {
451     auto sub = defer!(int, (observer) {
452         observer.put(0);
453         observer.failure(new Exception(""));
454         observer.put(1);
455     });
456 
457     int countPut = 0;
458     struct A
459     {
460         void put(int n)
461         {
462             countPut++;
463         }
464     }
465 
466     assert(countPut == 0);
467     auto d = sub.doSubscribe(A());
468     assert(countPut == 1);
469 }
470 
471 unittest
472 {
473     Disposable subscribeImpl(Observer!int observer)
474     {
475         .put(observer, 1);
476         return null;
477     }
478 
479     import std.array : appender;
480 
481     auto buf = appender!(int[]);
482 
483     auto put1 = defer!int(&subscribeImpl);
484     auto d = put1.doSubscribe(buf);
485 
486     assert(buf.data.length == 1);
487     assert(buf.data[0] == 1);
488     assert(d is null);
489 }
490 
491 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl)
492 {
493     struct DeferObservable
494     {
495         alias ElementType = E;
496 
497         TSubscribe _subscribeImpl;
498 
499         this(ref TSubscribe subscribeImpl)
500         {
501             _subscribeImpl = subscribeImpl;
502         }
503 
504         auto subscribe(TObserver)(auto ref TObserver observer)
505         {
506             return _subscribeImpl(observer);
507         }
508     }
509 
510     return DeferObservable(subscribeImpl);
511 }
512 
513 unittest
514 {
515     import std.array : appender;
516 
517     auto buf = appender!(int[]);
518 
519     auto put12 = defer!int((Observer!int observer) {
520         .put(observer, 1);
521         .put(observer, 2);
522         return NopDisposable.instance;
523     });
524     auto d = put12.doSubscribe(buf);
525 
526     assert(buf.data.length == 2);
527     assert(buf.data[0] == 1);
528     assert(buf.data[1] == 2);
529 }
530 
531 auto empty(E)()
532 {
533     static struct EmptyObservable
534     {
535         alias ElementType = E;
536 
537         Disposable subscribe(TObserver)(auto ref TObserver observer)
538         {
539             static if (hasCompleted!TObserver)
540             {
541                 observer.completed();
542             }
543             return NopDisposable.instance;
544         }
545     }
546 
547     return EmptyObservable();
548 }
549 
550 unittest
551 {
552     auto completed = false;
553     auto o = empty!int();
554 
555     assert(!completed);
556     auto d = o.doSubscribe((int n) {  }, () { completed = true; });
557     assert(completed);
558 }
559 
560 auto never(E)()
561 {
562     static struct NeverObservable
563     {
564         alias ElementType = E;
565 
566         Disposable subscribe(TObserver)(auto ref TObserver observer)
567         {
568             return NopDisposable.instance;
569         }
570     }
571 
572     return NeverObservable();
573 }
574 
575 unittest
576 {
577     auto o = never!int();
578     auto d = o.doSubscribe((int) {  });
579     d.dispose();
580 }
581 
582 auto error(E)(auto ref Exception e)
583 {
584     static struct ErrorObservable
585     {
586         alias ElementType = E;
587 
588         Exception _e;
589 
590         this(ref Exception e)
591         {
592             _e = e;
593         }
594 
595         Disposable subscribe(TObserver)(auto ref TObserver observer)
596         {
597             static if (hasFailure!TObserver)
598             {
599                 observer.failure(_e);
600             }
601             return NopDisposable.instance;
602         }
603     }
604 
605     return ErrorObservable(e);
606 }
607 
608 unittest
609 {
610     auto expected = new Exception("TEST");
611     auto o = error!int(expected);
612 
613     Exception actual = null;
614     o.doSubscribe((int n) {  }, (Exception e) { actual = e; });
615     assert(actual is expected);
616 }
617 
618 ///
619 auto from(R)(auto ref R input) if (isInputRange!R)
620 {
621     alias E = ElementType!R;
622 
623     static struct FromObservable
624     {
625         alias ElementType = E;
626 
627         this(ref R input)
628         {
629             this.input = input;
630         }
631 
632         Disposable subscribe(TObserver)(auto ref TObserver observer)
633                 if (isOutputRange!(TObserver, ElementType))
634         {
635             .put(observer, input);
636             return NopDisposable.instance;
637         }
638 
639         R input;
640     }
641 
642     return FromObservable(input);
643 }
644 ///
645 alias asObservable = from;
646 
647 ///
648 unittest
649 {
650     import std.range : iota;
651 
652     auto obs = from(iota(10));
653     auto res = new int[10];
654     auto d = obs.subscribe(res[]);
655     scope (exit)
656         d.dispose();
657 
658     assert(res.length == 10);
659     assert(res[0] == 0);
660     assert(res[9] == 9);
661 }
662 
663 ///
664 unittest
665 {
666     import std.range : iota;
667 
668     auto obs = iota(10).asObservable();
669     auto res = new int[10];
670     auto d = obs.subscribe(res[]);
671     scope (exit)
672         d.dispose();
673 
674     assert(res.length == 10);
675     assert(res[0] == 0);
676     assert(res[9] == 9);
677 }