1 /+++++++++++++++++++++++++++++ 2 + This module defines the concept of Disposable. 3 +/ 4 module rx.disposable; 5 6 import core.atomic; 7 import core.sync.mutex; 8 import rx.util; 9 10 ///Tests if something is a Disposable. 11 template isDisposable(T) 12 { 13 enum bool isDisposable = is(typeof({ 14 T disposable = void; 15 disposable.dispose(); 16 }())); 17 } 18 /// 19 unittest 20 { 21 struct A 22 { 23 void dispose() 24 { 25 } 26 } 27 28 class B 29 { 30 void dispose() 31 { 32 } 33 } 34 35 interface C 36 { 37 void dispose(); 38 } 39 40 static assert(isDisposable!A); 41 static assert(isDisposable!B); 42 static assert(isDisposable!C); 43 } 44 45 ///Tests if something is a Cancelable 46 template isCancelable(T) 47 { 48 enum isCancelable = isDisposable!T && is(typeof((inout int n = 0) { 49 T disposable = void; 50 bool b = disposable.isDisposed; 51 })); 52 } 53 /// 54 unittest 55 { 56 struct A 57 { 58 bool isDisposed() @property 59 { 60 return true; 61 } 62 63 void dispose() 64 { 65 } 66 } 67 68 class B 69 { 70 bool isDisposed() @property 71 { 72 return true; 73 } 74 75 void dispose() 76 { 77 } 78 } 79 80 interface C 81 { 82 bool isDisposed() @property; 83 void dispose(); 84 } 85 86 static assert(isCancelable!A); 87 static assert(isCancelable!B); 88 static assert(isCancelable!C); 89 } 90 91 ///Wrapper for disposable objects. 92 interface Disposable 93 { 94 /// 95 void dispose(); 96 } 97 ///Wrapper for cancelable objects. 98 interface Cancelable : Disposable 99 { 100 /// 101 bool isDisposed() @property; 102 } 103 104 ///Simply implements for Cancelable interface. Its propagates notification that operations should be canceled. 105 class CancellationToken : Cancelable 106 { 107 public: 108 /// 109 bool isDisposed() @property 110 { 111 return atomicLoad(_disposed); 112 } 113 /// 114 alias isDisposed isCanceled; 115 116 public: 117 /// 118 void dispose() 119 { 120 atomicStore(_disposed, true); 121 } 122 /// 123 alias dispose cancel; 124 125 private: 126 shared(bool) _disposed; 127 } 128 129 unittest 130 { 131 auto c = new CancellationToken; 132 assert(!c.isDisposed); 133 assert(!c.isCanceled); 134 c.dispose(); 135 assert(c.isDisposed); 136 assert(c.isCanceled); 137 } 138 139 unittest 140 { 141 auto c = new CancellationToken; 142 assert(!c.isDisposed); 143 assert(!c.isCanceled); 144 c.cancel(); 145 assert(c.isDisposed); 146 assert(c.isCanceled); 147 } 148 149 ///Class that implements the Disposable interface and wraps the dispose methods in virtual functions. 150 class DisposableObject(T) : Disposable 151 { 152 public: 153 /// 154 this(T disposable) 155 { 156 _disposable = disposable; 157 } 158 159 public: 160 /// 161 void dispose() 162 { 163 _disposable.dispose(); 164 } 165 166 private: 167 T _disposable; 168 } 169 ///Class that implements the Cancelable interface and wraps the isDisposed property in virtual functions. 170 class CancelableObject(T) : DisposableObject!T, Cancelable 171 { 172 public: 173 /// 174 this(T disposable) 175 { 176 super(disposable); 177 } 178 179 public: 180 /// 181 bool isDisposed() @property 182 { 183 return _disposable.isDisposed; 184 } 185 } 186 187 ///Wraps dispose method in virtual functions. 188 auto disposableObject(T)(T disposable) 189 { 190 static assert(isDisposable!T); 191 192 static if (is(T : Cancelable) || is(T : Disposable)) 193 { 194 return disposable; 195 } 196 else static if (isCancelable!T) 197 { 198 return new CancelableObject!T(disposable); 199 } 200 else 201 { 202 return new DisposableObject!T(disposable); 203 } 204 } 205 206 /// 207 unittest 208 { 209 int count = 0; 210 struct TestDisposable 211 { 212 void dispose() 213 { 214 count++; 215 } 216 } 217 218 TestDisposable test; 219 Disposable disposable = disposableObject(test); 220 assert(count == 0); 221 disposable.dispose(); 222 assert(count == 1); 223 } 224 225 unittest 226 { 227 int count = 0; 228 class TestDisposable : Disposable 229 { 230 void dispose() 231 { 232 count++; 233 } 234 } 235 236 auto test = new TestDisposable; 237 Disposable disposable = disposableObject(test); 238 assert(disposable is test); 239 assert(count == 0); 240 disposable.dispose(); 241 assert(count == 1); 242 } 243 244 unittest 245 { 246 int count = 0; 247 struct TestCancelable 248 { 249 bool isDisposed() @property 250 { 251 return _disposed; 252 } 253 254 void dispose() 255 { 256 count++; 257 _disposed = true; 258 } 259 260 bool _disposed; 261 } 262 263 TestCancelable test; 264 Cancelable cancelable = disposableObject(test); 265 266 assert(!cancelable.isDisposed); 267 assert(count == 0); 268 cancelable.dispose(); 269 assert(cancelable.isDisposed); 270 assert(count == 1); 271 } 272 273 ///Defines a instance property that return NOP Disposable. 274 final class NopDisposable : Disposable 275 { 276 private: 277 this() 278 { 279 } 280 281 public: 282 void dispose() 283 { 284 } 285 286 public: 287 /// 288 static Disposable instance() @property 289 { 290 import std.concurrency : initOnce; 291 292 static __gshared NopDisposable inst; 293 return initOnce!inst(new NopDisposable); 294 } 295 } 296 /// 297 unittest 298 { 299 Disposable d1 = NopDisposable.instance; 300 Disposable d2 = NopDisposable.instance; 301 assert(d1 !is null); 302 assert(d1 is d2); 303 } 304 305 package final class DisposedMarker : Cancelable 306 { 307 private: 308 this() 309 { 310 } 311 312 public: 313 bool isDisposed() @property 314 { 315 return true; 316 } 317 318 public: 319 void dispose() 320 { 321 } 322 323 public: 324 static Cancelable instance() 325 { 326 import std.concurrency : initOnce; 327 328 static __gshared DisposedMarker inst; 329 return initOnce!inst(new DisposedMarker); 330 } 331 } 332 333 /// 334 final class SingleAssignmentDisposable : Cancelable 335 { 336 public: 337 /// 338 void setDisposable(Disposable disposable) 339 { 340 import core.atomic; 341 342 if (!cas(&_disposable, shared(Disposable).init, cast(shared) disposable)) 343 assert(false); 344 } 345 346 public: 347 /// 348 bool isDisposed() @property 349 { 350 return _disposable is cast(shared) DisposedMarker.instance; 351 } 352 353 public: 354 /// 355 void dispose() 356 { 357 import rx.util; 358 359 auto temp = assumeThreadLocal(exchange(_disposable, cast(shared) DisposedMarker.instance)); 360 if (temp !is null) 361 temp.dispose(); 362 } 363 364 private: 365 shared(Disposable) _disposable; 366 } 367 /// 368 unittest 369 { 370 int count = 0; 371 class TestDisposable : Disposable 372 { 373 void dispose() 374 { 375 count++; 376 } 377 } 378 379 auto temp = new SingleAssignmentDisposable; 380 temp.setDisposable(new TestDisposable); 381 assert(!temp.isDisposed); 382 assert(count == 0); 383 temp.dispose(); 384 assert(temp.isDisposed); 385 assert(count == 1); 386 } 387 388 unittest 389 { 390 static assert(isDisposable!SingleAssignmentDisposable); 391 } 392 393 unittest 394 { 395 import core.exception; 396 397 class TestDisposable : Disposable 398 { 399 void dispose() 400 { 401 } 402 } 403 404 auto temp = new SingleAssignmentDisposable; 405 temp.setDisposable(new TestDisposable); 406 try 407 { 408 temp.setDisposable(new TestDisposable); 409 } 410 catch (AssertError) 411 { 412 return; 413 } 414 assert(false); 415 } 416 417 /// 418 class SerialDisposable : Cancelable 419 { 420 public: 421 this() 422 { 423 _gate = new Mutex; 424 } 425 426 public: 427 /// 428 bool isDisposed() @property 429 { 430 return _disposed; 431 } 432 433 /// 434 void disposable(Disposable value) @property 435 { 436 auto shouldDispose = false; 437 Disposable old = null; 438 synchronized (_gate) 439 { 440 shouldDispose = _disposed; 441 if (!shouldDispose) 442 { 443 old = _disposable; 444 _disposable = value; 445 } 446 } 447 if (old !is null) 448 old.dispose(); 449 if (shouldDispose && value !is null) 450 value.dispose(); 451 } 452 453 /// 454 Disposable disposable() @property 455 { 456 return _disposable; 457 } 458 459 public: 460 /// 461 void dispose() 462 { 463 Disposable old = null; 464 synchronized (_gate) 465 { 466 if (!_disposed) 467 { 468 _disposed = true; 469 old = _disposable; 470 _disposable = null; 471 } 472 } 473 if (old !is null) 474 old.dispose(); 475 } 476 477 private: 478 Mutex _gate; 479 bool _disposed; 480 Disposable _disposable; 481 } 482 483 unittest 484 { 485 int count = 0; 486 struct A 487 { 488 void dispose() 489 { 490 count++; 491 } 492 } 493 494 auto d = new SerialDisposable; 495 d.disposable = disposableObject(A()); 496 assert(count == 0); 497 d.disposable = disposableObject(A()); 498 assert(count == 1); 499 d.dispose(); 500 assert(count == 2); 501 d.disposable = disposableObject(A()); 502 assert(count == 3); 503 } 504 505 unittest 506 { 507 int count = 0; 508 struct A 509 { 510 void dispose() 511 { 512 count++; 513 } 514 } 515 516 auto d = new SerialDisposable; 517 d.dispose(); 518 assert(count == 0); 519 d.disposable = disposableObject(A()); 520 assert(count == 1); 521 } 522 523 /// 524 class SignalDisposable : Disposable 525 { 526 public: 527 this() 528 { 529 _signal = new EventSignal; 530 } 531 532 public: 533 /// 534 EventSignal signal() @property 535 { 536 return _signal; 537 } 538 539 public: 540 /// 541 void dispose() 542 { 543 _signal.setSignal(); 544 } 545 546 private: 547 EventSignal _signal; 548 } 549 550 unittest 551 { 552 auto d = new SignalDisposable; 553 auto signal = d.signal; 554 assert(!signal.signal); 555 d.dispose(); 556 assert(signal.signal); 557 } 558 559 /// 560 class CompositeDisposable : Disposable 561 { 562 public: 563 /// 564 this(Disposable[] disposables...) 565 { 566 _gate = new Object; 567 _disposables = disposables.dup; 568 } 569 570 public: 571 /// 572 void dispose() 573 { 574 Disposable[] currentDisposables; 575 synchronized (_gate) 576 { 577 if (!_disposed) 578 { 579 _disposed = true; 580 currentDisposables = _disposables; 581 _disposables = []; 582 } 583 } 584 585 if (currentDisposables) 586 { 587 foreach (d; currentDisposables) 588 { 589 d.dispose(); 590 } 591 } 592 } 593 594 void clear() 595 { 596 Disposable[] currentDisposables; 597 synchronized (_gate) 598 { 599 currentDisposables = _disposables; 600 _disposables = []; 601 } 602 603 foreach (d; currentDisposables) 604 { 605 d.dispose(); 606 } 607 } 608 609 void insert(Disposable item) 610 { 611 assert(item !is null); 612 613 bool shouldDispose = void; 614 synchronized (_gate) 615 { 616 shouldDispose = _disposed; 617 if (!_disposed) 618 { 619 _disposables ~= item; 620 } 621 } 622 623 if (shouldDispose) 624 { 625 item.dispose(); 626 } 627 } 628 629 private: 630 Disposable[] _disposables; 631 bool _disposed; 632 Object _gate; 633 } 634 /// 635 unittest 636 { 637 auto d1 = new SingleAssignmentDisposable; 638 auto d2 = new SerialDisposable; 639 auto d = new CompositeDisposable(d1, d2); 640 d.dispose(); 641 } 642 643 unittest 644 { 645 auto composite = new CompositeDisposable; 646 auto disposed = false; 647 auto inner = new AnonymousDisposable({ disposed = true; }); 648 composite.insert(inner); 649 composite.dispose(); 650 assert(disposed); 651 } 652 653 unittest 654 { 655 auto composite = new CompositeDisposable; 656 size_t _count = 0; 657 auto inner = new AnonymousDisposable({ _count++; }); 658 composite.insert(inner); 659 composite.clear(); // clear items and dispose all 660 assert(_count == 1); 661 662 composite.clear(); 663 assert(_count == 1); 664 } 665 666 unittest 667 { 668 auto composite = new CompositeDisposable; 669 composite.dispose(); 670 671 auto disposed = false; 672 auto inner2 = new AnonymousDisposable({ disposed = true; }); 673 composite.insert(inner2); 674 assert(disposed); 675 } 676 677 /// 678 class AnonymousDisposable : Disposable 679 { 680 public: 681 /// 682 this(void delegate() dispose) 683 { 684 assert(dispose !is null); 685 _dispose = dispose; 686 } 687 688 public: 689 /// 690 void dispose() 691 { 692 if (_dispose !is null) 693 { 694 _dispose(); 695 _dispose = null; 696 } 697 } 698 699 private: 700 void delegate() _dispose; 701 } 702 /// 703 unittest 704 { 705 int count = 0; 706 auto d = new AnonymousDisposable({ count++; }); 707 assert(count == 0); 708 d.dispose(); 709 assert(count == 1); 710 d.dispose(); 711 assert(count == 1); 712 } 713 714 /// 715 class RefCountDisposable : Disposable 716 { 717 public: 718 /// 719 this(Disposable disposable, bool throwWhenDisposed = false) 720 { 721 assert(disposable !is null); 722 723 _throwWhenDisposed = throwWhenDisposed; 724 _gate = new Object(); 725 _disposable = disposable; 726 _isPrimaryDisposed = false; 727 _count = 0; 728 } 729 730 public: 731 /// 732 Disposable getDisposable() 733 { 734 synchronized (_gate) 735 { 736 if (_disposable is null) 737 { 738 if (_throwWhenDisposed) 739 { 740 throw new Exception("RefCountDisposable is already disposed."); 741 } 742 return NopDisposable.instance; 743 } 744 else 745 { 746 _count++; 747 return new AnonymousDisposable(&this.release); 748 } 749 } 750 } 751 752 /// 753 void dispose() 754 { 755 Disposable disposable = null; 756 synchronized (_gate) 757 { 758 if (_disposable is null) 759 return; 760 761 if (!_isPrimaryDisposed) 762 { 763 _isPrimaryDisposed = true; 764 765 if (_count == 0) 766 { 767 disposable = _disposable; 768 _disposable = null; 769 } 770 } 771 } 772 if (disposable !is null) 773 { 774 disposable.dispose(); 775 } 776 } 777 778 private: 779 void release() 780 { 781 Disposable disposable = null; 782 synchronized (_gate) 783 { 784 if (_disposable is null) 785 return; 786 787 assert(_count > 0); 788 _count--; 789 790 if (_isPrimaryDisposed) 791 { 792 if (_count == 0) 793 { 794 disposable = _disposable; 795 _disposable = null; 796 } 797 } 798 } 799 if (disposable !is null) 800 { 801 disposable.dispose(); 802 } 803 } 804 805 private: 806 size_t _count; 807 Disposable _disposable; 808 bool _isPrimaryDisposed; 809 Object _gate; 810 bool _throwWhenDisposed; 811 } 812 813 /// 814 unittest 815 { 816 bool disposed = false; 817 auto disposable = new RefCountDisposable(new AnonymousDisposable({ 818 disposed = true; 819 })); 820 821 auto subscription = disposable.getDisposable(); 822 823 assert(!disposed); 824 disposable.dispose(); 825 assert(!disposed); 826 827 subscription.dispose(); 828 assert(disposed); 829 } 830 831 unittest 832 { 833 bool disposed = false; 834 auto disposable = new RefCountDisposable(new AnonymousDisposable({ 835 disposed = true; 836 })); 837 838 assert(!disposed); 839 disposable.dispose(); 840 assert(disposed); 841 } 842 843 unittest 844 { 845 bool disposed = false; 846 auto disposable = new RefCountDisposable(new AnonymousDisposable({ 847 disposed = true; 848 })); 849 850 auto subscription = disposable.getDisposable(); 851 assert(!disposed); 852 subscription.dispose(); 853 assert(!disposed); 854 disposable.dispose(); 855 assert(disposed); 856 } 857 858 unittest 859 { 860 bool disposed = false; 861 auto disposable = new RefCountDisposable(new AnonymousDisposable({ 862 disposed = true; 863 })); 864 865 auto subscription1 = disposable.getDisposable(); 866 auto subscription2 = disposable.getDisposable(); 867 assert(!disposed); 868 subscription1.dispose(); 869 assert(!disposed); 870 subscription2.dispose(); 871 assert(!disposed); 872 disposable.dispose(); 873 assert(disposed); 874 } 875 876 unittest 877 { 878 bool disposed = false; 879 auto disposable = new RefCountDisposable(new AnonymousDisposable({ 880 disposed = true; 881 })); 882 883 auto subscription1 = disposable.getDisposable(); 884 auto subscription2 = disposable.getDisposable(); 885 886 disposable.dispose(); 887 assert(!disposed); 888 889 subscription1.dispose(); 890 assert(!disposed); 891 subscription1.dispose(); 892 assert(!disposed); 893 894 subscription2.dispose(); 895 assert(disposed); 896 } 897 898 /// 899 template withDisposed(alias f) 900 { 901 auto withDisposed(TDisposable)(auto ref TDisposable disposable) 902 if (isDisposable!TDisposable) 903 { 904 return new CompositeDisposable(disposable, new AnonymousDisposable({ 905 f(); 906 })); 907 } 908 } 909 910 ///ditto 911 auto withDisposed(TDisposable)(auto ref TDisposable disposable, void delegate() disposed) 912 if (isDisposable!TDisposable) 913 { 914 return new CompositeDisposable(disposable, new AnonymousDisposable(disposed)); 915 } 916 917 /// 918 unittest 919 { 920 import rx; 921 922 auto sub = new SubjectObject!int; 923 size_t putCount = 0; 924 size_t disposedCount = 0; 925 926 auto disposable = sub.doSubscribe!(_ => putCount++) 927 .withDisposed!(() => disposedCount++); 928 929 sub.put(1); 930 disposable.dispose(); 931 932 assert(putCount == 1); 933 assert(disposedCount == 1); 934 } 935 936 /// 937 unittest 938 { 939 import rx; 940 941 auto sub = new SubjectObject!int; 942 size_t putCount = 0; 943 944 bool disposed = false; 945 alias traceDispose = withDisposed!(() => disposed = true); 946 947 auto disposable = traceDispose(sub.doSubscribe!(_ => putCount++)); 948 949 sub.put(1); 950 sub.completed(); 951 952 assert(putCount == 1); 953 assert(!disposed); 954 } 955 956 /// 957 unittest 958 { 959 import rx; 960 961 auto sub = new SubjectObject!int; 962 size_t putCount = 0; 963 964 bool disposed = false; 965 alias traceDispose = withDisposed!(() => disposed = true); 966 967 auto disposable = traceDispose(sub.doSubscribe!(_ => putCount++)); 968 969 sub.put(1); 970 disposable.dispose(); 971 972 assert(putCount == 1); 973 assert(disposed); 974 } 975 976 /// 977 unittest 978 { 979 import rx; 980 981 auto sub = new SubjectObject!int; 982 size_t putCount = 0; 983 984 bool disposed = false; 985 auto disposable = sub.doSubscribe!(_ => putCount++).withDisposed(() { 986 disposed = true; 987 }); 988 989 sub.put(1); 990 disposable.dispose(); 991 992 assert(putCount == 1); 993 assert(disposed); 994 } 995 996 /// 997 unittest 998 { 999 import rx; 1000 1001 auto sub = new SubjectObject!int; 1002 size_t disposedCount = 0; 1003 1004 auto disposable = sub.doSubscribe!((int) { }) 1005 .withDisposed!(() { disposedCount++; }); 1006 1007 disposable.dispose(); 1008 disposable.dispose(); 1009 1010 assert(disposedCount == 1); 1011 } 1012 1013 /// 1014 unittest 1015 { 1016 import rx; 1017 1018 auto sub = new SubjectObject!int; 1019 size_t putCount = 0; 1020 1021 bool disposed = false; 1022 auto disposable = sub.doSubscribe!(_ => putCount++).withDisposed(() { 1023 disposed = true; 1024 }); 1025 1026 sub.put(1); 1027 disposable.dispose(); 1028 1029 assert(putCount == 1); 1030 assert(disposed); 1031 } 1032 1033 /// 1034 unittest 1035 { 1036 import rx; 1037 1038 auto sub = new SubjectObject!int; 1039 size_t disposedCount = 0; 1040 1041 auto disposable = sub.doSubscribe!((int) { }).withDisposed(() { 1042 disposedCount++; 1043 }); 1044 1045 disposable.dispose(); 1046 disposable.dispose(); 1047 1048 assert(disposedCount == 1); 1049 }