1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'merge'
3  +/
4 module rx.algorithm.merge;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 
11 //####################
12 // Merge
13 //####################
14 struct MergeObservable(TObservable1, TObservable2)
15 {
16     import std.traits : CommonType;
17 
18     alias ElementType = CommonType!(TObservable1.ElementType, TObservable2.ElementType);
19 
20 public:
21     this(TObservable1 o1, TObservable2 o2)
22     {
23         _observable1 = o1;
24         _observable2 = o2;
25     }
26 
27 public:
28     auto subscribe(T)(T observer)
29     {
30         auto d1 = _observable1.doSubscribe(observer);
31         auto d2 = _observable2.doSubscribe(observer);
32         return new CompositeDisposable(disposableObject(d1), disposableObject(d2));
33     }
34 
35 private:
36     TObservable1 _observable1;
37     TObservable2 _observable2;
38 }
39 
40 ///
41 MergeObservable!(T1, T2) merge(T1, T2)(auto ref T1 observable1, auto ref T2 observable2)
42 {
43     return typeof(return)(observable1, observable2);
44 }
45 ///
46 unittest
47 {
48     import rx.subject : SubjectObject;
49 
50     auto s1 = new SubjectObject!int;
51     auto s2 = new SubjectObject!short;
52 
53     auto merged = s1.merge(s2);
54 
55     int count = 0;
56     auto d = merged.doSubscribe((int n) { count++; });
57 
58     assert(count == 0);
59     s1.put(1);
60     assert(count == 1);
61     s2.put(2);
62     assert(count == 2);
63 
64     d.dispose();
65 
66     s1.put(10);
67     assert(count == 2);
68     s2.put(100);
69     assert(count == 2);
70 }