1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'map'
3  +/
4 module rx.algorithm.map;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.util;
10 
11 import std.functional;
12 import std.range;
13 
14 //####################
15 // Map
16 //####################
17 struct MapObserver(alias f, TObserver, E)
18 {
19     mixin SimpleObserverImpl!(TObserver, E);
20 
21 public:
22     this(TObserver observer)
23     {
24         _observer = observer;
25     }
26 
27     static if (hasCompleted!TObserver || hasFailure!TObserver)
28     {
29         this(TObserver observer, Disposable disposable)
30         {
31             _observer = observer;
32             _disposable = disposable;
33         }
34     }
35 private:
36     void putImpl(E obj)
37     {
38         alias fun = unaryFun!f;
39         .put(_observer, fun(obj));
40     }
41 }
42 
43 unittest
44 {
45     import std.conv : to;
46 
47     alias TObserver = MapObserver!(o => to!string(o), Observer!string, int);
48 
49     static assert(isObserver!(TObserver, int));
50 }
51 
52 struct MapObservable(alias f, TObservable)
53 {
54     alias ElementType = typeof({
55         return unaryFun!(f)(TObservable.ElementType.init);
56     }());
57 
58 public:
59     this(TObservable observable)
60     {
61         _observable = observable;
62     }
63 
64 public:
65     auto subscribe(TObserver)(TObserver observer)
66     {
67         alias ObserverType = MapObserver!(f, TObserver, TObservable.ElementType);
68         static if (hasCompleted!TObserver || hasFailure!TObserver)
69         {
70             auto disposable = new SingleAssignmentDisposable;
71             disposable.setDisposable(disposableObject(doSubscribe(_observable,
72                     ObserverType(observer, disposable))));
73             return disposable;
74         }
75         else
76         {
77             return doSubscribe(_observable, ObserverType(observer));
78         }
79     }
80 
81 private:
82     TObservable _observable;
83 }
84 
85 unittest
86 {
87     import rx.subject;
88     import std.conv : to;
89 
90     alias TObservable = MapObservable!(n => to!string(n), Subject!int);
91     static assert(is(TObservable.ElementType : string));
92     static assert(isSubscribable!(TObservable, Observer!string));
93 
94     int putCount = 0;
95     int completedCount = 0;
96     int failureCount = 0;
97     struct TestObserver
98     {
99         void put(string n)
100         {
101             putCount++;
102         }
103 
104         void completed()
105         {
106             completedCount++;
107         }
108 
109         void failure(Exception)
110         {
111             failureCount++;
112         }
113     }
114 
115     auto sub = new SubjectObject!int;
116     auto observable = TObservable(sub);
117     auto disposable = observable.subscribe(TestObserver());
118     assert(putCount == 0);
119     sub.put(0);
120     assert(putCount == 1);
121     sub.put(1);
122     assert(putCount == 2);
123     disposable.dispose();
124     sub.put(2);
125     assert(putCount == 2);
126 }
127 
128 ///
129 template map(alias f)
130 {
131     MapObservable!(f, TObservable) map(TObservable)(auto ref TObservable observable)
132     {
133         return typeof(return)(observable);
134     }
135 }
136 ///
137 unittest
138 {
139     import rx.subject;
140     import std.array : appender;
141     import std.conv : to;
142 
143     Subject!int sub = new SubjectObject!int;
144     auto mapped = sub.map!(n => to!string(n));
145     static assert(isObservable!(typeof(mapped), string));
146     static assert(isSubscribable!(typeof(mapped), Observer!string));
147 
148     auto buffer = appender!(string[])();
149     auto disposable = mapped.subscribe(buffer);
150     scope (exit)
151         disposable.dispose();
152 
153     sub.put(0);
154     sub.put(1);
155     sub.put(2);
156 
157     import std.algorithm : equal;
158 
159     assert(equal(buffer.data, ["0", "1", "2"][]));
160 }
161 ///
162 unittest
163 {
164     import rx.subject;
165     import std.array : appender;
166     import std.conv : to;
167 
168     Subject!int sub = new SubjectObject!int;
169     auto mapped = sub.map!"a * 2";
170     static assert(isObservable!(typeof(mapped), int));
171     static assert(isSubscribable!(typeof(mapped), Observer!int));
172 
173     auto buffer = appender!(int[])();
174     auto disposable = mapped.subscribe(buffer);
175     scope (exit)
176         disposable.dispose();
177 
178     sub.put(0);
179     sub.put(1);
180     sub.put(2);
181 
182     import std.algorithm : equal;
183 
184     assert(equal(buffer.data, [0, 2, 4][]));
185 }
186 
187 unittest
188 {
189     import rx.observable : asObservable;
190 
191     auto data = [1, 2, 3, 4];
192     int[] result;
193     data.asObservable().map!"a + 2"().doSubscribe!(n => result ~= n);
194 
195     assert(result == [3, 4, 5, 6]);
196 }