1 /+++++++++++++++++++++++++++++ 2 + This module defines the Subject and some implements. 3 +/ 4 module rx.subject; 5 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.util : assumeThreadLocal; 10 11 import core.atomic : atomicLoad, cas; 12 import std.range : put; 13 14 ///Represents an object that is both an observable sequence as well as an observer. 15 interface Subject(E) : Observer!E, Observable!E 16 { 17 } 18 19 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers. 20 class SubjectObject(E) : Subject!E 21 { 22 alias ElementType = E; 23 24 public: 25 /// 26 this() 27 { 28 _observer = cast(shared) NopObserver!E.instance; 29 } 30 31 public: 32 /// 33 void put(E obj) 34 { 35 auto temp = assumeThreadLocal(atomicLoad(_observer)); 36 .put(temp, obj); 37 } 38 /// 39 void completed() 40 { 41 shared(Observer!E) oldObserver = void; 42 shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance; 43 Observer!E temp = void; 44 do 45 { 46 oldObserver = _observer; 47 temp = assumeThreadLocal(atomicLoad(oldObserver)); 48 if (cast(DoneObserver!E) temp) 49 break; 50 } 51 while (!cas(&_observer, oldObserver, newObserver)); 52 temp.completed(); 53 } 54 /// 55 void failure(Exception error) 56 { 57 shared(Observer!E) oldObserver = void; 58 shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error); 59 Observer!E temp = void; 60 do 61 { 62 oldObserver = _observer; 63 temp = assumeThreadLocal(atomicLoad(oldObserver)); 64 if (cast(DoneObserver!E) temp) 65 break; 66 } 67 while (!cas(&_observer, oldObserver, newObserver)); 68 temp.failure(error); 69 } 70 71 /// 72 Disposable subscribe(T)(T observer) 73 { 74 return subscribe(observerObject!E(observer)); 75 } 76 /// 77 Disposable subscribe(Observer!E observer) 78 { 79 shared(Observer!E) oldObserver = void; 80 shared(Observer!E) newObserver = void; 81 do 82 { 83 oldObserver = _observer; 84 auto temp = assumeThreadLocal(atomicLoad(oldObserver)); 85 86 if (temp is DoneObserver!E.instance) 87 { 88 observer.completed(); 89 return NopDisposable.instance; 90 } 91 92 if (auto fail = cast(DoneObserver!E) temp) 93 { 94 observer.failure(fail.exception); 95 return NopDisposable.instance; 96 } 97 98 if (auto composite = cast(CompositeObserver!E) temp) 99 { 100 newObserver = cast(shared) composite.add(observer); 101 } 102 else if (auto nop = cast(NopObserver!E) temp) 103 { 104 newObserver = cast(shared) observer; 105 } 106 else 107 { 108 newObserver = cast(shared)(new CompositeObserver!E([temp, observer])); 109 } 110 } 111 while (!cas(&_observer, oldObserver, newObserver)); 112 113 return subscription(this, observer); 114 } 115 116 /// 117 void unsubscribe(Observer!E observer) 118 { 119 shared(Observer!E) oldObserver = void; 120 shared(Observer!E) newObserver = void; 121 do 122 { 123 oldObserver = _observer; 124 125 import rx.util : assumeThreadLocal; 126 127 auto temp = assumeThreadLocal(atomicLoad(oldObserver)); 128 if (auto composite = cast(CompositeObserver!E) temp) 129 { 130 newObserver = cast(shared) composite.remove(observer); 131 } 132 else 133 { 134 if (temp !is observer) 135 return; 136 137 newObserver = cast(shared) NopObserver!E.instance; 138 } 139 } 140 while (!cas(&_observer, oldObserver, newObserver)); 141 } 142 143 protected: 144 Observer!E currentObserver() @property 145 { 146 return assumeThreadLocal(atomicLoad(_observer)); 147 } 148 149 private: 150 shared(Observer!E) _observer; 151 } 152 153 /// 154 unittest 155 { 156 import std.array : appender; 157 158 auto data = appender!(int[])(); 159 auto subject = new SubjectObject!int; 160 auto disposable = subject.subscribe(observerObject!(int)(data)); 161 assert(disposable !is null); 162 subject.put(0); 163 subject.put(1); 164 165 import std.algorithm : equal; 166 167 assert(equal(data.data, [0, 1])); 168 169 disposable.dispose(); 170 subject.put(2); 171 assert(equal(data.data, [0, 1])); 172 } 173 174 unittest 175 { 176 static assert(isObserver!(SubjectObject!int, int)); 177 static assert(isObservable!(SubjectObject!int, int)); 178 static assert(!isObservable!(SubjectObject!int, string)); 179 static assert(!isObservable!(SubjectObject!int, string)); 180 } 181 182 unittest 183 { 184 auto subject = new SubjectObject!int; 185 auto observer = new CounterObserver!int; 186 auto disposable = subject.subscribe(observer); 187 scope (exit) 188 disposable.dispose(); 189 190 subject.put(0); 191 subject.put(1); 192 193 assert(observer.putCount == 2); 194 subject.completed(); 195 subject.put(2); 196 assert(observer.putCount == 2); 197 assert(observer.completedCount == 1); 198 } 199 200 unittest 201 { 202 auto subject = new SubjectObject!int; 203 auto observer = new CounterObserver!int; 204 auto disposable = subject.subscribe(observer); 205 scope (exit) 206 disposable.dispose(); 207 208 subject.put(0); 209 subject.put(1); 210 211 assert(observer.putCount == 2); 212 auto ex = new Exception("Exception"); 213 subject.failure(ex); 214 subject.put(2); 215 assert(observer.putCount == 2); 216 assert(observer.failureCount == 1); 217 assert(observer.lastException is ex); 218 } 219 220 unittest 221 { 222 import std.array : appender; 223 224 auto buf1 = appender!(int[]); 225 auto buf2 = appender!(int[]); 226 auto subject = new SubjectObject!int; 227 subject.subscribe(observerObject!(int)(buf1)); 228 subject.doSubscribe((int n) => buf2.put(n)); 229 230 assert(buf1.data.length == 0); 231 assert(buf2.data.length == 0); 232 subject.put(0); 233 assert(buf1.data.length == 1); 234 assert(buf2.data.length == 1); 235 assert(buf1.data[0] == buf2.data[0]); 236 } 237 238 unittest 239 { 240 auto sub = new SubjectObject!int; 241 sub.completed(); 242 243 auto observer = new CounterObserver!int; 244 assert(observer.putCount == 0); 245 assert(observer.completedCount == 0); 246 assert(observer.failureCount == 0); 247 sub.subscribe(observer); 248 assert(observer.putCount == 0); 249 assert(observer.completedCount == 1); 250 assert(observer.failureCount == 0); 251 } 252 253 unittest 254 { 255 auto sub = new SubjectObject!int; 256 auto ex = new Exception("Exception"); 257 sub.failure(ex); 258 259 auto observer = new CounterObserver!int; 260 assert(observer.putCount == 0); 261 assert(observer.completedCount == 0); 262 assert(observer.failureCount == 0); 263 sub.subscribe(observer); 264 assert(observer.putCount == 0); 265 assert(observer.completedCount == 0); 266 assert(observer.failureCount == 1); 267 assert(observer.lastException is ex); 268 } 269 270 unittest 271 { 272 // MyFilterSubject puts a value only on MyCustomObserver. 273 274 static class MyCustomObserver : Observer!int 275 { 276 int[] buf; 277 278 void put(int obj) 279 { 280 buf ~= obj; 281 } 282 283 void completed() 284 { 285 } 286 287 void failure(Exception ex) 288 { 289 } 290 } 291 292 static class MyFilterSubject : SubjectObject!int 293 { 294 override void put(int obj) 295 { 296 if (auto current = cast(CompositeObserver!int) currentObserver) 297 { 298 /// write a own filter, map, order and more 299 foreach (observer; current.observers) 300 { 301 if (auto myObserver = cast(MyCustomObserver) observer) 302 { 303 myObserver.put(obj); 304 } 305 } 306 } 307 } 308 } 309 310 import std.array : appender; 311 312 auto myObserver = new MyCustomObserver; 313 auto buffer = appender!(int[]); 314 315 auto sub = new MyFilterSubject; 316 .put(sub, -1); 317 318 sub.subscribe(myObserver); 319 sub.subscribe(buffer); 320 321 .put(sub, 0); 322 .put(sub, 1); 323 .put(sub, 2); 324 325 assert(myObserver.buf.length == 3); 326 assert(buffer.data.length == 0); 327 } 328 329 private class Subscription(TSubject, TObserver) : Disposable 330 { 331 public: 332 this(TSubject subject, TObserver observer) 333 { 334 _subject = subject; 335 _observer = observer; 336 } 337 338 public: 339 void dispose() 340 { 341 if (_subject !is null) 342 { 343 _subject.unsubscribe(_observer); 344 _subject = null; 345 } 346 } 347 348 private: 349 TSubject _subject; 350 TObserver _observer; 351 } 352 353 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)( 354 TSubject subject, TObserver observer) 355 { 356 return new typeof(return)(subject, observer); 357 } 358 359 /// 360 class AsyncSubject(E) : Subject!E 361 { 362 public: 363 /// 364 Disposable subscribe(Observer!E observer) 365 { 366 Exception ex = null; 367 E value; 368 bool hasValue = false; 369 370 synchronized (this) 371 { 372 if (!_isStopped) 373 { 374 _observers ~= observer; 375 return subscription(this, observer); 376 } 377 378 ex = _exception; 379 hasValue = _hasValue; 380 value = _value; 381 } 382 383 if (ex !is null) 384 { 385 observer.failure(ex); 386 } 387 else if (hasValue) 388 { 389 .put(observer, value); 390 observer.completed(); 391 } 392 else 393 { 394 observer.completed(); 395 } 396 397 return NopDisposable.instance; 398 } 399 400 /// 401 auto subscribe(T)(T observer) 402 { 403 return subscribe(observerObject!E(observer)); 404 } 405 406 /// 407 void unsubscribe(Observer!E observer) 408 { 409 if (observer is null) 410 return; 411 412 synchronized (this) 413 { 414 import std.algorithm : remove, countUntil; 415 416 auto index = countUntil(_observers, observer); 417 if (index != -1) 418 { 419 _observers = remove(_observers, index); 420 } 421 } 422 } 423 424 public: 425 /// 426 void put(E value) 427 { 428 synchronized (this) 429 { 430 if (!_isStopped) 431 { 432 _value = value; 433 _hasValue = true; 434 } 435 } 436 } 437 438 /// 439 void completed() 440 { 441 Observer!E[] os = null; 442 443 E value; 444 bool hasValue = false; 445 446 synchronized (this) 447 { 448 if (!_isStopped) 449 { 450 os = _observers; 451 _observers.length = 0; 452 _isStopped = true; 453 value = _value; 454 hasValue = _hasValue; 455 } 456 } 457 458 if (os) 459 { 460 if (hasValue) 461 { 462 foreach (observer; os) 463 { 464 .put(observer, value); 465 observer.completed(); 466 } 467 } 468 else 469 { 470 foreach (observer; os) 471 { 472 observer.completed(); 473 } 474 } 475 } 476 } 477 478 /// 479 void failure(Exception e) 480 { 481 assert(e !is null); 482 483 Observer!E[] os = null; 484 synchronized (this) 485 { 486 if (!_isStopped) 487 { 488 os = _observers; 489 _observers.length = 0; 490 _isStopped = true; 491 _exception = e; 492 } 493 } 494 495 if (os) 496 { 497 foreach (observer; os) 498 { 499 observer.failure(e); 500 } 501 } 502 } 503 504 private: 505 Observer!E[] _observers; 506 bool _isStopped; 507 E _value; 508 bool _hasValue; 509 Exception _exception; 510 } 511 512 unittest 513 { 514 auto sub = new AsyncSubject!int; 515 516 .put(sub, 1); 517 sub.completed(); 518 519 auto observer = new CounterObserver!int; 520 521 assert(observer.hasNotBeenCalled); 522 523 sub.subscribe(observer); 524 525 assert(observer.putCount == 1); 526 assert(observer.completedCount == 1); 527 assert(observer.failureCount == 0); 528 assert(observer.lastValue == 1); 529 } 530 531 unittest 532 { 533 auto sub = new AsyncSubject!int; 534 auto observer = new CounterObserver!int; 535 536 auto d = sub.subscribe(observer); 537 scope (exit) 538 d.dispose(); 539 540 assert(observer.hasNotBeenCalled); 541 542 sub.put(100); 543 544 assert(observer.hasNotBeenCalled); 545 546 assert(sub._hasValue); 547 assert(sub._value == 100); 548 549 sub.completed(); 550 551 assert(observer.putCount == 1); 552 assert(observer.completedCount == 1); 553 assert(observer.failureCount == 0); 554 assert(observer.lastValue == 100); 555 } 556 557 unittest 558 { 559 auto sub = new AsyncSubject!int; 560 auto observer = new CounterObserver!int; 561 562 sub.put(100); 563 564 assert(sub._hasValue); 565 assert(sub._value == 100); 566 567 auto d = sub.subscribe(observer); 568 scope (exit) 569 d.dispose(); 570 571 assert(observer.hasNotBeenCalled); 572 573 sub.completed(); 574 575 assert(observer.putCount == 1); 576 assert(observer.completedCount == 1); 577 assert(observer.failureCount == 0); 578 assert(observer.lastValue == 100); 579 } 580 581 unittest 582 { 583 auto sub = new AsyncSubject!int; 584 auto observer = new CounterObserver!int; 585 586 auto d = sub.subscribe(observer); 587 588 d.dispose(); 589 assert(observer.hasNotBeenCalled); 590 591 sub.put(100); 592 assert(observer.hasNotBeenCalled); 593 594 sub.completed(); 595 assert(observer.hasNotBeenCalled); 596 } 597 598 unittest 599 { 600 auto sub = new AsyncSubject!int; 601 auto observer = new CounterObserver!int; 602 603 auto d = sub.subscribe(observer); 604 assert(observer.hasNotBeenCalled); 605 606 sub.put(100); 607 assert(observer.hasNotBeenCalled); 608 609 d.dispose(); 610 assert(observer.hasNotBeenCalled); 611 612 sub.completed(); 613 assert(observer.hasNotBeenCalled); 614 } 615 616 unittest 617 { 618 619 auto sub = new AsyncSubject!int; 620 auto observer = new CounterObserver!int; 621 622 sub.put(100); 623 assert(observer.hasNotBeenCalled); 624 625 auto d = sub.subscribe(observer); 626 assert(observer.hasNotBeenCalled); 627 628 d.dispose(); 629 assert(observer.hasNotBeenCalled); 630 631 sub.completed(); 632 assert(observer.hasNotBeenCalled); 633 } 634 635 unittest 636 { 637 auto sub = new AsyncSubject!int; 638 auto observer = new CounterObserver!int; 639 640 auto d = sub.subscribe(observer); 641 scope (exit) 642 d.dispose(); 643 644 assert(observer.hasNotBeenCalled); 645 646 sub.completed(); 647 648 assert(observer.putCount == 0); 649 assert(observer.completedCount == 1); 650 assert(observer.failureCount == 0); 651 } 652 653 unittest 654 { 655 auto sub = new AsyncSubject!int; 656 auto observer = new CounterObserver!int; 657 658 auto d = sub.subscribe(observer); 659 scope (exit) 660 d.dispose(); 661 662 assert(observer.hasNotBeenCalled); 663 664 auto ex = new Exception("TEST"); 665 sub.failure(ex); 666 667 assert(observer.putCount == 0); 668 assert(observer.completedCount == 0); 669 assert(observer.failureCount == 1); 670 assert(observer.lastException is ex); 671 } 672 673 unittest 674 { 675 auto sub = new AsyncSubject!int; 676 auto ex = new Exception("TEST"); 677 sub.failure(ex); 678 679 auto observer = new CounterObserver!int; 680 681 auto d = sub.subscribe(observer); 682 scope (exit) 683 d.dispose(); 684 685 assert(observer.putCount == 0); 686 assert(observer.completedCount == 0); 687 assert(observer.failureCount == 1); 688 assert(observer.lastException is ex); 689 } 690 691 unittest 692 { 693 auto sub = new AsyncSubject!int; 694 auto observer = new CounterObserver!int; 695 696 sub.completed(); 697 assert(observer.hasNotBeenCalled); 698 699 sub.subscribe(observer); 700 assert(observer.putCount == 0); 701 assert(observer.completedCount == 1); 702 assert(observer.failureCount == 0); 703 } 704 705 version (unittest) 706 { 707 class CounterObserver(T) : Observer!T 708 { 709 public: 710 size_t putCount; 711 size_t completedCount; 712 size_t failureCount; 713 T lastValue; 714 Exception lastException; 715 716 public: 717 bool hasNotBeenCalled() const pure nothrow @nogc @safe @property 718 { 719 return putCount == 0 && completedCount == 0 && failureCount == 0; 720 } 721 722 public: 723 void put(T obj) 724 { 725 putCount++; 726 lastValue = obj; 727 } 728 729 void completed() 730 { 731 completedCount++; 732 } 733 734 void failure(Exception e) 735 { 736 failureCount++; 737 lastException = e; 738 } 739 } 740 } 741 742 /// 743 class BehaviorSubject(E) : Subject!E 744 { 745 public: 746 /// 747 this() 748 { 749 this(E.init); 750 } 751 752 /// 753 this(E value) 754 { 755 _subject = new SubjectObject!E; 756 _value = value; 757 } 758 759 public: 760 /// 761 inout(E) value() inout @property 762 { 763 return _value; 764 } 765 766 /// 767 void value(E value) @property 768 { 769 if (_value != value) 770 { 771 _value = value; 772 .put(_subject, value); 773 } 774 } 775 776 public: 777 /// 778 auto subscribe(TObserver)(auto ref TObserver observer) 779 { 780 .put(observer, value); 781 return _subject.doSubscribe(observer); 782 } 783 784 /// 785 Disposable subscribe(Observer!E observer) 786 { 787 .put(observer, value); 788 return disposableObject(_subject.doSubscribe(observer)); 789 } 790 791 /// 792 void put(E obj) 793 { 794 value = obj; 795 } 796 797 /// 798 void completed() 799 { 800 _subject.completed(); 801 } 802 803 /// 804 void failure(Exception e) 805 { 806 _subject.failure(e); 807 } 808 809 private: 810 SubjectObject!E _subject; 811 E _value; 812 } 813 814 unittest 815 { 816 static assert(isObservable!(BehaviorSubject!int, int)); 817 static assert(is(BehaviorSubject!int.ElementType == int)); 818 } 819 820 unittest 821 { 822 int num = 0; 823 auto subject = new BehaviorSubject!int(100); 824 825 auto d = subject.doSubscribe((int n) { num = n; }); 826 assert(num == 100); 827 828 .put(subject, 1); 829 assert(num == 1); 830 831 d.dispose(); 832 .put(subject, 10); 833 assert(num == 1); 834 } 835 836 /// 837 auto asBehaviorSubject(TObservable)(auto ref TObservable observable) 838 { 839 alias E = TObservable.ElementType; 840 auto subject = new BehaviorSubject!E; 841 observable.doSubscribe(subject); 842 return subject; 843 } 844 845 /// 846 unittest 847 { 848 import rx; 849 850 auto num1 = new BehaviorSubject!int; 851 auto num2 = new BehaviorSubject!int; 852 853 BehaviorSubject!int sum = combineLatest!((l, r) => l + r)(num1, num2).asBehaviorSubject(); 854 855 assert(sum.value == 0); 856 num1.value = 10; 857 assert(sum.value == 10); 858 num2.value = 20; 859 assert(sum.value == 30); 860 } 861 862 /// 863 class ReplaySubject(E) : Subject!E 864 { 865 private: 866 RingBuffer!E _buffer; 867 SubjectObject!E _subject; 868 bool _completed; 869 870 public: 871 /// 872 this(size_t bufferSize) 873 { 874 _buffer = RingBuffer!E(bufferSize); 875 _subject = new SubjectObject!E; 876 } 877 878 public: 879 /// 880 Disposable subscribe(TObserver)(auto ref TObserver observer) 881 { 882 .put(observer, _buffer[]); 883 if (_completed) 884 return NopDisposable.instance; 885 else 886 return _subject.doSubscribe(observer).disposableObject(); 887 } 888 889 /// 890 Disposable subscribe(Observer!E observer) 891 { 892 .put(observer, _buffer[]); 893 if (_completed) 894 return NopDisposable.instance; 895 else 896 return disposableObject(_subject.doSubscribe(observer)); 897 } 898 899 /// 900 void put(E obj) 901 { 902 if (_completed) 903 return; 904 .put(_buffer, obj); 905 .put(_subject, obj); 906 } 907 908 /// 909 void completed() 910 { 911 _completed = true; 912 _subject.completed(); 913 } 914 915 /// 916 void failure(Exception e) 917 { 918 _completed = true; 919 _subject.failure(e); 920 } 921 } 922 923 /// 924 unittest 925 { 926 auto sub = new ReplaySubject!int(1); 927 .put(sub, 1); 928 929 int[] buf; 930 auto d = sub.doSubscribe!(v => buf ~= v); 931 scope (exit) 932 d.dispose(); 933 934 assert(buf.length == 1); 935 assert(buf[0] == 1); 936 } 937 938 /// 939 unittest 940 { 941 auto sub = new ReplaySubject!int(1); 942 .put(sub, 1); 943 .put(sub, 2); 944 945 int[] buf; 946 auto d = sub.doSubscribe!(v => buf ~= v); 947 scope (exit) 948 d.dispose(); 949 950 assert(buf == [2]); 951 } 952 953 /// 954 unittest 955 { 956 auto sub = new ReplaySubject!int(2); 957 .put(sub, 1); 958 .put(sub, 2); 959 .put(sub, 3); 960 961 int[] buf; 962 auto d = sub.doSubscribe!(v => buf ~= v); 963 scope (exit) 964 d.dispose(); 965 966 assert(buf == [2, 3]); 967 } 968 969 unittest 970 { 971 auto sub = new ReplaySubject!int(2); 972 .put(sub, 1); 973 974 int[] buf; 975 auto d = sub.doSubscribe!(v => buf ~= v); 976 scope (exit) 977 d.dispose(); 978 979 .put(sub, 2); 980 981 assert(buf.length == 2); 982 assert(buf[0] == 1); 983 assert(buf[1] == 2); 984 } 985 986 unittest 987 { 988 auto sub = new ReplaySubject!int(2); 989 .put(sub, 1); 990 sub.completed(); 991 .put(sub, 2); 992 993 int[] buf; 994 sub.doSubscribe!(v => buf ~= v); 995 996 assert(buf == [1]); 997 } 998 999 unittest 1000 { 1001 auto sub = new ReplaySubject!int(2); 1002 .put(sub, 1); 1003 .put(sub, 2); 1004 .put(sub, 3); 1005 sub.completed(); 1006 .put(sub, 4); 1007 1008 int[] buf; 1009 sub.doSubscribe!(v => buf ~= v); 1010 1011 assert(buf == [2, 3]); 1012 } 1013 1014 unittest 1015 { 1016 auto sub = new ReplaySubject!int(2); 1017 .put(sub, 1); 1018 .put(sub, 2); 1019 .put(sub, 3); 1020 sub.failure(null); 1021 .put(sub, 4); 1022 1023 int[] buf; 1024 sub.doSubscribe!(v => buf ~= v); 1025 1026 assert(buf == [2, 3]); 1027 } 1028 1029 private struct RingBuffer(T) 1030 { 1031 T[] buffer; 1032 size_t pos; 1033 size_t count; 1034 1035 this(size_t n) 1036 { 1037 buffer.length = n; 1038 } 1039 1040 void put(T obj) 1041 { 1042 import std.algorithm : min; 1043 1044 buffer[pos] = obj; 1045 pos = (pos + 1) % buffer.length; 1046 count = min(count + 1, buffer.length); 1047 } 1048 1049 RingBufferRange!T opSlice() 1050 { 1051 return RingBufferRange!T(buffer, buffer.length - (count - pos), 0, count); 1052 } 1053 } 1054 1055 unittest 1056 { 1057 import std.algorithm : equal; 1058 import std.range : walkLength; 1059 1060 auto buf = RingBuffer!int(4); 1061 1062 assert(walkLength(buf[]) == 0); 1063 1064 buf.put(0); 1065 assert(buf.buffer.length == 4); 1066 assert(buf.pos == 1); 1067 assert(buf.count == 1); 1068 assert(buf[][0] == 0); 1069 assert(equal(buf[], [0])); 1070 1071 buf.put(1); 1072 assert(buf.buffer.length == 4); 1073 assert(equal(buf.buffer, [0, 1, 0, 0])); 1074 assert(buf.pos == 2); 1075 assert(buf.count == 2); 1076 assert(buf[][0] == 0); 1077 assert(buf[][1] == 1); 1078 assert(equal(buf[], [0, 1])); 1079 1080 buf.put(2); 1081 assert(equal(buf[], [0, 1, 2])); 1082 1083 buf.put(3); 1084 assert(equal(buf[], [0, 1, 2, 3])); 1085 1086 buf.put(4); 1087 assert(equal(buf[], [1, 2, 3, 4])); 1088 } 1089 1090 private struct RingBufferRange(T) 1091 { 1092 T[] buffer; 1093 size_t offset; 1094 size_t pos; 1095 size_t count; 1096 1097 bool empty() const @property 1098 { 1099 return count == 0 || pos == count; 1100 } 1101 1102 inout(T) front() inout @property 1103 { 1104 return buffer[(offset + pos) % buffer.length]; 1105 } 1106 1107 void popFront() 1108 { 1109 pos++; 1110 } 1111 1112 T opIndex(size_t n) 1113 { 1114 return buffer[(offset + pos + n) % buffer.length]; 1115 } 1116 } 1117 1118 unittest 1119 { 1120 import std.algorithm : equal; 1121 1122 // no offset 1123 auto r0 = RingBufferRange!int([0, 1, 2], 0, 0, 2); 1124 assert(equal(r0, [0, 1])); 1125 1126 auto r1 = RingBufferRange!int([0, 1, 2], 0, 0, 3); 1127 assert(equal(r1, [0, 1, 2])); 1128 1129 auto r2 = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4); 1130 assert(equal(r2, [0, 1, 2, 3])); 1131 1132 auto r3 = RingBufferRange!int([0, 1, 2, 3, 4], 0, 0, 5); 1133 assert(equal(r3, [0, 1, 2, 3, 4])); 1134 1135 // has offset 1136 auto r4 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1137 assert(!r4.empty); 1138 assert(r4.front == 1); 1139 r4.popFront(); 1140 assert(!r4.empty); 1141 assert(r4.front == 2); 1142 r4.popFront(); 1143 assert(!r4.empty); 1144 assert(r4.front == 3); 1145 r4.popFront(); 1146 assert(!r4.empty); 1147 assert(r4.front == 0); 1148 r4.popFront(); 1149 assert(r4.empty); 1150 1151 auto r5 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1152 assert(equal(r5, [1, 2, 3, 0])); 1153 1154 auto r6 = RingBufferRange!int([0, 1, 2, 3], 2, 0, 4); 1155 assert(equal(r6, [2, 3, 0, 1])); 1156 } 1157 1158 unittest 1159 { 1160 import std.algorithm : equal; 1161 1162 // empty 1163 auto rempty = RingBufferRange!int([0, 0, 0, 0], 0, 0, 0); 1164 assert(rempty.empty); 1165 1166 auto r1 = RingBufferRange!int([1, 0, 0, 0], 0, 0, 1); 1167 assert(equal(r1, [1])); 1168 1169 auto r2 = RingBufferRange!int([1, 2, 0, 0], 0, 0, 2); 1170 assert(equal(r2, [1, 2])); 1171 } 1172 1173 unittest 1174 { 1175 import std.algorithm : equal; 1176 1177 // empty 1178 auto r = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4); 1179 assert(r[0] == 0); 1180 assert(r[1] == 1); 1181 assert(r[2] == 2); 1182 assert(r[3] == 3); 1183 } 1184 1185 unittest 1186 { 1187 import std.algorithm : equal; 1188 1189 // empty 1190 auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1191 assert(r[0] == 1); 1192 assert(r[1] == 2); 1193 assert(r[2] == 3); 1194 assert(r[3] == 0); 1195 } 1196 1197 unittest 1198 { 1199 import std.algorithm : equal; 1200 1201 // empty 1202 auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4); 1203 r.popFront(); 1204 assert(r[0] == 2); 1205 assert(r[1] == 3); 1206 assert(r[2] == 0); 1207 } 1208 1209 /// 1210 auto asReplaySubject(TObservable)(auto ref TObservable observable, size_t bufferSize) 1211 { 1212 alias E = TObservable.ElementType; 1213 auto subject = new ReplaySubject!E(bufferSize); 1214 observable.doSubscribe(subject); 1215 return subject; 1216 } 1217 1218 /// 1219 unittest 1220 { 1221 import rx; 1222 1223 auto sub = defer!(int, (observer) { 1224 observer.put(10); 1225 observer.put(20); 1226 observer.put(30); 1227 observer.completed(); 1228 return NopDisposable.instance; 1229 }); 1230 1231 ReplaySubject!int nums = sub.asReplaySubject(4); 1232 1233 int[] data; 1234 nums.doSubscribe!(x => data ~= x); 1235 1236 assert(data == [10, 20, 30]); 1237 } 1238 1239 /// 1240 unittest 1241 { 1242 import rx; 1243 1244 auto sub = defer!(int, (observer) { 1245 observer.put(10); 1246 observer.put(20); 1247 observer.put(30); 1248 observer.failure(null); 1249 return NopDisposable.instance; 1250 }); 1251 1252 ReplaySubject!int nums = sub.asReplaySubject(2); 1253 1254 int[] data; 1255 nums.doSubscribe!(x => data ~= x); 1256 1257 assert(data == [20, 30]); 1258 } 1259 1260 version (unittest) 1261 { 1262 class TestingSubject(E) : SubjectObject!E 1263 { 1264 size_t observerCount() 1265 { 1266 if (auto current = cast(CompositeObserver!E) currentObserver) 1267 { 1268 return current.observers.length; 1269 } 1270 if (currentObserver is NopObserver!E.instance) 1271 { 1272 return 0; 1273 } 1274 if (currentObserver is DoneObserver!E.instance) 1275 { 1276 return 0; 1277 } 1278 return 1; 1279 } 1280 } 1281 1282 unittest 1283 { 1284 auto s = new TestingSubject!int; 1285 assert(s.observerCount == 0); 1286 1287 int[] buf; 1288 auto observer = observerObject!int((int n) { buf ~= n; }); 1289 1290 auto d0 = s.subscribe(observer); 1291 assert(s.observerCount == 1); 1292 auto d1 = s.subscribe(observer); 1293 assert(s.observerCount == 2); 1294 1295 d0.dispose(); 1296 assert(s.observerCount == 1); 1297 d1.dispose(); 1298 assert(s.observerCount == 0); 1299 } 1300 }