merge

  1. MergeObservable!(T1, T2) merge(T1 observable1, T2 observable2)
  2. auto merge(TObservable observable)
    merge
    (
    TObservable
    )
    (
    auto ref TObservable observable
    )
    if (
    isObservable!TObservable &&
    isObservable!(TObservable.ElementType)
    )

Examples

import rx;

auto outer = new SubjectObject!(Observable!int);

Observable!int flatten = outer.merge().observableObject!int();

int[] xs;
auto disposable = flatten.doSubscribe((int n) { xs ~= n; });
scope (exit)
    disposable.dispose();

auto inner1 = new SubjectObject!int;
auto inner2 = new SubjectObject!int;

.put(outer, inner1);
.put(inner1, 0);
assert(xs == [0]);
.put(inner1, 1);
assert(xs == [0, 1]);

.put(outer, inner2);
.put(inner1, 2);
assert(xs == [0, 1, 2]);
.put(inner2, 3);
assert(xs == [0, 1, 2, 3]);
.put(inner2, 4);
assert(xs == [0, 1, 2, 3, 4]);
import rx;

auto outer = new SubjectObject!(Observable!int);

Observable!int flatten = outer.merge().observableObject!int();

auto observer = new CounterObserver!int;
auto disposable = flatten.doSubscribe(observer);
scope (exit)
    disposable.dispose();

auto inner = new SubjectObject!int;

.put(outer, inner);
.put(inner, 0);

inner.completed();
assert(observer.completedCount == 0);
outer.completed();
assert(observer.completedCount == 1);
import rx;

auto outer = new SubjectObject!(Observable!int);

Observable!int flatten = outer.merge().observableObject!int();

auto observer = new CounterObserver!int;
auto disposable = flatten.doSubscribe(observer);
scope (exit)
    disposable.dispose();

auto inner = new SubjectObject!int;

.put(outer, inner);
.put(inner, 0);

outer.failure(new Exception("TEST"));
assert(observer.failureCount == 1);
.put(inner, 1);
import std : format;

assert(observer.putCount == 1, format!"putCount: %d"(observer.putCount));
import rx.algorithm.groupby : groupBy;
import rx.algorithm.map : map;
import rx.algorithm.fold : fold;
import rx.subject : SubjectObject, CounterObserver;

auto subject = new SubjectObject!int;
auto counted = subject.groupBy!(n => n % 10)
    .map!(o => o.fold!((a, b) => a + 1)(0))
    .merge();

auto counter = new CounterObserver!int;

auto disposable = counted.subscribe(counter);

subject.put(0);
subject.put(0);
assert(counter.putCount == 0);
subject.completed();
assert(counter.putCount == 1);
assert(counter.lastValue == 2);
1 import std.format : format;
2 import rx;
3 
4 auto outer = new SubjectObject!(Observable!int);
5 auto inner_pair1 = new SubjectObject!int;
6 auto inner_pair2 = new SubjectObject!int;
7 auto inner_flat1 = new SubjectObject!int;
8 auto inner_flat2 = new SubjectObject!int;
9 
10 auto mergePair = merge(inner_pair1, inner_pair2);
11 auto mergeFlat = outer.merge();
12 
13 auto counter1 = new CounterObserver!int;
14 auto counter2 = new CounterObserver!int;
15 
16 auto disposable1 = mergePair.doSubscribe(counter1);
17 auto disposable2 = mergeFlat.doSubscribe(counter2);
18 .put(outer, inner_flat1);
19 .put(outer, inner_flat2);
20 
21 .put(inner_pair1, 0);
22 .put(inner_flat1, 0);
23 
24 .put(inner_pair2, 1);
25 .put(inner_flat2, 1);
26 
27 assert(counter1.putCount == counter2.putCount);
28 assert(counter1.lastValue == counter2.lastValue);
29 assert(counter1.completedCount == counter2.completedCount);
30 assert(counter1.failureCount == counter2.failureCount);
31 
32 inner_pair1.completed();
33 inner_flat1.completed();
34 
35 assert(counter1.putCount == counter2.putCount);
36 assert(counter1.lastValue == counter2.lastValue);
37 assert(counter1.completedCount == counter2.completedCount,
38         format!"%d == %d"(counter1.completedCount, counter2.completedCount));
39 assert(counter1.failureCount == counter2.failureCount);
40 
41 .put(inner_pair2, 10);
42 .put(inner_flat2, 10);
43 
44 assert(counter1.putCount == counter2.putCount);
45 assert(counter1.lastValue == counter2.lastValue);
46 assert(counter1.completedCount == counter2.completedCount);
47 assert(counter1.failureCount == counter2.failureCount);
48 
49 disposable1.dispose();
50 disposable2.dispose();
51 
52 assert(counter1.putCount == counter2.putCount);
53 assert(counter1.lastValue == counter2.lastValue);
54 assert(counter1.completedCount == counter2.completedCount);
55 assert(counter1.failureCount == counter2.failureCount);
56 
57 .put(inner_pair2, 100);
58 .put(inner_flat2, 100);
59 
60 assert(counter1.putCount == counter2.putCount);
61 assert(counter1.lastValue == counter2.lastValue);
62 assert(counter1.completedCount == counter2.completedCount);
63 assert(counter1.failureCount == counter2.failureCount);

Meta