1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'debounce' 3 +/ 4 module rx.algorithm.debounce; 5 6 import core.time; 7 import core.thread; 8 import std.range : put; 9 import rx.disposable; 10 import rx.observer; 11 import rx.observable; 12 import rx.scheduler; 13 14 //######################### 15 // Debounce 16 //######################### 17 /// 18 DebounceObservable!(T, TScheduler, T.ElementType) debounce(T, TScheduler : Scheduler)( 19 T observable, Duration val, TScheduler scheduler) 20 { 21 return typeof(return)(observable, scheduler, val); 22 } 23 /// 24 DebounceObservable!(T, TaskPoolScheduler, T.ElementType) debounce(T)(T observable, Duration val) 25 { 26 return typeof(return)(observable, new TaskPoolScheduler, val); 27 } 28 /// 29 unittest 30 { 31 import rx.subject : SubjectObject; 32 import core.thread : Thread; 33 import core.time : dur; 34 35 auto obs = new SubjectObject!int; 36 37 import std.array : appender; 38 39 auto buf = appender!(int[]); 40 auto d = obs.debounce(dur!"msecs"(100), new TaskPoolScheduler).doSubscribe(buf); 41 scope (exit) 42 d.dispose(); 43 44 .put(obs, 1); 45 Thread.sleep(dur!"msecs"(200)); 46 .put(obs, 2); 47 .put(obs, 3); 48 Thread.sleep(dur!"msecs"(200)); 49 50 assert(buf.data.length == 2); 51 assert(buf.data[0] == 1); 52 assert(buf.data[1] == 3); 53 } 54 55 struct DebounceObserver(TObserver, TScheduler, E) 56 { 57 public: 58 this(TObserver observer, TScheduler scheduler, Duration val, SerialDisposable disposable) 59 { 60 _observer = observer; 61 _scheduler = scheduler; 62 _dueTime = val; 63 _disposable = disposable; 64 } 65 66 public: 67 void put(E obj) 68 { 69 static if (hasFailure!TObserver) 70 { 71 try 72 { 73 _disposable.disposable = _scheduler.schedule({ 74 try 75 { 76 .put(_observer, obj); 77 } 78 catch (Exception e) 79 { 80 _observer.failure(e); 81 _disposable.dispose(); 82 } 83 }, _dueTime); 84 } 85 catch (Exception e) 86 { 87 _observer.failure(e); 88 _disposable.dispose(); 89 } 90 } 91 else 92 { 93 _disposable.disposable = _scheduler.schedule({ .put(_observer, obj); }, _dueTime); 94 } 95 } 96 97 void completed() 98 { 99 static if (hasCompleted!TObserver) 100 { 101 _observer.completed(); 102 } 103 _disposable.dispose(); 104 } 105 106 void failure(Exception e) 107 { 108 static if (hasFailure!TObserver) 109 { 110 _observer.failure(e); 111 } 112 _disposable.dispose(); 113 } 114 115 private: 116 TObserver _observer; 117 TScheduler _scheduler; 118 Duration _dueTime; 119 SerialDisposable _disposable; 120 } 121 122 struct DebounceObservable(TObservable, TScheduler, E) 123 { 124 alias ElementType = E; 125 public: 126 this(TObservable observable, TScheduler scheduler, Duration val) 127 { 128 _observable = observable; 129 _scheduler = scheduler; 130 _dueTime = val; 131 } 132 133 public: 134 auto subscribe(T)(T observer) 135 { 136 alias ObserverType = DebounceObserver!(T, TScheduler, ElementType); 137 auto inner = new SerialDisposable; 138 auto outer = _observable.doSubscribe(ObserverType(observer, _scheduler, _dueTime, inner)); 139 return new CompositeDisposable(disposableObject(outer), inner); 140 } 141 142 private: 143 TObservable _observable; 144 TScheduler _scheduler; 145 Duration _dueTime; 146 } 147 148 unittest 149 { 150 import std.array; 151 import rx.subject; 152 153 auto s = new TaskPoolScheduler; 154 auto sub = new SubjectObject!int; 155 auto buf = appender!(int[]); 156 157 auto d = sub.debounce(dur!"msecs"(50), s).doSubscribe(buf); 158 159 foreach (i; 0 .. 10) 160 { 161 sub.put(i); 162 } 163 Thread.sleep(dur!"msecs"(100)); 164 165 import std.algorithm : equal; 166 import std.format : format; 167 168 assert(equal(buf.data, [9]), "buf.data is %s".format(buf.data)); 169 } 170 171 unittest 172 { 173 import std.array; 174 import rx.subject; 175 176 auto sub = new SubjectObject!int; 177 auto buf = appender!(int[]); 178 179 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 180 181 foreach (i; 0 .. 10) 182 { 183 sub.put(i); 184 } 185 Thread.sleep(dur!"msecs"(100)); 186 187 import std.algorithm : equal; 188 189 assert(equal(buf.data, [9])); 190 } 191 192 unittest 193 { 194 import std.array; 195 import rx.subject; 196 197 auto sub = new SubjectObject!int; 198 auto buf = appender!(int[]); 199 200 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 201 202 foreach (i; 0 .. 10) 203 { 204 sub.put(i); 205 } 206 d.dispose(); 207 Thread.sleep(dur!"msecs"(100)); 208 209 import std.algorithm : equal; 210 211 assert(buf.data.length == 0); 212 } 213 214 unittest 215 { 216 import std.array; 217 import rx.subject; 218 219 auto sub = new SubjectObject!int; 220 auto buf = appender!(int[]); 221 222 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 223 224 foreach (i; 0 .. 10) 225 { 226 sub.put(i); 227 } 228 sub.completed(); 229 Thread.sleep(dur!"msecs"(100)); 230 231 import std.algorithm : equal; 232 233 assert(buf.data.length == 0); 234 } 235 236 unittest 237 { 238 import std.array; 239 import rx.subject; 240 241 auto sub = new SubjectObject!int; 242 auto buf = appender!(int[]); 243 244 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 245 246 foreach (i; 0 .. 10) 247 { 248 sub.put(i); 249 } 250 sub.failure(null); 251 Thread.sleep(dur!"msecs"(100)); 252 253 import std.algorithm : equal; 254 255 assert(buf.data.length == 0); 256 }