1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'merge'
3  +/
4 module rx.algorithm.merge;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 import std.range : put;
11 
12 //####################
13 // Merge
14 //####################
15 struct MergeObservable(TObservable1, TObservable2)
16 {
17     import std.traits : CommonType;
18 
19     alias ElementType = CommonType!(TObservable1.ElementType, TObservable2.ElementType);
20 
21 public:
22     this(TObservable1 o1, TObservable2 o2)
23     {
24         _observable1 = o1;
25         _observable2 = o2;
26     }
27 
28 public:
29     auto subscribe(T)(T observer)
30     {
31         static struct MergeObserver
32         {
33             T _observer;
34             shared(AtomicCounter) _counter;
35             Disposable _subscription;
36 
37             void put(ElementType obj)
38             {
39                 if (_counter.isZero) return;
40 
41                 .put(_observer, obj);
42             }
43 
44             void completed()
45             {
46                 auto result = _counter.tryDecrement();
47                 if (result.success && result.count == 0)
48                 {
49                     static if (hasCompleted!T)
50                     {
51                         _observer.completed();
52                     }
53                     _subscription.dispose();
54                 }
55             }
56 
57             void failure(Exception e)
58             {
59                 if (_counter.trySetZero())
60                 {
61                     static if (hasFailure!T)
62                     {
63                         _observer.failure(e);
64                     }
65                     _subscription.dispose();
66                 }
67             }
68         }
69 
70         auto subscription = new SingleAssignmentDisposable;
71         auto counter = new shared(AtomicCounter)(2);
72         auto mergeObserver = MergeObserver(observer, counter, subscription);
73         auto d1 = _observable1.doSubscribe(mergeObserver);
74         auto d2 = _observable2.doSubscribe(mergeObserver);
75         subscription.setDisposable(new CompositeDisposable(disposableObject(d1), disposableObject(d2)));
76         return subscription;
77     }
78 
79 private:
80     TObservable1 _observable1;
81     TObservable2 _observable2;
82 }
83 
84 ///
85 MergeObservable!(T1, T2) merge(T1, T2)(auto ref T1 observable1, auto ref T2 observable2)
86 {
87     return typeof(return)(observable1, observable2);
88 }
89 ///
90 unittest
91 {
92     import rx.subject : SubjectObject;
93 
94     auto s1 = new SubjectObject!int;
95     auto s2 = new SubjectObject!short;
96 
97     auto merged = s1.merge(s2);
98 
99     int count = 0;
100     auto d = merged.doSubscribe((int n) { count++; });
101 
102     assert(count == 0);
103     s1.put(1);
104     assert(count == 1);
105     s2.put(2);
106     assert(count == 2);
107 
108     d.dispose();
109 
110     s1.put(10);
111     assert(count == 2);
112     s2.put(100);
113     assert(count == 2);
114 }
115 
116 unittest
117 {
118     import rx : SubjectObject, CounterObserver;
119 
120     auto s1 = new SubjectObject!int;
121     auto s2 = new SubjectObject!int;
122 
123     auto merged = merge(s1, s2);
124     auto observer = new CounterObserver!int;
125 
126     auto disposable = merged.doSubscribe(observer);
127     scope(exit) disposable.dispose();
128 
129     s1.put(0);
130     assert(observer.putCount == 1);
131     s2.put(1);
132     assert(observer.putCount == 2);
133     s1.completed();
134     assert(observer.completedCount == 0);
135     s2.completed();
136     assert(observer.completedCount == 1);
137 }
138 
139 unittest
140 {
141     import rx : SubjectObject, CounterObserver;
142 
143     auto source1 = new SubjectObject!int;
144     auto source2 = new SubjectObject!int;
145     auto subject = merge(source1, source2);
146 
147     auto counter = new CounterObserver!int;
148     subject.subscribe(counter);
149 
150     source1.put(0);
151     assert(counter.putCount == 1);
152     assert(counter.lastValue == 0);
153     source1.completed();
154     assert(counter.completedCount == 0);
155 
156     source2.put(1);
157     assert(counter.putCount == 2);
158     assert(counter.lastValue == 1);
159 
160     assert(counter.completedCount == 0);
161     source2.completed();
162     assert(counter.completedCount == 1);
163 }
164 
165 unittest
166 {
167     import rx : SubjectObject, CounterObserver;
168 
169     auto s1 = new SubjectObject!int;
170     auto s2 = new SubjectObject!int;
171 
172     auto merged = merge(s1, s2);
173     auto observer = new CounterObserver!int;
174 
175     auto disposable = merged.doSubscribe(observer);
176     scope(exit) disposable.dispose();
177 
178     s1.put(0);
179     assert(observer.putCount == 1);
180     s2.put(1);
181     assert(observer.putCount == 2);
182 
183     auto ex = new Exception("TEST");
184     s1.failure(ex);
185     assert(observer.failureCount == 1);
186     assert(observer.lastException == ex);
187 
188     s2.put(2);
189     assert(observer.putCount == 2);
190     
191     s2.completed();
192     assert(observer.completedCount == 0);
193 }
194 
195 unittest
196 {
197     import rx : SubjectObject, CounterObserver;
198 
199     auto s1 = new SubjectObject!int;
200     auto s2 = new SubjectObject!int;
201 
202     auto merged = merge(s1, s2);
203     auto observer = new CounterObserver!int;
204 
205     auto disposable = merged.doSubscribe(observer);
206 
207     s1.put(0);
208     s2.put(1);
209     assert(observer.putCount == 2);
210 
211     disposable.dispose();
212 
213     s1.put(2);
214     s1.completed();
215 
216     s2.put(3);
217     s2.completed();
218     // no effect
219     assert(observer.putCount == 2);
220     assert(observer.completedCount == 0);
221     assert(observer.failureCount == 0);
222 }
223 
224 unittest
225 {
226     import rx : SubjectObject;
227 
228     auto s1 = new SubjectObject!int;
229     auto s2 = new SubjectObject!int;
230 
231     int result = -1;
232     auto disposable = merge(s1, s2).doSubscribe((int n) { result = n; });
233     
234     s1.put(0);
235     assert(result == 0);
236     s2.put(1);
237     assert(result == 1);
238 
239     s1.failure(null);
240     s2.put(2);
241     assert(result == 1);
242 }
243 
244 ///
245 auto merge(TObservable)(auto ref TObservable observable)
246         if (isObservable!TObservable && isObservable!(TObservable.ElementType))
247 {
248     import rx.subject : SubjectObject;
249 
250     static struct MergeObservable_Flat
251     {
252         alias ElementType = TObservable.ElementType.ElementType;
253 
254         this(TObservable observable)
255         {
256             _observable = observable;
257         }
258 
259         auto subscribe(TObserver)(TObserver observer)
260         {
261             auto subject = new SubjectObject!ElementType;
262             auto groupSubscription = new CompositeDisposable;
263             auto innerSubscription = subject.doSubscribe(observer);
264             auto outerSubscription = _observable.doSubscribe((TObservable.ElementType obj) {
265                 auto subscription = obj.doSubscribe(subject);
266                 groupSubscription.insert(disposableObject(subscription));
267             }, { subject.completed(); }, (Exception e) { subject.failure(e); });
268             return new CompositeDisposable(groupSubscription, innerSubscription, outerSubscription);
269         }
270 
271         TObservable _observable;
272     }
273 
274     return MergeObservable_Flat(observable);
275 }
276 
277 ///
278 unittest
279 {
280     import rx.algorithm.groupby : groupBy;
281     import rx.algorithm.map : map;
282     import rx.algorithm.fold : fold;
283     import rx.subject : SubjectObject, CounterObserver;
284 
285     auto subject = new SubjectObject!int;
286     auto counted = subject.groupBy!(n => n % 10).map!(o => o.fold!((a, b) => a + 1)(0)).merge();
287 
288     auto counter = new CounterObserver!int;
289 
290     auto disposable = counted.subscribe(counter);
291 
292     subject.put(0);
293     subject.put(0);
294     assert(counter.putCount == 0);
295     subject.completed();
296     assert(counter.putCount == 1);
297     assert(counter.lastValue == 2);
298 }