1 /+++++++++++++++++++++++++++++
2  + This module defines the Subject and some implements.
3  +/
4 module rx.subject;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.util : assumeThreadLocal;
10 
11 import core.atomic : atomicLoad, cas;
12 import std.range : put;
13 
14 ///Represents an object that is both an observable sequence as well as an observer.
15 interface Subject(E) : Observer!E, Observable!E
16 {
17 }
18 
19 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
20 class SubjectObject(E) : Subject!E
21 {
22     alias ElementType = E;
23 
24 public:
25     ///
26     this()
27     {
28         _observer = cast(shared) NopObserver!E.instance;
29     }
30 
31 public:
32     ///
33     void put(E obj)
34     {
35         auto temp = assumeThreadLocal(atomicLoad(_observer));
36         .put(temp, obj);
37     }
38     ///
39     void completed()
40     {
41         shared(Observer!E) oldObserver = void;
42         shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance;
43         Observer!E temp = void;
44         do
45         {
46             oldObserver = _observer;
47             temp = assumeThreadLocal(atomicLoad(oldObserver));
48             if (cast(DoneObserver!E) temp)
49                 break;
50         }
51         while (!cas(&_observer, oldObserver, newObserver));
52         temp.completed();
53     }
54     ///
55     void failure(Exception error)
56     {
57         shared(Observer!E) oldObserver = void;
58         shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error);
59         Observer!E temp = void;
60         do
61         {
62             oldObserver = _observer;
63             temp = assumeThreadLocal(atomicLoad(oldObserver));
64             if (cast(DoneObserver!E) temp)
65                 break;
66         }
67         while (!cas(&_observer, oldObserver, newObserver));
68         temp.failure(error);
69     }
70 
71     ///
72     Disposable subscribe(T)(T observer)
73     {
74         return subscribe(observerObject!E(observer));
75     }
76     ///
77     Disposable subscribe(Observer!E observer)
78     {
79         shared(Observer!E) oldObserver = void;
80         shared(Observer!E) newObserver = void;
81         do
82         {
83             oldObserver = _observer;
84             auto temp = assumeThreadLocal(atomicLoad(oldObserver));
85 
86             if (temp is DoneObserver!E.instance)
87             {
88                 observer.completed();
89                 return NopDisposable.instance;
90             }
91 
92             if (auto fail = cast(DoneObserver!E) temp)
93             {
94                 observer.failure(fail.exception);
95                 return NopDisposable.instance;
96             }
97 
98             if (auto composite = cast(CompositeObserver!E) temp)
99             {
100                 newObserver = cast(shared) composite.add(observer);
101             }
102             else if (auto nop = cast(NopObserver!E) temp)
103             {
104                 newObserver = cast(shared) observer;
105             }
106             else
107             {
108                 newObserver = cast(shared)(new CompositeObserver!E([temp, observer]));
109             }
110         }
111         while (!cas(&_observer, oldObserver, newObserver));
112 
113         return subscription(this, observer);
114     }
115 
116     ///
117     void unsubscribe(Observer!E observer)
118     {
119         shared(Observer!E) oldObserver = void;
120         shared(Observer!E) newObserver = void;
121         do
122         {
123             oldObserver = _observer;
124 
125             import rx.util : assumeThreadLocal;
126 
127             auto temp = assumeThreadLocal(atomicLoad(oldObserver));
128             if (auto composite = cast(CompositeObserver!E) temp)
129             {
130                 newObserver = cast(shared) composite.remove(observer);
131             }
132             else
133             {
134                 if (temp !is observer)
135                     return;
136 
137                 newObserver = cast(shared) NopObserver!E.instance;
138             }
139         }
140         while (!cas(&_observer, oldObserver, newObserver));
141     }
142 
143 private:
144     shared(Observer!E) _observer;
145 }
146 
147 ///
148 unittest
149 {
150     import std.array : appender;
151 
152     auto data = appender!(int[])();
153     auto subject = new SubjectObject!int;
154     auto disposable = subject.subscribe(observerObject!(int)(data));
155     assert(disposable !is null);
156     subject.put(0);
157     subject.put(1);
158 
159     import std.algorithm : equal;
160 
161     assert(equal(data.data, [0, 1]));
162 
163     disposable.dispose();
164     subject.put(2);
165     assert(equal(data.data, [0, 1]));
166 }
167 
168 unittest
169 {
170     static assert(isObserver!(SubjectObject!int, int));
171     static assert(isObservable!(SubjectObject!int, int));
172     static assert(!isObservable!(SubjectObject!int, string));
173     static assert(!isObservable!(SubjectObject!int, string));
174 }
175 
176 unittest
177 {
178     auto subject = new SubjectObject!int;
179     auto observer = new CounterObserver!int;
180     auto disposable = subject.subscribe(observer);
181     scope (exit)
182         disposable.dispose();
183 
184     subject.put(0);
185     subject.put(1);
186 
187     assert(observer.putCount == 2);
188     subject.completed();
189     subject.put(2);
190     assert(observer.putCount == 2);
191     assert(observer.completedCount == 1);
192 }
193 
194 unittest
195 {
196     auto subject = new SubjectObject!int;
197     auto observer = new CounterObserver!int;
198     auto disposable = subject.subscribe(observer);
199     scope (exit)
200         disposable.dispose();
201 
202     subject.put(0);
203     subject.put(1);
204 
205     assert(observer.putCount == 2);
206     auto ex = new Exception("Exception");
207     subject.failure(ex);
208     subject.put(2);
209     assert(observer.putCount == 2);
210     assert(observer.failureCount == 1);
211     assert(observer.lastException is ex);
212 }
213 
214 unittest
215 {
216     import std.array : appender;
217 
218     auto buf1 = appender!(int[]);
219     auto buf2 = appender!(int[]);
220     auto subject = new SubjectObject!int;
221     subject.subscribe(observerObject!(int)(buf1));
222     subject.doSubscribe((int n) => buf2.put(n));
223 
224     assert(buf1.data.length == 0);
225     assert(buf2.data.length == 0);
226     subject.put(0);
227     assert(buf1.data.length == 1);
228     assert(buf2.data.length == 1);
229     assert(buf1.data[0] == buf2.data[0]);
230 }
231 
232 unittest
233 {
234     auto sub = new SubjectObject!int;
235     sub.completed();
236 
237     auto observer = new CounterObserver!int;
238     assert(observer.putCount == 0);
239     assert(observer.completedCount == 0);
240     assert(observer.failureCount == 0);
241     sub.subscribe(observer);
242     assert(observer.putCount == 0);
243     assert(observer.completedCount == 1);
244     assert(observer.failureCount == 0);
245 }
246 
247 unittest
248 {
249     auto sub = new SubjectObject!int;
250     auto ex = new Exception("Exception");
251     sub.failure(ex);
252 
253     auto observer = new CounterObserver!int;
254     assert(observer.putCount == 0);
255     assert(observer.completedCount == 0);
256     assert(observer.failureCount == 0);
257     sub.subscribe(observer);
258     assert(observer.putCount == 0);
259     assert(observer.completedCount == 0);
260     assert(observer.failureCount == 1);
261     assert(observer.lastException is ex);
262 }
263 
264 private class Subscription(TSubject, TObserver) : Disposable
265 {
266 public:
267     this(TSubject subject, TObserver observer)
268     {
269         _subject = subject;
270         _observer = observer;
271     }
272 
273 public:
274     void dispose()
275     {
276         if (_subject !is null)
277         {
278             _subject.unsubscribe(_observer);
279             _subject = null;
280         }
281     }
282 
283 private:
284     TSubject _subject;
285     TObserver _observer;
286 }
287 
288 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)(
289         TSubject subject, TObserver observer)
290 {
291     return new typeof(return)(subject, observer);
292 }
293 
294 ///
295 class AsyncSubject(E) : Subject!E
296 {
297 public:
298     ///
299     Disposable subscribe(Observer!E observer)
300     {
301         Exception ex = null;
302         E value;
303         bool hasValue = false;
304 
305         synchronized (this)
306         {
307             if (!_isStopped)
308             {
309                 _observers ~= observer;
310                 return subscription(this, observer);
311             }
312 
313             ex = _exception;
314             hasValue = _hasValue;
315             value = _value;
316         }
317 
318         if (ex !is null)
319         {
320             observer.failure(ex);
321         }
322         else if (hasValue)
323         {
324             .put(observer, value);
325             observer.completed();
326         }
327         else
328         {
329             observer.completed();
330         }
331 
332         return NopDisposable.instance;
333     }
334 
335     ///
336     auto subscribe(T)(T observer)
337     {
338         return subscribe(observerObject!E(observer));
339     }
340 
341     ///
342     void unsubscribe(Observer!E observer)
343     {
344         if (observer is null)
345             return;
346 
347         synchronized (this)
348         {
349             import std.algorithm : remove, countUntil;
350 
351             auto index = countUntil(_observers, observer);
352             if (index != -1)
353             {
354                 _observers = remove(_observers, index);
355             }
356         }
357     }
358 
359 public:
360     ///
361     void put(E value)
362     {
363         synchronized (this)
364         {
365             if (!_isStopped)
366             {
367                 _value = value;
368                 _hasValue = true;
369             }
370         }
371     }
372 
373     ///
374     void completed()
375     {
376         Observer!E[] os = null;
377 
378         E value;
379         bool hasValue = false;
380 
381         synchronized (this)
382         {
383             if (!_isStopped)
384             {
385                 os = _observers;
386                 _observers.length = 0;
387                 _isStopped = true;
388                 value = _value;
389                 hasValue = _hasValue;
390             }
391         }
392 
393         if (os)
394         {
395             if (hasValue)
396             {
397                 foreach (observer; os)
398                 {
399                     .put(observer, value);
400                     observer.completed();
401                 }
402             }
403             else
404             {
405                 foreach (observer; os)
406                 {
407                     observer.completed();
408                 }
409             }
410         }
411     }
412 
413     ///
414     void failure(Exception e)
415     {
416         assert(e !is null);
417 
418         Observer!E[] os = null;
419         synchronized (this)
420         {
421             if (!_isStopped)
422             {
423                 os = _observers;
424                 _observers.length = 0;
425                 _isStopped = true;
426                 _exception = e;
427             }
428         }
429 
430         if (os)
431         {
432             foreach (observer; os)
433             {
434                 observer.failure(e);
435             }
436         }
437     }
438 
439 private:
440     Observer!E[] _observers;
441     bool _isStopped;
442     E _value;
443     bool _hasValue;
444     Exception _exception;
445 }
446 
447 unittest
448 {
449     auto sub = new AsyncSubject!int;
450 
451     .put(sub, 1);
452     sub.completed();
453 
454     auto observer = new CounterObserver!int;
455 
456     assert(observer.hasNotBeenCalled);
457 
458     sub.subscribe(observer);
459 
460     assert(observer.putCount == 1);
461     assert(observer.completedCount == 1);
462     assert(observer.failureCount == 0);
463     assert(observer.lastValue == 1);
464 }
465 
466 unittest
467 {
468     auto sub = new AsyncSubject!int;
469     auto observer = new CounterObserver!int;
470 
471     auto d = sub.subscribe(observer);
472     scope (exit)
473         d.dispose();
474 
475     assert(observer.hasNotBeenCalled);
476 
477     sub.put(100);
478 
479     assert(observer.hasNotBeenCalled);
480 
481     assert(sub._hasValue);
482     assert(sub._value == 100);
483 
484     sub.completed();
485 
486     assert(observer.putCount == 1);
487     assert(observer.completedCount == 1);
488     assert(observer.failureCount == 0);
489     assert(observer.lastValue == 100);
490 }
491 
492 unittest
493 {
494     auto sub = new AsyncSubject!int;
495     auto observer = new CounterObserver!int;
496 
497     sub.put(100);
498 
499     assert(sub._hasValue);
500     assert(sub._value == 100);
501 
502     auto d = sub.subscribe(observer);
503     scope (exit)
504         d.dispose();
505 
506     assert(observer.hasNotBeenCalled);
507 
508     sub.completed();
509 
510     assert(observer.putCount == 1);
511     assert(observer.completedCount == 1);
512     assert(observer.failureCount == 0);
513     assert(observer.lastValue == 100);
514 }
515 
516 unittest
517 {
518     auto sub = new AsyncSubject!int;
519     auto observer = new CounterObserver!int;
520 
521     auto d = sub.subscribe(observer);
522 
523     d.dispose();
524     assert(observer.hasNotBeenCalled);
525 
526     sub.put(100);
527     assert(observer.hasNotBeenCalled);
528 
529     sub.completed();
530     assert(observer.hasNotBeenCalled);
531 }
532 
533 unittest
534 {
535     auto sub = new AsyncSubject!int;
536     auto observer = new CounterObserver!int;
537 
538     auto d = sub.subscribe(observer);
539     assert(observer.hasNotBeenCalled);
540 
541     sub.put(100);
542     assert(observer.hasNotBeenCalled);
543 
544     d.dispose();
545     assert(observer.hasNotBeenCalled);
546 
547     sub.completed();
548     assert(observer.hasNotBeenCalled);
549 }
550 
551 unittest
552 {
553 
554     auto sub = new AsyncSubject!int;
555     auto observer = new CounterObserver!int;
556 
557     sub.put(100);
558     assert(observer.hasNotBeenCalled);
559 
560     auto d = sub.subscribe(observer);
561     assert(observer.hasNotBeenCalled);
562 
563     d.dispose();
564     assert(observer.hasNotBeenCalled);
565 
566     sub.completed();
567     assert(observer.hasNotBeenCalled);
568 }
569 
570 unittest
571 {
572     auto sub = new AsyncSubject!int;
573     auto observer = new CounterObserver!int;
574 
575     auto d = sub.subscribe(observer);
576     scope (exit)
577         d.dispose();
578 
579     assert(observer.hasNotBeenCalled);
580 
581     sub.completed();
582 
583     assert(observer.putCount == 0);
584     assert(observer.completedCount == 1);
585     assert(observer.failureCount == 0);
586 }
587 
588 unittest
589 {
590     auto sub = new AsyncSubject!int;
591     auto observer = new CounterObserver!int;
592 
593     auto d = sub.subscribe(observer);
594     scope (exit)
595         d.dispose();
596 
597     assert(observer.hasNotBeenCalled);
598 
599     auto ex = new Exception("TEST");
600     sub.failure(ex);
601 
602     assert(observer.putCount == 0);
603     assert(observer.completedCount == 0);
604     assert(observer.failureCount == 1);
605     assert(observer.lastException is ex);
606 }
607 
608 unittest
609 {
610     auto sub = new AsyncSubject!int;
611     auto ex = new Exception("TEST");
612     sub.failure(ex);
613 
614     auto observer = new CounterObserver!int;
615 
616     auto d = sub.subscribe(observer);
617     scope (exit)
618         d.dispose();
619 
620     assert(observer.putCount == 0);
621     assert(observer.completedCount == 0);
622     assert(observer.failureCount == 1);
623     assert(observer.lastException is ex);
624 }
625 
626 unittest
627 {
628     auto sub = new AsyncSubject!int;
629     auto observer = new CounterObserver!int;
630 
631     sub.completed();
632     assert(observer.hasNotBeenCalled);
633 
634     sub.subscribe(observer);
635     assert(observer.putCount == 0);
636     assert(observer.completedCount == 1);
637     assert(observer.failureCount == 0);
638 }
639 
640 version (unittest)
641 {
642     class CounterObserver(T) : Observer!T
643     {
644     public:
645         size_t putCount;
646         size_t completedCount;
647         size_t failureCount;
648         T lastValue;
649         Exception lastException;
650 
651     public:
652         bool hasNotBeenCalled() const pure nothrow @nogc @safe @property
653         {
654             return putCount == 0 && completedCount == 0 && failureCount == 0;
655         }
656 
657     public:
658         void put(T obj)
659         {
660             putCount++;
661             lastValue = obj;
662         }
663 
664         void completed()
665         {
666             completedCount++;
667         }
668 
669         void failure(Exception e)
670         {
671             failureCount++;
672             lastException = e;
673         }
674     }
675 }
676 
677 ///
678 class BehaviorSubject(E) : Subject!E
679 {
680 public:
681     ///
682     this()
683     {
684         this(E.init);
685     }
686 
687     ///
688     this(E value)
689     {
690         _subject = new SubjectObject!E;
691         _value = value;
692     }
693 
694 public:
695     ///
696     inout(E) value() inout @property
697     {
698         return _value;
699     }
700 
701     ///
702     void value(E value) @property
703     {
704         if (_value != value)
705         {
706             _value = value;
707             .put(_subject, value);
708         }
709     }
710 
711 public:
712     ///
713     auto subscribe(TObserver)(auto ref TObserver observer)
714     {
715         .put(observer, value);
716         return _subject.doSubscribe(observer);
717     }
718 
719     ///
720     Disposable subscribe(Observer!E observer)
721     {
722         .put(observer, value);
723         return disposableObject(_subject.doSubscribe(observer));
724     }
725 
726     ///
727     void put(E obj)
728     {
729         value = obj;
730     }
731 
732     ///
733     void completed()
734     {
735         _subject.completed();
736     }
737 
738     ///
739     void failure(Exception e)
740     {
741         _subject.failure(e);
742     }
743 
744 private:
745     SubjectObject!E _subject;
746     E _value;
747 }
748 
749 unittest
750 {
751     static assert(isObservable!(BehaviorSubject!int, int));
752     static assert(is(BehaviorSubject!int.ElementType == int));
753 }
754 
755 unittest
756 {
757     int num = 0;
758     auto subject = new BehaviorSubject!int(100);
759 
760     auto d = subject.doSubscribe((int n) { num = n; });
761     assert(num == 100);
762 
763     .put(subject, 1);
764     assert(num == 1);
765 
766     d.dispose();
767     .put(subject, 10);
768     assert(num == 1);
769 }