import rx.algorithm.groupby : groupBy; import rx.algorithm.map : map; import rx.algorithm.fold : fold; import rx.subject : SubjectObject, CounterObserver; auto subject = new SubjectObject!int; auto counted = subject.groupBy!(n => n % 10).map!(o => o.fold!((a, b) => a + 1)(0)).merge(); auto counter = new CounterObserver!int; auto disposable = counted.subscribe(counter); subject.put(0); subject.put(0); assert(counter.putCount == 0); subject.completed(); assert(counter.putCount == 1); assert(counter.lastValue == 2);