1 /+++++++++++++++++++++++++++++
2  + This module defines the Subject and some implements.
3  +/
4 module rx.subject;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.util : assumeThreadLocal;
10 
11 import core.atomic : atomicLoad, cas;
12 import std.range : put;
13 
14 ///Represents an object that is both an observable sequence as well as an observer.
15 interface Subject(E) : Observer!E, Observable!E
16 {
17 }
18 
19 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
20 class SubjectObject(E) : Subject!E
21 {
22     alias ElementType = E;
23 
24 public:
25     ///
26     this()
27     {
28         _observer = cast(shared) NopObserver!E.instance;
29     }
30 
31 public:
32     ///
33     void put(E obj)
34     {
35         auto temp = assumeThreadLocal(atomicLoad(_observer));
36         .put(temp, obj);
37     }
38     ///
39     void completed()
40     {
41         shared(Observer!E) oldObserver = void;
42         shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance;
43         Observer!E temp = void;
44         do
45         {
46             oldObserver = _observer;
47             temp = assumeThreadLocal(atomicLoad(oldObserver));
48             if (cast(DoneObserver!E) temp)
49                 break;
50         }
51         while (!cas(&_observer, oldObserver, newObserver));
52         temp.completed();
53     }
54     ///
55     void failure(Exception error)
56     {
57         shared(Observer!E) oldObserver = void;
58         shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error);
59         Observer!E temp = void;
60         do
61         {
62             oldObserver = _observer;
63             temp = assumeThreadLocal(atomicLoad(oldObserver));
64             if (cast(DoneObserver!E) temp)
65                 break;
66         }
67         while (!cas(&_observer, oldObserver, newObserver));
68         temp.failure(error);
69     }
70 
71     ///
72     Disposable subscribe(T)(T observer)
73     {
74         return subscribe(observerObject!E(observer));
75     }
76     ///
77     Disposable subscribe(Observer!E observer)
78     {
79         shared(Observer!E) oldObserver = void;
80         shared(Observer!E) newObserver = void;
81         do
82         {
83             oldObserver = _observer;
84             auto temp = assumeThreadLocal(atomicLoad(oldObserver));
85 
86             if (temp is DoneObserver!E.instance)
87             {
88                 observer.completed();
89                 return NopDisposable.instance;
90             }
91 
92             if (auto fail = cast(DoneObserver!E) temp)
93             {
94                 observer.failure(fail.exception);
95                 return NopDisposable.instance;
96             }
97 
98             if (auto composite = cast(CompositeObserver!E) temp)
99             {
100                 newObserver = cast(shared) composite.add(observer);
101             }
102             else if (auto nop = cast(NopObserver!E) temp)
103             {
104                 newObserver = cast(shared) observer;
105             }
106             else
107             {
108                 newObserver = cast(shared)(new CompositeObserver!E([temp, observer]));
109             }
110         }
111         while (!cas(&_observer, oldObserver, newObserver));
112 
113         return subscription(this, observer);
114     }
115 
116     ///
117     void unsubscribe(Observer!E observer)
118     {
119         shared(Observer!E) oldObserver = void;
120         shared(Observer!E) newObserver = void;
121         do
122         {
123             oldObserver = _observer;
124 
125             import rx.util : assumeThreadLocal;
126 
127             auto temp = assumeThreadLocal(atomicLoad(oldObserver));
128             if (auto composite = cast(CompositeObserver!E) temp)
129             {
130                 newObserver = cast(shared) composite.remove(observer);
131             }
132             else
133             {
134                 if (temp !is observer)
135                     return;
136 
137                 newObserver = cast(shared) NopObserver!E.instance;
138             }
139         }
140         while (!cas(&_observer, oldObserver, newObserver));
141     }
142 
143 private:
144     shared(Observer!E) _observer;
145 }
146 
147 ///
148 unittest
149 {
150     import std.array : appender;
151 
152     auto data = appender!(int[])();
153     auto subject = new SubjectObject!int;
154     auto disposable = subject.subscribe(observerObject!(int)(data));
155     assert(disposable !is null);
156     subject.put(0);
157     subject.put(1);
158 
159     import std.algorithm : equal;
160 
161     assert(equal(data.data, [0, 1]));
162 
163     disposable.dispose();
164     subject.put(2);
165     assert(equal(data.data, [0, 1]));
166 }
167 
168 unittest
169 {
170     static assert(isObserver!(SubjectObject!int, int));
171     static assert(isObservable!(SubjectObject!int, int));
172     static assert(!isObservable!(SubjectObject!int, string));
173     static assert(!isObservable!(SubjectObject!int, string));
174 }
175 
176 unittest
177 {
178     auto subject = new SubjectObject!int;
179     auto observer = new CounterObserver!int;
180     auto disposable = subject.subscribe(observer);
181     scope (exit)
182         disposable.dispose();
183 
184     subject.put(0);
185     subject.put(1);
186 
187     assert(observer.putCount == 2);
188     subject.completed();
189     subject.put(2);
190     assert(observer.putCount == 2);
191     assert(observer.completedCount == 1);
192 }
193 
194 unittest
195 {
196     auto subject = new SubjectObject!int;
197     auto observer = new CounterObserver!int;
198     auto disposable = subject.subscribe(observer);
199     scope (exit)
200         disposable.dispose();
201 
202     subject.put(0);
203     subject.put(1);
204 
205     assert(observer.putCount == 2);
206     auto ex = new Exception("Exception");
207     subject.failure(ex);
208     subject.put(2);
209     assert(observer.putCount == 2);
210     assert(observer.failureCount == 1);
211     assert(observer.lastException is ex);
212 }
213 
214 unittest
215 {
216     import std.array : appender;
217 
218     auto buf1 = appender!(int[]);
219     auto buf2 = appender!(int[]);
220     auto subject = new SubjectObject!int;
221     subject.subscribe(observerObject!(int)(buf1));
222     subject.doSubscribe((int n) => buf2.put(n));
223 
224     assert(buf1.data.length == 0);
225     assert(buf2.data.length == 0);
226     subject.put(0);
227     assert(buf1.data.length == 1);
228     assert(buf2.data.length == 1);
229     assert(buf1.data[0] == buf2.data[0]);
230 }
231 
232 unittest
233 {
234     auto sub = new SubjectObject!int;
235     sub.completed();
236 
237     auto observer = new CounterObserver!int;
238     assert(observer.putCount == 0);
239     assert(observer.completedCount == 0);
240     assert(observer.failureCount == 0);
241     sub.subscribe(observer);
242     assert(observer.putCount == 0);
243     assert(observer.completedCount == 1);
244     assert(observer.failureCount == 0);
245 }
246 
247 unittest
248 {
249     auto sub = new SubjectObject!int;
250     auto ex = new Exception("Exception");
251     sub.failure(ex);
252 
253     auto observer = new CounterObserver!int;
254     assert(observer.putCount == 0);
255     assert(observer.completedCount == 0);
256     assert(observer.failureCount == 0);
257     sub.subscribe(observer);
258     assert(observer.putCount == 0);
259     assert(observer.completedCount == 0);
260     assert(observer.failureCount == 1);
261     assert(observer.lastException is ex);
262 }
263 
264 private class Subscription(TSubject, TObserver) : Disposable
265 {
266 public:
267     this(TSubject subject, TObserver observer)
268     {
269         _subject = subject;
270         _observer = observer;
271     }
272 
273 public:
274     void dispose()
275     {
276         if (_subject !is null)
277         {
278             _subject.unsubscribe(_observer);
279             _subject = null;
280         }
281     }
282 
283 private:
284     TSubject _subject;
285     TObserver _observer;
286 }
287 
288 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)(
289         TSubject subject, TObserver observer)
290 {
291     return new typeof(return)(subject, observer);
292 }
293 
294 ///
295 class AsyncSubject(E) : Subject!E
296 {
297 public:
298     ///
299     Disposable subscribe(Observer!E observer)
300     {
301         Exception ex = null;
302         E value;
303         bool hasValue = false;
304 
305         synchronized (this)
306         {
307             if (!_isStopped)
308             {
309                 _observers ~= observer;
310                 return subscription(this, observer);
311             }
312 
313             ex = _exception;
314             hasValue = _hasValue;
315             value = _value;
316         }
317 
318         if (ex !is null)
319         {
320             observer.failure(ex);
321         }
322         else if (hasValue)
323         {
324             .put(observer, value);
325             observer.completed();
326         }
327         else
328         {
329             observer.completed();
330         }
331 
332         return NopDisposable.instance;
333     }
334 
335     ///
336     auto subscribe(T)(T observer)
337     {
338         return subscribe(observerObject!E(observer));
339     }
340 
341     ///
342     void unsubscribe(Observer!E observer)
343     {
344         if (observer is null)
345             return;
346 
347         synchronized (this)
348         {
349             import std.algorithm : remove, countUntil;
350 
351             auto index = countUntil(_observers, observer);
352             if (index != -1)
353             {
354                 _observers = remove(_observers, index);
355             }
356         }
357     }
358 
359 public:
360     ///
361     void put(E value)
362     {
363         synchronized (this)
364         {
365             if (!_isStopped)
366             {
367                 _value = value;
368                 _hasValue = true;
369             }
370         }
371     }
372 
373     ///
374     void completed()
375     {
376         Observer!E[] os = null;
377 
378         E value;
379         bool hasValue = false;
380 
381         synchronized (this)
382         {
383             if (!_isStopped)
384             {
385                 os = _observers;
386                 _observers.length = 0;
387                 _isStopped = true;
388                 value = _value;
389                 hasValue = _hasValue;
390             }
391         }
392 
393         if (os)
394         {
395             if (hasValue)
396             {
397                 foreach (observer; os)
398                 {
399                     .put(observer, value);
400                     observer.completed();
401                 }
402             }
403             else
404             {
405                 foreach (observer; os)
406                 {
407                     observer.completed();
408                 }
409             }
410         }
411     }
412 
413     ///
414     void failure(Exception e)
415     {
416         assert(e !is null);
417 
418         Observer!E[] os = null;
419         synchronized (this)
420         {
421             if (!_isStopped)
422             {
423                 os = _observers;
424                 _observers.length = 0;
425                 _isStopped = true;
426                 _exception = e;
427             }
428         }
429 
430         if (os)
431         {
432             foreach (observer; os)
433             {
434                 observer.failure(e);
435             }
436         }
437     }
438 
439 private:
440     Observer!E[] _observers;
441     bool _isStopped;
442     E _value;
443     bool _hasValue;
444     Exception _exception;
445 }
446 
447 unittest
448 {
449     auto sub = new AsyncSubject!int;
450 
451     .put(sub, 1);
452     sub.completed();
453 
454     auto observer = new CounterObserver!int;
455 
456     assert(observer.hasNotBeenCalled);
457 
458     sub.subscribe(observer);
459 
460     assert(observer.putCount == 1);
461     assert(observer.completedCount == 1);
462     assert(observer.failureCount == 0);
463     assert(observer.lastValue == 1);
464 }
465 
466 unittest
467 {
468     auto sub = new AsyncSubject!int;
469     auto observer = new CounterObserver!int;
470 
471     auto d = sub.subscribe(observer);
472     scope (exit)
473         d.dispose();
474 
475     assert(observer.hasNotBeenCalled);
476 
477     sub.put(100);
478 
479     assert(observer.hasNotBeenCalled);
480 
481     assert(sub._hasValue);
482     assert(sub._value == 100);
483 
484     sub.completed();
485 
486     assert(observer.putCount == 1);
487     assert(observer.completedCount == 1);
488     assert(observer.failureCount == 0);
489     assert(observer.lastValue == 100);
490 }
491 
492 unittest
493 {
494     auto sub = new AsyncSubject!int;
495     auto observer = new CounterObserver!int;
496 
497     sub.put(100);
498 
499     assert(sub._hasValue);
500     assert(sub._value == 100);
501 
502     auto d = sub.subscribe(observer);
503     scope (exit)
504         d.dispose();
505 
506     assert(observer.hasNotBeenCalled);
507 
508     sub.completed();
509 
510     assert(observer.putCount == 1);
511     assert(observer.completedCount == 1);
512     assert(observer.failureCount == 0);
513     assert(observer.lastValue == 100);
514 }
515 
516 unittest
517 {
518     auto sub = new AsyncSubject!int;
519     auto observer = new CounterObserver!int;
520 
521     auto d = sub.subscribe(observer);
522 
523     d.dispose();
524     assert(observer.hasNotBeenCalled);
525 
526     sub.put(100);
527     assert(observer.hasNotBeenCalled);
528 
529     sub.completed();
530     assert(observer.hasNotBeenCalled);
531 }
532 
533 unittest
534 {
535     auto sub = new AsyncSubject!int;
536     auto observer = new CounterObserver!int;
537 
538     auto d = sub.subscribe(observer);
539     assert(observer.hasNotBeenCalled);
540 
541     sub.put(100);
542     assert(observer.hasNotBeenCalled);
543 
544     d.dispose();
545     assert(observer.hasNotBeenCalled);
546 
547     sub.completed();
548     assert(observer.hasNotBeenCalled);
549 }
550 
551 unittest
552 {
553 
554     auto sub = new AsyncSubject!int;
555     auto observer = new CounterObserver!int;
556 
557     sub.put(100);
558     assert(observer.hasNotBeenCalled);
559 
560     auto d = sub.subscribe(observer);
561     assert(observer.hasNotBeenCalled);
562 
563     d.dispose();
564     assert(observer.hasNotBeenCalled);
565 
566     sub.completed();
567     assert(observer.hasNotBeenCalled);
568 }
569 
570 unittest
571 {
572     auto sub = new AsyncSubject!int;
573     auto observer = new CounterObserver!int;
574 
575     auto d = sub.subscribe(observer);
576     scope (exit)
577         d.dispose();
578 
579     assert(observer.hasNotBeenCalled);
580 
581     sub.completed();
582 
583     assert(observer.putCount == 0);
584     assert(observer.completedCount == 1);
585     assert(observer.failureCount == 0);
586 }
587 
588 unittest
589 {
590     auto sub = new AsyncSubject!int;
591     auto observer = new CounterObserver!int;
592 
593     auto d = sub.subscribe(observer);
594     scope (exit)
595         d.dispose();
596 
597     assert(observer.hasNotBeenCalled);
598 
599     auto ex = new Exception("TEST");
600     sub.failure(ex);
601 
602     assert(observer.putCount == 0);
603     assert(observer.completedCount == 0);
604     assert(observer.failureCount == 1);
605     assert(observer.lastException is ex);
606 }
607 
608 unittest
609 {
610     auto sub = new AsyncSubject!int;
611     auto ex = new Exception("TEST");
612     sub.failure(ex);
613 
614     auto observer = new CounterObserver!int;
615 
616     auto d = sub.subscribe(observer);
617     scope (exit)
618         d.dispose();
619 
620     assert(observer.putCount == 0);
621     assert(observer.completedCount == 0);
622     assert(observer.failureCount == 1);
623     assert(observer.lastException is ex);
624 }
625 
626 unittest
627 {
628     auto sub = new AsyncSubject!int;
629     auto observer = new CounterObserver!int;
630 
631     sub.completed();
632     assert(observer.hasNotBeenCalled);
633 
634     sub.subscribe(observer);
635     assert(observer.putCount == 0);
636     assert(observer.completedCount == 1);
637     assert(observer.failureCount == 0);
638 }
639 
640 version (unittest)
641 {
642     class CounterObserver(T) : Observer!T
643     {
644     public:
645         size_t putCount;
646         size_t completedCount;
647         size_t failureCount;
648         T lastValue;
649         Exception lastException;
650 
651     public:
652         bool hasNotBeenCalled() const pure nothrow @nogc @safe @property
653         {
654             return putCount == 0 && completedCount == 0 && failureCount == 0;
655         }
656 
657     public:
658         void put(T obj)
659         {
660             putCount++;
661             lastValue = obj;
662         }
663 
664         void completed()
665         {
666             completedCount++;
667         }
668 
669         void failure(Exception e)
670         {
671             failureCount++;
672             lastException = e;
673         }
674     }
675 }
676 
677 ///
678 class BehaviorSubject(E) : Subject!E
679 {
680 public:
681     ///
682     this()
683     {
684         this(E.init);
685     }
686 
687     ///
688     this(E value)
689     {
690         _subject = new SubjectObject!E;
691         _value = value;
692     }
693 
694 public:
695     ///
696     inout(E) value() inout @property
697     {
698         return _value;
699     }
700 
701     ///
702     void value(E value) @property
703     {
704         if (_value != value)
705         {
706             _value = value;
707             .put(_subject, value);
708         }
709     }
710 
711 public:
712     ///
713     auto subscribe(TObserver)(auto ref TObserver observer)
714     {
715         .put(observer, value);
716         return _subject.doSubscribe(observer);
717     }
718 
719     ///
720     Disposable subscribe(Observer!E observer)
721     {
722         .put(observer, value);
723         return disposableObject(_subject.doSubscribe(observer));
724     }
725 
726     ///
727     void put(E obj)
728     {
729         value = obj;
730     }
731 
732     ///
733     void completed()
734     {
735         _subject.completed();
736     }
737 
738     ///
739     void failure(Exception e)
740     {
741         _subject.failure(e);
742     }
743 
744 private:
745     SubjectObject!E _subject;
746     E _value;
747 }
748 
749 unittest
750 {
751     static assert(isObservable!(BehaviorSubject!int, int));
752     static assert(is(BehaviorSubject!int.ElementType == int));
753 }
754 
755 unittest
756 {
757     int num = 0;
758     auto subject = new BehaviorSubject!int(100);
759 
760     auto d = subject.doSubscribe((int n) { num = n; });
761     assert(num == 100);
762 
763     .put(subject, 1);
764     assert(num == 1);
765 
766     d.dispose();
767     .put(subject, 10);
768     assert(num == 1);
769 }
770 
771 ///
772 auto asBehaviorSubject(TObservable)(auto ref TObservable observable)
773 {
774     alias E = TObservable.ElementType;
775     auto subject = new BehaviorSubject!E;
776     observable.doSubscribe(subject);
777     return subject;
778 }
779 
780 ///
781 unittest
782 {
783     import rx;
784 
785     auto num1 = new BehaviorSubject!int;
786     auto num2 = new BehaviorSubject!int;
787 
788     BehaviorSubject!int sum = combineLatest!((l, r) => l + r)(num1, num2).asBehaviorSubject();
789 
790     assert(sum.value == 0);
791     num1.value = 10;
792     assert(sum.value == 10);
793     num2.value = 20;
794     assert(sum.value == 30);
795 }
796 
797 ///
798 class ReplaySubject(E) : Subject!E
799 {
800 private:
801     RingBuffer!E _buffer;
802     SubjectObject!E _subject;
803     bool _completed;
804 
805 public:
806     ///
807     this(size_t bufferSize)
808     {
809         _buffer = RingBuffer!E(bufferSize);
810         _subject = new SubjectObject!E;
811     }
812 
813 public:
814     ///
815     Disposable subscribe(TObserver)(auto ref TObserver observer)
816     {
817         .put(observer, _buffer[]);
818         if (_completed)
819             return NopDisposable.instance;
820         else
821             return _subject.doSubscribe(observer).disposableObject();
822     }
823 
824     ///
825     Disposable subscribe(Observer!E observer)
826     {
827         .put(observer, _buffer[]);
828         if (_completed)
829             return NopDisposable.instance;
830         else
831             return disposableObject(_subject.doSubscribe(observer));
832     }
833 
834     ///
835     void put(E obj)
836     {
837         if (_completed) return;
838         .put(_buffer, obj);
839         .put(_subject, obj);
840     }
841 
842     ///
843     void completed()
844     {
845         _completed = true;
846         _subject.completed();
847     }
848 
849     ///
850     void failure(Exception e)
851     {
852         _completed = true;
853         _subject.failure(e);
854     }
855 }
856 
857 ///
858 unittest
859 {
860     auto sub = new ReplaySubject!int(1);
861     .put(sub, 1);
862 
863     int[] buf;
864     auto d = sub.doSubscribe!(v => buf ~= v);
865     scope (exit)
866         d.dispose();
867 
868     assert(buf.length == 1);
869     assert(buf[0] == 1);
870 }
871 
872 ///
873 unittest
874 {
875     auto sub = new ReplaySubject!int(1);
876     .put(sub, 1);
877     .put(sub, 2);
878 
879     int[] buf;
880     auto d = sub.doSubscribe!(v => buf ~= v);
881     scope (exit)
882         d.dispose();
883 
884     assert(buf == [2]);
885 }
886 
887 ///
888 unittest
889 {
890     auto sub = new ReplaySubject!int(2);
891     .put(sub, 1);
892     .put(sub, 2);
893     .put(sub, 3);
894 
895     int[] buf;
896     auto d = sub.doSubscribe!(v => buf ~= v);
897     scope (exit)
898         d.dispose();
899 
900     assert(buf == [2, 3]);
901 }
902 
903 unittest
904 {
905     auto sub = new ReplaySubject!int(2);
906     .put(sub, 1);
907 
908     int[] buf;
909     auto d = sub.doSubscribe!(v => buf ~= v);
910     scope (exit)
911         d.dispose();
912 
913     .put(sub, 2);
914 
915     assert(buf.length == 2);
916     assert(buf[0] == 1);
917     assert(buf[1] == 2);
918 }
919 
920 unittest
921 {
922     auto sub = new ReplaySubject!int(2);
923     .put(sub, 1);
924     sub.completed();
925     .put(sub, 2);
926 
927     int[] buf;
928     sub.doSubscribe!(v => buf ~= v);
929 
930     assert(buf == [1]);
931 }
932 
933 unittest
934 {
935     auto sub = new ReplaySubject!int(2);
936     .put(sub, 1);
937     .put(sub, 2);
938     .put(sub, 3);
939     sub.completed();
940     .put(sub, 4);
941 
942     int[] buf;
943     sub.doSubscribe!(v => buf ~= v);
944 
945     assert(buf == [2, 3]);
946 }
947 
948 unittest
949 {
950     auto sub = new ReplaySubject!int(2);
951     .put(sub, 1);
952     .put(sub, 2);
953     .put(sub, 3);
954     sub.failure(null);
955     .put(sub, 4);
956 
957     int[] buf;
958     sub.doSubscribe!(v => buf ~= v);
959 
960     assert(buf == [2, 3]);
961 }
962 
963 private struct RingBuffer(T)
964 {
965     T[] buffer;
966     size_t pos;
967     size_t count;
968 
969     this(size_t n)
970     {
971         buffer.length = n;
972     }
973 
974     void put(T obj)
975     {
976         import std.algorithm : min;
977 
978         buffer[pos] = obj;
979         pos = (pos + 1) % buffer.length;
980         count = min(count + 1, buffer.length);
981     }
982 
983     RingBufferRange!T opSlice()
984     {
985         return RingBufferRange!T(buffer, buffer.length - (count - pos), 0, count);
986     }
987 }
988 
989 unittest
990 {
991     import std.algorithm : equal;
992     import std.range : walkLength;
993 
994     auto buf = RingBuffer!int(4);
995 
996     assert(walkLength(buf[]) == 0);
997 
998     buf.put(0);
999     assert(buf.buffer.length == 4);
1000     assert(buf.pos == 1);
1001     assert(buf.count == 1);
1002     assert(buf[][0] == 0);
1003     assert(equal(buf[], [0]));
1004 
1005     buf.put(1);
1006     assert(buf.buffer.length == 4);
1007     assert(equal(buf.buffer, [0, 1, 0, 0]));
1008     assert(buf.pos == 2);
1009     assert(buf.count == 2);
1010     assert(buf[][0] == 0);
1011     assert(buf[][1] == 1);
1012     assert(equal(buf[], [0, 1]));
1013 
1014     buf.put(2);
1015     assert(equal(buf[], [0, 1, 2]));
1016 
1017     buf.put(3);
1018     assert(equal(buf[], [0, 1, 2, 3]));
1019 
1020     buf.put(4);
1021     assert(equal(buf[], [1, 2, 3, 4]));
1022 }
1023 
1024 private struct RingBufferRange(T)
1025 {
1026     T[] buffer;
1027     size_t offset;
1028     size_t pos;
1029     size_t count;
1030 
1031     bool empty() const @property
1032     {
1033         return count == 0 || pos == count;
1034     }
1035 
1036     inout(T) front() inout @property
1037     {
1038         return buffer[(offset + pos) % buffer.length];
1039     }
1040 
1041     void popFront()
1042     {
1043         pos++;
1044     }
1045 
1046     T opIndex(size_t n)
1047     {
1048         return buffer[(offset + pos + n) % buffer.length];
1049     }
1050 }
1051 
1052 unittest
1053 {
1054     import std.algorithm : equal;
1055 
1056     // no offset
1057     auto r0 = RingBufferRange!int([0, 1, 2], 0, 0, 2);
1058     assert(equal(r0, [0, 1]));
1059 
1060     auto r1 = RingBufferRange!int([0, 1, 2], 0, 0, 3);
1061     assert(equal(r1, [0, 1, 2]));
1062 
1063     auto r2 = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4);
1064     assert(equal(r2, [0, 1, 2, 3]));
1065 
1066     auto r3 = RingBufferRange!int([0, 1, 2, 3, 4], 0, 0, 5);
1067     assert(equal(r3, [0, 1, 2, 3, 4]));
1068 
1069     // has offset
1070     auto r4 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1071     assert(!r4.empty);
1072     assert(r4.front == 1);
1073     r4.popFront();
1074     assert(!r4.empty);
1075     assert(r4.front == 2);
1076     r4.popFront();
1077     assert(!r4.empty);
1078     assert(r4.front == 3);
1079     r4.popFront();
1080     assert(!r4.empty);
1081     assert(r4.front == 0);
1082     r4.popFront();
1083     assert(r4.empty);
1084 
1085     auto r5 = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1086     assert(equal(r5, [1, 2, 3, 0]));
1087 
1088     auto r6 = RingBufferRange!int([0, 1, 2, 3], 2, 0, 4);
1089     assert(equal(r6, [2, 3, 0, 1]));
1090 }
1091 
1092 unittest
1093 {
1094     import std.algorithm : equal;
1095 
1096     // empty
1097     auto rempty = RingBufferRange!int([0, 0, 0, 0], 0, 0, 0);
1098     assert(rempty.empty);
1099 
1100     auto r1 = RingBufferRange!int([1, 0, 0, 0], 0, 0, 1);
1101     assert(equal(r1, [1]));
1102 
1103     auto r2 = RingBufferRange!int([1, 2, 0, 0], 0, 0, 2);
1104     assert(equal(r2, [1, 2]));
1105 }
1106 
1107 unittest
1108 {
1109     import std.algorithm : equal;
1110 
1111     // empty
1112     auto r = RingBufferRange!int([0, 1, 2, 3], 0, 0, 4);
1113     assert(r[0] == 0);
1114     assert(r[1] == 1);
1115     assert(r[2] == 2);
1116     assert(r[3] == 3);
1117 }
1118 
1119 unittest
1120 {
1121     import std.algorithm : equal;
1122 
1123     // empty
1124     auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1125     assert(r[0] == 1);
1126     assert(r[1] == 2);
1127     assert(r[2] == 3);
1128     assert(r[3] == 0);
1129 }
1130 
1131 unittest
1132 {
1133     import std.algorithm : equal;
1134 
1135     // empty
1136     auto r = RingBufferRange!int([0, 1, 2, 3], 1, 0, 4);
1137     r.popFront();
1138     assert(r[0] == 2);
1139     assert(r[1] == 3);
1140     assert(r[2] == 0);
1141 }
1142 
1143 ///
1144 auto asReplaySubject(TObservable)(auto ref TObservable observable, size_t bufferSize)
1145 {
1146     alias E = TObservable.ElementType;
1147     auto subject = new ReplaySubject!E(bufferSize);
1148     observable.doSubscribe(subject);
1149     return subject;
1150 }
1151 
1152 ///
1153 unittest
1154 {
1155     import rx;
1156 
1157     auto sub = defer!(int, (observer) {
1158         observer.put(10);
1159         observer.put(20);
1160         observer.put(30);
1161         observer.completed();
1162         return NopDisposable.instance;
1163     });
1164 
1165     ReplaySubject!int nums = sub.asReplaySubject(4);
1166 
1167     int[] data;
1168     nums.doSubscribe!(x => data ~= x);
1169 
1170     assert(data == [10, 20, 30]);
1171 }
1172 
1173 ///
1174 unittest
1175 {
1176     import rx;
1177 
1178     auto sub = defer!(int, (observer) {
1179         observer.put(10);
1180         observer.put(20);
1181         observer.put(30);
1182         observer.failure(null);
1183         return NopDisposable.instance;
1184     });
1185 
1186     ReplaySubject!int nums = sub.asReplaySubject(2);
1187 
1188     int[] data;
1189     nums.doSubscribe!(x => data ~= x);
1190 
1191     assert(data == [20, 30]);
1192 }