import std.algorithm; import rx; auto source = new SubjectObject!int; auto stopper = new SubjectObject!int; int[] buf; auto disposable = source.takeUntil(stopper).doSubscribe!((n) { buf ~= n; }); source.put(0); source.put(1); source.put(2); stopper.put(0); source.put(3); source.put(4); assert(equal(buf, [0, 1, 2]));