1 /+++++++++++++++++++++++++++++
2  + This module defines the Subject and some implements.
3  +/
4 module rx.subject;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.util : assumeThreadLocal;
10 
11 import core.atomic : atomicLoad, cas;
12 import std.range : put;
13 
14 ///Represents an object that is both an observable sequence as well as an observer.
15 interface Subject(E) : Observer!E, Observable!E
16 {
17 }
18 
19 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
20 class SubjectObject(E) : Subject!E
21 {
22     alias ElementType = E;
23 
24 public:
25     ///
26     this()
27     {
28         _observer = cast(shared) NopObserver!E.instance;
29     }
30 
31 public:
32     ///
33     void put(E obj)
34     {
35         auto temp = assumeThreadLocal(atomicLoad(_observer));
36         .put(temp, obj);
37     }
38     ///
39     void completed()
40     {
41         shared(Observer!E) oldObserver = void;
42         shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance;
43         Observer!E temp = void;
44         do
45         {
46             oldObserver = _observer;
47             temp = assumeThreadLocal(atomicLoad(oldObserver));
48             if (cast(DoneObserver!E) temp)
49                 break;
50         }
51         while (!cas(&_observer, oldObserver, newObserver));
52         temp.completed();
53     }
54     ///
55     void failure(Exception error)
56     {
57         shared(Observer!E) oldObserver = void;
58         shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error);
59         Observer!E temp = void;
60         do
61         {
62             oldObserver = _observer;
63             temp = assumeThreadLocal(atomicLoad(oldObserver));
64             if (cast(DoneObserver!E) temp)
65                 break;
66         }
67         while (!cas(&_observer, oldObserver, newObserver));
68         temp.failure(error);
69     }
70 
71     ///
72     Disposable subscribe(T)(T observer)
73     {
74         return subscribe(observerObject!E(observer));
75     }
76     ///
77     Disposable subscribe(Observer!E observer)
78     {
79         shared(Observer!E) oldObserver = void;
80         shared(Observer!E) newObserver = void;
81         do
82         {
83             oldObserver = _observer;
84             auto temp = assumeThreadLocal(atomicLoad(oldObserver));
85 
86             if (temp is DoneObserver!E.instance)
87             {
88                 observer.completed();
89                 return NopDisposable.instance;
90             }
91 
92             if (auto fail = cast(DoneObserver!E) temp)
93             {
94                 observer.failure(fail.exception);
95                 return NopDisposable.instance;
96             }
97 
98             if (auto composite = cast(CompositeObserver!E) temp)
99             {
100                 newObserver = cast(shared) composite.add(observer);
101             }
102             else if (auto nop = cast(NopObserver!E) temp)
103             {
104                 newObserver = cast(shared) observer;
105             }
106             else
107             {
108                 newObserver = cast(shared)(new CompositeObserver!E([temp, observer]));
109             }
110         }
111         while (!cas(&_observer, oldObserver, newObserver));
112 
113         return subscription(this, observer);
114     }
115 
116     ///
117     void unsubscribe(Observer!E observer)
118     {
119         shared(Observer!E) oldObserver = void;
120         shared(Observer!E) newObserver = void;
121         do
122         {
123             oldObserver = _observer;
124 
125             import rx.util : assumeThreadLocal;
126 
127             auto temp = assumeThreadLocal(atomicLoad(oldObserver));
128             if (auto composite = cast(CompositeObserver!E) temp)
129             {
130                 newObserver = cast(shared) composite.remove(observer);
131             }
132             else
133             {
134                 if (temp !is observer)
135                     return;
136 
137                 newObserver = cast(shared) NopObserver!E.instance;
138             }
139         }
140         while (!cas(&_observer, oldObserver, newObserver));
141     }
142 
143 protected:
144     Observer!E currentObserver() @property
145     {
146         return assumeThreadLocal(atomicLoad(_observer));
147     }
148 
149 private:
150     shared(Observer!E) _observer;
151 }
152 
153 ///
154 unittest
155 {
156     import std.array : appender;
157 
158     auto data = appender!(int[])();
159     auto subject = new SubjectObject!int;
160     auto disposable = subject.subscribe(observerObject!(int)(data));
161     assert(disposable !is null);
162     subject.put(0);
163     subject.put(1);
164 
165     import std.algorithm : equal;
166 
167     assert(equal(data.data, [0, 1]));
168 
169     disposable.dispose();
170     subject.put(2);
171     assert(equal(data.data, [0, 1]));
172 }
173 
174 unittest
175 {
176     static assert(isObserver!(SubjectObject!int, int));
177     static assert(isObservable!(SubjectObject!int, int));
178     static assert(!isObservable!(SubjectObject!int, string));
179     static assert(!isObservable!(SubjectObject!int, string));
180 }
181 
182 unittest
183 {
184     auto subject = new SubjectObject!int;
185     auto observer = new CounterObserver!int;
186     auto disposable = subject.subscribe(observer);
187     scope (exit)
188         disposable.dispose();
189 
190     subject.put(0);
191     subject.put(1);
192 
193     assert(observer.putCount == 2);
194     subject.completed();
195     subject.put(2);
196     assert(observer.putCount == 2);
197     assert(observer.completedCount == 1);
198 }
199 
200 unittest
201 {
202     auto subject = new SubjectObject!int;
203     auto observer = new CounterObserver!int;
204     auto disposable = subject.subscribe(observer);
205     scope (exit)
206         disposable.dispose();
207 
208     subject.put(0);
209     subject.put(1);
210 
211     assert(observer.putCount == 2);
212     auto ex = new Exception("Exception");
213     subject.failure(ex);
214     subject.put(2);
215     assert(observer.putCount == 2);
216     assert(observer.failureCount == 1);
217     assert(observer.lastException is ex);
218 }
219 
220 unittest
221 {
222     import std.array : appender;
223 
224     auto buf1 = appender!(int[]);
225     auto buf2 = appender!(int[]);
226     auto subject = new SubjectObject!int;
227     subject.subscribe(observerObject!(int)(buf1));
228     subject.doSubscribe((int n) => buf2.put(n));
229 
230     assert(buf1.data.length == 0);
231     assert(buf2.data.length == 0);
232     subject.put(0);
233     assert(buf1.data.length == 1);
234     assert(buf2.data.length == 1);
235     assert(buf1.data[0] == buf2.data[0]);
236 }
237 
238 unittest
239 {
240     auto sub = new SubjectObject!int;
241     sub.completed();
242 
243     auto observer = new CounterObserver!int;
244     assert(observer.putCount == 0);
245     assert(observer.completedCount == 0);
246     assert(observer.failureCount == 0);
247     sub.subscribe(observer);
248     assert(observer.putCount == 0);
249     assert(observer.completedCount == 1);
250     assert(observer.failureCount == 0);
251 }
252 
253 unittest
254 {
255     auto sub = new SubjectObject!int;
256     auto ex = new Exception("Exception");
257     sub.failure(ex);
258 
259     auto observer = new CounterObserver!int;
260     assert(observer.putCount == 0);
261     assert(observer.completedCount == 0);
262     assert(observer.failureCount == 0);
263     sub.subscribe(observer);
264     assert(observer.putCount == 0);
265     assert(observer.completedCount == 0);
266     assert(observer.failureCount == 1);
267     assert(observer.lastException is ex);
268 }
269 
270 unittest
271 {
272     // MyFilterSubject puts a value only on MyCustomObserver.
273 
274     static class MyCustomObserver : Observer!int
275     {
276         int[] buf;
277 
278         void put(int obj)
279         {
280             buf ~= obj;
281         }
282 
283         void completed()
284         {
285         }
286 
287         void failure(Exception ex)
288         {
289         }
290     }
291 
292     static class MyFilterSubject : SubjectObject!int
293     {
294         override void put(int obj)
295         {
296             if (auto current = cast(CompositeObserver!int) currentObserver)
297             {
298                 /// write a own filter, map, order and more  
299                 foreach (observer; current.observers)
300                 {
301                     if (auto myObserver = cast(MyCustomObserver) observer)
302                     {
303                         myObserver.put(obj);
304                     }
305                 }
306             }
307         }
308     }
309 
310     import std.array : appender;
311 
312     auto myObserver = new MyCustomObserver;
313     auto buffer = appender!(int[]);
314 
315     auto sub = new MyFilterSubject;
316     .put(sub, -1);
317 
318     sub.subscribe(myObserver);
319     sub.subscribe(buffer);
320 
321     .put(sub, 0);
322     .put(sub, 1);
323     .put(sub, 2);
324 
325     assert(myObserver.buf.length == 3);
326     assert(buffer.data.length == 0);
327 }
328 
329 private class Subscription(TSubject, TObserver) : Disposable
330 {
331 public:
332     this(TSubject subject, TObserver observer)
333     {
334         _subject = subject;
335         _observer = observer;
336     }
337 
338 public:
339     void dispose()
340     {
341         if (_subject !is null)
342         {
343             _subject.unsubscribe(_observer);
344             _subject = null;
345         }
346     }
347 
348 private:
349     TSubject _subject;
350     TObserver _observer;
351 }
352 
353 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)(
354         TSubject subject, TObserver observer)
355 {
356     return new typeof(return)(subject, observer);
357 }
358 
359 ///
360 class AsyncSubject(E) : Subject!E
361 {
362 public:
363     ///
364     Disposable subscribe(Observer!E observer)
365     {
366         Exception ex = null;
367         E value;
368         bool hasValue = false;
369 
370         synchronized (this)
371         {
372             if (!_isStopped)
373             {
374                 _observers ~= observer;
375                 return subscription(this, observer);
376             }
377 
378             ex = _exception;
379             hasValue = _hasValue;
380             value = _value;
381         }
382 
383         if (ex !is null)
384         {
385             observer.failure(ex);
386         }
387         else if (hasValue)
388         {
389             .put(observer, value);
390             observer.completed();
391         }
392         else
393         {
394             observer.completed();
395         }
396 
397         return NopDisposable.instance;
398     }
399 
400     ///
401     auto subscribe(T)(T observer)
402     {
403         return subscribe(observerObject!E(observer));
404     }
405 
406     ///
407     void unsubscribe(Observer!E observer)
408     {
409         if (observer is null)
410             return;
411 
412         synchronized (this)
413         {
414             import std.algorithm : remove, countUntil;
415 
416             auto index = countUntil(_observers, observer);
417             if (index != -1)
418             {
419                 _observers = remove(_observers, index);
420             }
421         }
422     }
423 
424 public:
425     ///
426     void put(E value)
427     {
428         synchronized (this)
429         {
430             if (!_isStopped)
431             {
432                 _value = value;
433                 _hasValue = true;
434             }
435         }
436     }
437 
438     ///
439     void completed()
440     {
441         Observer!E[] os = null;
442 
443         E value;
444         bool hasValue = false;
445 
446         synchronized (this)
447         {
448             if (!_isStopped)
449             {
450                 os = _observers;
451                 _observers.length = 0;
452                 _isStopped = true;
453                 value = _value;
454                 hasValue = _hasValue;
455             }
456         }
457 
458         if (os)
459         {
460             if (hasValue)
461             {
462                 foreach (observer; os)
463                 {
464                     .put(observer, value);
465                     observer.completed();
466                 }
467             }
468             else
469             {
470                 foreach (observer; os)
471                 {
472                     observer.completed();
473                 }
474             }
475         }
476     }
477 
478     ///
479     void failure(Exception e)
480     {
481         assert(e !is null);
482 
483         Observer!E[] os = null;
484         synchronized (this)
485         {
486             if (!_isStopped)
487             {
488                 os = _observers;
489                 _observers.length = 0;
490                 _isStopped = true;
491                 _exception = e;
492             }
493         }
494 
495         if (os)
496         {
497             foreach (observer; os)
498             {
499                 observer.failure(e);
500             }
501         }
502     }
503 
504 private:
505     Observer!E[] _observers;
506     bool _isStopped;
507     E _value;
508     bool _hasValue;
509     Exception _exception;
510 }
511 
512 unittest
513 {
514     auto sub = new AsyncSubject!int;
515 
516     .put(sub, 1);
517     sub.completed();
518 
519     auto observer = new CounterObserver!int;
520 
521     assert(observer.hasNotBeenCalled);
522 
523     sub.subscribe(observer);
524 
525     assert(observer.putCount == 1);
526     assert(observer.completedCount == 1);
527     assert(observer.failureCount == 0);
528     assert(observer.lastValue == 1);
529 }
530 
531 unittest
532 {
533     auto sub = new AsyncSubject!int;
534     auto observer = new CounterObserver!int;
535 
536     auto d = sub.subscribe(observer);
537     scope (exit)
538         d.dispose();
539 
540     assert(observer.hasNotBeenCalled);
541 
542     sub.put(100);
543 
544     assert(observer.hasNotBeenCalled);
545 
546     assert(sub._hasValue);
547     assert(sub._value == 100);
548 
549     sub.completed();
550 
551     assert(observer.putCount == 1);
552     assert(observer.completedCount == 1);
553     assert(observer.failureCount == 0);
554     assert(observer.lastValue == 100);
555 }
556 
557 unittest
558 {
559     auto sub = new AsyncSubject!int;
560     auto observer = new CounterObserver!int;
561 
562     sub.put(100);
563 
564     assert(sub._hasValue);
565     assert(sub._value == 100);
566 
567     auto d = sub.subscribe(observer);
568     scope (exit)
569         d.dispose();
570 
571     assert(observer.hasNotBeenCalled);
572 
573     sub.completed();
574 
575     assert(observer.putCount == 1);
576     assert(observer.completedCount == 1);
577     assert(observer.failureCount == 0);
578     assert(observer.lastValue == 100);
579 }
580 
581 unittest
582 {
583     auto sub = new AsyncSubject!int;
584     auto observer = new CounterObserver!int;
585 
586     auto d = sub.subscribe(observer);
587 
588     d.dispose();
589     assert(observer.hasNotBeenCalled);
590 
591     sub.put(100);
592     assert(observer.hasNotBeenCalled);
593 
594     sub.completed();
595     assert(observer.hasNotBeenCalled);
596 }
597 
598 unittest
599 {
600     auto sub = new AsyncSubject!int;
601     auto observer = new CounterObserver!int;
602 
603     auto d = sub.subscribe(observer);
604     assert(observer.hasNotBeenCalled);
605 
606     sub.put(100);
607     assert(observer.hasNotBeenCalled);
608 
609     d.dispose();
610     assert(observer.hasNotBeenCalled);
611 
612     sub.completed();
613     assert(observer.hasNotBeenCalled);
614 }
615 
616 unittest
617 {
618 
619     auto sub = new AsyncSubject!int;
620     auto observer = new CounterObserver!int;
621 
622     sub.put(100);
623     assert(observer.hasNotBeenCalled);
624 
625     auto d = sub.subscribe(observer);
626     assert(observer.hasNotBeenCalled);
627 
628     d.dispose();
629     assert(observer.hasNotBeenCalled);
630 
631     sub.completed();
632     assert(observer.hasNotBeenCalled);
633 }
634 
635 unittest
636 {
637     auto sub = new AsyncSubject!int;
638     auto observer = new CounterObserver!int;
639 
640     auto d = sub.subscribe(observer);
641     scope (exit)
642         d.dispose();
643 
644     assert(observer.hasNotBeenCalled);
645 
646     sub.completed();
647 
648     assert(observer.putCount == 0);
649     assert(observer.completedCount == 1);
650     assert(observer.failureCount == 0);
651 }
652 
653 unittest
654 {
655     auto sub = new AsyncSubject!int;
656     auto observer = new CounterObserver!int;
657 
658     auto d = sub.subscribe(observer);
659     scope (exit)
660         d.dispose();
661 
662     assert(observer.hasNotBeenCalled);
663 
664     auto ex = new Exception("TEST");
665     sub.failure(ex);
666 
667     assert(observer.putCount == 0);
668     assert(observer.completedCount == 0);
669     assert(observer.failureCount == 1);
670     assert(observer.lastException is ex);
671 }
672 
673 unittest
674 {
675     auto sub = new AsyncSubject!int;
676     auto ex = new Exception("TEST");
677     sub.failure(ex);
678 
679     auto observer = new CounterObserver!int;
680 
681     auto d = sub.subscribe(observer);
682     scope (exit)
683         d.dispose();
684 
685     assert(observer.putCount == 0);
686     assert(observer.completedCount == 0);
687     assert(observer.failureCount == 1);
688     assert(observer.lastException is ex);
689 }
690 
691 unittest
692 {
693     auto sub = new AsyncSubject!int;
694     auto observer = new CounterObserver!int;
695 
696     sub.completed();
697     assert(observer.hasNotBeenCalled);
698 
699     sub.subscribe(observer);
700     assert(observer.putCount == 0);
701     assert(observer.completedCount == 1);
702     assert(observer.failureCount == 0);
703 }
704 
705 version (unittest)
706 {
707     class CounterObserver(T) : Observer!T
708     {
709     public:
710         size_t putCount;
711         size_t completedCount;
712         size_t failureCount;
713         T lastValue;
714         Exception lastException;
715 
716     public:
717         bool hasNotBeenCalled() const pure nothrow @nogc @safe @property
718         {
719             return putCount == 0 && completedCount == 0 && failureCount == 0;
720         }
721 
722     public:
723         void put(T obj)
724         {
725             putCount++;
726             lastValue = obj;
727         }
728 
729         void completed()
730         {
731             completedCount++;
732         }
733 
734         void failure(Exception e)
735         {
736             failureCount++;
737             lastException = e;
738         }
739     }
740 }
741 
742 ///
743 class BehaviorSubject(E) : Subject!E
744 {
745 public:
746     ///
747     this()
748     {
749         this(E.init);
750     }
751 
752     ///
753     this(E value)
754     {
755         _subject = new SubjectObject!E;
756         _value = value;
757     }
758 
759 public:
760     ///
761     inout(E) value() inout @property
762     {
763         return _value;
764     }
765 
766     ///
767     void value(E value) @property
768     {
769         if (_value != value)
770         {
771             _value = value;
772             .put(_subject, value);
773         }
774     }
775 
776 public:
777     ///
778     auto subscribe(TObserver)(auto ref TObserver observer)
779     {
780         .put(observer, value);
781         return _subject.doSubscribe(observer);
782     }
783 
784     ///
785     Disposable subscribe(Observer!E observer)
786     {
787         .put(observer, value);
788         return disposableObject(_subject.doSubscribe(observer));
789     }
790 
791     ///
792     void put(E obj)
793     {
794         value = obj;
795     }
796 
797     ///
798     void completed()
799     {
800         _subject.completed();
801     }
802 
803     ///
804     void failure(Exception e)
805     {
806         _subject.failure(e);
807     }
808 
809 private:
810     SubjectObject!E _subject;
811     E _value;
812 }
813 
814 unittest
815 {
816     static assert(isObservable!(BehaviorSubject!int, int));
817     static assert(is(BehaviorSubject!int.ElementType == int));
818 }
819 
820 unittest
821 {
822     int num = 0;
823     auto subject = new BehaviorSubject!int(100);
824 
825     auto d = subject.doSubscribe((int n) { num = n; });
826     assert(num == 100);
827 
828     .put(subject, 1);
829     assert(num == 1);
830 
831     d.dispose();
832     .put(subject, 10);
833     assert(num == 1);
834 }
835 
836 ///
837 auto asBehaviorSubject(TObservable)(auto ref TObservable observable)
838 {
839     alias E = TObservable.ElementType;
840     auto subject = new BehaviorSubject!E;
841     observable.doSubscribe(subject);
842     return subject;
843 }
844 
845 ///
846 unittest
847 {
848     import rx;
849 
850     auto num1 = new BehaviorSubject!int;
851     auto num2 = new BehaviorSubject!int;
852 
853     BehaviorSubject!int sum = combineLatest!((l, r) => l + r)(num1, num2).asBehaviorSubject();
854 
855     assert(sum.value == 0);
856     num1.value = 10;
857     assert(sum.value == 10);
858     num2.value = 20;
859     assert(sum.value == 30);
860 }
861 
862 ///
863 class ReplaySubject(E) : Subject!E
864 {
865 private:
866     RingBuffer!E _buffer;
867     SubjectObject!E _subject;
868     bool _completed;
869 
870 public:
871     ///
872     this(size_t bufferSize)
873     {
874         _buffer = RingBuffer!E(bufferSize);
875         _subject = new SubjectObject!E;
876     }
877 
878 public:
879     ///
880     Disposable subscribe(TObserver)(auto ref TObserver observer)
881     {
882         .put(observer, _buffer[]);
883         if (_completed)
884             return NopDisposable.instance;
885         else
886             return _subject.doSubscribe(observer).disposableObject();
887     }
888 
889     ///
890     Disposable subscribe(Observer!E observer)
891     {
892         .put(observer, _buffer[]);
893         if (_completed)
894             return NopDisposable.instance;
895         else
896             return disposableObject(_subject.doSubscribe(observer));
897     }
898 
899     ///
900     void put(E obj)
901     {
902         if (_completed)
903             return;
904         .put(_buffer, obj);
905         .put(_subject, obj);
906     }
907 
908     ///
909     void completed()
910     {
911         _completed = true;
912         _subject.completed();
913     }
914 
915     ///
916     void failure(Exception e)
917     {
918         _completed = true;
919         _subject.failure(e);
920     }
921 }
922 
923 ///
924 unittest
925 {
926     auto sub = new ReplaySubject!int(1);
927     .put(sub, 1);
928 
929     int[] buf;
930     auto d = sub.doSubscribe!(v => buf ~= v);
931     scope (exit)
932         d.dispose();
933 
934     assert(buf.length == 1);
935     assert(buf[0] == 1);
936 }
937 
938 ///
939 unittest
940 {
941     auto sub = new ReplaySubject!int(1);
942     .put(sub, 1);
943     .put(sub, 2);
944 
945     int[] buf;
946     auto d = sub.doSubscribe!(v => buf ~= v);
947     scope (exit)
948         d.dispose();
949 
950     assert(buf == [2]);
951 }
952 
953 ///
954 unittest
955 {
956     auto sub = new ReplaySubject!int(2);
957     .put(sub, 1);
958     .put(sub, 2);
959     .put(sub, 3);
960 
961     int[] buf;
962     auto d = sub.doSubscribe!(v => buf ~= v);
963     scope (exit)
964         d.dispose();
965 
966     assert(buf == [2, 3]);
967 }
968 
969 unittest
970 {
971     auto sub = new ReplaySubject!int(2);
972     .put(sub, 1);
973 
974     int[] buf;
975     auto d = sub.doSubscribe!(v => buf ~= v);
976     scope (exit)
977         d.dispose();
978 
979     .put(sub, 2);
980 
981     assert(buf.length == 2);
982     assert(buf[0] == 1);
983     assert(buf[1] == 2);
984 }
985 
986 unittest
987 {
988     auto sub = new ReplaySubject!int(2);
989     .put(sub, 1);
990     sub.completed();
991     .put(sub, 2);
992 
993     int[] buf;
994     sub.doSubscribe!(v => buf ~= v);
995 
996     assert(buf == [1]);
997 }
998 
999 unittest
1000 {
1001     auto sub = new ReplaySubject!int(2);
1002     .put(sub, 1);
1003     .put(sub, 2);
1004     .put(sub, 3);
1005     sub.completed();
1006     .put(sub, 4);
1007 
1008     int[] buf;
1009     sub.doSubscribe!(v => buf ~= v);
1010 
1011     assert(buf == [2, 3]);
1012 }
1013 
1014 unittest
1015 {
1016     auto sub = new ReplaySubject!int(2);
1017     .put(sub, 1);
1018     .put(sub, 2);
1019     .put(sub, 3);
1020     sub.failure(null);
1021     .put(sub, 4);
1022 
1023     int[] buf;
1024     sub.doSubscribe!(v => buf ~= v);
1025 
1026     assert(buf == [2, 3]);
1027 }
1028 
1029 private struct RingBuffer(T)
1030 {
1031     T[] buffer;
1032     size_t pos;
1033     size_t count;
1034 
1035     this(size_t n)
1036     {
1037         buffer.length = n;
1038     }
1039 
1040     void put(T obj)
1041     {
1042         import std.algorithm : min;
1043 
1044         buffer[pos] = obj;
1045         pos = (pos + 1) % buffer.length;
1046         count = min(count + 1, buffer.length);
1047     }
1048 
1049     RingBufferRange!T opSlice()
1050     {
1051         return RingBufferRange!T(buffer, buffer.length - (count - pos), 0, count);
1052     }
1053 }
1054 
1055 unittest
1056 {
1057     import std.algorithm : equal;
1058     import std.range : walkLength;
1059 
1060     auto buf = RingBuffer!int(4);
1061 
1062     assert(walkLength(buf[]) == 0);
1063 
1064     buf.put(0);
1065     assert(buf.buffer.length == 4);
1066     assert(buf.pos == 1);
1067     assert(buf.count == 1);
1068     assert(buf[][0] == 0);
1069     assert(equal(buf[], [0]));
1070 
1071     buf.put(1);
1072     assert(buf.buffer.length == 4);
1073     assert(equal(buf.buffer, [0, 1, 0, 0]));
1074     assert(buf.pos == 2);
1075     assert(buf.count == 2);
1076     assert(buf[][0] == 0);
1077     assert(buf[][1] == 1);
1078     assert(equal(buf[], [0, 1]));
1079 
1080     buf.put(2);
1081     assert(equal(buf[], [0, 1, 2]));
1082 
1083     buf.put(3);
1084     assert(equal(buf[], [0, 1, 2, 3]));
1085 
1086     buf.put(4);
1087     assert(equal(buf[], [1, 2, 3, 4]));
1088 }
1089 
1090 private struct RingBufferRange(T)
1091 {
1092     T[] buffer;
1093     size_t offset;
1094     size_t pos;
1095     size_t count;
1096 
1097     bool empty() const @property
1098     {
1099         return count == 0 || pos == count;
1100     }
1101 
1102     inout(T) front() inout @property
1103     {
1104         return buffer[(offset + pos) % buffer.length];
1105     }
1106 
1107     void popFront()
1108     {
1109         pos++;
1110     }
1111 
1112     T opIndex(size_t n)
1113     {
1114         return buffer[(offset + pos + n) % buffer.length];
1115     }
1116 }
1117 
1118 unittest
1119 {
1120     import std.algorithm : equal;
1121 
1122     // no offset
1123     auto r0 = RingBufferRange!int([0, 1, 2], 0, 0, 2);
1124     assert(equal(r0, [0, 1]));
1125 
1126     auto r1 = RingBufferRange!int([0, 1, 2], 0, 0, 3);
1127     assert(equal(r1, [0, 1, 2]));
1128 
1129     auto r2 = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4);
1130     assert(equal(r2, [0, 1, 2, 3]));
1131 
1132     auto r3 = RingBufferRange!int([0, 1, 2, 3, 4], 0, 0, 5);
1133     assert(equal(r3, [0, 1, 2, 3, 4]));
1134 
1135     // has offset
1136     auto r4 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1137     assert(!r4.empty);
1138     assert(r4.front == 1);
1139     r4.popFront();
1140     assert(!r4.empty);
1141     assert(r4.front == 2);
1142     r4.popFront();
1143     assert(!r4.empty);
1144     assert(r4.front == 3);
1145     r4.popFront();
1146     assert(!r4.empty);
1147     assert(r4.front == 0);
1148     r4.popFront();
1149     assert(r4.empty);
1150 
1151     auto r5 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1152     assert(equal(r5, [1, 2, 3, 0]));
1153 
1154     auto r6 = RingBufferRange!int([0, 1, 2, 3], 2, 0, 4);
1155     assert(equal(r6, [2, 3, 0, 1]));
1156 }
1157 
1158 unittest
1159 {
1160     import std.algorithm : equal;
1161 
1162     // empty
1163     auto rempty = RingBufferRange!int([0, 0, 0, 0], 0, 0, 0);
1164     assert(rempty.empty);
1165 
1166     auto r1 = RingBufferRange!int([1, 0, 0, 0], 0, 0, 1);
1167     assert(equal(r1, [1]));
1168 
1169     auto r2 = RingBufferRange!int([1, 2, 0, 0], 0, 0, 2);
1170     assert(equal(r2, [1, 2]));
1171 }
1172 
1173 unittest
1174 {
1175     import std.algorithm : equal;
1176 
1177     // empty
1178     auto r = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4);
1179     assert(r[0] == 0);
1180     assert(r[1] == 1);
1181     assert(r[2] == 2);
1182     assert(r[3] == 3);
1183 }
1184 
1185 unittest
1186 {
1187     import std.algorithm : equal;
1188 
1189     // empty
1190     auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1191     assert(r[0] == 1);
1192     assert(r[1] == 2);
1193     assert(r[2] == 3);
1194     assert(r[3] == 0);
1195 }
1196 
1197 unittest
1198 {
1199     import std.algorithm : equal;
1200 
1201     // empty
1202     auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1203     r.popFront();
1204     assert(r[0] == 2);
1205     assert(r[1] == 3);
1206     assert(r[2] == 0);
1207 }
1208 
1209 ///
1210 auto asReplaySubject(TObservable)(auto ref TObservable observable, size_t bufferSize)
1211 {
1212     alias E = TObservable.ElementType;
1213     auto subject = new ReplaySubject!E(bufferSize);
1214     observable.doSubscribe(subject);
1215     return subject;
1216 }
1217 
1218 ///
1219 unittest
1220 {
1221     import rx;
1222 
1223     auto sub = defer!(int, (observer) {
1224         observer.put(10);
1225         observer.put(20);
1226         observer.put(30);
1227         observer.completed();
1228         return NopDisposable.instance;
1229     });
1230 
1231     ReplaySubject!int nums = sub.asReplaySubject(4);
1232 
1233     int[] data;
1234     nums.doSubscribe!(x => data ~= x);
1235 
1236     assert(data == [10, 20, 30]);
1237 }
1238 
1239 ///
1240 unittest
1241 {
1242     import rx;
1243 
1244     auto sub = defer!(int, (observer) {
1245         observer.put(10);
1246         observer.put(20);
1247         observer.put(30);
1248         observer.failure(null);
1249         return NopDisposable.instance;
1250     });
1251 
1252     ReplaySubject!int nums = sub.asReplaySubject(2);
1253 
1254     int[] data;
1255     nums.doSubscribe!(x => data ~= x);
1256 
1257     assert(data == [20, 30]);
1258 }
1259 
1260 version (unittest)
1261 {
1262     class TestingSubject(E) : SubjectObject!E
1263     {
1264         size_t observerCount()
1265         {
1266             if (auto current = cast(CompositeObserver!E) currentObserver)
1267             {
1268                 return current.observers.length;
1269             }
1270             if (currentObserver is NopObserver!E.instance)
1271             {
1272                 return 0;
1273             }
1274             if (currentObserver is DoneObserver!E.instance)
1275             {
1276                 return 0;
1277             }
1278             return 1;
1279         }
1280     }
1281 
1282     unittest
1283     {
1284         auto s = new TestingSubject!int;
1285         assert(s.observerCount == 0);
1286 
1287         int[] buf;
1288         auto observer = observerObject!int((int n) { buf ~= n; });
1289 
1290         auto d0 = s.subscribe(observer);
1291         assert(s.observerCount == 1);
1292         auto d1 = s.subscribe(observer);
1293         assert(s.observerCount == 2);
1294 
1295         d0.dispose();
1296         assert(s.observerCount == 1);
1297         d1.dispose();
1298         assert(s.observerCount == 0);
1299     }
1300 }