1 module rx.algorithm.debounce;
2 
3 import core.time;
4 import core.thread;
5 import std.range : put;
6 import rx.disposable;
7 import rx.observer;
8 import rx.observable;
9 import rx.scheduler;
10 
11 //#########################
12 // Debounce
13 //#########################
14 ///
15 DebounceObservable!(T, TScheduler, T.ElementType) debounce(T, TScheduler : AsyncScheduler)(
16         T observable, Duration val, TScheduler scheduler)
17 {
18     return typeof(return)(observable, scheduler, val);
19 }
20 ///
21 DebounceObservable!(T, TaskPoolScheduler, T.ElementType) debounce(T)(T observable, Duration val)
22 {
23     return typeof(return)(observable, new TaskPoolScheduler, val);
24 }
25 ///
26 unittest
27 {
28     import rx.subject : SubjectObject;
29     import core.thread : Thread;
30     import core.time : dur;
31 
32     auto obs = new SubjectObject!int;
33 
34     import std.array : appender;
35 
36     auto buf = appender!(int[]);
37     auto d = obs.debounce(dur!"msecs"(100), new TaskPoolScheduler).doSubscribe(buf);
38     scope (exit)
39         d.dispose();
40 
41     .put(obs, 1);
42     Thread.sleep(dur!"msecs"(200));
43     .put(obs, 2);
44     .put(obs, 3);
45     Thread.sleep(dur!"msecs"(200));
46 
47     assert(buf.data.length == 2);
48     assert(buf.data[0] == 1);
49     assert(buf.data[1] == 3);
50 }
51 
52 struct DebounceObserver(TObserver, TScheduler, E)
53 {
54 public:
55     this(TObserver observer, TScheduler scheduler, Duration val, SerialDisposable disposable)
56     {
57         _observer = observer;
58         _scheduler = scheduler;
59         _dueTime = val;
60         _disposable = disposable;
61     }
62 
63 public:
64     void put(E obj)
65     {
66         static if (hasFailure!TObserver)
67         {
68             try
69             {
70                 _disposable.disposable = _scheduler.schedule({
71                     try
72                     {
73                         .put(_observer, obj);
74                     }
75                     catch (Exception e)
76                     {
77                         _observer.failure(e);
78                         _disposable.dispose();
79                     }
80                 }, _dueTime);
81             }
82             catch (Exception e)
83             {
84                 _observer.failure(e);
85                 _disposable.dispose();
86             }
87         }
88         else
89         {
90             _disposable.disposable = _scheduler.schedule({ .put(_observer, obj); }, _dueTime);
91         }
92     }
93 
94     void completed()
95     {
96         static if (hasCompleted!TObserver)
97         {
98             _observer.completed();
99         }
100         _disposable.dispose();
101     }
102 
103     void failure(Exception e)
104     {
105         static if (hasFailure!TObserver)
106         {
107             _observer.failure(e);
108         }
109         _disposable.dispose();
110     }
111 
112 private:
113     TObserver _observer;
114     TScheduler _scheduler;
115     Duration _dueTime;
116     SerialDisposable _disposable;
117 }
118 
119 struct DebounceObservable(TObservable, TScheduler, E)
120 {
121     alias ElementType = E;
122 public:
123     this(TObservable observable, TScheduler scheduler, Duration val)
124     {
125         _observable = observable;
126         _scheduler = scheduler;
127         _dueTime = val;
128     }
129 
130 public:
131     auto subscribe(T)(T observer)
132     {
133         alias ObserverType = DebounceObserver!(T, TScheduler, ElementType);
134         auto inner = new SerialDisposable;
135         auto outer = _observable.doSubscribe(ObserverType(observer, _scheduler, _dueTime, inner));
136         return new CompositeDisposable(disposableObject(outer), inner);
137     }
138 
139 private:
140     TObservable _observable;
141     TScheduler _scheduler;
142     Duration _dueTime;
143 }
144 
145 unittest
146 {
147     import std.array;
148     import rx.subject;
149 
150     auto s = new TaskPoolScheduler;
151     auto sub = new SubjectObject!int;
152     auto buf = appender!(int[]);
153 
154     auto d = sub.debounce(dur!"msecs"(50), s).doSubscribe(buf);
155 
156     foreach (i; 0 .. 10)
157     {
158         sub.put(i);
159     }
160     Thread.sleep(dur!"msecs"(100));
161 
162     import std.algorithm : equal;
163     import std.format : format;
164 
165     assert(equal(buf.data, [9]), "buf.data is %s".format(buf.data));
166 }
167 
168 unittest
169 {
170     import std.array;
171     import rx.subject;
172 
173     auto sub = new SubjectObject!int;
174     auto buf = appender!(int[]);
175 
176     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
177 
178     foreach (i; 0 .. 10)
179     {
180         sub.put(i);
181     }
182     Thread.sleep(dur!"msecs"(100));
183 
184     import std.algorithm : equal;
185 
186     assert(equal(buf.data, [9]));
187 }
188 
189 unittest
190 {
191     import std.array;
192     import rx.subject;
193 
194     auto sub = new SubjectObject!int;
195     auto buf = appender!(int[]);
196 
197     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
198 
199     foreach (i; 0 .. 10)
200     {
201         sub.put(i);
202     }
203     d.dispose();
204     Thread.sleep(dur!"msecs"(100));
205 
206     import std.algorithm : equal;
207 
208     assert(buf.data.length == 0);
209 }
210 
211 unittest
212 {
213     import std.array;
214     import rx.subject;
215 
216     auto sub = new SubjectObject!int;
217     auto buf = appender!(int[]);
218 
219     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
220 
221     foreach (i; 0 .. 10)
222     {
223         sub.put(i);
224     }
225     sub.completed();
226     Thread.sleep(dur!"msecs"(100));
227 
228     import std.algorithm : equal;
229 
230     assert(buf.data.length == 0);
231 }
232 
233 unittest
234 {
235     import std.array;
236     import rx.subject;
237 
238     auto sub = new SubjectObject!int;
239     auto buf = appender!(int[]);
240 
241     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
242 
243     foreach (i; 0 .. 10)
244     {
245         sub.put(i);
246     }
247     sub.failure(null);
248     Thread.sleep(dur!"msecs"(100));
249 
250     import std.algorithm : equal;
251 
252     assert(buf.data.length == 0);
253 }