1 module rx.range.takeLast;
2 
3 import rx.disposable;
4 import rx.observer;
5 import rx.observable;
6 import rx.util;
7 
8 import std.range : put;
9 
10 //####################
11 // TakeLast
12 //####################
13 ///Creates a observable that take only a last element of the given source.
14 auto takeLast(TObservable)(auto ref TObservable observable)
15 {
16     static struct TakeLastObservable
17     {
18     public:
19         alias ElementType = TObservable.ElementType;
20 
21     public:
22         this(ref TObservable observable)
23         {
24             _observable = observable;
25         }
26 
27     public:
28         auto subscribe(TObserver)(auto ref TObserver observer)
29         {
30             static class TakeLastObserver
31             {
32             public:
33                 this(ref TObserver observer, SingleAssignmentDisposable disposable)
34                 {
35                     _observer = observer;
36                     _disposable = disposable;
37                 }
38 
39             public:
40                 void put(ElementType obj)
41                 {
42                     _current = obj;
43                     _hasValue = true;
44                 }
45 
46                 void completed()
47                 {
48                     if (_hasValue)
49                         .put(_observer, _current);
50 
51                     static if (hasCompleted!TObserver)
52                     {
53                         _observer.completed();
54                     }
55                     _disposable.dispose();
56                 }
57 
58                 static if (hasFailure!TObserver)
59                 {
60                     void failure(Exception e)
61                     {
62                         _observer.failure(e);
63                     }
64                 }
65 
66             private:
67                 bool _hasValue = false;
68                 ElementType _current;
69                 TObserver _observer;
70                 SingleAssignmentDisposable _disposable;
71             }
72 
73             auto d = new SingleAssignmentDisposable;
74             d.setDisposable(disposableObject(doSubscribe(_observable,
75                     new TakeLastObserver(observer, d))));
76             return d;
77         }
78 
79     private:
80         TObservable _observable;
81     }
82 
83     return TakeLastObservable(observable);
84 }
85 ///
86 unittest
87 {
88     import rx.subject;
89 
90     auto sub = new SubjectObject!int;
91 
92     int putCount = 0;
93     int completedCount = 0;
94     struct TestObserver
95     {
96         void put(int n)
97         {
98             putCount++;
99         }
100 
101         void completed()
102         {
103             completedCount++;
104         }
105     }
106 
107     auto d = sub.takeLast.subscribe(TestObserver());
108 
109     assert(putCount == 0);
110     sub.put(1);
111     assert(putCount == 0);
112     sub.put(10);
113     assert(putCount == 0);
114     sub.completed();
115     assert(putCount == 1);
116     assert(completedCount == 1);
117 
118     sub.put(100);
119     assert(putCount == 1);
120     assert(completedCount == 1);
121 }
122 
123 unittest
124 {
125     import rx.subject : SubjectObject;
126 
127     auto sub = new SubjectObject!(int[]);
128 
129     int count = 0;
130     auto d = sub.takeLast.subscribe((int) { count++; });
131     scope(exit) d.dispose();
132 
133     assert(count == 0);
134     sub.put([0]);
135     assert(count == 0);
136     sub.put([1, 2]);
137     assert(count == 0);
138     sub.completed();
139     assert(count == 2);
140 }
141 
142 unittest
143 {
144     import rx : SubjectObject, merge;
145 
146     auto source1 = new SubjectObject!int;
147     auto source2 = new SubjectObject!int;
148 
149     auto source = merge(source1, source2).takeLast();
150     int[] result;
151     source.doSubscribe!(n => result ~= n);
152 
153     .put(source1, 0);
154     .put(source2, 1);
155     source1.completed();
156     
157     assert(result.length == 0);
158 
159     .put(source2, 2);
160     source2.completed();
161 
162     assert(result.length == 1);
163     assert(result[0] == 2);
164 }