1 /+++++++++++++++++++++++++++++ 2 + This module defines some operations like range. 3 +/ 4 module rx.range; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.util; 10 11 import core.atomic : cas, atomicLoad; 12 import std.range : put; 13 14 /+++++++++++++++++++++++++++++ 15 + Overview 16 +/ 17 unittest 18 { 19 import rx.subject; 20 import std.algorithm : equal; 21 import std.array : appender; 22 import std.conv : to; 23 24 auto subject = new SubjectObject!int; 25 auto pub = subject.drop(2).take(3); 26 27 auto buf = appender!(int[]); 28 auto disposable = pub.subscribe(observerObject!int(buf)); 29 30 foreach (i; 0 .. 10) 31 { 32 subject.put(i); 33 } 34 35 auto result = buf.data; 36 assert(equal(result, [2, 3, 4])); 37 } 38 39 //#################### 40 // Drop 41 //#################### 42 ///Creates the observable that results from discarding the first n elements from the given source. 43 auto drop(TObservable)(auto ref TObservable observable, size_t n) 44 { 45 static struct DropObservable 46 { 47 public: 48 alias ElementType = TObservable.ElementType; 49 50 public: 51 this(TObservable observable, size_t n) 52 { 53 _observable = observable; 54 _count = n; 55 } 56 57 public: 58 auto subscribe(TObserver)(TObserver observer) 59 { 60 static struct DropObserver 61 { 62 mixin SimpleObserverImpl!(TObserver, ElementType); 63 64 public: 65 this(TObserver observer, size_t count) 66 { 67 _observer = observer; 68 _counter = new shared(AtomicCounter)(count); 69 } 70 71 static if (hasCompleted!TObserver || hasFailure!TObserver) 72 { 73 this(TObserver observer, size_t count, Disposable disposable) 74 { 75 _observer = observer; 76 _counter = new shared(AtomicCounter)(count); 77 _disposable = disposable; 78 } 79 } 80 81 private: 82 void putImpl(ElementType obj) 83 { 84 if (_counter.tryUpdateCount()) 85 { 86 .put(_observer, obj); 87 } 88 } 89 90 private: 91 shared(AtomicCounter) _counter; 92 } 93 94 static if (hasCompleted!TObserver || hasFailure!TObserver) 95 { 96 auto disposable = new SingleAssignmentDisposable; 97 disposable.setDisposable(disposableObject(doSubscribe(_observable, 98 DropObserver(observer, _count, disposable)))); 99 return disposable; 100 } 101 else 102 { 103 return doSubscribe(_observable, DropObserver(observer, _count)); 104 } 105 } 106 107 private: 108 TObservable _observable; 109 size_t _count; 110 } 111 112 return DropObservable(observable, n); 113 } 114 /// 115 unittest 116 { 117 import rx.subject; 118 119 auto subject = new SubjectObject!int; 120 auto dropped = subject.drop(1); 121 static assert(isObservable!(typeof(dropped), int)); 122 123 import std.array : appender; 124 125 auto buf = appender!(int[]); 126 auto disposable = dropped.subscribe(buf); 127 128 subject.put(0); 129 assert(buf.data.length == 0); 130 subject.put(1); 131 assert(buf.data.length == 1); 132 133 auto buf2 = appender!(int[]); 134 dropped.subscribe(buf2); 135 assert(buf2.data.length == 0); 136 subject.put(2); 137 assert(buf2.data.length == 0); 138 assert(buf.data.length == 2); 139 subject.put(3); 140 assert(buf2.data.length == 1); 141 assert(buf.data.length == 3); 142 } 143 144 unittest 145 { 146 import rx.subject : SubjectObject; 147 148 auto sub = new SubjectObject!(int[]); 149 int count = 0; 150 auto d = sub.drop(1).subscribe((int) { count++; }); 151 scope (exit) 152 d.dispose(); 153 154 assert(count == 0); 155 sub.put([1, 2]); 156 assert(count == 0); 157 sub.put([2, 3]); 158 assert(count == 2); 159 } 160 161 //#################### 162 // Take 163 //#################### 164 ///Creates a sub-observable consisting of only up to the first n elements of the given source. 165 auto take(TObservable)(auto ref TObservable observable, size_t n) 166 { 167 static struct TakeObservable 168 { 169 public: 170 alias ElementType = TObservable.ElementType; 171 172 public: 173 this(TObservable observable, size_t n) 174 { 175 _observable = observable; 176 _count = n; 177 } 178 179 public: 180 auto subscribe(TObserver)(TObserver observer) 181 { 182 static struct TakeObserver 183 { 184 public: 185 this(TObserver observer, size_t count, Disposable disposable) 186 { 187 _observer = observer; 188 _count = count; 189 _disposable = disposable; 190 } 191 192 public: 193 void put(ElementType obj) 194 { 195 shared(size_t) oldValue = void; 196 size_t newValue = void; 197 do 198 { 199 oldValue = _count; 200 if (oldValue == 0) 201 return; 202 203 newValue = atomicLoad(oldValue) - 1; 204 } 205 while (!cas(&_count, oldValue, newValue)); 206 207 .put(_observer, obj); 208 if (newValue == 0) 209 { 210 static if (hasCompleted!TObserver) 211 { 212 _observer.completed(); 213 } 214 _disposable.dispose(); 215 } 216 } 217 218 void completed() 219 { 220 static if (hasCompleted!TObserver) 221 { 222 _observer.completed(); 223 } 224 _disposable.dispose(); 225 } 226 227 void failure(Exception e) 228 { 229 static if (hasFailure!TObserver) 230 { 231 _observer.failure(e); 232 } 233 _disposable.dispose(); 234 } 235 236 private: 237 TObserver _observer; 238 shared(size_t) _count; 239 Disposable _disposable; 240 } 241 242 auto disposable = new SingleAssignmentDisposable; 243 disposable.setDisposable(disposableObject(doSubscribe(_observable, 244 TakeObserver(observer, _count, disposable)))); 245 return disposable; 246 } 247 248 private: 249 TObservable _observable; 250 size_t _count; 251 } 252 253 return TakeObservable(observable, n); 254 } 255 /// 256 unittest 257 { 258 import std.array; 259 import rx.subject; 260 261 auto pub = new SubjectObject!int; 262 auto sub = appender!(int[]); 263 264 auto d = pub.take(2).subscribe(sub); 265 foreach (i; 0 .. 10) 266 { 267 pub.put(i); 268 } 269 270 import std.algorithm; 271 272 assert(equal(sub.data, [0, 1])); 273 } 274 275 unittest 276 { 277 import rx.subject; 278 279 auto subject = new SubjectObject!int; 280 auto taken = subject.take(1); 281 static assert(isObservable!(typeof(taken), int)); 282 283 import std.array : appender; 284 285 auto buf = appender!(int[]); 286 auto disposable = taken.subscribe(buf); 287 288 subject.put(0); 289 assert(buf.data.length == 1); 290 subject.put(1); 291 assert(buf.data.length == 1); 292 293 auto buf2 = appender!(int[]); 294 taken.subscribe(buf2); 295 assert(buf2.data.length == 0); 296 subject.put(2); 297 assert(buf2.data.length == 1); 298 assert(buf.data.length == 1); 299 subject.put(3); 300 assert(buf2.data.length == 1); 301 assert(buf.data.length == 1); 302 } 303 304 unittest 305 { 306 import rx.subject; 307 308 auto sub = new SubjectObject!int; 309 auto taken = sub.take(2); 310 311 int countPut = 0; 312 int countCompleted = 0; 313 struct TestObserver 314 { 315 void put(int n) 316 { 317 countPut++; 318 } 319 320 void completed() 321 { 322 countCompleted++; 323 } 324 } 325 326 auto d = taken.doSubscribe(TestObserver()); 327 assert(countPut == 0); 328 sub.put(1); 329 assert(countPut == 1); 330 assert(countCompleted == 0); 331 sub.put(2); 332 assert(countPut == 2); 333 assert(countCompleted == 1); 334 } 335 336 //#################### 337 // TakeLast 338 //#################### 339 ///Creates a observable that take only a last element of the given source. 340 auto takeLast(TObservable)(auto ref TObservable observable) 341 { 342 static struct TakeLastObservable 343 { 344 public: 345 alias ElementType = TObservable.ElementType; 346 347 public: 348 this(ref TObservable observable) 349 { 350 _observable = observable; 351 } 352 353 public: 354 auto subscribe(TObserver)(auto ref TObserver observer) 355 { 356 static class TakeLastObserver 357 { 358 public: 359 this(ref TObserver observer, SingleAssignmentDisposable disposable) 360 { 361 _observer = observer; 362 _disposable = disposable; 363 } 364 365 public: 366 void put(ElementType obj) 367 { 368 _current = obj; 369 _hasValue = true; 370 } 371 372 void completed() 373 { 374 if (_hasValue) 375 .put(_observer, _current); 376 377 static if (hasCompleted!TObserver) 378 { 379 _observer.completed(); 380 } 381 _disposable.dispose(); 382 } 383 384 static if (hasFailure!TObserver) 385 { 386 void failure(Exception e) 387 { 388 _observer.failure(e); 389 } 390 } 391 392 private: 393 bool _hasValue = false; 394 ElementType _current; 395 TObserver _observer; 396 SingleAssignmentDisposable _disposable; 397 } 398 399 auto d = new SingleAssignmentDisposable; 400 d.setDisposable(disposableObject(doSubscribe(_observable, 401 new TakeLastObserver(observer, d)))); 402 return d; 403 } 404 405 private: 406 TObservable _observable; 407 } 408 409 return TakeLastObservable(observable); 410 } 411 /// 412 unittest 413 { 414 import rx.subject; 415 416 auto sub = new SubjectObject!int; 417 418 int putCount = 0; 419 int completedCount = 0; 420 struct TestObserver 421 { 422 void put(int n) 423 { 424 putCount++; 425 } 426 427 void completed() 428 { 429 completedCount++; 430 } 431 } 432 433 auto d = sub.takeLast.subscribe(TestObserver()); 434 435 assert(putCount == 0); 436 sub.put(1); 437 assert(putCount == 0); 438 sub.put(10); 439 assert(putCount == 0); 440 sub.completed(); 441 assert(putCount == 1); 442 assert(completedCount == 1); 443 444 sub.put(100); 445 assert(putCount == 1); 446 assert(completedCount == 1); 447 } 448 449 unittest 450 { 451 import rx.subject : SubjectObject; 452 453 auto sub = new SubjectObject!(int[]); 454 455 int count = 0; 456 auto d = sub.takeLast.subscribe((int) { count++; }); 457 scope(exit) d.dispose(); 458 459 assert(count == 0); 460 sub.put([0]); 461 assert(count == 0); 462 sub.put([1, 2]); 463 assert(count == 0); 464 sub.completed(); 465 assert(count == 2); 466 }