// use simple
import rx;
auto s0 = new SubjectObject!int;
auto s1 = new SubjectObject!int;
auto zipped = zip(s0, s1);
int[] buf;
auto disposable = zipped.doSubscribe!(t => buf ~= (t[0] * t[1]));
scope (exit)
disposable.dispose();
.put(s0, [0, 1, 2, 3]);
assert(buf.length == 0);
.put(s1, 0);
assert(buf == [0]);
.put(s1, 1);
assert(buf == [0, 1]);
.put(s1, 2);
assert(buf == [0, 1, 4]);
.put(s1, 3);
assert(buf == [0, 1, 4, 9]);
// call completed
import rx;
import std.typecons;
auto s0 = new SubjectObject!int;
auto s1 = new SubjectObject!int;
auto s2 = new SubjectObject!int;
auto observer = new CounterObserver!(Tuple!(int, int, int));
auto disposable = zip(s0, s1, s2).doSubscribe(observer);
scope (exit)
disposable.dispose();
.put(s0, 100);
.put(s1, 10);
.put(s2, 1);
assert(observer.putCount == 1);
assert(observer.lastValue == tuple(100, 10, 1));
s0.completed();
assert(observer.completedCount == 0);
s1.completed();
assert(observer.completedCount == 0);
s2.completed();
assert(observer.completedCount == 1);
// use selector
import rx;
auto s0 = new SubjectObject!int;
auto s1 = new SubjectObject!int;
int[] buf;
auto disposable = zip!((a, b) => a + b)(s0, s1).doSubscribe!(n => buf ~= n);
scope (exit)
disposable.dispose();
.put(s0, 100);
.put(s0, 200);
.put(s1, 10);
.put(s1, 20);
assert(buf == [110, 220]);