1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'uniq'
3  +
4  + This is also called 'distinctUntilChanged'.
5  +/
6 module rx.algorithm.uniq;
7 
8 import rx.disposable;
9 import rx.observable;
10 import rx.observer;
11 import rx.util;
12 
13 import std.functional : binaryFun;
14 import std.range : put;
15 
16 struct UniqObserver(TObserver, E, alias pred = "a == b")
17 {
18     mixin SimpleObserverImpl!(TObserver, E);
19 
20 public:
21     this(ref TObserver observer)
22     {
23         _observer = observer;
24         _hasValue = false;
25     }
26 
27     static if (hasCompleted!TObserver || hasFailure!TObserver)
28     {
29         this(ref TObserver observer, Disposable disposable)
30         {
31             _observer = observer;
32             _disposable = disposable;
33         }
34     }
35 
36 private:
37     void putImpl(E obj)
38     {
39         alias fun = binaryFun!pred;
40 
41         if (_hasValue)
42         {
43             if (!fun(_current, obj))
44             {
45                 _current = obj;
46                 .put(_observer, obj);
47             }
48         }
49         else
50         {
51             _current = obj;
52             _hasValue = true;
53             .put(_observer, obj);
54         }
55     }
56 
57 private:
58     bool _hasValue;
59     E _current;
60 }
61 
62 @safe unittest
63 {
64     import std.array : appender;
65 
66     auto buf = appender!(int[]);
67 
68     auto observer = UniqObserver!(typeof(buf), int)(buf);
69 
70     .put(observer, [1, 1, 2, 3]);
71 
72     import std.algorithm : equal;
73 
74     assert(equal(buf.data, [1, 2, 3]));
75 }
76 
77 @safe unittest
78 {
79     struct Person
80     {
81         string name;
82         int age;
83     }
84 
85     import std.array : appender;
86 
87     auto buf = appender!(Person[]);
88 
89     auto observer = UniqObserver!(typeof(buf), Person, "a.name == b.name")(buf);
90 
91     .put(observer, Person("Smith", 20));
92     .put(observer, Person("Smith", 30));
93     .put(observer, Person("Johnson", 40));
94     .put(observer, Person("Johnson", 50));
95 
96     import std.algorithm : equal;
97 
98     auto data = buf.data;
99     assert(data.length == 2);
100     assert(data[0].name == "Smith");
101     assert(data[0].age == 20);
102     assert(data[1].name == "Johnson");
103     assert(data[1].age == 40);
104 }
105 
106 struct UniqObservable(TObservable, alias pred = "a == b")
107 {
108     alias ElementType = TObservable.ElementType;
109 
110 public:
111     this(ref TObservable observable)
112     {
113         _observable = observable;
114     }
115 
116 public:
117     auto subscribe(TObserver)(auto ref TObserver observer)
118     {
119         alias ObserverType = UniqObserver!(TObserver, ElementType, pred);
120 
121         static if (hasCompleted!TObserver || hasFailure!TObserver)
122         {
123             auto disposable = new SingleAssignmentDisposable;
124             disposable.setDisposable(disposableObject(doSubscribe(_observable,
125                     ObserverType(observer, disposable))));
126             return disposable;
127         }
128         else
129         {
130             return doSubscribe(_observable, ObserverType(observer));
131         }
132     }
133 
134 private:
135     TObservable _observable;
136 }
137 
138 @system unittest
139 {
140     import rx.subject : SubjectObject;
141 
142     auto sub = new SubjectObject!int;
143 
144     auto observable = UniqObservable!(typeof(sub))(sub);
145 
146     import std.array : appender;
147 
148     auto buf = appender!(int[]);
149 
150     auto disposable = observable.subscribe(buf);
151     scope (exit)
152         disposable.dispose();
153 
154     .put(sub, 10);
155     .put(sub, 10);
156     .put(sub, 20);
157     .put(sub, 30);
158 
159     auto data = buf.data;
160     assert(data.length == 3);
161     assert(data[0] == 10);
162     assert(data[1] == 20);
163     assert(data[2] == 30);
164 }
165 
166 unittest
167 {
168     struct Point
169     {
170         int x;
171         int y;
172     }
173 
174     import rx.subject : SubjectObject;
175 
176     auto sub = new SubjectObject!Point;
177 
178     auto observable = UniqObservable!(typeof(sub), "a.x == b.x")(sub);
179 
180     import std.array : appender;
181 
182     auto buf = appender!(Point[]);
183 
184     auto disposable = observable.subscribe(buf);
185     scope (exit)
186         disposable.dispose();
187 
188     .put(sub, Point(0, 0));
189     .put(sub, Point(0, 10));
190     .put(sub, Point(10, 10));
191     .put(sub, Point(10, 20));
192 
193     auto data = buf.data;
194     assert(data.length == 2);
195     assert(data[0] == Point(0, 0));
196     assert(data[1] == Point(10, 10));
197 }
198 
199 ///
200 template uniq(alias pred = "a == b")
201 {
202     UniqObservable!(TObservable, pred) uniq(TObservable)(auto ref TObservable observable)
203     {
204         return typeof(return)(observable);
205     }
206 }
207 
208 ///ditto
209 alias distinctUntilChanged = uniq;
210 
211 ///ditto
212 unittest
213 {
214     import rx.subject : SubjectObject;
215     import std.array : appender;
216 
217     auto sub = new SubjectObject!int;
218     auto buf = appender!(int[]);
219 
220     auto disposable = sub.uniq.subscribe(buf);
221     scope (exit)
222         disposable.dispose();
223 
224     .put(sub, [11, 11, 22, 22, 33]);
225 
226     auto data = buf.data;
227     assert(data.length == 3);
228     assert(data[0] == 11);
229     assert(data[1] == 22);
230     assert(data[2] == 33);
231 }
232 
233 ///ditto
234 @system unittest
235 {
236     import std.datetime : Date;
237     import rx.subject : SubjectObject;
238     import std.array : appender;
239     
240     auto sub = new SubjectObject!Date;
241     auto buf = appender!(Date[]);
242 
243     auto disposable = sub.uniq!"a.year == b.year".subscribe(buf);
244     scope (exit)
245         disposable.dispose();
246 
247     .put(sub, Date(2000, 1, 1));
248     .put(sub, Date(2000, 1, 2));
249     .put(sub, Date(2017, 3, 24));
250     .put(sub, Date(2017, 4, 24));
251     .put(sub, Date(2017, 4, 24));
252 
253     auto data = buf.data;
254     assert(data.length == 2);
255     assert(data[0] == Date(2000, 1, 1));
256     assert(data[1] == Date(2017, 3, 24));
257 }
258 
259 ///ditto
260 @system unittest
261 {
262     import std.datetime : Date;
263     import rx.subject : SubjectObject;
264     import std.array : appender;
265     
266     auto sub = new SubjectObject!Date;
267     auto buf = appender!(Date[]);
268 
269     auto disposable = sub.distinctUntilChanged!"a.year == b.year".subscribe(buf);
270     scope (exit)
271         disposable.dispose();
272 
273     .put(sub, Date(2000, 1, 1));
274     .put(sub, Date(2000, 1, 2));
275     .put(sub, Date(2017, 3, 24));
276     .put(sub, Date(2017, 4, 24));
277     .put(sub, Date(2017, 4, 24));
278 
279     auto data = buf.data;
280     assert(data.length == 2);
281     assert(data[0] == Date(2000, 1, 1));
282     assert(data[1] == Date(2017, 3, 24));
283 }