1 /+++++++++++++++++++++++++++++
2  + This module defines algorithm 'debounce'
3  +/
4 module rx.algorithm.debounce;
5 
6 import core.time;
7 import core.thread;
8 import std.range : put;
9 import rx.disposable;
10 import rx.observer;
11 import rx.observable;
12 import rx.scheduler;
13 
14 //#########################
15 // Debounce
16 //#########################
17 ///
18 DebounceObservable!(T, TScheduler, T.ElementType) debounce(T, TScheduler : Scheduler)(
19         T observable, Duration val, TScheduler scheduler)
20 {
21     return typeof(return)(observable, scheduler, val);
22 }
23 ///
24 DebounceObservable!(T, TaskPoolScheduler, T.ElementType) debounce(T)(T observable, Duration val)
25 {
26     return typeof(return)(observable, new TaskPoolScheduler, val);
27 }
28 ///
29 unittest
30 {
31     import rx.subject : SubjectObject;
32     import core.thread : Thread;
33     import core.time : dur;
34 
35     auto obs = new SubjectObject!int;
36 
37     import std.array : appender;
38 
39     auto buf = appender!(int[]);
40     auto d = obs.debounce(dur!"msecs"(100), new TaskPoolScheduler).doSubscribe(buf);
41     scope (exit)
42         d.dispose();
43 
44     .put(obs, 1);
45     Thread.sleep(dur!"msecs"(200));
46     .put(obs, 2);
47     .put(obs, 3);
48     Thread.sleep(dur!"msecs"(200));
49 
50     assert(buf.data.length == 2);
51     assert(buf.data[0] == 1);
52     assert(buf.data[1] == 3);
53 }
54 
55 struct DebounceObserver(TObserver, TScheduler, E)
56 {
57 public:
58     this(TObserver observer, TScheduler scheduler, Duration val, SerialDisposable disposable)
59     {
60         _observer = observer;
61         _scheduler = scheduler;
62         _dueTime = val;
63         _disposable = disposable;
64     }
65 
66 public:
67     void put(E obj)
68     {
69         static if (hasFailure!TObserver)
70         {
71             try
72             {
73                 _disposable.disposable = _scheduler.schedule({
74                     try
75                     {
76                         .put(_observer, obj);
77                     }
78                     catch (Exception e)
79                     {
80                         _observer.failure(e);
81                         _disposable.dispose();
82                     }
83                 }, _dueTime);
84             }
85             catch (Exception e)
86             {
87                 _observer.failure(e);
88                 _disposable.dispose();
89             }
90         }
91         else
92         {
93             _disposable.disposable = _scheduler.schedule({ .put(_observer, obj); }, _dueTime);
94         }
95     }
96 
97     void completed()
98     {
99         static if (hasCompleted!TObserver)
100         {
101             _observer.completed();
102         }
103         _disposable.dispose();
104     }
105 
106     void failure(Exception e)
107     {
108         static if (hasFailure!TObserver)
109         {
110             _observer.failure(e);
111         }
112         _disposable.dispose();
113     }
114 
115 private:
116     TObserver _observer;
117     TScheduler _scheduler;
118     Duration _dueTime;
119     SerialDisposable _disposable;
120 }
121 
122 struct DebounceObservable(TObservable, TScheduler, E)
123 {
124     alias ElementType = E;
125 public:
126     this(TObservable observable, TScheduler scheduler, Duration val)
127     {
128         _observable = observable;
129         _scheduler = scheduler;
130         _dueTime = val;
131     }
132 
133 public:
134     auto subscribe(T)(T observer)
135     {
136         alias ObserverType = DebounceObserver!(T, TScheduler, ElementType);
137         auto inner = new SerialDisposable;
138         auto outer = _observable.doSubscribe(ObserverType(observer, _scheduler, _dueTime, inner));
139         return new CompositeDisposable(disposableObject(outer), inner);
140     }
141 
142 private:
143     TObservable _observable;
144     TScheduler _scheduler;
145     Duration _dueTime;
146 }
147 
148 unittest
149 {
150     import std.array;
151     import rx.subject;
152 
153     auto s = new TaskPoolScheduler;
154     auto sub = new SubjectObject!int;
155     auto buf = appender!(int[]);
156 
157     auto d = sub.debounce(dur!"msecs"(50), s).doSubscribe(buf);
158 
159     foreach (i; 0 .. 10)
160     {
161         sub.put(i);
162     }
163     Thread.sleep(dur!"msecs"(100));
164 
165     import std.algorithm : equal;
166     import std.format : format;
167 
168     assert(equal(buf.data, [9]), "buf.data is %s".format(buf.data));
169 }
170 
171 unittest
172 {
173     import std.array;
174     import rx.subject;
175 
176     auto sub = new SubjectObject!int;
177     auto buf = appender!(int[]);
178 
179     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
180 
181     foreach (i; 0 .. 10)
182     {
183         sub.put(i);
184     }
185     Thread.sleep(dur!"msecs"(100));
186 
187     import std.algorithm : equal;
188 
189     assert(equal(buf.data, [9]));
190 }
191 
192 unittest
193 {
194     import std.array;
195     import rx.subject;
196 
197     auto sub = new SubjectObject!int;
198     auto buf = appender!(int[]);
199 
200     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
201 
202     foreach (i; 0 .. 10)
203     {
204         sub.put(i);
205     }
206     d.dispose();
207     Thread.sleep(dur!"msecs"(100));
208 
209     import std.algorithm : equal;
210 
211     assert(buf.data.length == 0);
212 }
213 
214 unittest
215 {
216     import std.array;
217     import rx.subject;
218 
219     auto sub = new SubjectObject!int;
220     auto buf = appender!(int[]);
221 
222     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
223 
224     foreach (i; 0 .. 10)
225     {
226         sub.put(i);
227     }
228     sub.completed();
229     Thread.sleep(dur!"msecs"(100));
230 
231     import std.algorithm : equal;
232 
233     assert(buf.data.length == 0);
234 }
235 
236 unittest
237 {
238     import std.array;
239     import rx.subject;
240 
241     auto sub = new SubjectObject!int;
242     auto buf = appender!(int[]);
243 
244     auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf);
245 
246     foreach (i; 0 .. 10)
247     {
248         sub.put(i);
249     }
250     sub.failure(null);
251     Thread.sleep(dur!"msecs"(100));
252 
253     import std.algorithm : equal;
254 
255     assert(buf.data.length == 0);
256 }