import rx;
auto outer = new SubjectObject!(Observable!int);
Observable!int flatten = outer.merge().observableObject!int();
int[] xs;
auto disposable = flatten.doSubscribe((int n) { xs ~= n; });
scope (exit)
disposable.dispose();
auto inner1 = new SubjectObject!int;
auto inner2 = new SubjectObject!int;
.put(outer, inner1);
.put(inner1, 0);
assert(xs == [0]);
.put(inner1, 1);
assert(xs == [0, 1]);
.put(outer, inner2);
.put(inner1, 2);
assert(xs == [0, 1, 2]);
.put(inner2, 3);
assert(xs == [0, 1, 2, 3]);
.put(inner2, 4);
assert(xs == [0, 1, 2, 3, 4]);
import rx;
auto outer = new SubjectObject!(Observable!int);
Observable!int flatten = outer.merge().observableObject!int();
auto observer = new CounterObserver!int;
auto disposable = flatten.doSubscribe(observer);
scope (exit)
disposable.dispose();
auto inner = new SubjectObject!int;
.put(outer, inner);
.put(inner, 0);
inner.completed();
assert(observer.completedCount == 0);
outer.completed();
assert(observer.completedCount == 1);
import rx;
auto outer = new SubjectObject!(Observable!int);
Observable!int flatten = outer.merge().observableObject!int();
auto observer = new CounterObserver!int;
auto disposable = flatten.doSubscribe(observer);
scope (exit)
disposable.dispose();
auto inner = new SubjectObject!int;
.put(outer, inner);
.put(inner, 0);
outer.failure(new Exception("TEST"));
assert(observer.failureCount == 1);
.put(inner, 1);
import std : format;
assert(observer.putCount == 1, format!"putCount: %d"(observer.putCount));
import rx.algorithm.groupby : groupBy;
import rx.algorithm.map : map;
import rx.algorithm.fold : fold;
import rx.subject : SubjectObject, CounterObserver;
auto subject = new SubjectObject!int;
auto counted = subject.groupBy!(n => n % 10)
.map!(o => o.fold!((a, b) => a + 1)(0))
.merge();
auto counter = new CounterObserver!int;
auto disposable = counted.subscribe(counter);
subject.put(0);
subject.put(0);
assert(counter.putCount == 0);
subject.completed();
assert(counter.putCount == 1);
assert(counter.lastValue == 2);
1 import std.format : format;
2 import rx;
3
4 auto outer = new SubjectObject!(Observable!int);
5 auto inner_pair1 = new SubjectObject!int;
6 auto inner_pair2 = new SubjectObject!int;
7 auto inner_flat1 = new SubjectObject!int;
8 auto inner_flat2 = new SubjectObject!int;
9
10 auto mergePair = merge(inner_pair1, inner_pair2);
11 auto mergeFlat = outer.merge();
12
13 auto counter1 = new CounterObserver!int;
14 auto counter2 = new CounterObserver!int;
15
16 auto disposable1 = mergePair.doSubscribe(counter1);
17 auto disposable2 = mergeFlat.doSubscribe(counter2);
18 .put(outer, inner_flat1);
19 .put(outer, inner_flat2);
20
21 .put(inner_pair1, 0);
22 .put(inner_flat1, 0);
23
24 .put(inner_pair2, 1);
25 .put(inner_flat2, 1);
26
27 assert(counter1.putCount == counter2.putCount);
28 assert(counter1.lastValue == counter2.lastValue);
29 assert(counter1.completedCount == counter2.completedCount);
30 assert(counter1.failureCount == counter2.failureCount);
31
32 inner_pair1.completed();
33 inner_flat1.completed();
34
35 assert(counter1.putCount == counter2.putCount);
36 assert(counter1.lastValue == counter2.lastValue);
37 assert(counter1.completedCount == counter2.completedCount,
38 format!"%d == %d"(counter1.completedCount, counter2.completedCount));
39 assert(counter1.failureCount == counter2.failureCount);
40
41 .put(inner_pair2, 10);
42 .put(inner_flat2, 10);
43
44 assert(counter1.putCount == counter2.putCount);
45 assert(counter1.lastValue == counter2.lastValue);
46 assert(counter1.completedCount == counter2.completedCount);
47 assert(counter1.failureCount == counter2.failureCount);
48
49 disposable1.dispose();
50 disposable2.dispose();
51
52 assert(counter1.putCount == counter2.putCount);
53 assert(counter1.lastValue == counter2.lastValue);
54 assert(counter1.completedCount == counter2.completedCount);
55 assert(counter1.failureCount == counter2.failureCount);
56
57 .put(inner_pair2, 100);
58 .put(inner_flat2, 100);
59
60 assert(counter1.putCount == counter2.putCount);
61 assert(counter1.lastValue == counter2.lastValue);
62 assert(counter1.completedCount == counter2.completedCount);
63 assert(counter1.failureCount == counter2.failureCount);