1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Observable.
3  +/
4 module rx.observable;
5 
6 import std.functional : unaryFun;
7 import std.range : put, isInputRange, isOutputRange, ElementType;
8 
9 import rx.disposable;
10 import rx.observer;
11 import rx.util;
12 
13 ///Tests if something is a Observable.
14 template isObservable(T, E)
15 {
16     enum bool isObservable = is(T.ElementType : E) && is(typeof({
17                 T observable = void;
18                 Observer!E observer = void;
19                 auto d = observable.subscribe(observer);
20                 static assert(isDisposable!(typeof(d)));
21             }()));
22 }
23 ///
24 unittest
25 {
26     struct TestObservable
27     {
28         alias ElementType = int;
29 
30         Disposable subscribe(T)(T observer)
31         {
32             static assert(isObserver!(T, int));
33             return null;
34         }
35     }
36 
37     static assert(isObservable!(TestObservable, int));
38     static assert(!isObservable!(TestObservable, Object));
39 }
40 
41 ///Test if the observer can subscribe to the observable.
42 template isSubscribable(TObservable, TObserver)
43 {
44     enum bool isSubscribable = is(typeof({
45                 TObservable observable = void;
46                 TObserver observer = void;
47                 auto d = observable.subscribe(observer);
48                 static assert(isDisposable!(typeof(d)));
49             }()));
50 }
51 ///
52 unittest
53 {
54     struct TestDisposable
55     {
56         void dispose()
57         {
58         }
59     }
60 
61     struct TestObserver
62     {
63         void put(int n)
64         {
65         }
66 
67         void completed()
68         {
69         }
70 
71         void failure(Exception e)
72         {
73         }
74     }
75 
76     struct TestObservable
77     {
78         TestDisposable subscribe(TestObserver observer)
79         {
80             return TestDisposable();
81         }
82     }
83 
84     static assert(isSubscribable!(TestObservable, TestObserver));
85 }
86 
87 ///The helper for subscribe easier.
88 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut,
89         void delegate() doCompleted, void delegate(Exception) doFailure)
90 {
91     return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure));
92 }
93 ///ditto
94 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
95         void delegate(E) doPut, void delegate() doCompleted)
96 {
97     return doSubscribe(observable, makeObserver(doPut, doCompleted));
98 }
99 ///ditto
100 auto doSubscribe(TObservable, E)(auto ref TObservable observable,
101         void delegate(E) doPut, void delegate(Exception) doFailure)
102 {
103     return doSubscribe(observable, makeObserver(doPut, doFailure));
104 }
105 ///ditto
106 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer)
107 {
108     alias ElementType = TObservable.ElementType;
109     static if (isSubscribable!(TObservable, TObserver))
110         return observable.subscribe(observer);
111     else static if (isSubscribable!(TObservable, Observer!ElementType))
112         return observable.subscribe(observerObject!ElementType(observer));
113     else
114         static assert(false);
115 }
116 ///
117 unittest
118 {
119     struct TestObserver
120     {
121         void put(int n)
122         {
123         }
124     }
125 
126     struct TestObservable1
127     {
128         alias ElementType = int;
129         Disposable subscribe(Observer!int observer)
130         {
131             return null;
132         }
133     }
134 
135     struct TestObservable2
136     {
137         alias ElementType = int;
138         Disposable subscribe(T)(T observer)
139         {
140             return null;
141         }
142     }
143 
144     TestObservable1 o1;
145     auto d0 = o1.doSubscribe((int n) {  }, () {  }, (Exception e) {  });
146     auto d1 = o1.doSubscribe((int n) {  }, () {  });
147     auto d2 = o1.doSubscribe((int n) {  }, (Exception e) {  });
148     auto d3 = o1.doSubscribe((int n) {  });
149     auto d4 = o1.doSubscribe(TestObserver());
150     TestObservable2 o2;
151     auto d5 = o2.doSubscribe((int n) {  }, () {  }, (Exception e) {  });
152     auto d6 = o2.doSubscribe((int n) {  }, () {  });
153     auto d7 = o2.doSubscribe((int n) {  }, (Exception e) {  });
154     auto d8 = o2.doSubscribe((int n) {  });
155     auto d9 = o2.doSubscribe(TestObserver());
156 }
157 
158 ///Wrapper for Observable objects.
159 interface Observable(E)
160 {
161     alias ElementType = E;
162 
163     Disposable subscribe(Observer!E observer);
164 }
165 
166 unittest
167 {
168     static assert(isObservable!(Observable!int, int));
169 }
170 
171 ///Class that implements Observable interface and wraps the subscribe method in virtual function.
172 class ObservableObject(R, E) : Observable!E
173 {
174 public:
175     this(R observable)
176     {
177         _observable = observable;
178     }
179 
180 public:
181     Disposable subscribe(Observer!E observer)
182     {
183         return disposableObject(_observable.subscribe(observer));
184     }
185 
186 private:
187     R _observable;
188 }
189 
190 ///Wraps subscribe method in virtual function.
191 template observableObject(E)
192 {
193     Observable!E observableObject(R)(auto ref R observable)
194     {
195         static if (is(R : Observable!E))
196         {
197             return observable;
198         }
199         else
200         {
201             return new ObservableObject!(R, E)(observable);
202         }
203     }
204 }
205 ///
206 unittest
207 {
208     int subscribeCount = 0;
209     class TestObservable : Observable!int
210     {
211         Disposable subscribe(Observer!int observer)
212         {
213             subscribeCount++;
214             return NopDisposable.instance;
215         }
216     }
217 
218     auto test = new TestObservable;
219     auto observable = observableObject!int(test);
220     assert(observable is test);
221     assert(subscribeCount == 0);
222     auto d = observable.subscribe(null);
223     assert(subscribeCount == 1);
224 }
225 
226 unittest
227 {
228     int disposeCount = 0;
229     int subscribeCount = 0;
230 
231     struct TestDisposable
232     {
233         void dispose()
234         {
235             disposeCount++;
236         }
237     }
238 
239     struct TestObservable
240     {
241         TestDisposable subscribe(Observer!int observer)
242         {
243             subscribeCount++;
244             return TestDisposable();
245         }
246     }
247 
248     Observable!int observable = observableObject!int(TestObservable());
249     assert(subscribeCount == 0);
250     Disposable disposable = observable.subscribe(null);
251     assert(subscribeCount == 1);
252     assert(disposeCount == 0);
253     disposable.dispose();
254     assert(disposeCount == 1);
255 }
256 
257 //#########################
258 // Defer
259 //#########################
260 ///Create observable by function that template parameter.
261 auto defer(E, alias f)()
262 {
263     static struct DeferObservable
264     {
265         alias ElementType = E;
266 
267     public:
268         auto subscribe(TObserver)(TObserver observer)
269         {
270             static struct DeferObserver
271             {
272             public:
273                 this(TObserver observer, EventSignal signal)
274                 {
275                     _observer = observer;
276                     _signal = signal;
277                 }
278 
279             public:
280                 void put(E obj)
281                 {
282                     if (_signal.signal)
283                         return;
284 
285                     static if (hasFailure!TObserver)
286                     {
287                         try
288                         {
289                             .put(_observer, obj);
290                         }
291                         catch (Exception e)
292                         {
293                             _observer.failure(e);
294                         }
295                     }
296                     else
297                     {
298                         .put(_observer, obj);
299                     }
300                 }
301 
302                 void completed()
303                 {
304                     if (_signal.signal)
305                         return;
306                     _signal.setSignal();
307 
308                     static if (hasCompleted!TObserver)
309                     {
310                         _observer.completed();
311                     }
312                 }
313 
314                 void failure(Exception e)
315                 {
316                     if (_signal.signal)
317                         return;
318                     _signal.setSignal();
319 
320                     static if (hasFailure!TObserver)
321                     {
322                         _observer.failure(e);
323                     }
324                 }
325 
326             private:
327                 TObserver _observer;
328                 EventSignal _signal;
329             }
330 
331             alias fun = unaryFun!f;
332             auto d = new SignalDisposable;
333             fun(DeferObserver(observer, d.signal));
334             return d;
335         }
336     }
337 
338     return DeferObservable();
339 }
340 ///
341 unittest
342 {
343     auto sub = defer!(int, (observer) {
344         observer.put(1);
345         observer.put(2);
346         observer.put(3);
347         observer.completed();
348     });
349 
350     int countPut = 0;
351     int countCompleted = 0;
352     struct A
353     {
354         void put(int n)
355         {
356             countPut++;
357         }
358 
359         void completed()
360         {
361             countCompleted++;
362         }
363     }
364 
365     assert(countPut == 0);
366     assert(countCompleted == 0);
367     auto d = sub.doSubscribe(A());
368     assert(countPut == 3);
369     assert(countCompleted == 1);
370 }
371 
372 unittest
373 {
374     auto sub = defer!(int, (observer) {
375         observer.put(0);
376         observer.failure(new Exception(""));
377         observer.put(1);
378     });
379 
380     int countPut = 0;
381     int countFailure = 0;
382     struct A
383     {
384         void put(int n)
385         {
386             countPut++;
387         }
388 
389         void failure(Exception e)
390         {
391             countFailure++;
392         }
393     }
394 
395     assert(countPut == 0);
396     assert(countFailure == 0);
397     auto d = sub.doSubscribe(A());
398     assert(countPut == 1);
399     assert(countFailure == 1);
400 }
401 
402 unittest
403 {
404     auto sub = defer!(int, (observer) {
405         observer.put(0);
406         observer.failure(new Exception(""));
407         observer.put(1);
408     });
409 
410     int countPut = 0;
411     struct A
412     {
413         void put(int n)
414         {
415             countPut++;
416         }
417     }
418 
419     assert(countPut == 0);
420     auto d = sub.doSubscribe(A());
421     assert(countPut == 1);
422 }
423 
424 unittest
425 {
426     Disposable subscribeImpl(Observer!int observer)
427     {
428         .put(observer, 1);
429         return null;
430     }
431 
432     import std.array : appender;
433 
434     auto buf = appender!(int[]);
435 
436     auto put1 = defer!int(&subscribeImpl);
437     auto d = put1.doSubscribe(buf);
438 
439     assert(buf.data.length == 1);
440     assert(buf.data[0] == 1);
441     assert(d is null);
442 }
443 
444 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl)
445 {
446     struct DeferObservable
447     {
448         alias ElementType = E;
449 
450         TSubscribe _subscribeImpl;
451 
452         this(ref TSubscribe subscribeImpl)
453         {
454             _subscribeImpl = subscribeImpl;
455         }
456 
457         auto subscribe(TObserver)(auto ref TObserver observer)
458         {
459             return _subscribeImpl(observer);
460         }
461     }
462 
463     return DeferObservable(subscribeImpl);
464 }
465 
466 unittest
467 {
468     import std.array : appender;
469 
470     auto buf = appender!(int[]);
471 
472     auto put12 = defer!int((Observer!int observer) {
473         .put(observer, 1);
474         .put(observer, 2);
475         return NopDisposable.instance;
476     });
477     auto d = put12.doSubscribe(buf);
478 
479     assert(buf.data.length == 2);
480     assert(buf.data[0] == 1);
481     assert(buf.data[1] == 2);
482 }
483 
484 auto empty(E)()
485 {
486     static struct EmptyObservable
487     {
488         alias ElementType = E;
489 
490         Disposable subscribe(TObserver)(auto ref TObserver observer)
491         {
492             static if (hasCompleted!TObserver)
493             {
494                 observer.completed();
495             }
496             return NopDisposable.instance;
497         }
498     }
499 
500     return EmptyObservable();
501 }
502 
503 unittest
504 {
505     auto completed = false;
506     auto o = empty!int();
507 
508     assert(!completed);
509     auto d = o.doSubscribe((int n) {  }, () { completed = true; });
510     assert(completed);
511 }
512 
513 auto never(E)()
514 {
515     static struct NeverObservable
516     {
517         alias ElementType = E;
518 
519         Disposable subscribe(TObserver)(auto ref TObserver observer)
520         {
521             return NopDisposable.instance;
522         }
523     }
524 
525     return NeverObservable();
526 }
527 
528 unittest
529 {
530     auto o = never!int();
531     auto d = o.doSubscribe((int) {  });
532     d.dispose();
533 }
534 
535 auto error(E)(auto ref Exception e)
536 {
537     static struct ErrorObservable
538     {
539         alias ElementType = E;
540 
541         Exception _e;
542 
543         this(ref Exception e)
544         {
545             _e = e;
546         }
547 
548         Disposable subscribe(TObserver)(auto ref TObserver observer)
549         {
550             static if (hasFailure!TObserver)
551             {
552                 observer.failure(_e);
553             }
554             return NopDisposable.instance;
555         }
556     }
557 
558     return ErrorObservable(e);
559 }
560 
561 unittest
562 {
563     auto expected = new Exception("TEST");
564     auto o = error!int(expected);
565 
566     Exception actual = null;
567     o.doSubscribe((int n) {  }, (Exception e) { actual = e; });
568     assert(actual is expected);
569 }
570 
571 ///
572 auto from(R)(auto ref R input) if (isInputRange!R)
573 {
574     alias E = ElementType!R;
575 
576     static struct FromObservable
577     {
578         alias ElementType = E;
579 
580         this(ref R input)
581         {
582             this.input = input;
583         }
584 
585         Disposable subscribe(TObserver)(auto ref TObserver observer)
586                 if (isOutputRange!(TObserver, ElementType))
587         {
588             .put(observer, input);
589             return NopDisposable.instance;
590         }
591 
592         R input;
593     }
594 
595     return FromObservable(input);
596 }
597 ///
598 alias asObservable = from;
599 
600 ///
601 unittest
602 {
603     import std.range : iota;
604 
605     auto obs = from(iota(10));
606     auto res = new int[10];
607     auto d = obs.subscribe(res[]);
608     scope (exit)
609         d.dispose();
610 
611     assert(res.length == 10);
612     assert(res[0] == 0);
613     assert(res[9] == 9);
614 }
615 
616 ///
617 unittest
618 {
619     import std.range : iota;
620 
621     auto obs = iota(10).asObservable();
622     auto res = new int[10];
623     auto d = obs.subscribe(res[]);
624     scope (exit)
625         d.dispose();
626 
627     assert(res.length == 10);
628     assert(res[0] == 0);
629     assert(res[9] == 9);
630 }