1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'tee'
3  +/
4 module rx.algorithm.tee;
5 
6 import rx.disposable;
7 import rx.observable;
8 import rx.observer;
9 import rx.util;
10 
11 import std.functional : unaryFun;
12 import std.range : put;
13 
14 //####################
15 // Tee
16 //####################
17 struct TeeObserver(alias f, TObserver, E)
18 {
19     mixin SimpleObserverImpl!(TObserver, E);
20 
21 public:
22     this(TObserver observer)
23     {
24         _observer = observer;
25     }
26 
27     static if (hasCompleted!TObserver || hasFailure!TObserver)
28     {
29         this(TObserver observer, Disposable disposable)
30         {
31             _observer = observer;
32             _disposable = disposable;
33         }
34     }
35 
36 private:
37     void putImpl(E obj)
38     {
39         unaryFun!f(obj);
40         .put(_observer, obj);
41     }
42 }
43 
44 struct TeeObservable(alias f, TObservable, E)
45 {
46     alias ElementType = E;
47 
48 public:
49     this(TObservable observable)
50     {
51         _observable = observable;
52     }
53 
54 public:
55     auto subscribe(T)(auto ref T observer)
56     {
57         alias ObserverType = TeeObserver!(f, T, E);
58         static if (hasCompleted!T || hasFailure!T)
59         {
60             auto disposable = new SingleAssignmentDisposable;
61             disposable.setDisposable(disposableObject(doSubscribe(_observable,
62                     ObserverType(observer, disposable))));
63             return disposable;
64         }
65         else
66         {
67             return doSubscribe(_observable, ObserverType(observer));
68         }
69     }
70 
71 private:
72     TObservable _observable;
73 }
74 
75 ///
76 template tee(alias f)
77 {
78     TeeObservable!(f, TObservable, TObservable.ElementType) tee(TObservable)(
79             auto ref TObservable observable)
80     {
81         return typeof(return)(observable);
82     }
83 }
84 ///
85 unittest
86 {
87     import rx.subject : SubjectObject;
88 
89     auto sub = new SubjectObject!int;
90 
91     import std.array : appender;
92 
93     auto buf1 = appender!(int[]);
94     auto buf2 = appender!(int[]);
95 
96     import rx.algorithm : map;
97 
98     auto disposable = sub.tee!(i => buf1.put(i))().map!(i => i * 2)().subscribe(buf2);
99 
100     sub.put(1);
101     sub.put(2);
102     disposable.dispose();
103     sub.put(3);
104 
105     import std.algorithm : equal;
106 
107     assert(equal(buf1.data, [1, 2]));
108     assert(equal(buf2.data, [2, 4]));
109 }
110 
111 unittest
112 {
113     import rx.subject : SubjectObject;
114 
115     auto sub = new SubjectObject!int;
116 
117     int countPut = 0;
118     int countFailure = 0;
119     struct Test
120     {
121         void put(int)
122         {
123             countPut++;
124         }
125 
126         void failure(Exception)
127         {
128             countFailure++;
129         }
130     }
131 
132     int foo(int n)
133     {
134         if (n == 0)
135             throw new Exception("");
136         return n * 2;
137     }
138 
139     auto d = sub.tee!foo().doSubscribe(Test());
140     scope (exit)
141         d.dispose();
142 
143     assert(countPut == 0);
144     sub.put(1);
145     assert(countPut == 1);
146     assert(countFailure == 0);
147     sub.put(0);
148     assert(countPut == 1);
149     assert(countFailure == 1);
150 }