1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Disposable.
3  +/
4 module rx.disposable;
5 
6 import core.atomic;
7 import core.sync.mutex;
8 import rx.util;
9 
10 ///Tests if something is a Disposable.
11 template isDisposable(T)
12 {
13     enum bool isDisposable = is(typeof({
14                 T disposable = void;
15                 disposable.dispose();
16             }()));
17 }
18 ///
19 unittest
20 {
21     struct A
22     {
23         void dispose()
24         {
25         }
26     }
27 
28     class B
29     {
30         void dispose()
31         {
32         }
33     }
34 
35     interface C
36     {
37         void dispose();
38     }
39 
40     static assert(isDisposable!A);
41     static assert(isDisposable!B);
42     static assert(isDisposable!C);
43 }
44 
45 ///Tests if something is a Cancelable
46 template isCancelable(T)
47 {
48     enum isCancelable = isDisposable!T && is(typeof((inout int n = 0) {
49                 T disposable = void;
50                 bool b = disposable.isDisposed;
51             }));
52 }
53 ///
54 unittest
55 {
56     struct A
57     {
58         bool isDisposed() @property
59         {
60             return true;
61         }
62 
63         void dispose()
64         {
65         }
66     }
67 
68     class B
69     {
70         bool isDisposed() @property
71         {
72             return true;
73         }
74 
75         void dispose()
76         {
77         }
78     }
79 
80     interface C
81     {
82         bool isDisposed() @property;
83         void dispose();
84     }
85 
86     static assert(isCancelable!A);
87     static assert(isCancelable!B);
88     static assert(isCancelable!C);
89 }
90 
91 ///Wrapper for disposable objects.
92 interface Disposable
93 {
94     ///
95     void dispose();
96 }
97 ///Wrapper for cancelable objects.
98 interface Cancelable : Disposable
99 {
100     ///
101     bool isDisposed() @property;
102 }
103 
104 ///Simply implements for Cancelable interface. Its propagates notification that operations should be canceled.
105 class CancellationToken : Cancelable
106 {
107 public:
108     ///
109     bool isDisposed() @property
110     {
111         return atomicLoad(_disposed);
112     }
113     ///
114     alias isDisposed isCanceled;
115 
116 public:
117     ///
118     void dispose()
119     {
120         atomicStore(_disposed, true);
121     }
122     ///
123     alias dispose cancel;
124 
125 private:
126     shared(bool) _disposed;
127 }
128 
129 unittest
130 {
131     auto c = new CancellationToken;
132     assert(!c.isDisposed);
133     assert(!c.isCanceled);
134     c.dispose();
135     assert(c.isDisposed);
136     assert(c.isCanceled);
137 }
138 
139 unittest
140 {
141     auto c = new CancellationToken;
142     assert(!c.isDisposed);
143     assert(!c.isCanceled);
144     c.cancel();
145     assert(c.isDisposed);
146     assert(c.isCanceled);
147 }
148 
149 ///Class that implements the Disposable interface and wraps the dispose methods in virtual functions.
150 class DisposableObject(T) : Disposable
151 {
152 public:
153     ///
154     this(T disposable)
155     {
156         _disposable = disposable;
157     }
158 
159 public:
160     ///
161     void dispose()
162     {
163         _disposable.dispose();
164     }
165 
166 private:
167     T _disposable;
168 }
169 ///Class that implements the Cancelable interface and wraps the  isDisposed property in virtual functions.
170 class CancelableObject(T) : DisposableObject!T, Cancelable
171 {
172 public:
173     ///
174     this(T disposable)
175     {
176         super(disposable);
177     }
178 
179 public:
180     ///
181     bool isDisposed() @property
182     {
183         return _disposable.isDisposed;
184     }
185 }
186 
187 ///Wraps dispose method in virtual functions.
188 auto disposableObject(T)(T disposable)
189 {
190     static assert(isDisposable!T);
191 
192     static if (is(T : Cancelable) || is(T : Disposable))
193     {
194         return disposable;
195     }
196     else static if (isCancelable!T)
197     {
198         return new CancelableObject!T(disposable);
199     }
200     else
201     {
202         return new DisposableObject!T(disposable);
203     }
204 }
205 
206 ///
207 unittest
208 {
209     int count = 0;
210     struct TestDisposable
211     {
212         void dispose()
213         {
214             count++;
215         }
216     }
217 
218     TestDisposable test;
219     Disposable disposable = disposableObject(test);
220     assert(count == 0);
221     disposable.dispose();
222     assert(count == 1);
223 }
224 
225 unittest
226 {
227     int count = 0;
228     class TestDisposable : Disposable
229     {
230         void dispose()
231         {
232             count++;
233         }
234     }
235 
236     auto test = new TestDisposable;
237     Disposable disposable = disposableObject(test);
238     assert(disposable is test);
239     assert(count == 0);
240     disposable.dispose();
241     assert(count == 1);
242 }
243 
244 unittest
245 {
246     int count = 0;
247     struct TestCancelable
248     {
249         bool isDisposed() @property
250         {
251             return _disposed;
252         }
253 
254         void dispose()
255         {
256             count++;
257             _disposed = true;
258         }
259 
260         bool _disposed;
261     }
262 
263     TestCancelable test;
264     Cancelable cancelable = disposableObject(test);
265 
266     assert(!cancelable.isDisposed);
267     assert(count == 0);
268     cancelable.dispose();
269     assert(cancelable.isDisposed);
270     assert(count == 1);
271 }
272 
273 ///Defines a instance property that return NOP Disposable.
274 final class NopDisposable : Disposable
275 {
276 private:
277     this()
278     {
279     }
280 
281 public:
282     void dispose()
283     {
284     }
285 
286 public:
287     ///
288     static Disposable instance() @property
289     {
290         import std.concurrency : initOnce;
291 
292         static __gshared NopDisposable inst;
293         return initOnce!inst(new NopDisposable);
294     }
295 }
296 ///
297 unittest
298 {
299     Disposable d1 = NopDisposable.instance;
300     Disposable d2 = NopDisposable.instance;
301     assert(d1 !is null);
302     assert(d1 is d2);
303 }
304 
305 package final class DisposedMarker : Cancelable
306 {
307 private:
308     this()
309     {
310     }
311 
312 public:
313     bool isDisposed() @property
314     {
315         return true;
316     }
317 
318 public:
319     void dispose()
320     {
321     }
322 
323 public:
324     static Cancelable instance()
325     {
326         import std.concurrency : initOnce;
327 
328         static __gshared DisposedMarker inst;
329         return initOnce!inst(new DisposedMarker);
330     }
331 }
332 
333 ///
334 final class SingleAssignmentDisposable : Cancelable
335 {
336 public:
337     ///
338     void setDisposable(Disposable disposable)
339     {
340         import core.atomic;
341 
342         if (!cas(&_disposable, shared(Disposable).init, cast(shared) disposable))
343             assert(false);
344     }
345 
346 public:
347     ///
348     bool isDisposed() @property
349     {
350         return _disposable is cast(shared) DisposedMarker.instance;
351     }
352 
353 public:
354     ///
355     void dispose()
356     {
357         import rx.util;
358 
359         auto temp = assumeThreadLocal(exchange(_disposable, cast(shared) DisposedMarker.instance));
360         if (temp !is null)
361             temp.dispose();
362     }
363 
364 private:
365     shared(Disposable) _disposable;
366 }
367 ///
368 unittest
369 {
370     int count = 0;
371     class TestDisposable : Disposable
372     {
373         void dispose()
374         {
375             count++;
376         }
377     }
378 
379     auto temp = new SingleAssignmentDisposable;
380     temp.setDisposable(new TestDisposable);
381     assert(!temp.isDisposed);
382     assert(count == 0);
383     temp.dispose();
384     assert(temp.isDisposed);
385     assert(count == 1);
386 }
387 
388 unittest
389 {
390     static assert(isDisposable!SingleAssignmentDisposable);
391 }
392 
393 unittest
394 {
395     import core.exception;
396 
397     class TestDisposable : Disposable
398     {
399         void dispose()
400         {
401         }
402     }
403 
404     auto temp = new SingleAssignmentDisposable;
405     temp.setDisposable(new TestDisposable);
406     try
407     {
408         temp.setDisposable(new TestDisposable);
409     }
410     catch (AssertError)
411     {
412         return;
413     }
414     assert(false);
415 }
416 
417 ///
418 class SerialDisposable : Cancelable
419 {
420 public:
421     this()
422     {
423         _gate = new Mutex;
424     }
425 
426 public:
427     ///
428     bool isDisposed() @property
429     {
430         return _disposed;
431     }
432 
433     ///
434     void disposable(Disposable value) @property
435     {
436         auto shouldDispose = false;
437         Disposable old = null;
438         synchronized (_gate)
439         {
440             shouldDispose = _disposed;
441             if (!shouldDispose)
442             {
443                 old = _disposable;
444                 _disposable = value;
445             }
446         }
447         if (old !is null)
448             old.dispose();
449         if (shouldDispose && value !is null)
450             value.dispose();
451     }
452 
453     ///
454     Disposable disposable() @property
455     {
456         return _disposable;
457     }
458 
459 public:
460     ///
461     void dispose()
462     {
463         Disposable old = null;
464         synchronized (_gate)
465         {
466             if (!_disposed)
467             {
468                 _disposed = true;
469                 old = _disposable;
470                 _disposable = null;
471             }
472         }
473         if (old !is null)
474             old.dispose();
475     }
476 
477 private:
478     Mutex _gate;
479     bool _disposed;
480     Disposable _disposable;
481 }
482 
483 unittest
484 {
485     int count = 0;
486     struct A
487     {
488         void dispose()
489         {
490             count++;
491         }
492     }
493 
494     auto d = new SerialDisposable;
495     d.disposable = disposableObject(A());
496     assert(count == 0);
497     d.disposable = disposableObject(A());
498     assert(count == 1);
499     d.dispose();
500     assert(count == 2);
501     d.disposable = disposableObject(A());
502     assert(count == 3);
503 }
504 
505 unittest
506 {
507     int count = 0;
508     struct A
509     {
510         void dispose()
511         {
512             count++;
513         }
514     }
515 
516     auto d = new SerialDisposable;
517     d.dispose();
518     assert(count == 0);
519     d.disposable = disposableObject(A());
520     assert(count == 1);
521 }
522 
523 ///
524 class SignalDisposable : Disposable
525 {
526 public:
527     this()
528     {
529         _signal = new EventSignal;
530     }
531 
532 public:
533     ///
534     EventSignal signal() @property
535     {
536         return _signal;
537     }
538 
539 public:
540     ///
541     void dispose()
542     {
543         _signal.setSignal();
544     }
545 
546 private:
547     EventSignal _signal;
548 }
549 
550 unittest
551 {
552     auto d = new SignalDisposable;
553     auto signal = d.signal;
554     assert(!signal.signal);
555     d.dispose();
556     assert(signal.signal);
557 }
558 
559 ///
560 class CompositeDisposable : Disposable
561 {
562 public:
563     ///
564     this(Disposable[] disposables...)
565     {
566         _gate = new Object;
567         _disposables = disposables.dup;
568     }
569 
570 public:
571     ///
572     void dispose()
573     {
574         Disposable[] currentDisposables;
575         synchronized (_gate)
576         {
577             if (!_disposed)
578             {
579                 _disposed = true;
580                 currentDisposables = _disposables;
581                 _disposables = [];
582             }
583         }
584 
585         if (currentDisposables)
586         {
587             foreach (d; currentDisposables)
588             {
589                 d.dispose();
590             }
591         }
592     }
593 
594     void clear()
595     {
596         Disposable[] currentDisposables;
597         synchronized (_gate)
598         {
599             currentDisposables = _disposables;
600             _disposables = [];
601         }
602 
603         foreach (d; currentDisposables)
604         {
605             d.dispose();
606         }
607     }
608 
609     void insert(Disposable item)
610     {
611         assert(item !is null);
612 
613         auto shouldDispose = false;
614         synchronized (_gate)
615         {
616             shouldDispose = _disposed;
617             if (!_disposed)
618             {
619                 _disposables ~= item;
620             }
621         }
622 
623         if (shouldDispose)
624         {
625             item.dispose();
626         }
627     }
628 
629 private:
630     Disposable[] _disposables;
631     bool _disposed;
632     Object _gate;
633 }
634 ///
635 unittest
636 {
637     auto d1 = new SingleAssignmentDisposable;
638     auto d2 = new SerialDisposable;
639     auto d = new CompositeDisposable(d1, d2);
640     d.dispose();
641 }
642 
643 unittest
644 {
645     auto composite = new CompositeDisposable;
646     auto inner = new SerialDisposable;
647     composite.insert(inner);
648     composite.dispose();
649 }
650 
651 ///
652 class AnonymousDisposable : Disposable
653 {
654 public:
655     ///
656     this(void delegate() dispose)
657     {
658         assert(dispose !is null);
659         _dispose = dispose;
660     }
661 
662 public:
663     ///
664     void dispose()
665     {
666         if (_dispose !is null)
667         {
668             _dispose();
669             _dispose = null;
670         }
671     }
672 
673 private:
674     void delegate() _dispose;
675 }
676 ///
677 unittest
678 {
679     int count = 0;
680     auto d = new AnonymousDisposable({ count++; });
681     assert(count == 0);
682     d.dispose();
683     assert(count == 1);
684     d.dispose();
685     assert(count == 1);
686 }
687 
688 ///
689 class RefCountDisposable : Disposable
690 {
691 public:
692     ///
693     this(Disposable disposable, bool throwWhenDisposed = false)
694     {
695         assert(disposable !is null);
696 
697         _throwWhenDisposed = throwWhenDisposed;
698         _gate = new Object();
699         _disposable = disposable;
700         _isPrimaryDisposed = false;
701         _count = 0;
702     }
703 
704 public:
705     ///
706     Disposable getDisposable()
707     {
708         synchronized (_gate)
709         {
710             if (_disposable is null)
711             {
712                 if (_throwWhenDisposed)
713                 {
714                     throw new Exception("RefCountDisposable is already disposed.");
715                 }
716                 return NopDisposable.instance;
717             }
718             else
719             {
720                 _count++;
721                 return new AnonymousDisposable(&this.release);
722             }
723         }
724     }
725 
726     ///
727     void dispose()
728     {
729         Disposable disposable = null;
730         synchronized (_gate)
731         {
732             if (_disposable is null)
733                 return;
734 
735             if (!_isPrimaryDisposed)
736             {
737                 _isPrimaryDisposed = true;
738 
739                 if (_count == 0)
740                 {
741                     disposable = _disposable;
742                     _disposable = null;
743                 }
744             }
745         }
746         if (disposable !is null)
747         {
748             disposable.dispose();
749         }
750     }
751 
752 private:
753     void release()
754     {
755         Disposable disposable = null;
756         synchronized (_gate)
757         {
758             if (_disposable is null)
759                 return;
760 
761             assert(_count > 0);
762             _count--;
763 
764             if (_isPrimaryDisposed)
765             {
766                 if (_count == 0)
767                 {
768                     disposable = _disposable;
769                     _disposable = null;
770                 }
771             }
772         }
773         if (disposable !is null)
774         {
775             disposable.dispose();
776         }
777     }
778 
779 private:
780     size_t _count;
781     Disposable _disposable;
782     bool _isPrimaryDisposed;
783     Object _gate;
784     bool _throwWhenDisposed;
785 }
786 
787 ///
788 unittest
789 {
790     bool disposed = false;
791     auto disposable = new RefCountDisposable(new AnonymousDisposable({
792             disposed = true;
793         }));
794 
795     auto subscription = disposable.getDisposable();
796 
797     assert(!disposed);
798     disposable.dispose();
799     assert(!disposed);
800 
801     subscription.dispose();
802     assert(disposed);
803 }
804 
805 unittest
806 {
807     bool disposed = false;
808     auto disposable = new RefCountDisposable(new AnonymousDisposable({
809             disposed = true;
810         }));
811 
812     assert(!disposed);
813     disposable.dispose();
814     assert(disposed);
815 }
816 
817 unittest
818 {
819     bool disposed = false;
820     auto disposable = new RefCountDisposable(new AnonymousDisposable({
821             disposed = true;
822         }));
823 
824     auto subscription = disposable.getDisposable();
825     assert(!disposed);
826     subscription.dispose();
827     assert(!disposed);
828     disposable.dispose();
829     assert(disposed);
830 }
831 
832 unittest
833 {
834     bool disposed = false;
835     auto disposable = new RefCountDisposable(new AnonymousDisposable({
836             disposed = true;
837         }));
838 
839     auto subscription1 = disposable.getDisposable();
840     auto subscription2 = disposable.getDisposable();
841     assert(!disposed);
842     subscription1.dispose();
843     assert(!disposed);
844     subscription2.dispose();
845     assert(!disposed);
846     disposable.dispose();
847     assert(disposed);
848 }
849 
850 unittest
851 {
852     bool disposed = false;
853     auto disposable = new RefCountDisposable(new AnonymousDisposable({
854             disposed = true;
855         }));
856 
857     auto subscription1 = disposable.getDisposable();
858     auto subscription2 = disposable.getDisposable();
859 
860     disposable.dispose();
861     assert(!disposed);
862 
863     subscription1.dispose();
864     assert(!disposed);
865     subscription1.dispose();
866     assert(!disposed);
867 
868     subscription2.dispose();
869     assert(disposed);
870 }
871 
872 ///
873 template withDisposed(alias f)
874 {
875     auto withDisposed(TDisposable)(auto ref TDisposable disposable)
876             if (isDisposable!TDisposable)
877     {
878         return new CompositeDisposable(disposable, new AnonymousDisposable({
879                 f();
880             }));
881     }
882 }
883 
884 ///ditto
885 auto withDisposed(TDisposable)(auto ref TDisposable disposable, void delegate() disposed)
886         if (isDisposable!TDisposable)
887 {
888     return new CompositeDisposable(disposable, new AnonymousDisposable(disposed));
889 }
890 
891 ///
892 unittest
893 {
894     import rx;
895 
896     auto sub = new SubjectObject!int;
897     size_t putCount = 0;
898     size_t disposedCount = 0;
899 
900     auto disposable = sub.doSubscribe!(_ => putCount++)
901         .withDisposed!(() => disposedCount++);
902 
903     sub.put(1);
904     disposable.dispose();
905 
906     assert(putCount == 1);
907     assert(disposedCount == 1);
908 }
909 
910 ///
911 unittest
912 {
913     import rx;
914 
915     auto sub = new SubjectObject!int;
916     size_t putCount = 0;
917 
918     bool disposed = false;
919     alias traceDispose = withDisposed!(() => disposed = true);
920 
921     auto disposable = traceDispose(sub.doSubscribe!(_ => putCount++));
922 
923     sub.put(1);
924     sub.completed();
925 
926     assert(putCount == 1);
927     assert(!disposed);
928 }
929 
930 ///
931 unittest
932 {
933     import rx;
934 
935     auto sub = new SubjectObject!int;
936     size_t putCount = 0;
937 
938     bool disposed = false;
939     alias traceDispose = withDisposed!(() => disposed = true);
940 
941     auto disposable = traceDispose(sub.doSubscribe!(_ => putCount++));
942 
943     sub.put(1);
944     disposable.dispose();
945 
946     assert(putCount == 1);
947     assert(disposed);
948 }
949 
950 ///
951 unittest
952 {
953     import rx;
954 
955     auto sub = new SubjectObject!int;
956     size_t putCount = 0;
957 
958     bool disposed = false;
959     auto disposable = sub.doSubscribe!(_ => putCount++).withDisposed(() {
960         disposed = true;
961     });
962 
963     sub.put(1);
964     disposable.dispose();
965 
966     assert(putCount == 1);
967     assert(disposed);
968 }
969 
970 ///
971 unittest
972 {
973     import rx;
974 
975     auto sub = new SubjectObject!int;
976     size_t disposedCount = 0;
977 
978     auto disposable = sub.doSubscribe!((int) {  })
979         .withDisposed!(() { disposedCount++; });
980 
981     disposable.dispose();
982     disposable.dispose();
983 
984     assert(disposedCount == 1);
985 }
986 
987 ///
988 unittest
989 {
990     import rx;
991 
992     auto sub = new SubjectObject!int;
993     size_t putCount = 0;
994 
995     bool disposed = false;
996     auto disposable = sub.doSubscribe!(_ => putCount++).withDisposed(() {
997         disposed = true;
998     });
999 
1000     sub.put(1);
1001     disposable.dispose();
1002 
1003     assert(putCount == 1);
1004     assert(disposed);
1005 }
1006 
1007 ///
1008 unittest
1009 {
1010     import rx;
1011 
1012     auto sub = new SubjectObject!int;
1013     size_t disposedCount = 0;
1014 
1015     auto disposable = sub.doSubscribe!((int) {  }).withDisposed(() {
1016         disposedCount++;
1017     });
1018 
1019     disposable.dispose();
1020     disposable.dispose();
1021 
1022     assert(disposedCount == 1);
1023 }