1 /+++++++++++++++++++++++++++++
2  + This module defines the Subject and some implements.
3  +/
4 module rx.subject;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 
10 import core.atomic : atomicLoad, cas;
11 import std.range : put;
12 
13 ///Represents an object that is both an observable sequence as well as an observer.
14 interface Subject(E) : Observer!E, Observable!E
15 {
16 }
17 
18 ///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
19 class SubjectObject(E) : Subject!E
20 {
21     alias ElementType = E;
22 public:
23     this()
24     {
25         _observer = cast(shared) NopObserver!E.instance;
26     }
27 
28 public:
29     ///
30     void put(E obj)
31     {
32         auto temp = atomicLoad(_observer);
33         .put(temp, obj);
34     }
35     ///
36     void completed()
37     {
38         shared(Observer!E) oldObserver = void;
39         shared(Observer!E) newObserver = cast(shared) DoneObserver!E.instance;
40         Observer!E temp = void;
41         do
42         {
43             oldObserver = _observer;
44             temp = atomicLoad(oldObserver);
45             if (cast(DoneObserver!E) temp)
46                 break;
47         }
48         while (!cas(&_observer, oldObserver, newObserver));
49         temp.completed();
50     }
51     ///
52     void failure(Exception error)
53     {
54         shared(Observer!E) oldObserver = void;
55         shared(Observer!E) newObserver = cast(shared) new DoneObserver!E(error);
56         Observer!E temp = void;
57         do
58         {
59             oldObserver = _observer;
60             temp = atomicLoad(oldObserver);
61             if (cast(DoneObserver!E) temp)
62                 break;
63         }
64         while (!cas(&_observer, oldObserver, newObserver));
65         temp.failure(error);
66     }
67 
68     ///
69     Disposable subscribe(T)(T observer)
70     {
71         return subscribe(observerObject!E(observer));
72     }
73     ///
74     Disposable subscribe(Observer!E observer)
75     {
76         shared(Observer!E) oldObserver = void;
77         shared(Observer!E) newObserver = void;
78         do
79         {
80             oldObserver = _observer;
81             auto temp = atomicLoad(oldObserver);
82 
83             if (temp is DoneObserver!E.instance)
84             {
85                 observer.completed();
86                 return NopDisposable.instance;
87             }
88 
89             if (auto fail = cast(DoneObserver!E) temp)
90             {
91                 observer.failure(fail.exception);
92                 return NopDisposable.instance;
93             }
94 
95             if (auto composite = cast(CompositeObserver!E) temp)
96             {
97                 newObserver = cast(shared) composite.add(observer);
98             }
99             else if (auto nop = cast(NopObserver!E) temp)
100             {
101                 newObserver = cast(shared) observer;
102             }
103             else
104             {
105                 newObserver = cast(shared)(new CompositeObserver!E([temp, observer]));
106             }
107         }
108         while (!cas(&_observer, oldObserver, newObserver));
109 
110         return subscription(this, observer);
111     }
112 
113     ///
114     void unsubscribe(Observer!E observer)
115     {
116         shared(Observer!E) oldObserver = void;
117         shared(Observer!E) newObserver = void;
118         do
119         {
120             oldObserver = _observer;
121 
122             auto temp = atomicLoad(oldObserver);
123             if (auto composite = cast(CompositeObserver!E) temp)
124             {
125                 newObserver = cast(shared) composite.remove(observer);
126             }
127             else
128             {
129                 if (temp !is observer)
130                     return;
131 
132                 newObserver = cast(shared) NopObserver!E.instance;
133             }
134         }
135         while (!cas(&_observer, oldObserver, newObserver));
136     }
137 
138 private:
139     shared(Observer!E) _observer;
140 }
141 
142 ///
143 unittest
144 {
145     import std.array : appender;
146 
147     auto data = appender!(int[])();
148     auto subject = new SubjectObject!int;
149     auto disposable = subject.subscribe(observerObject!(int)(data));
150     assert(disposable !is null);
151     subject.put(0);
152     subject.put(1);
153 
154     import std.algorithm : equal;
155 
156     assert(equal(data.data, [0, 1]));
157 
158     disposable.dispose();
159     subject.put(2);
160     assert(equal(data.data, [0, 1]));
161 }
162 
163 unittest
164 {
165     static assert(isObserver!(SubjectObject!int, int));
166     static assert(isObservable!(SubjectObject!int, int));
167     static assert(!isObservable!(SubjectObject!int, string));
168     static assert(!isObservable!(SubjectObject!int, string));
169 }
170 
171 unittest
172 {
173     auto subject = new SubjectObject!int;
174     auto observer = new CounterObserver!int;
175     auto disposable = subject.subscribe(observer);
176     scope (exit)
177         disposable.dispose();
178 
179     subject.put(0);
180     subject.put(1);
181 
182     assert(observer.putCount == 2);
183     subject.completed();
184     subject.put(2);
185     assert(observer.putCount == 2);
186     assert(observer.completedCount == 1);
187 }
188 
189 unittest
190 {
191     auto subject = new SubjectObject!int;
192     auto observer = new CounterObserver!int;
193     auto disposable = subject.subscribe(observer);
194     scope (exit)
195         disposable.dispose();
196 
197     subject.put(0);
198     subject.put(1);
199 
200     assert(observer.putCount == 2);
201     auto ex = new Exception("Exception");
202     subject.failure(ex);
203     subject.put(2);
204     assert(observer.putCount == 2);
205     assert(observer.failureCount == 1);
206     assert(observer.lastException is ex);
207 }
208 
209 unittest
210 {
211     import std.array : appender;
212 
213     auto buf1 = appender!(int[]);
214     auto buf2 = appender!(int[]);
215     auto subject = new SubjectObject!int;
216     subject.subscribe(observerObject!(int)(buf1));
217     subject.doSubscribe((int n) => buf2.put(n));
218 
219     assert(buf1.data.length == 0);
220     assert(buf2.data.length == 0);
221     subject.put(0);
222     assert(buf1.data.length == 1);
223     assert(buf2.data.length == 1);
224     assert(buf1.data[0] == buf2.data[0]);
225 }
226 
227 unittest
228 {
229     auto sub = new SubjectObject!int;
230     sub.completed();
231 
232     auto observer = new CounterObserver!int;
233     assert(observer.putCount == 0);
234     assert(observer.completedCount == 0);
235     assert(observer.failureCount == 0);
236     sub.subscribe(observer);
237     assert(observer.putCount == 0);
238     assert(observer.completedCount == 1);
239     assert(observer.failureCount == 0);
240 }
241 
242 unittest
243 {
244     auto sub = new SubjectObject!int;
245     auto ex = new Exception("Exception");
246     sub.failure(ex);
247 
248     auto observer = new CounterObserver!int;
249     assert(observer.putCount == 0);
250     assert(observer.completedCount == 0);
251     assert(observer.failureCount == 0);
252     sub.subscribe(observer);
253     assert(observer.putCount == 0);
254     assert(observer.completedCount == 0);
255     assert(observer.failureCount == 1);
256     assert(observer.lastException is ex);
257 }
258 
259 private class Subscription(TSubject, TObserver) : Disposable
260 {
261 public:
262     this(TSubject subject, TObserver observer)
263     {
264         _subject = subject;
265         _observer = observer;
266     }
267 
268 public:
269     void dispose()
270     {
271         if (_subject !is null)
272         {
273             _subject.unsubscribe(_observer);
274             _subject = null;
275         }
276     }
277 
278 private:
279     TSubject _subject;
280     TObserver _observer;
281 }
282 
283 private Subscription!(TSubject, TObserver) subscription(TSubject, TObserver)(
284         TSubject subject, TObserver observer)
285 {
286     return new typeof(return)(subject, observer);
287 }
288 
289 ///
290 class AsyncSubject(E) : Subject!E
291 {
292 public:
293     ///
294     Disposable subscribe(Observer!E observer)
295     {
296         Exception ex = null;
297         E value;
298         bool hasValue = false;
299 
300         synchronized (this)
301         {
302             if (!_isStopped)
303             {
304                 _observers ~= observer;
305                 return subscription(this, observer);
306             }
307 
308             ex = _exception;
309             hasValue = _hasValue;
310             value = _value;
311         }
312 
313         if (ex !is null)
314         {
315             observer.failure(ex);
316         }
317         else if (hasValue)
318         {
319             .put(observer, value);
320             observer.completed();
321         }
322         else
323         {
324             observer.completed();
325         }
326 
327         return NopDisposable.instance;
328     }
329 
330     ///
331     auto subscribe(T)(T observer)
332     {
333         return subscribe(observerObject!E(observer));
334     }
335 
336     ///
337     void unsubscribe(Observer!E observer)
338     {
339         if (observer is null)
340             return;
341 
342         synchronized (this)
343         {
344             import std.algorithm : remove, countUntil;
345 
346             auto index = countUntil(_observers, observer);
347             if (index != -1)
348             {
349                 _observers = remove(_observers, index);
350             }
351         }
352     }
353 
354 public:
355     ///
356     void put(E value)
357     {
358         synchronized (this)
359         {
360             if (!_isStopped)
361             {
362                 _value = value;
363                 _hasValue = true;
364             }
365         }
366     }
367 
368     ///
369     void completed()
370     {
371         Observer!E[] os = null;
372 
373         E value;
374         bool hasValue = false;
375 
376         synchronized (this)
377         {
378             if (!_isStopped)
379             {
380                 os = _observers;
381                 _observers.length = 0;
382                 _isStopped = true;
383                 value = _value;
384                 hasValue = _hasValue;
385             }
386         }
387 
388         if (os)
389         {
390             if (hasValue)
391             {
392                 foreach (observer; os)
393                 {
394                     .put(observer, value);
395                     observer.completed();
396                 }
397             }
398             else
399             {
400                 foreach (observer; os)
401                 {
402                     observer.completed();
403                 }
404             }
405         }
406     }
407 
408     ///
409     void failure(Exception e)
410     {
411         assert(e !is null);
412 
413         Observer!E[] os = null;
414         synchronized (this)
415         {
416             if (!_isStopped)
417             {
418                 os = _observers;
419                 _observers.length = 0;
420                 _isStopped = true;
421                 _exception = e;
422             }
423         }
424 
425         if (os)
426         {
427             foreach (observer; os)
428             {
429                 observer.failure(e);
430             }
431         }
432     }
433 
434 private:
435     Observer!E[] _observers;
436     bool _isStopped;
437     E _value;
438     bool _hasValue;
439     Exception _exception;
440 }
441 
442 unittest
443 {
444     auto sub = new AsyncSubject!int;
445 
446     .put(sub, 1);
447     sub.completed();
448 
449     auto observer = new CounterObserver!int;
450 
451     assert(observer.hasNotBeenCalled);
452 
453     sub.subscribe(observer);
454 
455     assert(observer.putCount == 1);
456     assert(observer.completedCount == 1);
457     assert(observer.failureCount == 0);
458     assert(observer.lastValue == 1);
459 }
460 
461 unittest
462 {
463     auto sub = new AsyncSubject!int;
464     auto observer = new CounterObserver!int;
465 
466     auto d = sub.subscribe(observer);
467     scope (exit)
468         d.dispose();
469 
470     assert(observer.hasNotBeenCalled);
471 
472     sub.put(100);
473 
474     assert(observer.hasNotBeenCalled);
475 
476     assert(sub._hasValue);
477     assert(sub._value == 100);
478 
479     sub.completed();
480 
481     assert(observer.putCount == 1);
482     assert(observer.completedCount == 1);
483     assert(observer.failureCount == 0);
484     assert(observer.lastValue == 100);
485 }
486 
487 unittest
488 {
489     auto sub = new AsyncSubject!int;
490     auto observer = new CounterObserver!int;
491 
492     sub.put(100);
493 
494     assert(sub._hasValue);
495     assert(sub._value == 100);
496 
497     auto d = sub.subscribe(observer);
498     scope (exit)
499         d.dispose();
500 
501     assert(observer.hasNotBeenCalled);
502 
503     sub.completed();
504 
505     assert(observer.putCount == 1);
506     assert(observer.completedCount == 1);
507     assert(observer.failureCount == 0);
508     assert(observer.lastValue == 100);
509 }
510 
511 unittest
512 {
513     auto sub = new AsyncSubject!int;
514     auto observer = new CounterObserver!int;
515 
516     auto d = sub.subscribe(observer);
517 
518     d.dispose();
519     assert(observer.hasNotBeenCalled);
520 
521     sub.put(100);
522     assert(observer.hasNotBeenCalled);
523 
524     sub.completed();
525     assert(observer.hasNotBeenCalled);
526 }
527 
528 unittest
529 {
530     auto sub = new AsyncSubject!int;
531     auto observer = new CounterObserver!int;
532 
533     auto d = sub.subscribe(observer);
534     assert(observer.hasNotBeenCalled);
535 
536     sub.put(100);
537     assert(observer.hasNotBeenCalled);
538 
539     d.dispose();
540     assert(observer.hasNotBeenCalled);
541 
542     sub.completed();
543     assert(observer.hasNotBeenCalled);
544 }
545 
546 unittest
547 {
548 
549     auto sub = new AsyncSubject!int;
550     auto observer = new CounterObserver!int;
551 
552     sub.put(100);
553     assert(observer.hasNotBeenCalled);
554 
555     auto d = sub.subscribe(observer);
556     assert(observer.hasNotBeenCalled);
557 
558     d.dispose();
559     assert(observer.hasNotBeenCalled);
560 
561     sub.completed();
562     assert(observer.hasNotBeenCalled);
563 }
564 
565 unittest
566 {
567     auto sub = new AsyncSubject!int;
568     auto observer = new CounterObserver!int;
569 
570     auto d = sub.subscribe(observer);
571     scope (exit)
572         d.dispose();
573 
574     assert(observer.hasNotBeenCalled);
575 
576     sub.completed();
577 
578     assert(observer.putCount == 0);
579     assert(observer.completedCount == 1);
580     assert(observer.failureCount == 0);
581 }
582 
583 unittest
584 {
585     auto sub = new AsyncSubject!int;
586     auto observer = new CounterObserver!int;
587 
588     auto d = sub.subscribe(observer);
589     scope (exit)
590         d.dispose();
591 
592     assert(observer.hasNotBeenCalled);
593 
594     auto ex = new Exception("TEST");
595     sub.failure(ex);
596 
597     assert(observer.putCount == 0);
598     assert(observer.completedCount == 0);
599     assert(observer.failureCount == 1);
600     assert(observer.lastException is ex);
601 }
602 
603 unittest
604 {
605     auto sub = new AsyncSubject!int;
606     auto ex = new Exception("TEST");
607     sub.failure(ex);
608 
609     auto observer = new CounterObserver!int;
610 
611     auto d = sub.subscribe(observer);
612     scope (exit)
613         d.dispose();
614 
615     assert(observer.putCount == 0);
616     assert(observer.completedCount == 0);
617     assert(observer.failureCount == 1);
618     assert(observer.lastException is ex);
619 }
620 
621 unittest
622 {
623     auto sub = new AsyncSubject!int;
624     auto observer = new CounterObserver!int;
625 
626     sub.completed();
627     assert(observer.hasNotBeenCalled);
628 
629     sub.subscribe(observer);
630     assert(observer.putCount == 0);
631     assert(observer.completedCount == 1);
632     assert(observer.failureCount == 0);
633 }
634 
635 version (unittest)
636 {
637     class CounterObserver(T) : Observer!T
638     {
639     public:
640         size_t putCount;
641         size_t completedCount;
642         size_t failureCount;
643         T lastValue;
644         Exception lastException;
645 
646     public:
647         bool hasNotBeenCalled() const pure nothrow @nogc @safe @property
648         {
649             return putCount == 0 && completedCount == 0 && failureCount == 0;
650         }
651 
652     public:
653         void put(T obj)
654         {
655             putCount++;
656             lastValue = obj;
657         }
658 
659         void completed()
660         {
661             completedCount++;
662         }
663 
664         void failure(Exception e)
665         {
666             failureCount++;
667             lastException = e;
668         }
669     }
670 }