import rx.observable : defer;
auto sub = defer!int((Observer!int observer) {
.put(observer, 100);
return NopDisposable.instance;
});
auto scheduler = new ImmediateScheduler;
auto scheduled = sub.subscribeOn(scheduler);
int value = 0;
auto d = scheduled.doSubscribe((int n) { value = n; });
scope (exit)
d.dispose();
assert(value == 100);
import rx.observable : defer;
import rx.util : EventSignal;
auto sub = defer!int((Observer!int observer) {
.put(observer, 100);
return NopDisposable.instance;
});
auto scheduler = new TaskPoolScheduler;
auto scheduled = sub.subscribeOn(scheduler);
int value = 0;
auto signal = new EventSignal;
auto d = scheduled.subscribe((int n) { value = n; signal.setSignal(); });
scope (exit)
d.dispose();
signal.wait();
assert(value == 100);