1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'scan'
3  +/
4 module rx.algorithm.scan;
5 
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.util;
10 
11 import std.functional;
12 import std.range;
13 
14 //####################
15 // Scan
16 //####################
17 struct ScanObserver(alias f, TObserver, E, TAccumulate)
18 {
19     mixin SimpleObserverImpl!(TObserver, E);
20 
21 public:
22     this(TObserver observer, TAccumulate seed)
23     {
24         _observer = observer;
25         _current = seed;
26     }
27 
28     static if (hasCompleted!TObserver || hasFailure!TObserver)
29     {
30         this(TObserver observer, TAccumulate seed, Disposable disposable)
31         {
32             _observer = observer;
33             _current = seed;
34             _disposable = disposable;
35         }
36     }
37 
38 public:
39     void putImpl(E obj)
40     {
41         alias fun = binaryFun!f;
42         _current = fun(_current, obj);
43         .put(_observer, _current);
44     }
45 
46 private:
47     TAccumulate _current;
48 }
49 
50 unittest
51 {
52     import std.array : appender;
53 
54     auto buf = appender!(int[]);
55     alias TObserver = ScanObserver!((a, b) => a + b, typeof(buf), int, int);
56     auto observer = TObserver(buf, 0);
57     foreach (i; 1 .. 6)
58     {
59         .put(observer, i);
60     }
61     auto result = buf.data;
62     assert(result.length == 5);
63     assert(result[0] == 1);
64     assert(result[1] == 3);
65     assert(result[2] == 6);
66     assert(result[3] == 10);
67     assert(result[4] == 15);
68 }
69 
70 struct ScanObservable(alias f, TObservable, TAccumulate)
71 {
72     alias ElementType = TAccumulate;
73 
74 public:
75     this(TObservable observable, TAccumulate seed)
76     {
77         _observable = observable;
78         _seed = seed;
79     }
80 
81 public:
82     auto subscribe(TObserver)(TObserver observer)
83     {
84         alias ObserverType = ScanObserver!(f, TObserver, TObservable.ElementType, TAccumulate);
85         static if (hasCompleted!TObserver || hasFailure!TObserver)
86         {
87             auto disposable = new SingleAssignmentDisposable;
88             disposable.setDisposable(disposableObject(doSubscribe(_observable,
89                     ObserverType(observer, _seed, disposable))));
90             return disposable;
91         }
92         else
93         {
94             return doSubscribe(_observable, ObserverType(observer, _seed));
95         }
96     }
97 
98 private:
99     TObservable _observable;
100     TAccumulate _seed;
101 }
102 
103 unittest
104 {
105     alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int);
106     static assert(isObservable!(Scan, int));
107 }
108 
109 unittest
110 {
111     import rx.subject : SubjectObject;
112 
113     auto sub = new SubjectObject!int;
114 
115     alias Scan = ScanObservable!((a, b) => a + b, Observable!int, int);
116     auto s = Scan(sub, 0);
117 
118     import std.stdio : writeln;
119 
120     auto disposable = s.subscribe((int i) => writeln(i));
121     static assert(isDisposable!(typeof(disposable)));
122 }
123 
124 ///
125 template scan(alias f)
126 {
127     auto scan(TObservable, TAccumulate)(auto ref TObservable observable, TAccumulate seed)
128     {
129         return ScanObservable!(f, TObservable, TAccumulate)(observable, seed);
130     }
131 }
132 ///
133 unittest
134 {
135     import rx.subject : SubjectObject;
136 
137     auto subject = new SubjectObject!int;
138 
139     auto sum = subject.scan!((a, b) => a + b)(0);
140     static assert(isObservable!(typeof(sum), int));
141 
142     import std.array : appender;
143 
144     auto buf = appender!(int[]);
145     auto disposable = sum.subscribe(buf);
146     scope (exit)
147         disposable.dispose();
148 
149     foreach (_; 0 .. 5)
150     {
151         subject.put(1);
152     }
153 
154     auto result = buf.data;
155     assert(result.length == 5);
156     import std.algorithm : equal;
157 
158     assert(equal(result, [1, 2, 3, 4, 5]));
159 }