import rx;
auto sub = defer!(int, (observer) {
observer.put(10);
observer.put(20);
observer.put(30);
observer.completed();
return NopDisposable.instance;
});
ReplaySubject!int nums = sub.asReplaySubject(4);
int[] data;
nums.doSubscribe!(x => data ~= x);
assert(data == [10, 20, 30]);
import rx;
auto sub = defer!(int, (observer) {
observer.put(10);
observer.put(20);
observer.put(30);
observer.failure(null);
return NopDisposable.instance;
});
ReplaySubject!int nums = sub.asReplaySubject(2);
int[] data;
nums.doSubscribe!(x => data ~= x);
assert(data == [20, 30]);