import rx.subject : SubjectObject;
import std.array : appender;
auto sub = new SubjectObject!int;
auto buf = appender!(int[]);
auto d = sub.buffered(2).doSubscribe(buf);
sub.put(0);
sub.put(1);
assert(buf.data.length == 2);
assert(buf.data[0] == 0);
assert(buf.data[1] == 1);
sub.put(2);
assert(buf.data.length == 2);
sub.completed();
assert(buf.data.length == 3);
assert(buf.data[2] == 2);
import rx.subject : SubjectObject;
import std.array : appender;
import std.parallelism : taskPool, task;
auto sub = new SubjectObject!int;
auto buf = appender!(int[]);
auto d = sub.buffered(100).doSubscribe(buf);
import std.range : iota;
auto t1 = task({ .put(sub, iota(100)); });
auto t2 = task({ .put(sub, iota(100)); });
auto t3 = task({ .put(sub, iota(100)); });
taskPool.put(t1);
taskPool.put(t2);
taskPool.put(t3);
t1.workForce;
t2.workForce;
t3.workForce;
sub.completed();
assert(buf.data.length == 300);