1 /+++++++++++++++++++++++++++++ 2 + This module defines the Subject and some implements. 3 +/ 4 module rx.subject; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.util : assumeThreadLocal; 10 11 import core.atomic : atomicLoad, cas; 12 import std.range : put; 13 14 ///Represents an object that is both an observable sequence as well as an observer. 15 interface Subject(E) : Observer!E, Observable!E 16 { 17 } 18 19 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers. 20 class SubjectObject(E) : Subject!E 21 { 22 alias ElementType = E; 23 24 public: 25 /// 26 this() 27 { 28 _observer = cast(shared) NopObserver!E.instance; 29 } 30 31 public: 32 /// 33 void put(E obj) 34 { 35 auto temp = assumeThreadLocal(atomicLoad(_observer)); 36 .put(temp, obj); 37 } 38 /// 39 void completed() 40 { 41 shared(Observer!E) oldObserver = void; 42 shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance; 43 Observer!E temp = void; 44 do 45 { 46 oldObserver = _observer; 47 temp = assumeThreadLocal(atomicLoad(oldObserver)); 48 if (cast(DoneObserver!E) temp) 49 break; 50 } 51 while (!cas(&_observer, oldObserver, newObserver)); 52 temp.completed(); 53 } 54 /// 55 void failure(Exception error) 56 { 57 shared(Observer!E) oldObserver = void; 58 shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error); 59 Observer!E temp = void; 60 do 61 { 62 oldObserver = _observer; 63 temp = assumeThreadLocal(atomicLoad(oldObserver)); 64 if (cast(DoneObserver!E) temp) 65 break; 66 } 67 while (!cas(&_observer, oldObserver, newObserver)); 68 temp.failure(error); 69 } 70 71 /// 72 Disposable subscribe(T)(T observer) 73 { 74 return subscribe(observerObject!E(observer)); 75 } 76 /// 77 Disposable subscribe(Observer!E observer) 78 { 79 shared(Observer!E) oldObserver = void; 80 shared(Observer!E) newObserver = void; 81 do 82 { 83 oldObserver = _observer; 84 auto temp = assumeThreadLocal(atomicLoad(oldObserver)); 85 86 if (temp is DoneObserver!E.instance) 87 { 88 observer.completed(); 89 return NopDisposable.instance; 90 } 91 92 if (auto fail = cast(DoneObserver!E) temp) 93 { 94 observer.failure(fail.exception); 95 return NopDisposable.instance; 96 } 97 98 if (auto composite = cast(CompositeObserver!E) temp) 99 { 100 newObserver = cast(shared) composite.add(observer); 101 } 102 else if (auto nop = cast(NopObserver!E) temp) 103 { 104 newObserver = cast(shared) observer; 105 } 106 else 107 { 108 newObserver = cast(shared)(new CompositeObserver!E([temp, observer])); 109 } 110 } 111 while (!cas(&_observer, oldObserver, newObserver)); 112 113 return subscription(this, observer); 114 } 115 116 /// 117 void unsubscribe(Observer!E observer) 118 { 119 shared(Observer!E) oldObserver = void; 120 shared(Observer!E) newObserver = void; 121 do 122 { 123 oldObserver = _observer; 124 125 import rx.util : assumeThreadLocal; 126 127 auto temp = assumeThreadLocal(atomicLoad(oldObserver)); 128 if (auto composite = cast(CompositeObserver!E) temp) 129 { 130 newObserver = cast(shared) composite.remove(observer); 131 } 132 else 133 { 134 if (temp !is observer) 135 return; 136 137 newObserver = cast(shared) NopObserver!E.instance; 138 } 139 } 140 while (!cas(&_observer, oldObserver, newObserver)); 141 } 142 143 private: 144 shared(Observer!E) _observer; 145 } 146 147 /// 148 unittest 149 { 150 import std.array : appender; 151 152 auto data = appender!(int[])(); 153 auto subject = new SubjectObject!int; 154 auto disposable = subject.subscribe(observerObject!(int)(data)); 155 assert(disposable !is null); 156 subject.put(0); 157 subject.put(1); 158 159 import std.algorithm : equal; 160 161 assert(equal(data.data, [0, 1])); 162 163 disposable.dispose(); 164 subject.put(2); 165 assert(equal(data.data, [0, 1])); 166 } 167 168 unittest 169 { 170 static assert(isObserver!(SubjectObject!int, int)); 171 static assert(isObservable!(SubjectObject!int, int)); 172 static assert(!isObservable!(SubjectObject!int, string)); 173 static assert(!isObservable!(SubjectObject!int, string)); 174 } 175 176 unittest 177 { 178 auto subject = new SubjectObject!int; 179 auto observer = new CounterObserver!int; 180 auto disposable = subject.subscribe(observer); 181 scope (exit) 182 disposable.dispose(); 183 184 subject.put(0); 185 subject.put(1); 186 187 assert(observer.putCount == 2); 188 subject.completed(); 189 subject.put(2); 190 assert(observer.putCount == 2); 191 assert(observer.completedCount == 1); 192 } 193 194 unittest 195 { 196 auto subject = new SubjectObject!int; 197 auto observer = new CounterObserver!int; 198 auto disposable = subject.subscribe(observer); 199 scope (exit) 200 disposable.dispose(); 201 202 subject.put(0); 203 subject.put(1); 204 205 assert(observer.putCount == 2); 206 auto ex = new Exception("Exception"); 207 subject.failure(ex); 208 subject.put(2); 209 assert(observer.putCount == 2); 210 assert(observer.failureCount == 1); 211 assert(observer.lastException is ex); 212 } 213 214 unittest 215 { 216 import std.array : appender; 217 218 auto buf1 = appender!(int[]); 219 auto buf2 = appender!(int[]); 220 auto subject = new SubjectObject!int; 221 subject.subscribe(observerObject!(int)(buf1)); 222 subject.doSubscribe((int n) => buf2.put(n)); 223 224 assert(buf1.data.length == 0); 225 assert(buf2.data.length == 0); 226 subject.put(0); 227 assert(buf1.data.length == 1); 228 assert(buf2.data.length == 1); 229 assert(buf1.data[0] == buf2.data[0]); 230 } 231 232 unittest 233 { 234 auto sub = new SubjectObject!int; 235 sub.completed(); 236 237 auto observer = new CounterObserver!int; 238 assert(observer.putCount == 0); 239 assert(observer.completedCount == 0); 240 assert(observer.failureCount == 0); 241 sub.subscribe(observer); 242 assert(observer.putCount == 0); 243 assert(observer.completedCount == 1); 244 assert(observer.failureCount == 0); 245 } 246 247 unittest 248 { 249 auto sub = new SubjectObject!int; 250 auto ex = new Exception("Exception"); 251 sub.failure(ex); 252 253 auto observer = new CounterObserver!int; 254 assert(observer.putCount == 0); 255 assert(observer.completedCount == 0); 256 assert(observer.failureCount == 0); 257 sub.subscribe(observer); 258 assert(observer.putCount == 0); 259 assert(observer.completedCount == 0); 260 assert(observer.failureCount == 1); 261 assert(observer.lastException is ex); 262 } 263 264 private class Subscription(TSubject, TObserver) : Disposable 265 { 266 public: 267 this(TSubject subject, TObserver observer) 268 { 269 _subject = subject; 270 _observer = observer; 271 } 272 273 public: 274 void dispose() 275 { 276 if (_subject !is null) 277 { 278 _subject.unsubscribe(_observer); 279 _subject = null; 280 } 281 } 282 283 private: 284 TSubject _subject; 285 TObserver _observer; 286 } 287 288 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)( 289 TSubject subject, TObserver observer) 290 { 291 return new typeof(return)(subject, observer); 292 } 293 294 /// 295 class AsyncSubject(E) : Subject!E 296 { 297 public: 298 /// 299 Disposable subscribe(Observer!E observer) 300 { 301 Exception ex = null; 302 E value; 303 bool hasValue = false; 304 305 synchronized (this) 306 { 307 if (!_isStopped) 308 { 309 _observers ~= observer; 310 return subscription(this, observer); 311 } 312 313 ex = _exception; 314 hasValue = _hasValue; 315 value = _value; 316 } 317 318 if (ex !is null) 319 { 320 observer.failure(ex); 321 } 322 else if (hasValue) 323 { 324 .put(observer, value); 325 observer.completed(); 326 } 327 else 328 { 329 observer.completed(); 330 } 331 332 return NopDisposable.instance; 333 } 334 335 /// 336 auto subscribe(T)(T observer) 337 { 338 return subscribe(observerObject!E(observer)); 339 } 340 341 /// 342 void unsubscribe(Observer!E observer) 343 { 344 if (observer is null) 345 return; 346 347 synchronized (this) 348 { 349 import std.algorithm : remove, countUntil; 350 351 auto index = countUntil(_observers, observer); 352 if (index != -1) 353 { 354 _observers = remove(_observers, index); 355 } 356 } 357 } 358 359 public: 360 /// 361 void put(E value) 362 { 363 synchronized (this) 364 { 365 if (!_isStopped) 366 { 367 _value = value; 368 _hasValue = true; 369 } 370 } 371 } 372 373 /// 374 void completed() 375 { 376 Observer!E[] os = null; 377 378 E value; 379 bool hasValue = false; 380 381 synchronized (this) 382 { 383 if (!_isStopped) 384 { 385 os = _observers; 386 _observers.length = 0; 387 _isStopped = true; 388 value = _value; 389 hasValue = _hasValue; 390 } 391 } 392 393 if (os) 394 { 395 if (hasValue) 396 { 397 foreach (observer; os) 398 { 399 .put(observer, value); 400 observer.completed(); 401 } 402 } 403 else 404 { 405 foreach (observer; os) 406 { 407 observer.completed(); 408 } 409 } 410 } 411 } 412 413 /// 414 void failure(Exception e) 415 { 416 assert(e !is null); 417 418 Observer!E[] os = null; 419 synchronized (this) 420 { 421 if (!_isStopped) 422 { 423 os = _observers; 424 _observers.length = 0; 425 _isStopped = true; 426 _exception = e; 427 } 428 } 429 430 if (os) 431 { 432 foreach (observer; os) 433 { 434 observer.failure(e); 435 } 436 } 437 } 438 439 private: 440 Observer!E[] _observers; 441 bool _isStopped; 442 E _value; 443 bool _hasValue; 444 Exception _exception; 445 } 446 447 unittest 448 { 449 auto sub = new AsyncSubject!int; 450 451 .put(sub, 1); 452 sub.completed(); 453 454 auto observer = new CounterObserver!int; 455 456 assert(observer.hasNotBeenCalled); 457 458 sub.subscribe(observer); 459 460 assert(observer.putCount == 1); 461 assert(observer.completedCount == 1); 462 assert(observer.failureCount == 0); 463 assert(observer.lastValue == 1); 464 } 465 466 unittest 467 { 468 auto sub = new AsyncSubject!int; 469 auto observer = new CounterObserver!int; 470 471 auto d = sub.subscribe(observer); 472 scope (exit) 473 d.dispose(); 474 475 assert(observer.hasNotBeenCalled); 476 477 sub.put(100); 478 479 assert(observer.hasNotBeenCalled); 480 481 assert(sub._hasValue); 482 assert(sub._value == 100); 483 484 sub.completed(); 485 486 assert(observer.putCount == 1); 487 assert(observer.completedCount == 1); 488 assert(observer.failureCount == 0); 489 assert(observer.lastValue == 100); 490 } 491 492 unittest 493 { 494 auto sub = new AsyncSubject!int; 495 auto observer = new CounterObserver!int; 496 497 sub.put(100); 498 499 assert(sub._hasValue); 500 assert(sub._value == 100); 501 502 auto d = sub.subscribe(observer); 503 scope (exit) 504 d.dispose(); 505 506 assert(observer.hasNotBeenCalled); 507 508 sub.completed(); 509 510 assert(observer.putCount == 1); 511 assert(observer.completedCount == 1); 512 assert(observer.failureCount == 0); 513 assert(observer.lastValue == 100); 514 } 515 516 unittest 517 { 518 auto sub = new AsyncSubject!int; 519 auto observer = new CounterObserver!int; 520 521 auto d = sub.subscribe(observer); 522 523 d.dispose(); 524 assert(observer.hasNotBeenCalled); 525 526 sub.put(100); 527 assert(observer.hasNotBeenCalled); 528 529 sub.completed(); 530 assert(observer.hasNotBeenCalled); 531 } 532 533 unittest 534 { 535 auto sub = new AsyncSubject!int; 536 auto observer = new CounterObserver!int; 537 538 auto d = sub.subscribe(observer); 539 assert(observer.hasNotBeenCalled); 540 541 sub.put(100); 542 assert(observer.hasNotBeenCalled); 543 544 d.dispose(); 545 assert(observer.hasNotBeenCalled); 546 547 sub.completed(); 548 assert(observer.hasNotBeenCalled); 549 } 550 551 unittest 552 { 553 554 auto sub = new AsyncSubject!int; 555 auto observer = new CounterObserver!int; 556 557 sub.put(100); 558 assert(observer.hasNotBeenCalled); 559 560 auto d = sub.subscribe(observer); 561 assert(observer.hasNotBeenCalled); 562 563 d.dispose(); 564 assert(observer.hasNotBeenCalled); 565 566 sub.completed(); 567 assert(observer.hasNotBeenCalled); 568 } 569 570 unittest 571 { 572 auto sub = new AsyncSubject!int; 573 auto observer = new CounterObserver!int; 574 575 auto d = sub.subscribe(observer); 576 scope (exit) 577 d.dispose(); 578 579 assert(observer.hasNotBeenCalled); 580 581 sub.completed(); 582 583 assert(observer.putCount == 0); 584 assert(observer.completedCount == 1); 585 assert(observer.failureCount == 0); 586 } 587 588 unittest 589 { 590 auto sub = new AsyncSubject!int; 591 auto observer = new CounterObserver!int; 592 593 auto d = sub.subscribe(observer); 594 scope (exit) 595 d.dispose(); 596 597 assert(observer.hasNotBeenCalled); 598 599 auto ex = new Exception("TEST"); 600 sub.failure(ex); 601 602 assert(observer.putCount == 0); 603 assert(observer.completedCount == 0); 604 assert(observer.failureCount == 1); 605 assert(observer.lastException is ex); 606 } 607 608 unittest 609 { 610 auto sub = new AsyncSubject!int; 611 auto ex = new Exception("TEST"); 612 sub.failure(ex); 613 614 auto observer = new CounterObserver!int; 615 616 auto d = sub.subscribe(observer); 617 scope (exit) 618 d.dispose(); 619 620 assert(observer.putCount == 0); 621 assert(observer.completedCount == 0); 622 assert(observer.failureCount == 1); 623 assert(observer.lastException is ex); 624 } 625 626 unittest 627 { 628 auto sub = new AsyncSubject!int; 629 auto observer = new CounterObserver!int; 630 631 sub.completed(); 632 assert(observer.hasNotBeenCalled); 633 634 sub.subscribe(observer); 635 assert(observer.putCount == 0); 636 assert(observer.completedCount == 1); 637 assert(observer.failureCount == 0); 638 } 639 640 version (unittest) 641 { 642 class CounterObserver(T) : Observer!T 643 { 644 public: 645 size_t putCount; 646 size_t completedCount; 647 size_t failureCount; 648 T lastValue; 649 Exception lastException; 650 651 public: 652 bool hasNotBeenCalled() const pure nothrow @nogc @safe @property 653 { 654 return putCount == 0 && completedCount == 0 && failureCount == 0; 655 } 656 657 public: 658 void put(T obj) 659 { 660 putCount++; 661 lastValue = obj; 662 } 663 664 void completed() 665 { 666 completedCount++; 667 } 668 669 void failure(Exception e) 670 { 671 failureCount++; 672 lastException = e; 673 } 674 } 675 } 676 677 /// 678 class BehaviorSubject(E) : Subject!E 679 { 680 public: 681 /// 682 this() 683 { 684 this(E.init); 685 } 686 687 /// 688 this(E value) 689 { 690 _subject = new SubjectObject!E; 691 _value = value; 692 } 693 694 public: 695 /// 696 inout(E) value() inout @property 697 { 698 return _value; 699 } 700 701 /// 702 void value(E value) @property 703 { 704 if (_value != value) 705 { 706 _value = value; 707 .put(_subject, value); 708 } 709 } 710 711 public: 712 /// 713 auto subscribe(TObserver)(auto ref TObserver observer) 714 { 715 .put(observer, value); 716 return _subject.doSubscribe(observer); 717 } 718 719 /// 720 Disposable subscribe(Observer!E observer) 721 { 722 .put(observer, value); 723 return disposableObject(_subject.doSubscribe(observer)); 724 } 725 726 /// 727 void put(E obj) 728 { 729 value = obj; 730 } 731 732 /// 733 void completed() 734 { 735 _subject.completed(); 736 } 737 738 /// 739 void failure(Exception e) 740 { 741 _subject.failure(e); 742 } 743 744 private: 745 SubjectObject!E _subject; 746 E _value; 747 } 748 749 unittest 750 { 751 static assert(isObservable!(BehaviorSubject!int, int)); 752 static assert(is(BehaviorSubject!int.ElementType == int)); 753 } 754 755 unittest 756 { 757 int num = 0; 758 auto subject = new BehaviorSubject!int(100); 759 760 auto d = subject.doSubscribe((int n) { num = n; }); 761 assert(num == 100); 762 763 .put(subject, 1); 764 assert(num == 1); 765 766 d.dispose(); 767 .put(subject, 10); 768 assert(num == 1); 769 } 770 771 /// 772 auto asBehaviorSubject(TObservable)(auto ref TObservable observable) 773 { 774 alias E = TObservable.ElementType; 775 auto subject = new BehaviorSubject!E; 776 observable.doSubscribe(subject); 777 return subject; 778 } 779 780 /// 781 unittest 782 { 783 import rx; 784 785 auto num1 = new BehaviorSubject!int; 786 auto num2 = new BehaviorSubject!int; 787 788 BehaviorSubject!int sum = combineLatest!((l, r) => l + r)(num1, num2).asBehaviorSubject(); 789 790 assert(sum.value == 0); 791 num1.value = 10; 792 assert(sum.value == 10); 793 num2.value = 20; 794 assert(sum.value == 30); 795 } 796 797 /// 798 class ReplaySubject(E) : Subject!E 799 { 800 private: 801 RingBuffer!E _buffer; 802 SubjectObject!E _subject; 803 bool _completed; 804 805 public: 806 /// 807 this(size_t bufferSize) 808 { 809 _buffer = RingBuffer!E(bufferSize); 810 _subject = new SubjectObject!E; 811 } 812 813 public: 814 /// 815 Disposable subscribe(TObserver)(auto ref TObserver observer) 816 { 817 .put(observer, _buffer[]); 818 if (_completed) 819 return NopDisposable.instance; 820 else 821 return _subject.doSubscribe(observer).disposableObject(); 822 } 823 824 /// 825 Disposable subscribe(Observer!E observer) 826 { 827 .put(observer, _buffer[]); 828 if (_completed) 829 return NopDisposable.instance; 830 else 831 return disposableObject(_subject.doSubscribe(observer)); 832 } 833 834 /// 835 void put(E obj) 836 { 837 if (_completed) return; 838 .put(_buffer, obj); 839 .put(_subject, obj); 840 } 841 842 /// 843 void completed() 844 { 845 _completed = true; 846 _subject.completed(); 847 } 848 849 /// 850 void failure(Exception e) 851 { 852 _completed = true; 853 _subject.failure(e); 854 } 855 } 856 857 /// 858 unittest 859 { 860 auto sub = new ReplaySubject!int(1); 861 .put(sub, 1); 862 863 int[] buf; 864 auto d = sub.doSubscribe!(v => buf ~= v); 865 scope (exit) 866 d.dispose(); 867 868 assert(buf.length == 1); 869 assert(buf[0] == 1); 870 } 871 872 /// 873 unittest 874 { 875 auto sub = new ReplaySubject!int(1); 876 .put(sub, 1); 877 .put(sub, 2); 878 879 int[] buf; 880 auto d = sub.doSubscribe!(v => buf ~= v); 881 scope (exit) 882 d.dispose(); 883 884 assert(buf == [2]); 885 } 886 887 /// 888 unittest 889 { 890 auto sub = new ReplaySubject!int(2); 891 .put(sub, 1); 892 .put(sub, 2); 893 .put(sub, 3); 894 895 int[] buf; 896 auto d = sub.doSubscribe!(v => buf ~= v); 897 scope (exit) 898 d.dispose(); 899 900 assert(buf == [2, 3]); 901 } 902 903 unittest 904 { 905 auto sub = new ReplaySubject!int(2); 906 .put(sub, 1); 907 908 int[] buf; 909 auto d = sub.doSubscribe!(v => buf ~= v); 910 scope (exit) 911 d.dispose(); 912 913 .put(sub, 2); 914 915 assert(buf.length == 2); 916 assert(buf[0] == 1); 917 assert(buf[1] == 2); 918 } 919 920 unittest 921 { 922 auto sub = new ReplaySubject!int(2); 923 .put(sub, 1); 924 sub.completed(); 925 .put(sub, 2); 926 927 int[] buf; 928 sub.doSubscribe!(v => buf ~= v); 929 930 assert(buf == [1]); 931 } 932 933 unittest 934 { 935 auto sub = new ReplaySubject!int(2); 936 .put(sub, 1); 937 .put(sub, 2); 938 .put(sub, 3); 939 sub.completed(); 940 .put(sub, 4); 941 942 int[] buf; 943 sub.doSubscribe!(v => buf ~= v); 944 945 assert(buf == [2, 3]); 946 } 947 948 unittest 949 { 950 auto sub = new ReplaySubject!int(2); 951 .put(sub, 1); 952 .put(sub, 2); 953 .put(sub, 3); 954 sub.failure(null); 955 .put(sub, 4); 956 957 int[] buf; 958 sub.doSubscribe!(v => buf ~= v); 959 960 assert(buf == [2, 3]); 961 } 962 963 private struct RingBuffer(T) 964 { 965 T[] buffer; 966 size_t pos; 967 size_t count; 968 969 this(size_t n) 970 { 971 buffer.length = n; 972 } 973 974 void put(T obj) 975 { 976 import std.algorithm : min; 977 978 buffer[pos] = obj; 979 pos = (pos + 1) % buffer.length; 980 count = min(count + 1, buffer.length); 981 } 982 983 RingBufferRange!T opSlice() 984 { 985 return RingBufferRange!T(buffer, buffer.length - (count - pos), 0, count); 986 } 987 } 988 989 unittest 990 { 991 import std.algorithm : equal; 992 import std.range : walkLength; 993 994 auto buf = RingBuffer!int(4); 995 996 assert(walkLength(buf[]) == 0); 997 998 buf.put(0); 999 assert(buf.buffer.length == 4); 1000 assert(buf.pos == 1); 1001 assert(buf.count == 1); 1002 assert(buf[][0] == 0); 1003 assert(equal(buf[], [0])); 1004 1005 buf.put(1); 1006 assert(buf.buffer.length == 4); 1007 assert(equal(buf.buffer, [0, 1, 0, 0])); 1008 assert(buf.pos == 2); 1009 assert(buf.count == 2); 1010 assert(buf[][0] == 0); 1011 assert(buf[][1] == 1); 1012 assert(equal(buf[], [0, 1])); 1013 1014 buf.put(2); 1015 assert(equal(buf[], [0, 1, 2])); 1016 1017 buf.put(3); 1018 assert(equal(buf[], [0, 1, 2, 3])); 1019 1020 buf.put(4); 1021 assert(equal(buf[], [1, 2, 3, 4])); 1022 } 1023 1024 private struct RingBufferRange(T) 1025 { 1026 T[] buffer; 1027 size_t offset; 1028 size_t pos; 1029 size_t count; 1030 1031 bool empty() const @property 1032 { 1033 return count == 0 || pos == count; 1034 } 1035 1036 inout(T) front() inout @property 1037 { 1038 return buffer[(offset + pos) % buffer.length]; 1039 } 1040 1041 void popFront() 1042 { 1043 pos++; 1044 } 1045 1046 T opIndex(size_t n) 1047 { 1048 return buffer[(offset + pos + n) % buffer.length]; 1049 } 1050 } 1051 1052 unittest 1053 { 1054 import std.algorithm : equal; 1055 1056 // no offset 1057 auto r0 = RingBufferRange!int([0, 1, 2], 0, 0, 2); 1058 assert(equal(r0, [0, 1])); 1059 1060 auto r1 = RingBufferRange!int([0, 1, 2], 0, 0, 3); 1061 assert(equal(r1, [0, 1, 2])); 1062 1063 auto r2 = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4); 1064 assert(equal(r2, [0, 1, 2, 3])); 1065 1066 auto r3 = RingBufferRange!int([0, 1, 2, 3, 4], 0, 0, 5); 1067 assert(equal(r3, [0, 1, 2, 3, 4])); 1068 1069 // has offset 1070 auto r4 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1071 assert(!r4.empty); 1072 assert(r4.front == 1); 1073 r4.popFront(); 1074 assert(!r4.empty); 1075 assert(r4.front == 2); 1076 r4.popFront(); 1077 assert(!r4.empty); 1078 assert(r4.front == 3); 1079 r4.popFront(); 1080 assert(!r4.empty); 1081 assert(r4.front == 0); 1082 r4.popFront(); 1083 assert(r4.empty); 1084 1085 auto r5 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1086 assert(equal(r5, [1, 2, 3, 0])); 1087 1088 auto r6 = RingBufferRange!int([0, 1, 2, 3], 2, 0, 4); 1089 assert(equal(r6, [2, 3, 0, 1])); 1090 } 1091 1092 unittest 1093 { 1094 import std.algorithm : equal; 1095 1096 // empty 1097 auto rempty = RingBufferRange!int([0, 0, 0, 0], 0, 0, 0); 1098 assert(rempty.empty); 1099 1100 auto r1 = RingBufferRange!int([1, 0, 0, 0], 0, 0, 1); 1101 assert(equal(r1, [1])); 1102 1103 auto r2 = RingBufferRange!int([1, 2, 0, 0], 0, 0, 2); 1104 assert(equal(r2, [1, 2])); 1105 } 1106 1107 unittest 1108 { 1109 import std.algorithm : equal; 1110 1111 // empty 1112 auto r = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4); 1113 assert(r[0] == 0); 1114 assert(r[1] == 1); 1115 assert(r[2] == 2); 1116 assert(r[3] == 3); 1117 } 1118 1119 unittest 1120 { 1121 import std.algorithm : equal; 1122 1123 // empty 1124 auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1125 assert(r[0] == 1); 1126 assert(r[1] == 2); 1127 assert(r[2] == 3); 1128 assert(r[3] == 0); 1129 } 1130 1131 unittest 1132 { 1133 import std.algorithm : equal; 1134 1135 // empty 1136 auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1137 r.popFront(); 1138 assert(r[0] == 2); 1139 assert(r[1] == 3); 1140 assert(r[2] == 0); 1141 } 1142 1143 /// 1144 auto asReplaySubject(TObservable)(auto ref TObservable observable, size_t bufferSize) 1145 { 1146 alias E = TObservable.ElementType; 1147 auto subject = new ReplaySubject!E(bufferSize); 1148 observable.doSubscribe(subject); 1149 return subject; 1150 } 1151 1152 /// 1153 unittest 1154 { 1155 import rx; 1156 1157 auto sub = defer!(int, (observer) { 1158 observer.put(10); 1159 observer.put(20); 1160 observer.put(30); 1161 observer.completed(); 1162 return NopDisposable.instance; 1163 }); 1164 1165 ReplaySubject!int nums = sub.asReplaySubject(4); 1166 1167 int[] data; 1168 nums.doSubscribe!(x => data ~= x); 1169 1170 assert(data == [10, 20, 30]); 1171 } 1172 1173 /// 1174 unittest 1175 { 1176 import rx; 1177 1178 auto sub = defer!(int, (observer) { 1179 observer.put(10); 1180 observer.put(20); 1181 observer.put(30); 1182 observer.failure(null); 1183 return NopDisposable.instance; 1184 }); 1185 1186 ReplaySubject!int nums = sub.asReplaySubject(2); 1187 1188 int[] data; 1189 nums.doSubscribe!(x => data ~= x); 1190 1191 assert(data == [20, 30]); 1192 }