1 /+++++++++++++++++++++++++++++
2  + This module defines the Subject and some implements.
3  +/
4 module rx.subject;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 
10 import core.atomic : atomicLoad, cas;
11 import std.range : put;
12 
13 ///Represents an object that is both an observable sequence as well as an observer.
14 interface Subject(E) : Observer!E, Observable!E
15 {
16 }
17 
18 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
19 class SubjectObject(E) : Subject!E
20 {
21     alias ElementType = E;
22 
23 public:
24     ///
25     this()
26     {
27         _observer = cast(shared) NopObserver!E.instance;
28     }
29 
30 public:
31     ///
32     void put(E obj)
33     {
34         auto temp = atomicLoad(_observer);
35         .put(temp, obj);
36     }
37     ///
38     void completed()
39     {
40         shared(Observer!E) oldObserver = void;
41         shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance;
42         Observer!E temp = void;
43         do
44         {
45             oldObserver = _observer;
46             temp = atomicLoad(oldObserver);
47             if (cast(DoneObserver!E) temp)
48                 break;
49         }
50         while (!cas(&_observer, oldObserver, newObserver));
51         temp.completed();
52     }
53     ///
54     void failure(Exception error)
55     {
56         shared(Observer!E) oldObserver = void;
57         shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error);
58         Observer!E temp = void;
59         do
60         {
61             oldObserver = _observer;
62             temp = atomicLoad(oldObserver);
63             if (cast(DoneObserver!E) temp)
64                 break;
65         }
66         while (!cas(&_observer, oldObserver, newObserver));
67         temp.failure(error);
68     }
69 
70     ///
71     Disposable subscribe(T)(T observer)
72     {
73         return subscribe(observerObject!E(observer));
74     }
75     ///
76     Disposable subscribe(Observer!E observer)
77     {
78         shared(Observer!E) oldObserver = void;
79         shared(Observer!E) newObserver = void;
80         do
81         {
82             oldObserver = _observer;
83             auto temp = atomicLoad(oldObserver);
84 
85             if (temp is DoneObserver!E.instance)
86             {
87                 observer.completed();
88                 return NopDisposable.instance;
89             }
90 
91             if (auto fail = cast(DoneObserver!E) temp)
92             {
93                 observer.failure(fail.exception);
94                 return NopDisposable.instance;
95             }
96 
97             if (auto composite = cast(CompositeObserver!E) temp)
98             {
99                 newObserver = cast(shared) composite.add(observer);
100             }
101             else if (auto nop = cast(NopObserver!E) temp)
102             {
103                 newObserver = cast(shared) observer;
104             }
105             else
106             {
107                 newObserver = cast(shared)(new CompositeObserver!E([temp, observer]));
108             }
109         }
110         while (!cas(&_observer, oldObserver, newObserver));
111 
112         return subscription(this, observer);
113     }
114 
115     ///
116     void unsubscribe(Observer!E observer)
117     {
118         shared(Observer!E) oldObserver = void;
119         shared(Observer!E) newObserver = void;
120         do
121         {
122             oldObserver = _observer;
123 
124             auto temp = atomicLoad(oldObserver);
125             if (auto composite = cast(CompositeObserver!E) temp)
126             {
127                 newObserver = cast(shared) composite.remove(observer);
128             }
129             else
130             {
131                 if (temp !is observer)
132                     return;
133 
134                 newObserver = cast(shared) NopObserver!E.instance;
135             }
136         }
137         while (!cas(&_observer, oldObserver, newObserver));
138     }
139 
140 private:
141     shared(Observer!E) _observer;
142 }
143 
144 ///
145 unittest
146 {
147     import std.array : appender;
148 
149     auto data = appender!(int[])();
150     auto subject = new SubjectObject!int;
151     auto disposable = subject.subscribe(observerObject!(int)(data));
152     assert(disposable !is null);
153     subject.put(0);
154     subject.put(1);
155 
156     import std.algorithm : equal;
157 
158     assert(equal(data.data, [0, 1]));
159 
160     disposable.dispose();
161     subject.put(2);
162     assert(equal(data.data, [0, 1]));
163 }
164 
165 unittest
166 {
167     static assert(isObserver!(SubjectObject!int, int));
168     static assert(isObservable!(SubjectObject!int, int));
169     static assert(!isObservable!(SubjectObject!int, string));
170     static assert(!isObservable!(SubjectObject!int, string));
171 }
172 
173 unittest
174 {
175     auto subject = new SubjectObject!int;
176     auto observer = new CounterObserver!int;
177     auto disposable = subject.subscribe(observer);
178     scope (exit)
179         disposable.dispose();
180 
181     subject.put(0);
182     subject.put(1);
183 
184     assert(observer.putCount == 2);
185     subject.completed();
186     subject.put(2);
187     assert(observer.putCount == 2);
188     assert(observer.completedCount == 1);
189 }
190 
191 unittest
192 {
193     auto subject = new SubjectObject!int;
194     auto observer = new CounterObserver!int;
195     auto disposable = subject.subscribe(observer);
196     scope (exit)
197         disposable.dispose();
198 
199     subject.put(0);
200     subject.put(1);
201 
202     assert(observer.putCount == 2);
203     auto ex = new Exception("Exception");
204     subject.failure(ex);
205     subject.put(2);
206     assert(observer.putCount == 2);
207     assert(observer.failureCount == 1);
208     assert(observer.lastException is ex);
209 }
210 
211 unittest
212 {
213     import std.array : appender;
214 
215     auto buf1 = appender!(int[]);
216     auto buf2 = appender!(int[]);
217     auto subject = new SubjectObject!int;
218     subject.subscribe(observerObject!(int)(buf1));
219     subject.doSubscribe((int n) => buf2.put(n));
220 
221     assert(buf1.data.length == 0);
222     assert(buf2.data.length == 0);
223     subject.put(0);
224     assert(buf1.data.length == 1);
225     assert(buf2.data.length == 1);
226     assert(buf1.data[0] == buf2.data[0]);
227 }
228 
229 unittest
230 {
231     auto sub = new SubjectObject!int;
232     sub.completed();
233 
234     auto observer = new CounterObserver!int;
235     assert(observer.putCount == 0);
236     assert(observer.completedCount == 0);
237     assert(observer.failureCount == 0);
238     sub.subscribe(observer);
239     assert(observer.putCount == 0);
240     assert(observer.completedCount == 1);
241     assert(observer.failureCount == 0);
242 }
243 
244 unittest
245 {
246     auto sub = new SubjectObject!int;
247     auto ex = new Exception("Exception");
248     sub.failure(ex);
249 
250     auto observer = new CounterObserver!int;
251     assert(observer.putCount == 0);
252     assert(observer.completedCount == 0);
253     assert(observer.failureCount == 0);
254     sub.subscribe(observer);
255     assert(observer.putCount == 0);
256     assert(observer.completedCount == 0);
257     assert(observer.failureCount == 1);
258     assert(observer.lastException is ex);
259 }
260 
261 private class Subscription(TSubject, TObserver) : Disposable
262 {
263 public:
264     this(TSubject subject, TObserver observer)
265     {
266         _subject = subject;
267         _observer = observer;
268     }
269 
270 public:
271     void dispose()
272     {
273         if (_subject !is null)
274         {
275             _subject.unsubscribe(_observer);
276             _subject = null;
277         }
278     }
279 
280 private:
281     TSubject _subject;
282     TObserver _observer;
283 }
284 
285 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)(
286         TSubject subject, TObserver observer)
287 {
288     return new typeof(return)(subject, observer);
289 }
290 
291 ///
292 class AsyncSubject(E) : Subject!E
293 {
294 public:
295     ///
296     Disposable subscribe(Observer!E observer)
297     {
298         Exception ex = null;
299         E value;
300         bool hasValue = false;
301 
302         synchronized (this)
303         {
304             if (!_isStopped)
305             {
306                 _observers ~= observer;
307                 return subscription(this, observer);
308             }
309 
310             ex = _exception;
311             hasValue = _hasValue;
312             value = _value;
313         }
314 
315         if (ex !is null)
316         {
317             observer.failure(ex);
318         }
319         else if (hasValue)
320         {
321             .put(observer, value);
322             observer.completed();
323         }
324         else
325         {
326             observer.completed();
327         }
328 
329         return NopDisposable.instance;
330     }
331 
332     ///
333     auto subscribe(T)(T observer)
334     {
335         return subscribe(observerObject!E(observer));
336     }
337 
338     ///
339     void unsubscribe(Observer!E observer)
340     {
341         if (observer is null)
342             return;
343 
344         synchronized (this)
345         {
346             import std.algorithm : remove, countUntil;
347 
348             auto index = countUntil(_observers, observer);
349             if (index != -1)
350             {
351                 _observers = remove(_observers, index);
352             }
353         }
354     }
355 
356 public:
357     ///
358     void put(E value)
359     {
360         synchronized (this)
361         {
362             if (!_isStopped)
363             {
364                 _value = value;
365                 _hasValue = true;
366             }
367         }
368     }
369 
370     ///
371     void completed()
372     {
373         Observer!E[] os = null;
374 
375         E value;
376         bool hasValue = false;
377 
378         synchronized (this)
379         {
380             if (!_isStopped)
381             {
382                 os = _observers;
383                 _observers.length = 0;
384                 _isStopped = true;
385                 value = _value;
386                 hasValue = _hasValue;
387             }
388         }
389 
390         if (os)
391         {
392             if (hasValue)
393             {
394                 foreach (observer; os)
395                 {
396                     .put(observer, value);
397                     observer.completed();
398                 }
399             }
400             else
401             {
402                 foreach (observer; os)
403                 {
404                     observer.completed();
405                 }
406             }
407         }
408     }
409 
410     ///
411     void failure(Exception e)
412     {
413         assert(e !is null);
414 
415         Observer!E[] os = null;
416         synchronized (this)
417         {
418             if (!_isStopped)
419             {
420                 os = _observers;
421                 _observers.length = 0;
422                 _isStopped = true;
423                 _exception = e;
424             }
425         }
426 
427         if (os)
428         {
429             foreach (observer; os)
430             {
431                 observer.failure(e);
432             }
433         }
434     }
435 
436 private:
437     Observer!E[] _observers;
438     bool _isStopped;
439     E _value;
440     bool _hasValue;
441     Exception _exception;
442 }
443 
444 unittest
445 {
446     auto sub = new AsyncSubject!int;
447 
448     .put(sub, 1);
449     sub.completed();
450 
451     auto observer = new CounterObserver!int;
452 
453     assert(observer.hasNotBeenCalled);
454 
455     sub.subscribe(observer);
456 
457     assert(observer.putCount == 1);
458     assert(observer.completedCount == 1);
459     assert(observer.failureCount == 0);
460     assert(observer.lastValue == 1);
461 }
462 
463 unittest
464 {
465     auto sub = new AsyncSubject!int;
466     auto observer = new CounterObserver!int;
467 
468     auto d = sub.subscribe(observer);
469     scope (exit)
470         d.dispose();
471 
472     assert(observer.hasNotBeenCalled);
473 
474     sub.put(100);
475 
476     assert(observer.hasNotBeenCalled);
477 
478     assert(sub._hasValue);
479     assert(sub._value == 100);
480 
481     sub.completed();
482 
483     assert(observer.putCount == 1);
484     assert(observer.completedCount == 1);
485     assert(observer.failureCount == 0);
486     assert(observer.lastValue == 100);
487 }
488 
489 unittest
490 {
491     auto sub = new AsyncSubject!int;
492     auto observer = new CounterObserver!int;
493 
494     sub.put(100);
495 
496     assert(sub._hasValue);
497     assert(sub._value == 100);
498 
499     auto d = sub.subscribe(observer);
500     scope (exit)
501         d.dispose();
502 
503     assert(observer.hasNotBeenCalled);
504 
505     sub.completed();
506 
507     assert(observer.putCount == 1);
508     assert(observer.completedCount == 1);
509     assert(observer.failureCount == 0);
510     assert(observer.lastValue == 100);
511 }
512 
513 unittest
514 {
515     auto sub = new AsyncSubject!int;
516     auto observer = new CounterObserver!int;
517 
518     auto d = sub.subscribe(observer);
519 
520     d.dispose();
521     assert(observer.hasNotBeenCalled);
522 
523     sub.put(100);
524     assert(observer.hasNotBeenCalled);
525 
526     sub.completed();
527     assert(observer.hasNotBeenCalled);
528 }
529 
530 unittest
531 {
532     auto sub = new AsyncSubject!int;
533     auto observer = new CounterObserver!int;
534 
535     auto d = sub.subscribe(observer);
536     assert(observer.hasNotBeenCalled);
537 
538     sub.put(100);
539     assert(observer.hasNotBeenCalled);
540 
541     d.dispose();
542     assert(observer.hasNotBeenCalled);
543 
544     sub.completed();
545     assert(observer.hasNotBeenCalled);
546 }
547 
548 unittest
549 {
550 
551     auto sub = new AsyncSubject!int;
552     auto observer = new CounterObserver!int;
553 
554     sub.put(100);
555     assert(observer.hasNotBeenCalled);
556 
557     auto d = sub.subscribe(observer);
558     assert(observer.hasNotBeenCalled);
559 
560     d.dispose();
561     assert(observer.hasNotBeenCalled);
562 
563     sub.completed();
564     assert(observer.hasNotBeenCalled);
565 }
566 
567 unittest
568 {
569     auto sub = new AsyncSubject!int;
570     auto observer = new CounterObserver!int;
571 
572     auto d = sub.subscribe(observer);
573     scope (exit)
574         d.dispose();
575 
576     assert(observer.hasNotBeenCalled);
577 
578     sub.completed();
579 
580     assert(observer.putCount == 0);
581     assert(observer.completedCount == 1);
582     assert(observer.failureCount == 0);
583 }
584 
585 unittest
586 {
587     auto sub = new AsyncSubject!int;
588     auto observer = new CounterObserver!int;
589 
590     auto d = sub.subscribe(observer);
591     scope (exit)
592         d.dispose();
593 
594     assert(observer.hasNotBeenCalled);
595 
596     auto ex = new Exception("TEST");
597     sub.failure(ex);
598 
599     assert(observer.putCount == 0);
600     assert(observer.completedCount == 0);
601     assert(observer.failureCount == 1);
602     assert(observer.lastException is ex);
603 }
604 
605 unittest
606 {
607     auto sub = new AsyncSubject!int;
608     auto ex = new Exception("TEST");
609     sub.failure(ex);
610 
611     auto observer = new CounterObserver!int;
612 
613     auto d = sub.subscribe(observer);
614     scope (exit)
615         d.dispose();
616 
617     assert(observer.putCount == 0);
618     assert(observer.completedCount == 0);
619     assert(observer.failureCount == 1);
620     assert(observer.lastException is ex);
621 }
622 
623 unittest
624 {
625     auto sub = new AsyncSubject!int;
626     auto observer = new CounterObserver!int;
627 
628     sub.completed();
629     assert(observer.hasNotBeenCalled);
630 
631     sub.subscribe(observer);
632     assert(observer.putCount == 0);
633     assert(observer.completedCount == 1);
634     assert(observer.failureCount == 0);
635 }
636 
637 version (unittest)
638 {
639     class CounterObserver(T) : Observer!T
640     {
641     public:
642         size_t putCount;
643         size_t completedCount;
644         size_t failureCount;
645         T lastValue;
646         Exception lastException;
647 
648     public:
649         bool hasNotBeenCalled() const pure nothrow @nogc @safe @property
650         {
651             return putCount == 0 && completedCount == 0 && failureCount == 0;
652         }
653 
654     public:
655         void put(T obj)
656         {
657             putCount++;
658             lastValue = obj;
659         }
660 
661         void completed()
662         {
663             completedCount++;
664         }
665 
666         void failure(Exception e)
667         {
668             failureCount++;
669             lastException = e;
670         }
671     }
672 }
673 
674 ///
675 class BehaviorSubject(E) : Subject!E
676 {
677 public:
678     ///
679     this()
680     {
681         this(E.init);
682     }
683 
684     ///
685     this(E value)
686     {
687         _subject = new SubjectObject!E;
688         _value = value;
689     }
690 
691 public:
692     ///
693     inout(E) value() inout @property
694     {
695         return _value;
696     }
697 
698     ///
699     void value(E value) @property
700     {
701         if (_value != value)
702         {
703             _value = value;
704             .put(_subject, value);
705         }
706     }
707 
708 public:
709     ///
710     auto subscribe(TObserver)(auto ref TObserver observer)
711     {
712         .put(observer, value);
713         return _subject.doSubscribe(observer);
714     }
715 
716     ///
717     Disposable subscribe(Observer!E observer)
718     {
719         .put(observer, value);
720         return disposableObject(_subject.doSubscribe(observer));
721     }
722 
723     ///
724     void put(E obj)
725     {
726         value = obj;
727     }
728 
729     ///
730     void completed()
731     {
732         _subject.completed();
733     }
734 
735     ///
736     void failure(Exception e)
737     {
738         _subject.failure(e);
739     }
740 
741 private:
742     SubjectObject!E _subject;
743     E _value;
744 }
745 
746 unittest
747 {
748     static assert(isObservable!(BehaviorSubject!int, int));
749     static assert(is(BehaviorSubject!int.ElementType == int));
750 }
751 
752 unittest
753 {
754     int num = 0;
755     auto subject = new BehaviorSubject!int(100);
756 
757     auto d = subject.doSubscribe((int n) { num = n; });
758     assert(num == 100);
759 
760     .put(subject, 1);
761     assert(num == 1);
762 
763     d.dispose();
764     .put(subject, 10);
765     assert(num == 1);
766 }