import rx.subject : SubjectObject; auto subject = new SubjectObject!int; auto sum = subject.scan!((a, b) => a + b)(0); static assert(isObservable!(typeof(sum), int)); import std.array : appender; auto buf = appender!(int[]); auto disposable = sum.subscribe(buf); scope (exit) disposable.dispose(); foreach (_; 0 .. 5) { subject.put(1); } auto result = buf.data; assert(result.length == 5); import std.algorithm : equal; assert(equal(result, [1, 2, 3, 4, 5]));