1 ///
2 module rx.range.zip;
3 
4 import rx.disposable;
5 import rx.observable;
6 import rx.observer;
7 
8 import std.range : put;
9 import std.typecons : tuple;
10 import std.meta;
11 import std.container.dlist : DList;
12 
13 ///
14 class ZipNObservable(alias selector, TObservables...)
15 {
16     ///
17     alias ElementType = typeof({
18         GetElementsTuple!TObservables values = void;
19         return selector(values);
20     }());
21 
22     TObservables sources;
23 
24     ///
25     this(TObservables observables)
26     {
27         sources = observables;
28     }
29 
30     ///
31     auto subscribe(TObserver)(auto ref TObserver observer)
32     {
33         auto sink = new ZipNSink!(selector, ElementType, TObserver, TObservables)(sources, observer);
34         return sink.run();
35     }
36 }
37 
38 class ZipNSink(alias selector, E, TObserver, TObservables...)
39 {
40     alias ItemTypes = GetElementsTuple!TObservables;
41     alias Store = GetZipStoreType!(TObserver, TObservables);
42 
43     TObservables sources;
44     TObserver observer;
45     Store store;
46     Object gate;
47     Disposable cancel;
48 
49     this(TObservables sources, TObserver observer)
50     {
51         this.sources = sources;
52         this.observer = observer;
53         this.gate = new Object;
54     }
55 
56     auto run()
57     {
58         auto disposable = new SingleAssignmentDisposable;
59         this.cancel = disposable;
60 
61         Disposable[] disposables;
62 
63         static foreach (i; 0 .. TObservables.length)
64         {
65             disposables ~= disposableObject(sources[i].doSubscribe(new ZipChildObserver!(i,
66                     TObserver, ItemTypes[i])));
67             disposables ~= new AnonymousDisposable({ store[i].queue.clear(); });
68         }
69         disposables ~= new AnonymousDisposable({
70             static foreach (i; 0 .. TObservables.length)
71             {
72                 store[i].queue.clear();
73             }
74         });
75         disposable.setDisposable(new CompositeDisposable(disposables));
76 
77         return disposable;
78     }
79 
80     bool hasElements()
81     {
82         static foreach (i; 0 .. TObservables.length)
83         {
84             if (store[i].empty)
85                 return false;
86         }
87         return true;
88     }
89 
90     static if (hasCompleted!TObserver)
91     {
92         bool isCompleted()
93         {
94             static foreach (i; 0 .. TObservables.length)
95             {
96                 if (!store[i].isCompleted)
97                     return false;
98             }
99             return true;
100         }
101     }
102 
103     void enqueue()
104     {
105 
106         if (hasElements())
107         {
108             ItemTypes items;
109             static foreach (i; 0 .. TObservables.length)
110             {
111                 items[i] = store[i].dequeue();
112             }
113 
114             static if (hasFailure!TObserver)
115             {
116                 E result = void;
117                 try
118                 {
119                     result = selector(items);
120                 }
121                 catch (Exception e)
122                 {
123                     observer.failure(e);
124                     return;
125                 }
126                 .put(observer, result);
127             }
128             else
129             {
130                 .put(observer, selector(items));
131             }
132             return;
133         }
134 
135         static if (hasCompleted!TObserver)
136         {
137             if (isCompleted())
138             {
139                 observer.completed();
140             }
141         }
142     }
143 
144     static if (hasCompleted!TObserver)
145     {
146         void checkCompleted()
147         {
148             if (isCompleted())
149             {
150                 observer.completed();
151             }
152         }
153     }
154 
155     void failure(Exception e)
156     {
157         observer.failure(e);
158     }
159 
160     class ZipChildObserver(size_t index, TObserver, E)
161     {
162         void put(E obj)
163         {
164             synchronized (gate)
165             {
166                 store[index].enqueue(obj);
167                 this.outer.enqueue();
168             }
169         }
170 
171         static if (hasCompleted!TObserver)
172         {
173             void completed()
174             {
175                 synchronized (gate)
176                 {
177                     store[index].isCompleted = true;
178                     this.outer.checkCompleted();
179                 }
180             }
181         }
182 
183         void failure(Exception e)
184         {
185             scope(exit) cancel.dispose();
186             static if (hasFailure!TObserver)
187             {
188                 store[index].isCompleted = true;
189                 this.outer.observer.failure(e);
190             }
191         }
192     }
193 }
194 
195 ///
196 template GetElementsTuple(Ts...)
197 {
198     alias GetElementType(TObservable) = TObservable.ElementType;
199     alias GetElementsTuple = AliasSeq!(staticMap!(GetElementType, Ts));
200 }
201 
202 ///
203 alias GetZipStoreType(TObserver, TObservables...) = AliasSeq!(
204         staticMap!(GetZipStoreTypeImpl!TObserver, GetElementsTuple!TObservables));
205 
206 ///
207 template GetZipStoreTypeImpl(TObserver)
208 {
209     ///
210     alias GetZipStoreTypeImpl(E) = ZipStore!(E, hasCompleted!TObserver);
211 }
212 
213 unittest
214 {
215     import rx;
216 
217     alias GetStore = GetZipStoreTypeImpl!(Observer!int);
218     alias Store = GetStore!int;
219 
220     static assert(is(Store == ZipStore!(int, true)));
221 }
222 
223 ///
224 struct ZipStore(E, bool useCompleted)
225 {
226     DList!E queue;
227     static if (useCompleted)
228     {
229         bool isCompleted;
230     }
231 
232     bool empty() @property
233     {
234         return queue.empty;
235     }
236 
237     void enqueue(E obj)
238     {
239         queue.insertBack(obj);
240     }
241 
242     E dequeue()
243     {
244         scope (success)
245         {
246             queue.removeFront();
247         }
248 
249         return queue.front;
250     }
251 }
252 
253 unittest
254 {
255     import rx;
256     import std.conv : to;
257     import std.range : iota;
258 
259     alias A = SubjectObject!int;
260     alias B = SubjectObject!int;
261     alias C = SubjectObject!int;
262 
263     alias concatAsStrings = (a, b, c) => to!string(a) ~ to!string(b) ~ to!string(c);
264 
265     alias Zip = ZipNObservable!(concatAsStrings, A, B, C);
266 
267     auto a = new A;
268     auto b = new B;
269     auto c = new C;
270     auto zipped = new Zip(a, b, c);
271 
272     string s;
273     auto observer = ((string text) { s = text; }).observerObject!string();
274     auto disposable = zipped.subscribe(observer);
275     scope (exit)
276         disposable.dispose();
277 
278     .put(a, iota(3));
279     .put(b, iota(3));
280 
281     assert(s == null);
282     .put(c, 0);
283     assert(s == "000");
284     .put(c, 1);
285     assert(s == "111");
286     .put(c, 2);
287     assert(s == "222");
288     .put(c, 3);
289     assert(s == "222");
290     .put(a, 3);
291     assert(s == "222");
292     .put(b, 3);
293     assert(s == "333");
294 }
295 
296 ///
297 template zip(alias selector = tuple)
298 {
299     ZipNObservable!(selector, TObservables) zip(TObservables...)(TObservables observables)
300     {
301         return new typeof(return)(observables);
302     }
303 }
304 
305 ///
306 unittest
307 {
308     // use simple
309     import rx;
310 
311     auto s0 = new SubjectObject!int;
312     auto s1 = new SubjectObject!int;
313 
314     auto zipped = zip(s0, s1);
315 
316     int[] buf;
317     auto disposable = zipped.doSubscribe!(t => buf ~= (t[0] * t[1]));
318     scope (exit)
319         disposable.dispose();
320 
321     .put(s0, [0, 1, 2, 3]);
322     assert(buf.length == 0);
323 
324     .put(s1, 0);
325     assert(buf == [0]);
326     .put(s1, 1);
327     assert(buf == [0, 1]);
328     .put(s1, 2);
329     assert(buf == [0, 1, 4]);
330     .put(s1, 3);
331     assert(buf == [0, 1, 4, 9]);
332 }
333 
334 ///
335 unittest
336 {
337     // call completed
338     import rx;
339     import std.typecons;
340 
341     auto s0 = new SubjectObject!int;
342     auto s1 = new SubjectObject!int;
343     auto s2 = new SubjectObject!int;
344 
345     auto observer = new CounterObserver!(Tuple!(int, int, int));
346     auto disposable = zip(s0, s1, s2).doSubscribe(observer);
347     scope (exit)
348         disposable.dispose();
349 
350     .put(s0, 100);
351     .put(s1, 10);
352     .put(s2, 1);
353     assert(observer.putCount == 1);
354     assert(observer.lastValue == tuple(100, 10, 1));
355 
356     s0.completed();
357     assert(observer.completedCount == 0);
358     s1.completed();
359     assert(observer.completedCount == 0);
360     s2.completed();
361     assert(observer.completedCount == 1);
362 }
363 
364 ///
365 unittest
366 {
367     // use selector
368     import rx;
369 
370     auto s0 = new SubjectObject!int;
371     auto s1 = new SubjectObject!int;
372 
373     int[] buf;
374     auto disposable = zip!((a, b) => a + b)(s0, s1).doSubscribe!(n => buf ~= n);
375     scope (exit)
376         disposable.dispose();
377 
378     .put(s0, 100);
379     .put(s0, 200);
380     .put(s1, 10);
381     .put(s1, 20);
382 
383     assert(buf == [110, 220]);
384 }
385 
386 unittest
387 {
388     // advanced
389     import rx;
390     import std.typecons : Tuple;
391 
392     auto sub = new SubjectObject!int;
393     auto observer = new CounterObserver!(Tuple!(int, int));
394 
395     auto disposable = zip(sub, sub.drop(1)).doSubscribe(observer);
396     scope (exit)
397         disposable.dispose();
398 
399     .put(sub, 0);
400     .put(sub, 1);
401     assert(observer.putCount == 1);
402     assert(observer.lastValue == tuple(0, 1));
403     .put(sub, 2);
404     assert(observer.putCount == 2);
405     assert(observer.lastValue == tuple(1, 2));
406 }
407 
408 unittest
409 {
410     // dispose all
411     import rx;
412     import std.typecons : Tuple;
413 
414     alias InnerObserver = CounterObserver!(Tuple!(int, int));
415 
416     auto s0 = new TestingSubject!int;
417     auto s1 = new TestingSubject!int;
418     auto observer = new InnerObserver;
419 
420     auto disposable = zip(s0, s1).subscribe(observer);
421 
422     import std.conv;
423 
424     assert(s0.observerCount == 1, "Error: " ~ to!string(s0.observerCount));
425     assert(s1.observerCount == 1, "Error: " ~ to!string(s1.observerCount));
426     disposable.dispose();
427     assert(s0.observerCount == 0, "Error: " ~ to!string(s0.observerCount));
428     assert(s1.observerCount == 0, "Error: " ~ to!string(s1.observerCount));
429 }
430 
431 unittest
432 {
433     // unsbscribe when completed
434     import rx;
435     import std.typecons : Tuple;
436 
437     alias InnerObserver = CounterObserver!(Tuple!(int, int));
438 
439     auto s0 = new TestingSubject!int;
440     auto s1 = new TestingSubject!int;
441     auto observer = new InnerObserver;
442 
443     auto disposable = zip(s0, s1).subscribe(observer);
444     scope (exit)
445         disposable.dispose();
446 
447     s0.completed();
448     assert(s0.observerCount == 0);
449     assert(s1.observerCount == 1);
450     s1.completed();
451     assert(s0.observerCount == 0);
452     assert(s1.observerCount == 0);
453 }
454 
455 unittest
456 {
457     // if any subject failured then call observer.failure
458     import rx;
459     import std.typecons : Tuple;
460 
461     auto s0 = new TestingSubject!int;
462     auto s1 = new TestingSubject!int;
463     auto observer = new CounterObserver!(Tuple!(int, int));
464     auto disposable = zip(s0, s1).doSubscribe(observer);
465     scope (exit)
466         disposable.dispose();
467 
468     auto e = new Exception("TEST");
469     s0.failure(e);
470     assert(observer.failureCount == 1);
471     assert(observer.lastException is e);
472     s1.failure(new Exception("test"));
473     assert(observer.failureCount == 1);
474     assert(observer.lastException is e);
475 }