1 /+++++++++++++++++++++++++++++
2  + This module is a submodule of rx.range.
3  + It provides basic operation a 'takeUntil'
4  +/
5 module rx.range.takeUntil;
6 
7 import rx.disposable;
8 import rx.observer;
9 import rx.observable;
10 import rx.util;
11 
12 import std.range : put;
13 
14 //####################
15 // TakeUntil
16 //####################
17 ///
18 auto takeUntil(TObservable1, TObservable2)(auto ref TObservable1 source, auto ref TObservable2 stopper)
19         if (isObservable!TObservable1 && isObservable!TObservable2)
20 {
21     static struct TakeUntilObservable
22     {
23         alias ElementType = TObservable1.ElementType;
24 
25         TObservable1 source;
26         TObservable2 stopper;
27 
28         auto subscribe(TObserver)(auto ref TObserver observer)
29         {
30             auto sourceDisposable = source.doSubscribe(observer);
31             auto stopperDisposable = stopper.doSubscribe((TObservable2.ElementType _) {
32                 static if (hasCompleted!TObserver)
33                 {
34                     observer.completed();
35                 }
36                 sourceDisposable.dispose();
37             });
38 
39             return new CompositeDisposable(sourceDisposable, stopperDisposable);
40         }
41     }
42 
43     return TakeUntilObservable(source, stopper);
44 }
45 
46 ///
47 unittest
48 {
49     import std.algorithm;
50     import rx;
51 
52     auto source = new SubjectObject!int;
53     auto stopper = new SubjectObject!int;
54 
55     int[] buf;
56     auto disposable = source.takeUntil(stopper).doSubscribe!((n) { buf ~= n; });
57 
58     source.put(0);
59     source.put(1);
60     source.put(2);
61 
62     stopper.put(0);
63 
64     source.put(3);
65     source.put(4);
66 
67     assert(equal(buf, [0, 1, 2]));
68 }
69 
70 unittest
71 {
72     import std.algorithm;
73     import rx;
74 
75     auto sub1 = new SubjectObject!int;
76     auto sub2 = new SubjectObject!int;
77 
78     int[] buf;
79     auto disposable = sub1.takeUntil(sub2).subscribe((int n) { buf ~= n; });
80 
81     sub1.put(0);
82 
83     disposable.dispose();
84 
85     sub1.put(1);
86     
87     assert(equal(buf, [0]));
88 }