1 /+++++++++++++++++++++++++++++
2  + This module defines the concept of Disposable.
3  +/
4 module rx.disposable;
5 
6 import core.atomic;
7 import core.sync.mutex;
8 import rx.util;
9 
10 ///Tests if something is a Disposable.
11 template isDisposable(T)
12 {
13     enum bool isDisposable = is(typeof({
14                 T disposable = void;
15                 disposable.dispose();
16             }()));
17 }
18 ///
19 unittest
20 {
21     struct A
22     {
23         void dispose()
24         {
25         }
26     }
27 
28     class B
29     {
30         void dispose()
31         {
32         }
33     }
34 
35     interface C
36     {
37         void dispose();
38     }
39 
40     static assert(isDisposable!A);
41     static assert(isDisposable!B);
42     static assert(isDisposable!C);
43 }
44 
45 ///Tests if something is a Cancelable
46 template isCancelable(T)
47 {
48     enum isCancelable = isDisposable!T && is(typeof((inout int n = 0) {
49                 T disposable = void;
50                 bool b = disposable.isDisposed;
51             }));
52 }
53 ///
54 unittest
55 {
56     struct A
57     {
58         bool isDisposed() @property
59         {
60             return true;
61         }
62 
63         void dispose()
64         {
65         }
66     }
67 
68     class B
69     {
70         bool isDisposed() @property
71         {
72             return true;
73         }
74 
75         void dispose()
76         {
77         }
78     }
79 
80     interface C
81     {
82         bool isDisposed() @property;
83         void dispose();
84     }
85 
86     static assert(isCancelable!A);
87     static assert(isCancelable!B);
88     static assert(isCancelable!C);
89 }
90 
91 ///Wrapper for disposable objects.
92 interface Disposable
93 {
94     ///
95     void dispose();
96 }
97 ///Wrapper for cancelable objects.
98 interface Cancelable : Disposable
99 {
100     ///
101     bool isDisposed() @property;
102 }
103 
104 ///Simply implements for Cancelable interface. Its propagates notification that operations should be canceled.
105 class CancellationToken : Cancelable
106 {
107 public:
108     ///
109     bool isDisposed() @property
110     {
111         return atomicLoad(_disposed);
112     }
113     ///
114     alias isDisposed isCanceled;
115 
116 public:
117     ///
118     void dispose()
119     {
120         atomicStore(_disposed, true);
121     }
122     ///
123     alias dispose cancel;
124 
125 private:
126     shared(bool) _disposed;
127 }
128 
129 unittest
130 {
131     auto c = new CancellationToken;
132     assert(!c.isDisposed);
133     assert(!c.isCanceled);
134     c.dispose();
135     assert(c.isDisposed);
136     assert(c.isCanceled);
137 }
138 
139 unittest
140 {
141     auto c = new CancellationToken;
142     assert(!c.isDisposed);
143     assert(!c.isCanceled);
144     c.cancel();
145     assert(c.isDisposed);
146     assert(c.isCanceled);
147 }
148 
149 ///Class that implements the Disposable interface and wraps the dispose methods in virtual functions.
150 class DisposableObject(T) : Disposable
151 {
152 public:
153     ///
154     this(T disposable)
155     {
156         _disposable = disposable;
157     }
158 
159 public:
160     ///
161     void dispose()
162     {
163         _disposable.dispose();
164     }
165 
166 private:
167     T _disposable;
168 }
169 ///Class that implements the Cancelable interface and wraps the  isDisposed property in virtual functions.
170 class CancelableObject(T) : DisposableObject!T, Cancelable
171 {
172 public:
173     ///
174     this(T disposable)
175     {
176         super(disposable);
177     }
178 
179 public:
180     ///
181     bool isDisposed() @property
182     {
183         return _disposable.isDisposed;
184     }
185 }
186 
187 ///Wraps dispose method in virtual functions.
188 auto disposableObject(T)(T disposable)
189 {
190     static assert(isDisposable!T);
191 
192     static if (is(T : Cancelable) || is(T : Disposable))
193     {
194         return disposable;
195     }
196     else static if (isCancelable!T)
197     {
198         return new CancelableObject!T(disposable);
199     }
200     else
201     {
202         return new DisposableObject!T(disposable);
203     }
204 }
205 
206 ///
207 unittest
208 {
209     int count = 0;
210     struct TestDisposable
211     {
212         void dispose()
213         {
214             count++;
215         }
216     }
217 
218     TestDisposable test;
219     Disposable disposable = disposableObject(test);
220     assert(count == 0);
221     disposable.dispose();
222     assert(count == 1);
223 }
224 
225 unittest
226 {
227     int count = 0;
228     class TestDisposable : Disposable
229     {
230         void dispose()
231         {
232             count++;
233         }
234     }
235 
236     auto test = new TestDisposable;
237     Disposable disposable = disposableObject(test);
238     assert(disposable is test);
239     assert(count == 0);
240     disposable.dispose();
241     assert(count == 1);
242 }
243 
244 unittest
245 {
246     int count = 0;
247     struct TestCancelable
248     {
249         bool isDisposed() @property
250         {
251             return _disposed;
252         }
253 
254         void dispose()
255         {
256             count++;
257             _disposed = true;
258         }
259 
260         bool _disposed;
261     }
262 
263     TestCancelable test;
264     Cancelable cancelable = disposableObject(test);
265 
266     assert(!cancelable.isDisposed);
267     assert(count == 0);
268     cancelable.dispose();
269     assert(cancelable.isDisposed);
270     assert(count == 1);
271 }
272 
273 ///Defines a instance property that return NOP Disposable.
274 final class NopDisposable : Disposable
275 {
276 private:
277     this()
278     {
279     }
280 
281 public:
282     void dispose()
283     {
284     }
285 
286 public:
287     ///
288     static Disposable instance() @property
289     {
290         import std.concurrency : initOnce;
291 
292         static __gshared NopDisposable inst;
293         return initOnce!inst(new NopDisposable);
294     }
295 }
296 ///
297 unittest
298 {
299     Disposable d1 = NopDisposable.instance;
300     Disposable d2 = NopDisposable.instance;
301     assert(d1 !is null);
302     assert(d1 is d2);
303 }
304 
305 package final class DisposedMarker : Cancelable
306 {
307 private:
308     this()
309     {
310     }
311 
312 public:
313     bool isDisposed() @property
314     {
315         return true;
316     }
317 
318 public:
319     void dispose()
320     {
321     }
322 
323 public:
324     static Cancelable instance()
325     {
326         import std.concurrency : initOnce;
327 
328         static __gshared DisposedMarker inst;
329         return initOnce!inst(new DisposedMarker);
330     }
331 }
332 
333 ///
334 final class SingleAssignmentDisposable : Cancelable
335 {
336 public:
337     ///
338     void setDisposable(Disposable disposable)
339     {
340         import core.atomic;
341 
342         if (!cas(&_disposable, shared(Disposable).init, cast(shared) disposable))
343             assert(false);
344     }
345 
346 public:
347     ///
348     bool isDisposed() @property
349     {
350         return _disposable is cast(shared) DisposedMarker.instance;
351     }
352 
353 public:
354     ///
355     void dispose()
356     {
357         import rx.util;
358 
359         auto temp = assumeThreadLocal(exchange(_disposable, cast(shared) DisposedMarker.instance));
360         if (temp !is null)
361             temp.dispose();
362     }
363 
364 private:
365     shared(Disposable) _disposable;
366 }
367 ///
368 unittest
369 {
370     int count = 0;
371     class TestDisposable : Disposable
372     {
373         void dispose()
374         {
375             count++;
376         }
377     }
378 
379     auto temp = new SingleAssignmentDisposable;
380     temp.setDisposable(new TestDisposable);
381     assert(!temp.isDisposed);
382     assert(count == 0);
383     temp.dispose();
384     assert(temp.isDisposed);
385     assert(count == 1);
386 }
387 
388 unittest
389 {
390     static assert(isDisposable!SingleAssignmentDisposable);
391 }
392 
393 unittest
394 {
395     import core.exception;
396 
397     class TestDisposable : Disposable
398     {
399         void dispose()
400         {
401         }
402     }
403 
404     auto temp = new SingleAssignmentDisposable;
405     temp.setDisposable(new TestDisposable);
406     try
407     {
408         temp.setDisposable(new TestDisposable);
409     }
410     catch (AssertError)
411     {
412         return;
413     }
414     assert(false);
415 }
416 
417 ///
418 class SerialDisposable : Cancelable
419 {
420 public:
421     this()
422     {
423         _gate = new Mutex;
424     }
425 
426 public:
427     ///
428     bool isDisposed() @property
429     {
430         return _disposed;
431     }
432 
433     ///
434     void disposable(Disposable value) @property
435     {
436         auto shouldDispose = false;
437         Disposable old = null;
438         synchronized (_gate)
439         {
440             shouldDispose = _disposed;
441             if (!shouldDispose)
442             {
443                 old = _disposable;
444                 _disposable = value;
445             }
446         }
447         if (old !is null)
448             old.dispose();
449         if (shouldDispose && value !is null)
450             value.dispose();
451     }
452 
453     ///
454     Disposable disposable() @property
455     {
456         return _disposable;
457     }
458 
459 public:
460     ///
461     void dispose()
462     {
463         Disposable old = null;
464         synchronized (_gate)
465         {
466             if (!_disposed)
467             {
468                 _disposed = true;
469                 old = _disposable;
470                 _disposable = null;
471             }
472         }
473         if (old !is null)
474             old.dispose();
475     }
476 
477 private:
478     Mutex _gate;
479     bool _disposed;
480     Disposable _disposable;
481 }
482 
483 unittest
484 {
485     int count = 0;
486     struct A
487     {
488         void dispose()
489         {
490             count++;
491         }
492     }
493 
494     auto d = new SerialDisposable;
495     d.disposable = disposableObject(A());
496     assert(count == 0);
497     d.disposable = disposableObject(A());
498     assert(count == 1);
499     d.dispose();
500     assert(count == 2);
501     d.disposable = disposableObject(A());
502     assert(count == 3);
503 }
504 
505 unittest
506 {
507     int count = 0;
508     struct A
509     {
510         void dispose()
511         {
512             count++;
513         }
514     }
515 
516     auto d = new SerialDisposable;
517     d.dispose();
518     assert(count == 0);
519     d.disposable = disposableObject(A());
520     assert(count == 1);
521 }
522 
523 ///
524 class SignalDisposable : Disposable
525 {
526 public:
527     this()
528     {
529         _signal = new EventSignal;
530     }
531 
532 public:
533     ///
534     EventSignal signal() @property
535     {
536         return _signal;
537     }
538 
539 public:
540     ///
541     void dispose()
542     {
543         _signal.setSignal();
544     }
545 
546 private:
547     EventSignal _signal;
548 }
549 
550 unittest
551 {
552     auto d = new SignalDisposable;
553     auto signal = d.signal;
554     assert(!signal.signal);
555     d.dispose();
556     assert(signal.signal);
557 }
558 
559 ///
560 class CompositeDisposable : Disposable
561 {
562 public:
563     ///
564     this(Disposable[] disposables...)
565     {
566         _gate = new Object;
567         _disposables = disposables.dup;
568     }
569 
570 public:
571     ///
572     void dispose()
573     {
574         Disposable[] currentDisposables;
575         synchronized (_gate)
576         {
577             if (!_disposed)
578             {
579                 _disposed = true;
580                 currentDisposables = _disposables;
581                 _disposables = [];
582             }
583         }
584 
585         if (currentDisposables)
586         {
587             foreach (d; currentDisposables)
588             {
589                 d.dispose();
590             }
591         }
592     }
593 
594     void clear()
595     {
596         Disposable[] currentDisposables;
597         synchronized (_gate)
598         {
599             currentDisposables = _disposables;
600             _disposables = [];
601         }
602 
603         foreach (d; currentDisposables)
604         {
605             d.dispose();
606         }
607     }
608 
609     void insert(Disposable item)
610     {
611         assert(item !is null);
612 
613         bool shouldDispose = void;
614         synchronized (_gate)
615         {
616             shouldDispose = _disposed;
617             if (!_disposed)
618             {
619                 _disposables ~= item;
620             }
621         }
622 
623         if (shouldDispose)
624         {
625             item.dispose();
626         }
627     }
628 
629 private:
630     Disposable[] _disposables;
631     bool _disposed;
632     Object _gate;
633 }
634 ///
635 unittest
636 {
637     auto d1 = new SingleAssignmentDisposable;
638     auto d2 = new SerialDisposable;
639     auto d = new CompositeDisposable(d1, d2);
640     d.dispose();
641 }
642 
643 unittest
644 {
645     auto composite = new CompositeDisposable;
646     auto disposed = false;
647     auto inner = new AnonymousDisposable({ disposed = true; });
648     composite.insert(inner);
649     composite.dispose();
650     assert(disposed);
651 }
652 
653 unittest
654 {
655     auto composite = new CompositeDisposable;
656     size_t _count = 0;
657     auto inner = new AnonymousDisposable({ _count++; });
658     composite.insert(inner);
659     composite.clear(); // clear items and dispose all
660     assert(_count == 1);
661 
662     composite.clear();
663     assert(_count == 1);
664 }
665 
666 unittest
667 {
668     auto composite = new CompositeDisposable;
669     composite.dispose();
670 
671     auto disposed = false;
672     auto inner2 = new AnonymousDisposable({ disposed = true; });
673     composite.insert(inner2);
674     assert(disposed);
675 }
676 
677 ///
678 class AnonymousDisposable : Disposable
679 {
680 public:
681     ///
682     this(void delegate() dispose)
683     {
684         assert(dispose !is null);
685         _dispose = dispose;
686     }
687 
688 public:
689     ///
690     void dispose()
691     {
692         if (_dispose !is null)
693         {
694             _dispose();
695             _dispose = null;
696         }
697     }
698 
699 private:
700     void delegate() _dispose;
701 }
702 ///
703 unittest
704 {
705     int count = 0;
706     auto d = new AnonymousDisposable({ count++; });
707     assert(count == 0);
708     d.dispose();
709     assert(count == 1);
710     d.dispose();
711     assert(count == 1);
712 }
713 
714 ///
715 class RefCountDisposable : Disposable
716 {
717 public:
718     ///
719     this(Disposable disposable, bool throwWhenDisposed = false)
720     {
721         assert(disposable !is null);
722 
723         _throwWhenDisposed = throwWhenDisposed;
724         _gate = new Object();
725         _disposable = disposable;
726         _isPrimaryDisposed = false;
727         _count = 0;
728     }
729 
730 public:
731     ///
732     Disposable getDisposable()
733     {
734         synchronized (_gate)
735         {
736             if (_disposable is null)
737             {
738                 if (_throwWhenDisposed)
739                 {
740                     throw new Exception("RefCountDisposable is already disposed.");
741                 }
742                 return NopDisposable.instance;
743             }
744             else
745             {
746                 _count++;
747                 return new AnonymousDisposable(&this.release);
748             }
749         }
750     }
751 
752     ///
753     void dispose()
754     {
755         Disposable disposable = null;
756         synchronized (_gate)
757         {
758             if (_disposable is null)
759                 return;
760 
761             if (!_isPrimaryDisposed)
762             {
763                 _isPrimaryDisposed = true;
764 
765                 if (_count == 0)
766                 {
767                     disposable = _disposable;
768                     _disposable = null;
769                 }
770             }
771         }
772         if (disposable !is null)
773         {
774             disposable.dispose();
775         }
776     }
777 
778 private:
779     void release()
780     {
781         Disposable disposable = null;
782         synchronized (_gate)
783         {
784             if (_disposable is null)
785                 return;
786 
787             assert(_count > 0);
788             _count--;
789 
790             if (_isPrimaryDisposed)
791             {
792                 if (_count == 0)
793                 {
794                     disposable = _disposable;
795                     _disposable = null;
796                 }
797             }
798         }
799         if (disposable !is null)
800         {
801             disposable.dispose();
802         }
803     }
804 
805 private:
806     size_t _count;
807     Disposable _disposable;
808     bool _isPrimaryDisposed;
809     Object _gate;
810     bool _throwWhenDisposed;
811 }
812 
813 ///
814 unittest
815 {
816     bool disposed = false;
817     auto disposable = new RefCountDisposable(new AnonymousDisposable({
818             disposed = true;
819         }));
820 
821     auto subscription = disposable.getDisposable();
822 
823     assert(!disposed);
824     disposable.dispose();
825     assert(!disposed);
826 
827     subscription.dispose();
828     assert(disposed);
829 }
830 
831 unittest
832 {
833     bool disposed = false;
834     auto disposable = new RefCountDisposable(new AnonymousDisposable({
835             disposed = true;
836         }));
837 
838     assert(!disposed);
839     disposable.dispose();
840     assert(disposed);
841 }
842 
843 unittest
844 {
845     bool disposed = false;
846     auto disposable = new RefCountDisposable(new AnonymousDisposable({
847             disposed = true;
848         }));
849 
850     auto subscription = disposable.getDisposable();
851     assert(!disposed);
852     subscription.dispose();
853     assert(!disposed);
854     disposable.dispose();
855     assert(disposed);
856 }
857 
858 unittest
859 {
860     bool disposed = false;
861     auto disposable = new RefCountDisposable(new AnonymousDisposable({
862             disposed = true;
863         }));
864 
865     auto subscription1 = disposable.getDisposable();
866     auto subscription2 = disposable.getDisposable();
867     assert(!disposed);
868     subscription1.dispose();
869     assert(!disposed);
870     subscription2.dispose();
871     assert(!disposed);
872     disposable.dispose();
873     assert(disposed);
874 }
875 
876 unittest
877 {
878     bool disposed = false;
879     auto disposable = new RefCountDisposable(new AnonymousDisposable({
880             disposed = true;
881         }));
882 
883     auto subscription1 = disposable.getDisposable();
884     auto subscription2 = disposable.getDisposable();
885 
886     disposable.dispose();
887     assert(!disposed);
888 
889     subscription1.dispose();
890     assert(!disposed);
891     subscription1.dispose();
892     assert(!disposed);
893 
894     subscription2.dispose();
895     assert(disposed);
896 }
897 
898 ///
899 template withDisposed(alias f)
900 {
901     auto withDisposed(TDisposable)(auto ref TDisposable disposable)
902             if (isDisposable!TDisposable)
903     {
904         return new CompositeDisposable(disposable, new AnonymousDisposable({
905                 f();
906             }));
907     }
908 }
909 
910 ///ditto
911 auto withDisposed(TDisposable)(auto ref TDisposable disposable, void delegate() disposed)
912         if (isDisposable!TDisposable)
913 {
914     return new CompositeDisposable(disposable, new AnonymousDisposable(disposed));
915 }
916 
917 ///
918 unittest
919 {
920     import rx;
921 
922     auto sub = new SubjectObject!int;
923     size_t putCount = 0;
924     size_t disposedCount = 0;
925 
926     auto disposable = sub.doSubscribe!(_ => putCount++)
927         .withDisposed!(() => disposedCount++);
928 
929     sub.put(1);
930     disposable.dispose();
931 
932     assert(putCount == 1);
933     assert(disposedCount == 1);
934 }
935 
936 ///
937 unittest
938 {
939     import rx;
940 
941     auto sub = new SubjectObject!int;
942     size_t putCount = 0;
943 
944     bool disposed = false;
945     alias traceDispose = withDisposed!(() => disposed = true);
946 
947     auto disposable = traceDispose(sub.doSubscribe!(_ => putCount++));
948 
949     sub.put(1);
950     sub.completed();
951 
952     assert(putCount == 1);
953     assert(!disposed);
954 }
955 
956 ///
957 unittest
958 {
959     import rx;
960 
961     auto sub = new SubjectObject!int;
962     size_t putCount = 0;
963 
964     bool disposed = false;
965     alias traceDispose = withDisposed!(() => disposed = true);
966 
967     auto disposable = traceDispose(sub.doSubscribe!(_ => putCount++));
968 
969     sub.put(1);
970     disposable.dispose();
971 
972     assert(putCount == 1);
973     assert(disposed);
974 }
975 
976 ///
977 unittest
978 {
979     import rx;
980 
981     auto sub = new SubjectObject!int;
982     size_t putCount = 0;
983 
984     bool disposed = false;
985     auto disposable = sub.doSubscribe!(_ => putCount++).withDisposed(() {
986         disposed = true;
987     });
988 
989     sub.put(1);
990     disposable.dispose();
991 
992     assert(putCount == 1);
993     assert(disposed);
994 }
995 
996 ///
997 unittest
998 {
999     import rx;
1000 
1001     auto sub = new SubjectObject!int;
1002     size_t disposedCount = 0;
1003 
1004     auto disposable = sub.doSubscribe!((int) {  })
1005         .withDisposed!(() { disposedCount++; });
1006 
1007     disposable.dispose();
1008     disposable.dispose();
1009 
1010     assert(disposedCount == 1);
1011 }
1012 
1013 ///
1014 unittest
1015 {
1016     import rx;
1017 
1018     auto sub = new SubjectObject!int;
1019     size_t putCount = 0;
1020 
1021     bool disposed = false;
1022     auto disposable = sub.doSubscribe!(_ => putCount++).withDisposed(() {
1023         disposed = true;
1024     });
1025 
1026     sub.put(1);
1027     disposable.dispose();
1028 
1029     assert(putCount == 1);
1030     assert(disposed);
1031 }
1032 
1033 ///
1034 unittest
1035 {
1036     import rx;
1037 
1038     auto sub = new SubjectObject!int;
1039     size_t disposedCount = 0;
1040 
1041     auto disposable = sub.doSubscribe!((int) {  }).withDisposed(() {
1042         disposedCount++;
1043     });
1044 
1045     disposable.dispose();
1046     disposable.dispose();
1047 
1048     assert(disposedCount == 1);
1049 }