import rx.subject : SubjectObject; auto sub = new SubjectObject!int; auto sum = sub.fold!"a+b"(0); int result = 0; auto disposable = sum.doSubscribe((int n) { result = n; }); scope (exit) disposable.dispose(); foreach (i; 1 .. 11) sub.put(i); assert(result == 0); sub.completed(); assert(result == 55);