1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'merge'
3  +/
4 module rx.algorithm.merge;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 import std.range : put;
11 
12 //####################
13 // Merge
14 //####################
15 struct MergeObservable(TObservable1, TObservable2)
16 {
17     import std.traits : CommonType;
18 
19     alias ElementType = CommonType!(TObservable1.ElementType, TObservable2.ElementType);
20 
21 public:
22     this(TObservable1 o1, TObservable2 o2)
23     {
24         _observable1 = o1;
25         _observable2 = o2;
26     }
27 
28 public:
29     auto subscribe(T)(T observer)
30     {
31         static struct MergeObserver
32         {
33             T _observer;
34             shared(AtomicCounter) _counter;
35             Disposable _subscription;
36 
37             void put(ElementType obj)
38             {
39                 if (_counter.isZero)
40                     return;
41 
42                 .put(_observer, obj);
43             }
44 
45             void completed()
46             {
47                 auto result = _counter.tryDecrement();
48                 if (result.success && result.count == 0)
49                 {
50                     static if (hasCompleted!T)
51                     {
52                         _observer.completed();
53                     }
54                     _subscription.dispose();
55                 }
56             }
57 
58             void failure(Exception e)
59             {
60                 if (_counter.trySetZero())
61                 {
62                     static if (hasFailure!T)
63                     {
64                         _observer.failure(e);
65                     }
66                     _subscription.dispose();
67                 }
68             }
69         }
70 
71         auto subscription = new SingleAssignmentDisposable;
72         auto counter = new shared(AtomicCounter)(2);
73         auto mergeObserver = MergeObserver(observer, counter, subscription);
74         auto d1 = _observable1.doSubscribe(mergeObserver);
75         auto d2 = _observable2.doSubscribe(mergeObserver);
76         subscription.setDisposable(new CompositeDisposable(disposableObject(d1),
77                 disposableObject(d2)));
78         return subscription;
79     }
80 
81 private:
82     TObservable1 _observable1;
83     TObservable2 _observable2;
84 }
85 
86 ///
87 MergeObservable!(T1, T2) merge(T1, T2)(auto ref T1 observable1, auto ref T2 observable2)
88 {
89     return typeof(return)(observable1, observable2);
90 }
91 ///
92 unittest
93 {
94     import rx.subject : SubjectObject;
95 
96     auto s1 = new SubjectObject!int;
97     auto s2 = new SubjectObject!short;
98 
99     auto merged = s1.merge(s2);
100 
101     int count = 0;
102     auto d = merged.doSubscribe((int n) { count++; });
103 
104     assert(count == 0);
105     s1.put(1);
106     assert(count == 1);
107     s2.put(2);
108     assert(count == 2);
109 
110     d.dispose();
111 
112     s1.put(10);
113     assert(count == 2);
114     s2.put(100);
115     assert(count == 2);
116 }
117 
118 unittest
119 {
120     import rx : SubjectObject, CounterObserver;
121 
122     auto s1 = new SubjectObject!int;
123     auto s2 = new SubjectObject!int;
124 
125     auto merged = merge(s1, s2);
126     auto observer = new CounterObserver!int;
127 
128     auto disposable = merged.doSubscribe(observer);
129     scope (exit)
130         disposable.dispose();
131 
132     s1.put(0);
133     assert(observer.putCount == 1);
134     s2.put(1);
135     assert(observer.putCount == 2);
136     s1.completed();
137     assert(observer.completedCount == 0);
138     s2.completed();
139     assert(observer.completedCount == 1);
140 }
141 
142 unittest
143 {
144     import rx : SubjectObject, CounterObserver;
145 
146     auto source1 = new SubjectObject!int;
147     auto source2 = new SubjectObject!int;
148     auto subject = merge(source1, source2);
149 
150     auto counter = new CounterObserver!int;
151     subject.subscribe(counter);
152 
153     source1.put(0);
154     assert(counter.putCount == 1);
155     assert(counter.lastValue == 0);
156     source1.completed();
157     assert(counter.completedCount == 0);
158 
159     source2.put(1);
160     assert(counter.putCount == 2);
161     assert(counter.lastValue == 1);
162 
163     assert(counter.completedCount == 0);
164     source2.completed();
165     assert(counter.completedCount == 1);
166 }
167 
168 unittest
169 {
170     import rx : SubjectObject, CounterObserver;
171 
172     auto s1 = new SubjectObject!int;
173     auto s2 = new SubjectObject!int;
174 
175     auto merged = merge(s1, s2);
176     auto observer = new CounterObserver!int;
177 
178     auto disposable = merged.doSubscribe(observer);
179     scope (exit)
180         disposable.dispose();
181 
182     s1.put(0);
183     assert(observer.putCount == 1);
184     s2.put(1);
185     assert(observer.putCount == 2);
186 
187     auto ex = new Exception("TEST");
188     s1.failure(ex);
189     assert(observer.failureCount == 1);
190     assert(observer.lastException == ex);
191 
192     s2.put(2);
193     assert(observer.putCount == 2);
194 
195     s2.completed();
196     assert(observer.completedCount == 0);
197 }
198 
199 unittest
200 {
201     import rx : SubjectObject, CounterObserver;
202 
203     auto s1 = new SubjectObject!int;
204     auto s2 = new SubjectObject!int;
205 
206     auto merged = merge(s1, s2);
207     auto observer = new CounterObserver!int;
208 
209     auto disposable = merged.doSubscribe(observer);
210 
211     s1.put(0);
212     s2.put(1);
213     assert(observer.putCount == 2);
214 
215     disposable.dispose();
216 
217     s1.put(2);
218     s1.completed();
219 
220     s2.put(3);
221     s2.completed();
222     // no effect
223     assert(observer.putCount == 2);
224     assert(observer.completedCount == 0);
225     assert(observer.failureCount == 0);
226 }
227 
228 unittest
229 {
230     import rx : SubjectObject;
231 
232     auto s1 = new SubjectObject!int;
233     auto s2 = new SubjectObject!int;
234 
235     int result = -1;
236     auto disposable = merge(s1, s2).doSubscribe((int n) { result = n; });
237 
238     s1.put(0);
239     assert(result == 0);
240     s2.put(1);
241     assert(result == 1);
242 
243     s1.failure(null);
244     s2.put(2);
245     assert(result == 1);
246 }
247 
248 ///
249 auto merge(TObservable)(auto ref TObservable observable)
250         if (isObservable!TObservable && isObservable!(TObservable.ElementType))
251 {
252     import rx.subject : SubjectObject;
253 
254     static struct MergeObservable_Flat
255     {
256         alias ElementType = TObservable.ElementType.ElementType;
257 
258         this(TObservable observable)
259         {
260             _observable = observable;
261         }
262 
263         auto subscribe(TObserver)(auto ref TObserver observer)
264         {
265             auto sink = new MergeSink!(TObservable.ElementType, TObserver, ElementType)(observer);
266             sink._upstream = _observable.doSubscribe(sink).disposableObject();
267             return sink;
268         }
269 
270         TObservable _observable;
271     }
272 
273     return MergeObservable_Flat(observable);
274 }
275 
276 ///
277 unittest
278 {
279     import rx;
280 
281     auto outer = new SubjectObject!(Observable!int);
282 
283     Observable!int flatten = outer.merge().observableObject!int();
284 
285     int[] xs;
286     auto disposable = flatten.doSubscribe((int n) { xs ~= n; });
287     scope (exit)
288         disposable.dispose();
289 
290     auto inner1 = new SubjectObject!int;
291     auto inner2 = new SubjectObject!int;
292 
293     .put(outer, inner1);
294     .put(inner1, 0);
295     assert(xs == [0]);
296     .put(inner1, 1);
297     assert(xs == [0, 1]);
298 
299     .put(outer, inner2);
300     .put(inner1, 2);
301     assert(xs == [0, 1, 2]);
302     .put(inner2, 3);
303     assert(xs == [0, 1, 2, 3]);
304     .put(inner2, 4);
305     assert(xs == [0, 1, 2, 3, 4]);
306 }
307 
308 ///
309 unittest
310 {
311     import rx;
312 
313     auto outer = new SubjectObject!(Observable!int);
314 
315     Observable!int flatten = outer.merge().observableObject!int();
316 
317     auto observer = new CounterObserver!int;
318     auto disposable = flatten.doSubscribe(observer);
319     scope (exit)
320         disposable.dispose();
321 
322     auto inner = new SubjectObject!int;
323 
324     .put(outer, inner);
325     .put(inner, 0);
326 
327     inner.completed();
328     assert(observer.completedCount == 0);
329     outer.completed();
330     assert(observer.completedCount == 1);
331 }
332 
333 ///
334 unittest
335 {
336     import rx;
337 
338     auto outer = new SubjectObject!(Observable!int);
339 
340     Observable!int flatten = outer.merge().observableObject!int();
341 
342     auto observer = new CounterObserver!int;
343     auto disposable = flatten.doSubscribe(observer);
344     scope (exit)
345         disposable.dispose();
346 
347     auto inner = new SubjectObject!int;
348 
349     .put(outer, inner);
350     .put(inner, 0);
351 
352     outer.failure(new Exception("TEST"));
353     assert(observer.failureCount == 1);
354     .put(inner, 1);
355     import std : format;
356 
357     assert(observer.putCount == 1, format!"putCount: %d"(observer.putCount));
358 }
359 
360 ///
361 unittest
362 {
363     import rx.algorithm.groupby : groupBy;
364     import rx.algorithm.map : map;
365     import rx.algorithm.fold : fold;
366     import rx.subject : SubjectObject, CounterObserver;
367 
368     auto subject = new SubjectObject!int;
369     auto counted = subject.groupBy!(n => n % 10)
370         .map!(o => o.fold!((a, b) => a + 1)(0))
371         .merge();
372 
373     auto counter = new CounterObserver!int;
374 
375     auto disposable = counted.subscribe(counter);
376 
377     subject.put(0);
378     subject.put(0);
379     assert(counter.putCount == 0);
380     subject.completed();
381     assert(counter.putCount == 1);
382     assert(counter.lastValue == 2);
383 }
384 
385 ///
386 unittest
387 {
388     import std.format : format;
389     import rx;
390 
391     auto outer = new SubjectObject!(Observable!int);
392     auto inner_pair1 = new SubjectObject!int;
393     auto inner_pair2 = new SubjectObject!int;
394     auto inner_flat1 = new SubjectObject!int;
395     auto inner_flat2 = new SubjectObject!int;
396 
397     auto mergePair = merge(inner_pair1, inner_pair2);
398     auto mergeFlat = outer.merge();
399 
400     auto counter1 = new CounterObserver!int;
401     auto counter2 = new CounterObserver!int;
402 
403     auto disposable1 = mergePair.doSubscribe(counter1);
404     auto disposable2 = mergeFlat.doSubscribe(counter2);
405     .put(outer, inner_flat1);
406     .put(outer, inner_flat2);
407 
408     .put(inner_pair1, 0);
409     .put(inner_flat1, 0);
410 
411     .put(inner_pair2, 1);
412     .put(inner_flat2, 1);
413 
414     assert(counter1.putCount == counter2.putCount);
415     assert(counter1.lastValue == counter2.lastValue);
416     assert(counter1.completedCount == counter2.completedCount);
417     assert(counter1.failureCount == counter2.failureCount);
418 
419     inner_pair1.completed();
420     inner_flat1.completed();
421 
422     assert(counter1.putCount == counter2.putCount);
423     assert(counter1.lastValue == counter2.lastValue);
424     assert(counter1.completedCount == counter2.completedCount,
425             format!"%d == %d"(counter1.completedCount, counter2.completedCount));
426     assert(counter1.failureCount == counter2.failureCount);
427 
428     .put(inner_pair2, 10);
429     .put(inner_flat2, 10);
430 
431     assert(counter1.putCount == counter2.putCount);
432     assert(counter1.lastValue == counter2.lastValue);
433     assert(counter1.completedCount == counter2.completedCount);
434     assert(counter1.failureCount == counter2.failureCount);
435 
436     disposable1.dispose();
437     disposable2.dispose();
438 
439     assert(counter1.putCount == counter2.putCount);
440     assert(counter1.lastValue == counter2.lastValue);
441     assert(counter1.completedCount == counter2.completedCount);
442     assert(counter1.failureCount == counter2.failureCount);
443 
444     .put(inner_pair2, 100);
445     .put(inner_flat2, 100);
446 
447     assert(counter1.putCount == counter2.putCount);
448     assert(counter1.lastValue == counter2.lastValue);
449     assert(counter1.completedCount == counter2.completedCount);
450     assert(counter1.failureCount == counter2.failureCount);
451 }
452 
453 class MergeSink(TObservable, TObserver, E) : Observer!TObservable, Disposable
454 {
455     private TObserver _observer;
456     private Disposable _upstream;
457     private Object _gate;
458     private shared(bool) _disposed;
459     private shared(bool) _isStopped;
460     private CompositeDisposable _group;
461 
462     this(TObserver observer)
463     {
464         _observer = observer;
465         _gate = new Object;
466         _group = new CompositeDisposable;
467     }
468 
469     void dispose()
470     {
471         import core.atomic : atomicStore;
472 
473         atomicStore(_disposed, true);
474         tryDispose(_upstream);
475         _group.dispose();
476     }
477 
478     void put(TObservable obj)
479     {
480         auto inner = new InnerObserver(this);
481         _group.insert(inner);
482         inner._upstream = obj.doSubscribe(inner).disposableObject();
483     }
484 
485     void completed()
486     {
487         import core.atomic;
488 
489         atomicStore(_isStopped, true);
490         if (_group.count == 0)
491         {
492             forwardCompleted();
493         }
494         else
495         {
496             dispose();
497         }
498     }
499 
500     void failure(Exception e)
501     {
502         forwardFailure(e);
503         dispose();
504     }
505 
506     private void forwardPut(E obj)
507     {
508         if (_disposed)
509             return;
510         synchronized (_gate)
511         {
512             .put(_observer, obj);
513         }
514     }
515 
516     private void forwardCompleted()
517     {
518         if (_disposed)
519             return;
520         synchronized (_gate)
521         {
522             static if (hasCompleted!TObserver)
523             {
524                 _observer.completed();
525             }
526             tryDispose(_upstream);
527         }
528     }
529 
530     private void forwardFailure(Exception e)
531     {
532         if (_disposed)
533             return;
534         synchronized (_gate)
535         {
536             static if (hasFailure!TObserver)
537             {
538                 _observer.failure(e);
539             }
540             tryDispose(_upstream);
541         }
542     }
543 
544     private static final class InnerObserver : Observer!E, Disposable
545     {
546         private MergeSink _parent;
547         private Disposable _upstream;
548 
549         this(MergeSink parent)
550         {
551             assert(parent !is null);
552 
553             _parent = parent;
554         }
555 
556         void dispose()
557         {
558             tryDispose(_upstream);
559         }
560 
561         void put(E obj)
562         {
563             scope (failure)
564                 dispose();
565             _parent.forwardPut(obj);
566         }
567 
568         void completed()
569         {
570             scope (exit)
571                 dispose();
572             _parent._group.remove(this);
573             if (_parent._isStopped && _parent._group.count == 0)
574             {
575                 _parent.forwardCompleted();
576             }
577         }
578 
579         void failure(Exception e)
580         {
581             scope (exit)
582                 dispose();
583             _parent.forwardFailure(e);
584         }
585     }
586 }
587 
588 unittest
589 {
590     import rx;
591 
592     auto sub = new SubjectObject!(Observable!int);
593 
594     auto counter = new CounterObserver!int;
595     auto sink = new MergeSink!(Observable!int, Observer!int, int)(counter);
596 
597     auto d = sub.subscribe(sink.observerObject!(Observable!int)());
598     sink._upstream = d.disposableObject();
599 
600     auto inner1 = new SubjectObject!int;
601     sub.put(inner1);
602 
603     assert(counter.putCount == 0);
604     inner1.put(1);
605     assert(counter.putCount == 1);
606     inner1.put(2);
607     assert(counter.putCount == 2);
608     inner1.put(3);
609     assert(counter.putCount == 3);
610 
611     auto inner2 = new SubjectObject!int;
612     sub.put(inner2);
613 
614     inner2.put(10);
615     assert(counter.putCount == 4);
616     inner2.put(11);
617     assert(counter.putCount == 5);
618 
619     inner1.put(4);
620     assert(counter.putCount == 6);
621 
622     inner1.completed();
623     assert(counter.completedCount == 0);
624     inner2.completed();
625     assert(counter.completedCount == 0);
626 
627     sub.completed();
628     assert(counter.completedCount == 1);
629 }