1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'scan'
3  +/
4 module rx.algorithm.scan;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.util;
10 
11 import std.functional;
12 import std.range;
13 import std.typecons;
14 
15 //####################
16 // Scan
17 //####################
18 struct ScanObserver(alias f, TObserver, E, TAccumulate)
19 {
20     mixin SimpleObserverImpl!(TObserver, E);
21 
22 public:
23     this(TObserver observer, TAccumulate seed)
24     {
25         _observer = observer;
26         _current = refCounted(seed);
27     }
28 
29     static if (hasCompleted!TObserver || hasFailure!TObserver)
30     {
31         this(TObserver observer, TAccumulate seed, Disposable disposable)
32         {
33             _observer = observer;
34             _current = refCounted(seed);
35             _disposable = disposable;
36         }
37     }
38 
39 public:
40     void putImpl(E obj)
41     {
42         alias fun = binaryFun!f;
43         _current = fun(_current, obj);
44         .put(_observer, _current);
45     }
46 
47 private:
48     RefCounted!TAccumulate _current;
49 }
50 
51 unittest
52 {
53     import std.array : appender;
54 
55     auto buf = appender!(int[]);
56     alias TObserver = ScanObserver!((a, b) => a + b, typeof(buf), int, int);
57     auto observer = TObserver(buf, 0);
58     foreach (i; 1 .. 6)
59     {
60         .put(observer, i);
61     }
62     auto result = buf.data;
63     assert(result.length == 5);
64     assert(result[0] == 1);
65     assert(result[1] == 3);
66     assert(result[2] == 6);
67     assert(result[3] == 10);
68     assert(result[4] == 15);
69 }
70 
71 struct ScanObservable(alias f, TObservable, TAccumulate)
72 {
73     alias ElementType = TAccumulate;
74 
75 public:
76     this(TObservable observable, TAccumulate seed)
77     {
78         _observable = observable;
79         _seed = seed;
80     }
81 
82 public:
83     auto subscribe(TObserver)(TObserver observer)
84     {
85         alias ObserverType = ScanObserver!(f, TObserver, TObservable.ElementType, TAccumulate);
86         static if (hasCompleted!TObserver || hasFailure!TObserver)
87         {
88             auto disposable = new SingleAssignmentDisposable;
89             disposable.setDisposable(disposableObject(doSubscribe(_observable,
90                     ObserverType(observer, _seed, disposable))));
91             return disposable;
92         }
93         else
94         {
95             return doSubscribe(_observable, ObserverType(observer, _seed));
96         }
97     }
98 
99 private:
100     TObservable _observable;
101     TAccumulate _seed;
102 }
103 
104 unittest
105 {
106     alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int);
107     static assert(isObservable!(Scan, int));
108 }
109 
110 unittest
111 {
112     import rx.subject : SubjectObject;
113 
114     auto sub = new SubjectObject!int;
115 
116     alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int);
117     auto s = Scan(sub, 0);
118 
119     import std.stdio : writeln;
120 
121     auto disposable = s.subscribe((int i) => writeln(i));
122     static assert(isDisposable!(typeof(disposable)));
123 }
124 
125 ///
126 template scan(alias f)
127 {
128     auto scan(TObservable, TAccumulate)(auto ref TObservable observable, TAccumulate seed)
129     {
130         return ScanObservable!(f, TObservable, TAccumulate)(observable, seed);
131     }
132 }
133 ///
134 unittest
135 {
136     import rx.subject : SubjectObject;
137 
138     auto subject = new SubjectObject!int;
139 
140     auto sum = subject.scan!((a, b) => a + b)(0);
141     static assert(isObservable!(typeof(sum), int));
142 
143     import std.array : appender;
144 
145     auto buf = appender!(int[]);
146     auto disposable = sum.subscribe(buf);
147     scope (exit)
148         disposable.dispose();
149 
150     foreach (_; 0 .. 5)
151     {
152         subject.put(1);
153     }
154 
155     auto result = buf.data;
156     assert(result.length == 5);
157     import std.algorithm : equal;
158 
159     assert(equal(result, [1, 2, 3, 4, 5]));
160 }
161 
162 unittest
163 {
164     import rx.subject : SubjectObject;
165 
166     auto subject1 = new SubjectObject!int;
167     auto subject2 = new SubjectObject!int;
168 
169     import rx.algorithm : merge;
170 
171     auto sum = merge(subject1, subject2).scan!"a + b"(0);
172     static assert(isObservable!(typeof(sum), int));
173 
174     import std.array : appender;
175 
176     auto buf = appender!(int[]);
177     auto disposable = sum.subscribe(buf);
178     scope (exit)
179         disposable.dispose();
180 
181     subject1.put(1); // 1
182     subject2.put(1); // 2
183     subject1.put(-1); // 1
184     subject2.put(-1); // 0
185     subject2.put(1); // 1
186     subject2.put(-1); // 0
187 
188     auto result = buf.data;
189     assert(result.length == 6);
190     import std.algorithm : equal;
191 
192     assert(equal(result, [1, 2, 1, 0, 1, 0]));
193 }