1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'combineLatest'
3  +/
4 module rx.algorithm.combineLatest;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 
11 import std.range : put;
12 import std.meta : staticMap, allSatisfy;
13 import std.typecons : Tuple, tuple;
14 
15 ///
16 template combineLatest(alias f = tuple)
17 {
18     CombineLatestObservable!(f, TObservables) combineLatest(TObservables...)(TObservables observables)
19         if (allSatisfy!(isObservable, TObservables))
20     {
21         return typeof(return)(observables);
22     }
23 }
24 
25 ///
26 unittest
27 {
28     import rx : SubjectObject, CounterObserver;
29 
30     auto hello = new SubjectObject!string;
31     auto world = new SubjectObject!string;
32 
33     auto message = combineLatest!((a, b) => a ~ ", " ~ b ~ "!")(hello, world);
34     
35     auto observer = new CounterObserver!string;
36     message.doSubscribe(observer);
37 
38     .put(hello, "Hello");
39     .put(world, "world");
40 
41     assert(observer.putCount == 1);
42     assert(observer.lastValue == "Hello, world!");
43 
44     .put(world, "D-man");
45     assert(observer.putCount == 2);
46     assert(observer.lastValue == "Hello, D-man!");
47 }
48 
49 ///
50 unittest
51 {
52     import rx : SubjectObject, CounterObserver, uniq;
53 
54     auto count1 = new SubjectObject!int;
55     auto count2 = new SubjectObject!int;
56     auto count3 = new SubjectObject!int;
57 
58     import std.algorithm : max;
59 
60     alias pickMax = combineLatest!max;
61     auto observable = pickMax(count1, count2, count3).uniq();
62     
63     auto observer = new CounterObserver!int;
64     observable.doSubscribe(observer);
65 
66     .put(count1, 0);
67     .put(count2, 0);
68     .put(count3, 0);
69     
70     assert(observer.putCount == 1);
71     assert(observer.lastValue == 0);
72 
73     .put(count1, 10);
74     assert(observer.putCount == 2);
75     assert(observer.lastValue == 10);
76 
77     .put(count2, 10);
78     assert(observer.putCount == 2);
79 
80     .put(count3, 11);
81     assert(observer.putCount == 3);
82     assert(observer.lastValue == 11);
83 }
84 
85 unittest
86 {
87     import rx : SubjectObject;
88 
89     auto s1 = new SubjectObject!int;
90     auto s2 = new SubjectObject!int;
91 
92     auto comb = combineLatest(s1, s2);
93 
94     Tuple!(int, int)[] result;
95     comb.doSubscribe!(t => result ~= t);
96 
97     .put(s1, 0);
98     .put(s2, 100);
99     assert(result.length == 1);
100     assert(result[0] == tuple(0, 100));
101 
102     .put(s1, 1);
103     assert(result.length == 2);
104     assert(result[1] == tuple(1, 100));
105 
106     .put(s2, 101);
107     assert(result.length == 3);
108     assert(result[2] == tuple(1, 101));
109 }
110 
111 unittest
112 {
113     import rx : SubjectObject, CounterObserver, map;
114 
115     auto s1 = new SubjectObject!int;
116     auto s2 = new SubjectObject!int;
117 
118     auto sum = combineLatest(s1, s2).map!"a[0] + a[1]"();
119 
120     auto observer = new CounterObserver!int;
121     sum.doSubscribe(observer);
122 
123     .put(s1, 1);
124     .put(s2, 2);
125     assert(observer.putCount == 1);
126     assert(observer.lastValue == 3);
127 }
128 
129 unittest
130 {
131     import rx : SubjectObject, CounterObserver;
132 
133     auto s1 = new SubjectObject!int;
134     auto s2 = new SubjectObject!int;
135 
136     auto observable = combineLatest(s1, s2);
137     auto observer = new CounterObserver!(Tuple!(int, int));
138     auto disposable = observable.doSubscribe(observer);
139 
140     disposable.dispose();
141 
142     .put(s1, 1);
143     .put(s2, 2);
144     assert(observer.putCount == 0);
145 }
146 
147 unittest
148 {
149     import rx : SubjectObject, CounterObserver;
150 
151     auto s1 = new SubjectObject!int;
152     auto s2 = new SubjectObject!int;
153 
154     auto observable = combineLatest(s1, s2);
155     auto observer = new CounterObserver!(Tuple!(int, int));
156     auto disposable = observable.doSubscribe(observer);
157 
158     .put(s1, 1);
159     .put(s2, 2);
160     assert(observer.putCount == 1);
161 
162     disposable.dispose();
163     .put(s1, 10);
164     .put(s2, 100);
165     assert(observer.putCount == 1);
166 }
167 
168 unittest
169 {
170     import rx : SubjectObject, CounterObserver;
171 
172     auto s1 = new SubjectObject!int;
173     auto s2 = new SubjectObject!int;
174 
175     auto observable = combineLatest(s1, s2);
176     auto observer = new CounterObserver!(Tuple!(int, int));
177     auto disposable = observable.doSubscribe(observer);
178 
179     s1.completed();
180     assert(observer.completedCount == 0);
181     s2.completed();
182     assert(observer.completedCount == 1);
183 }
184 
185 unittest
186 {
187     import rx : SubjectObject, CounterObserver;
188 
189     auto s1 = new SubjectObject!int;
190     auto s2 = new SubjectObject!int;
191 
192     auto observable = combineLatest(s1, s2);
193     auto observer = new CounterObserver!(Tuple!(int, int));
194     auto disposable = observable.doSubscribe(observer);
195 
196     auto ex = new Exception("message");
197     s1.failure(ex);
198     assert(observer.completedCount == 0);
199     assert(observer.failureCount == 1);
200     assert(observer.lastException is ex);
201 
202     .put(s2, 10);
203     assert(observer.putCount == 0);
204 }
205 
206 unittest
207 {
208     import rx : SubjectObject, CounterObserver;
209 
210     auto s1 = new SubjectObject!int;
211     auto s2 = new SubjectObject!int;
212 
213     auto observable = combineLatest(s1, s2);
214     auto observer = new CounterObserver!(Tuple!(int, int));
215     auto disposable = observable.doSubscribe(observer);
216 
217     s1.completed();
218     assert(observer.completedCount == 0);
219     assert(observer.failureCount == 0);
220 
221     auto ex = new Exception("message");
222     s2.failure(ex);
223     assert(observer.completedCount == 0);
224     assert(observer.failureCount == 1);
225     assert(observer.lastException is ex);
226 }
227 
228 unittest
229 {
230     import rx : SubjectObject, CounterObserver;
231 
232     auto s1 = new SubjectObject!int;
233     auto s2 = new SubjectObject!int;
234     auto s3 = new SubjectObject!int;
235     auto s4 = new SubjectObject!int;
236     auto s5 = new SubjectObject!int;
237 
238     auto observable = combineLatest(s1, s2, s3, s4, s5);
239     auto observer = new CounterObserver!(Tuple!(int, int, int, int, int));
240     auto disposable = observable.doSubscribe(observer);
241 
242     .put(s1, 0);
243     .put(s2, 0);
244     .put(s3, 0);
245     .put(s4, 0);
246     .put(s5, 0);
247     assert(observer.putCount == 1);
248     assert(observer.lastValue == tuple(0, 0, 0, 0, 0));
249 }
250 
251 
252 private template GetElementType(T) {
253     alias GetElementType = T.ElementType;
254 }
255 
256 struct CombineLatestObservable(alias f, TObservables...)
257 {
258     alias ElementType = typeof(({
259         alias ElementTypes = staticMap!(GetElementType, TObservables);
260         return f(ElementTypes.init);
261     })());
262 
263     TObservables _observables;
264 
265     auto subscribe(TObserver)(TObserver observer)
266     {
267         alias CombCoordinator = CombineLatestCoordinator!(f, TObserver, staticMap!(GetElementType, TObservables));
268 
269         auto subscription = new SingleAssignmentDisposable;
270         auto coordinator = new CombCoordinator(observer, subscription);
271 
272         Disposable[TObservables.length] innerSubscriptions;
273         foreach(i, T; TObservables)
274         {
275             alias CombObserver = CombineLatestObserver!(CombCoordinator, TObservables[i].ElementType, i);
276             innerSubscriptions[i] = _observables[i].doSubscribe(CombObserver(coordinator)).disposableObject();
277         }
278         subscription.setDisposable(new CompositeDisposable(innerSubscriptions));
279 
280         return subscription;
281     }
282 }
283 
284 class CombineLatestCoordinator(alias f, TObserver, ElementTypes...)
285 {
286 public:
287     this(TObserver observer, Disposable subscription)
288     {
289         _gate = new Object;
290         _counter = new shared(AtomicCounter)(ElementTypes.length);
291         _observer = observer;
292         _subscription = subscription;
293     }
294 
295 public:
296     void innerPut(size_t index)(ElementTypes[index] obj)
297     {
298         if (_counter.isZero) return;
299 
300         synchronized (_gate)
301         {
302             _values[index] = obj;
303             _hasValues[index] = true;
304 
305             foreach (hasValue; _hasValues)
306             {
307                 if (!hasValue) return;
308             }
309             
310             .put(_observer, f(_values));
311         }
312     }
313 
314     void innerCompleted()
315     {
316         auto res = _counter.tryDecrement();
317         if (res.success && res.count == 0)
318         {
319             static if (hasCompleted!TObserver)
320             {
321                 _observer.completed();
322             }
323             _subscription.dispose();
324         }
325     }
326 
327     void innerFailure(Exception e)
328     {
329         if (_counter.trySetZero())
330         {
331             static if (hasFailure!TObserver)
332             {
333                 _observer.failure(e);
334             }
335             _subscription.dispose();
336         }
337     }
338     
339 public:
340     Object _gate;
341     shared(AtomicCounter) _counter;
342     bool[ElementTypes.length] _hasValues;
343     ElementTypes _values;
344 
345     TObserver _observer;
346     Disposable _subscription;
347 }
348 
349 struct CombineLatestObserver(TCoordinator, E, size_t index)
350 {
351     TCoordinator _parent;
352 
353     void put(E obj)
354     {
355         _parent.innerPut!index(obj);
356     }
357 
358     void completed()
359     {
360         _parent.innerCompleted();
361     }
362 
363     void failure(Exception e)
364     {
365         _parent.innerFailure(e);
366     }
367 }
368 
369 unittest
370 {
371     import rx : CounterObserver;
372 
373     alias CombCoordinator = CombineLatestCoordinator!(tuple, Observer!(Tuple!int), int);
374     alias CombObserver = CombineLatestObserver!(CombCoordinator, int, 0);
375 
376     auto subscription = new SingleAssignmentDisposable;
377     auto observer = new CounterObserver!(Tuple!int);
378     auto coordinator = new CombCoordinator(observer, subscription);
379     auto co = CombObserver(coordinator);
380 }
381 
382 unittest
383 {
384     alias CombObservable = CombineLatestObservable!(tuple, Observable!string, Observable!int);
385     static assert(is(CombObservable.ElementType == Tuple!(string, int)));
386 
387     import rx : SubjectObject, CounterObserver;
388 
389     auto name = new SubjectObject!string;
390     auto age = new SubjectObject!int;
391 
392     auto observable = CombObservable(name, age);
393     auto observer = new CounterObserver!(Tuple!(string, int));
394     auto subscription = observable.subscribe(observer);
395 
396     subscription.dispose();
397 }