1 module rx.algorithm.uniq;
2 
3 import rx.disposable;
4 import rx.observable;
5 import rx.observer;
6 import rx.util;
7 
8 import std.functional : binaryFun;
9 import std.range : put;
10 
11 struct UniqObserver(TObserver, E, alias pred = "a == b")
12 {
13     mixin SimpleObserverImpl!(TObserver, E);
14 
15 public:
16     this(ref TObserver observer)
17     {
18         _observer = observer;
19         _hasValue = false;
20     }
21 
22     static if (hasCompleted!TObserver || hasFailure!TObserver)
23     {
24         this(ref TObserver observer, Disposable disposable)
25         {
26             _observer = observer;
27             _disposable = disposable;
28         }
29     }
30 
31 private:
32     void putImpl(E obj)
33     {
34         alias fun = binaryFun!pred;
35 
36         if (_hasValue)
37         {
38             if (!fun(_current, obj))
39             {
40                 _current = obj;
41                 .put(_observer, obj);
42             }
43         }
44         else
45         {
46             _current = obj;
47             _hasValue = true;
48             .put(_observer, obj);
49         }
50     }
51 
52 private:
53     bool _hasValue;
54     E _current;
55 }
56 
57 @safe unittest
58 {
59     import std.array : appender;
60 
61     auto buf = appender!(int[]);
62 
63     auto observer = UniqObserver!(typeof(buf), int)(buf);
64 
65     .put(observer, [1, 1, 2, 3]);
66 
67     import std.algorithm : equal;
68 
69     assert(equal(buf.data, [1, 2, 3]));
70 }
71 
72 @safe unittest
73 {
74     struct Person
75     {
76         string name;
77         int age;
78     }
79 
80     import std.array : appender;
81 
82     auto buf = appender!(Person[]);
83 
84     auto observer = UniqObserver!(typeof(buf), Person, "a.name == b.name")(buf);
85 
86     .put(observer, Person("Smith", 20));
87     .put(observer, Person("Smith", 30));
88     .put(observer, Person("Johnson", 40));
89     .put(observer, Person("Johnson", 50));
90 
91     import std.algorithm : equal;
92 
93     auto data = buf.data;
94     assert(data.length == 2);
95     assert(data[0].name == "Smith");
96     assert(data[0].age == 20);
97     assert(data[1].name == "Johnson");
98     assert(data[1].age == 40);
99 }
100 
101 struct UniqObservable(TObservable, alias pred = "a == b")
102 {
103     alias ElementType = TObservable.ElementType;
104 
105 public:
106     this(ref TObservable observable)
107     {
108         _observable = observable;
109     }
110 
111 public:
112     auto subscribe(TObserver)(auto ref TObserver observer)
113     {
114         alias ObserverType = UniqObserver!(TObserver, ElementType, pred);
115 
116         static if (hasCompleted!TObserver || hasFailure!TObserver)
117         {
118             auto disposable = new SingleAssignmentDisposable;
119             disposable.setDisposable(disposableObject(doSubscribe(_observable,
120                     ObserverType(observer, disposable))));
121             return disposable;
122         }
123         else
124         {
125             return doSubscribe(_observable, ObserverType(observer));
126         }
127     }
128 
129 private:
130     TObservable _observable;
131 }
132 
133 @system unittest
134 {
135     import rx.subject : SubjectObject;
136 
137     auto sub = new SubjectObject!int;
138 
139     auto observable = UniqObservable!(typeof(sub))(sub);
140 
141     import std.array : appender;
142 
143     auto buf = appender!(int[]);
144 
145     auto disposable = observable.subscribe(buf);
146     scope (exit)
147         disposable.dispose();
148 
149     .put(sub, 10);
150     .put(sub, 10);
151     .put(sub, 20);
152     .put(sub, 30);
153 
154     auto data = buf.data;
155     assert(data.length == 3);
156     assert(data[0] == 10);
157     assert(data[1] == 20);
158     assert(data[2] == 30);
159 }
160 
161 unittest
162 {
163     struct Point
164     {
165         int x;
166         int y;
167     }
168 
169     import rx.subject : SubjectObject;
170 
171     auto sub = new SubjectObject!Point;
172 
173     auto observable = UniqObservable!(typeof(sub), "a.x == b.x")(sub);
174 
175     import std.array : appender;
176 
177     auto buf = appender!(Point[]);
178 
179     auto disposable = observable.subscribe(buf);
180     scope (exit)
181         disposable.dispose();
182 
183     .put(sub, Point(0, 0));
184     .put(sub, Point(0, 10));
185     .put(sub, Point(10, 10));
186     .put(sub, Point(10, 20));
187 
188     auto data = buf.data;
189     assert(data.length == 2);
190     assert(data[0] == Point(0, 0));
191     assert(data[1] == Point(10, 10));
192 }
193 
194 ///
195 template uniq(alias pred = "a == b")
196 {
197     UniqObservable!(TObservable, pred) uniq(TObservable)(auto ref TObservable observable)
198     {
199         return typeof(return)(observable);
200     }
201 }
202 
203 ///
204 unittest
205 {
206     import rx.subject : SubjectObject;
207     import std.array : appender;
208 
209     auto sub = new SubjectObject!int;
210     auto buf = appender!(int[]);
211 
212     auto disposable = sub.uniq.subscribe(buf);
213     scope (exit)
214         disposable.dispose();
215 
216     .put(sub, [11, 11, 22, 22, 33]);
217 
218     auto data = buf.data;
219     assert(data.length == 3);
220     assert(data[0] == 11);
221     assert(data[1] == 22);
222     assert(data[2] == 33);
223 }
224 
225 ///
226 @system unittest
227 {
228     import std.datetime : Date;
229     import rx.subject : SubjectObject;
230     import std.array : appender;
231     
232     auto sub = new SubjectObject!Date;
233     auto buf = appender!(Date[]);
234 
235     auto disposable = sub.uniq!"a.year == b.year".subscribe(buf);
236     scope (exit)
237         disposable.dispose();
238 
239     .put(sub, Date(2000, 1, 1));
240     .put(sub, Date(2000, 1, 2));
241     .put(sub, Date(2017, 3, 24));
242     .put(sub, Date(2017, 4, 24));
243     .put(sub, Date(2017, 4, 24));
244 
245     auto data = buf.data;
246     assert(data.length == 2);
247     assert(data[0] == Date(2000, 1, 1));
248     assert(data[1] == Date(2017, 3, 24));
249 }