1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'buffer' 3 +/ 4 module rx.algorithm.buffer; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 10 import std.range : put; 11 12 class BufferedObserver(E, TObserver) 13 { 14 this(TObserver observer, Disposable disposable, size_t size) 15 { 16 assert(size > 0); 17 18 _observer = observer; 19 _disposable = disposable; 20 21 _gate = new Object; 22 _bufferSize = size; 23 _buffer = new E[](size); 24 _buffer.length = 0; 25 } 26 27 void put(E obj) 28 { 29 synchronized (_gate) 30 { 31 _buffer ~= obj; 32 if (_buffer.length == _bufferSize) 33 { 34 .put(_observer, _buffer); 35 _buffer.length = 0; 36 } 37 } 38 } 39 40 void completed() 41 { 42 synchronized (_gate) 43 { 44 if (_buffer.length > 0) 45 { 46 .put(_observer, _buffer); 47 _buffer.length = 0; 48 } 49 } 50 51 static if (hasCompleted!TObserver) 52 { 53 _observer.completed(); 54 } 55 _disposable.dispose(); 56 } 57 58 static if (hasFailure!TObserver) 59 { 60 void failure(Exception e) 61 { 62 _observer.failure(e); 63 _disposable.dispose(); 64 } 65 } 66 67 private: 68 TObserver _observer; 69 Object _gate; 70 size_t _bufferSize; 71 E[] _buffer; 72 Disposable _disposable; 73 } 74 75 unittest 76 { 77 alias Bufd = BufferedObserver!(int, Observer!int); 78 79 size_t putCount, completedCount; 80 auto observer = observerObject!int(makeObserver((int n) { putCount++; }, { 81 completedCount++; 82 })); 83 auto bufd = new Bufd(observer, NopDisposable.instance, 2); 84 85 bufd.put(1); 86 assert(putCount == 0); 87 bufd.put(1); 88 assert(putCount == 2); 89 bufd.put(1); 90 assert(putCount == 2); 91 assert(completedCount == 0); 92 bufd.completed(); 93 assert(putCount == 3); 94 assert(completedCount == 1); 95 } 96 97 struct BufferedObservable(TObservable) 98 { 99 alias ElementType = TObservable.ElementType[]; 100 101 this(TObservable observable, size_t bufferSize) 102 { 103 _observable = observable; 104 _bufferSize = bufferSize; 105 } 106 107 auto subscribe(TObserver)(auto ref TObserver observer) 108 { 109 alias ObserverType = BufferedObserver!(TObservable.ElementType, TObserver); 110 111 auto subscription = new SingleAssignmentDisposable; 112 subscription.setDisposable(disposableObject(_observable.doSubscribe(new ObserverType(observer, 113 subscription, _bufferSize)))); 114 return subscription; 115 } 116 117 private: 118 TObservable _observable; 119 size_t _bufferSize; 120 } 121 122 unittest 123 { 124 alias TObservable = BufferedObservable!(Observable!int); 125 static assert(isObservable!(TObservable, int[])); 126 127 import rx.subject : SubjectObject; 128 import std.array : appender; 129 130 auto sub = new SubjectObject!int; 131 auto buf = appender!(int[]); 132 133 auto observable = TObservable(sub, 2); 134 auto d = observable.subscribe(buf); 135 136 sub.put(0); 137 sub.put(1); 138 assert(buf.data.length == 2); 139 assert(buf.data[0] == 0); 140 assert(buf.data[1] == 1); 141 sub.put(2); 142 assert(buf.data.length == 2); 143 sub.completed(); 144 assert(buf.data.length == 3); 145 assert(buf.data[2] == 2); 146 } 147 148 /// 149 BufferedObservable!(TObservable) buffered(TObservable)( 150 auto ref TObservable observable, size_t bufferSize) 151 { 152 return typeof(return)(observable, bufferSize); 153 } 154 155 /// 156 unittest 157 { 158 import rx.subject : SubjectObject; 159 import std.array : appender; 160 161 auto sub = new SubjectObject!int; 162 auto buf = appender!(int[]); 163 164 auto d = sub.buffered(2).doSubscribe(buf); 165 166 sub.put(0); 167 sub.put(1); 168 assert(buf.data.length == 2); 169 assert(buf.data[0] == 0); 170 assert(buf.data[1] == 1); 171 sub.put(2); 172 assert(buf.data.length == 2); 173 sub.completed(); 174 assert(buf.data.length == 3); 175 assert(buf.data[2] == 2); 176 } 177 178 /// 179 unittest 180 { 181 import rx.subject : SubjectObject; 182 import std.array : appender; 183 import std.parallelism : taskPool, task; 184 185 auto sub = new SubjectObject!int; 186 auto buf = appender!(int[]); 187 auto d = sub.buffered(100).doSubscribe(buf); 188 189 import std.range : iota; 190 191 auto t1 = task({ .put(sub, iota(100)); }); 192 auto t2 = task({ .put(sub, iota(100)); }); 193 auto t3 = task({ .put(sub, iota(100)); }); 194 taskPool.put(t1); 195 taskPool.put(t2); 196 taskPool.put(t3); 197 198 t1.workForce; 199 t2.workForce; 200 t3.workForce; 201 202 sub.completed(); 203 204 assert(buf.data.length == 300); 205 }