1 module rx.algorithm.debounce;
2 
3 import core.time;
4 import core.thread;
5 import std.range : put;
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.scheduler;
10 
11 //#########################
12 // Debounce
13 //#########################
14 ///
15 DebounceObservable!(T, TScheduler, T.ElementType) debounce(T, TScheduler : AsyncScheduler)(
16         T observable, Duration val, TScheduler scheduler)
17 {
18     return typeof(return)(observable, scheduler, val);
19 }
20 ///
21 DebounceObservable!(T, TaskPoolScheduler, T.ElementType) debounce(T)(T observable, Duration val)
22 {
23     return typeof(return)(observable, new TaskPoolScheduler, val);
24 }
25 ///
26 unittest
27 {
28     import rx.subject : SubjectObject;
29     import core.thread : Thread;
30     import core.time : dur;
31 
32     auto obs = new SubjectObject!int;
33 
34     import std.array : appender;
35 
36     auto buf = appender!(int[]);
37     auto d = obs.debounce(dur!"msecs"(100), new TaskPoolScheduler).doSubscribe(buf);
38     scope (exit)
39         d.dispose();
40 
41     .put(obs, 1);
42     Thread.sleep(dur!"msecs"(200));
43     .put(obs, 2);
44     .put(obs, 3);
45     Thread.sleep(dur!"msecs"(200));
46 
47     assert(buf.data.length == 2);
48     assert(buf.data[0] == 1);
49     assert(buf.data[1] == 3);
50 }
51 
52 struct DebounceObserver(TObserver, TScheduler, E)
53 {
54 public:
55     this(TObserver observer, TScheduler scheduler, Duration val, SerialDisposable disposable)
56     {
57         _observer = observer;
58         _scheduler = scheduler;
59         _dueTime = val;
60         _disposable = disposable;
61     }
62 
63 public:
64     void put(E obj)
65     {
66         static if (hasFailure!TObserver)
67         {
68             try
69             {
70                 _disposable.disposable = _scheduler.schedule({
71                     try
72                     {
73                         .put(_observer, obj);
74                     }
75                     catch (Exception e)
76                     {
77                         _observer.failure(e);
78                         _disposable.dispose();
79                     }
80                 }, _dueTime);
81             }
82             catch (Exception e)
83             {
84                 _observer.failure(e);
85                 _disposable.dispose();
86             }
87         }
88         else
89         {
90             _disposable.disposable = _scheduler.schedule({ .put(_observer, obj); }, _dueTime);
91         }
92     }
93 
94     void completed()
95     {
96         static if (hasCompleted!TObserver)
97         {
98             _observer.completed();
99         }
100         _disposable.dispose();
101     }
102 
103     void failure(Exception e)
104     {
105         static if (hasFailure!TObserver)
106         {
107             _observer.failure(e);
108         }
109         _disposable.dispose();
110     }
111 
112 private:
113     TObserver _observer;
114     TScheduler _scheduler;
115     Duration _dueTime;
116     SerialDisposable _disposable;
117 }
118 
119 struct DebounceObservable(TObservable, TScheduler, E)
120 {
121     alias ElementType = E;
122 public:
123     this(TObservable observable, TScheduler scheduler, Duration val)
124     {
125         _observable = observable;
126         _scheduler = scheduler;
127         _dueTime = val;
128     }
129 
130 public:
131     auto subscribe(T)(T observer)
132     {
133         alias ObserverType = DebounceObserver!(T, TScheduler, ElementType);
134         auto inner = new SerialDisposable;
135         auto outer = _observable.doSubscribe(ObserverType(observer, _scheduler, _dueTime, inner));
136         return new CompositeDisposable(disposableObject(outer), inner);
137     }
138 
139 private:
140     TObservable _observable;
141     TScheduler _scheduler;
142     Duration _dueTime;
143 }
144 
145 unittest
146 {
147     import std.array;
148     import rx.subject;
149 
150     auto s = new TaskPoolScheduler;
151     auto sub = new SubjectObject!int;
152     auto buf = appender!(int[]);
153 
154     auto d = sub.debounce(dur!"msecs"(50), s).doSubscribe(buf);
155 
156     foreach (i; 0 .. 10)
157     {
158         sub.put(i);
159     }
160     Thread.sleep(dur!"msecs"(100));
161 
162     import std.algorithm : equal;
163 
164     assert(equal(buf.data, [9]));
165 }
166 
167 unittest
168 {
169     import std.array;
170     import rx.subject;
171 
172     auto sub = new SubjectObject!int;
173     auto buf = appender!(int[]);
174 
175     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
176 
177     foreach (i; 0 .. 10)
178     {
179         sub.put(i);
180     }
181     Thread.sleep(dur!"msecs"(100));
182 
183     import std.algorithm : equal;
184 
185     assert(equal(buf.data, [9]));
186 }
187 
188 unittest
189 {
190     import std.array;
191     import rx.subject;
192 
193     auto sub = new SubjectObject!int;
194     auto buf = appender!(int[]);
195 
196     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
197 
198     foreach (i; 0 .. 10)
199     {
200         sub.put(i);
201     }
202     d.dispose();
203     Thread.sleep(dur!"msecs"(100));
204 
205     import std.algorithm : equal;
206 
207     assert(buf.data.length == 0);
208 }
209 
210 unittest
211 {
212     import std.array;
213     import rx.subject;
214 
215     auto sub = new SubjectObject!int;
216     auto buf = appender!(int[]);
217 
218     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
219 
220     foreach (i; 0 .. 10)
221     {
222         sub.put(i);
223     }
224     sub.completed();
225     Thread.sleep(dur!"msecs"(100));
226 
227     import std.algorithm : equal;
228 
229     assert(buf.data.length == 0);
230 }
231 
232 unittest
233 {
234     import std.array;
235     import rx.subject;
236 
237     auto sub = new SubjectObject!int;
238     auto buf = appender!(int[]);
239 
240     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
241 
242     foreach (i; 0 .. 10)
243     {
244         sub.put(i);
245     }
246     sub.failure(null);
247     Thread.sleep(dur!"msecs"(100));
248 
249     import std.algorithm : equal;
250 
251     assert(buf.data.length == 0);
252 }