1 /+++++++++++++++++++++++++++++
2  + This module is a submodule of rx.range.
3  + It provides basic operation a 'takeLast'
4  +/
5 module rx.range.takeLast;
6 
7 import rx.disposable;
8 import rx.observer;
9 import rx.observable;
10 import rx.util;
11 
12 import std.range : put;
13 
14 //####################
15 // TakeLast
16 //####################
17 ///Creates a observable that take only a last element of the given source.
18 auto takeLast(TObservable)(auto ref TObservable observable)
19 {
20     static struct TakeLastObservable
21     {
22     public:
23         alias ElementType = TObservable.ElementType;
24 
25     public:
26         this(ref TObservable observable)
27         {
28             _observable = observable;
29         }
30 
31     public:
32         auto subscribe(TObserver)(auto ref TObserver observer)
33         {
34             static class TakeLastObserver
35             {
36             public:
37                 this(ref TObserver observer, SingleAssignmentDisposable disposable)
38                 {
39                     _observer = observer;
40                     _disposable = disposable;
41                 }
42 
43             public:
44                 void put(ElementType obj)
45                 {
46                     _current = obj;
47                     _hasValue = true;
48                 }
49 
50                 void completed()
51                 {
52                     if (_hasValue)
53                         .put(_observer, _current);
54 
55                     static if (hasCompleted!TObserver)
56                     {
57                         _observer.completed();
58                     }
59                     _disposable.dispose();
60                 }
61 
62                 static if (hasFailure!TObserver)
63                 {
64                     void failure(Exception e)
65                     {
66                         _observer.failure(e);
67                     }
68                 }
69 
70             private:
71                 bool _hasValue = false;
72                 ElementType _current;
73                 TObserver _observer;
74                 SingleAssignmentDisposable _disposable;
75             }
76 
77             auto d = new SingleAssignmentDisposable;
78             d.setDisposable(disposableObject(doSubscribe(_observable,
79                     new TakeLastObserver(observer, d))));
80             return d;
81         }
82 
83     private:
84         TObservable _observable;
85     }
86 
87     return TakeLastObservable(observable);
88 }
89 ///
90 unittest
91 {
92     import rx.subject;
93 
94     auto sub = new SubjectObject!int;
95 
96     int putCount = 0;
97     int completedCount = 0;
98     struct TestObserver
99     {
100         void put(int n)
101         {
102             putCount++;
103         }
104 
105         void completed()
106         {
107             completedCount++;
108         }
109     }
110 
111     auto d = sub.takeLast.subscribe(TestObserver());
112 
113     assert(putCount == 0);
114     sub.put(1);
115     assert(putCount == 0);
116     sub.put(10);
117     assert(putCount == 0);
118     sub.completed();
119     assert(putCount == 1);
120     assert(completedCount == 1);
121 
122     sub.put(100);
123     assert(putCount == 1);
124     assert(completedCount == 1);
125 }
126 
127 unittest
128 {
129     import rx.subject : SubjectObject;
130 
131     auto sub = new SubjectObject!(int[]);
132 
133     int count = 0;
134     auto d = sub.takeLast.subscribe((int) { count++; });
135     scope(exit) d.dispose();
136 
137     assert(count == 0);
138     sub.put([0]);
139     assert(count == 0);
140     sub.put([1, 2]);
141     assert(count == 0);
142     sub.completed();
143     assert(count == 2);
144 }
145 
146 unittest
147 {
148     import rx : SubjectObject, merge;
149 
150     auto source1 = new SubjectObject!int;
151     auto source2 = new SubjectObject!int;
152 
153     auto source = merge(source1, source2).takeLast();
154     int[] result;
155     source.doSubscribe!(n => result ~= n);
156 
157     .put(source1, 0);
158     .put(source2, 1);
159     source1.completed();
160     
161     assert(result.length == 0);
162 
163     .put(source2, 2);
164     source2.completed();
165 
166     assert(result.length == 1);
167     assert(result[0] == 2);
168 }