1 module rx.algorithm.debounce; 2 3 import core.time; 4 import core.thread; 5 import std.range : put; 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.scheduler; 10 11 //######################### 12 // Debounce 13 //######################### 14 /// 15 DebounceObservable!(T, TScheduler, T.ElementType) debounce(T, TScheduler : AsyncScheduler)( 16 T observable, Duration val, TScheduler scheduler) 17 { 18 return typeof(return)(observable, scheduler, val); 19 } 20 /// 21 DebounceObservable!(T, TaskPoolScheduler, T.ElementType) debounce(T)(T observable, Duration val) 22 { 23 return typeof(return)(observable, new TaskPoolScheduler, val); 24 } 25 /// 26 unittest 27 { 28 import rx.subject : SubjectObject; 29 import core.thread : Thread; 30 import core.time : dur; 31 32 auto obs = new SubjectObject!int; 33 34 import std.array : appender; 35 36 auto buf = appender!(int[]); 37 auto d = obs.debounce(dur!"msecs"(100), new TaskPoolScheduler).doSubscribe(buf); 38 scope (exit) 39 d.dispose(); 40 41 .put(obs, 1); 42 Thread.sleep(dur!"msecs"(200)); 43 .put(obs, 2); 44 .put(obs, 3); 45 Thread.sleep(dur!"msecs"(200)); 46 47 assert(buf.data.length == 2); 48 assert(buf.data[0] == 1); 49 assert(buf.data[1] == 3); 50 } 51 52 struct DebounceObserver(TObserver, TScheduler, E) 53 { 54 public: 55 this(TObserver observer, TScheduler scheduler, Duration val, SerialDisposable disposable) 56 { 57 _observer = observer; 58 _scheduler = scheduler; 59 _dueTime = val; 60 _disposable = disposable; 61 } 62 63 public: 64 void put(E obj) 65 { 66 static if (hasFailure!TObserver) 67 { 68 try 69 { 70 _disposable.disposable = _scheduler.schedule({ 71 try 72 { 73 .put(_observer, obj); 74 } 75 catch (Exception e) 76 { 77 _observer.failure(e); 78 _disposable.dispose(); 79 } 80 }, _dueTime); 81 } 82 catch (Exception e) 83 { 84 _observer.failure(e); 85 _disposable.dispose(); 86 } 87 } 88 else 89 { 90 _disposable.disposable = _scheduler.schedule({ .put(_observer, obj); }, _dueTime); 91 } 92 } 93 94 void completed() 95 { 96 static if (hasCompleted!TObserver) 97 { 98 _observer.completed(); 99 } 100 _disposable.dispose(); 101 } 102 103 void failure(Exception e) 104 { 105 static if (hasFailure!TObserver) 106 { 107 _observer.failure(e); 108 } 109 _disposable.dispose(); 110 } 111 112 private: 113 TObserver _observer; 114 TScheduler _scheduler; 115 Duration _dueTime; 116 SerialDisposable _disposable; 117 } 118 119 struct DebounceObservable(TObservable, TScheduler, E) 120 { 121 alias ElementType = E; 122 public: 123 this(TObservable observable, TScheduler scheduler, Duration val) 124 { 125 _observable = observable; 126 _scheduler = scheduler; 127 _dueTime = val; 128 } 129 130 public: 131 auto subscribe(T)(T observer) 132 { 133 alias ObserverType = DebounceObserver!(T, TScheduler, ElementType); 134 auto inner = new SerialDisposable; 135 auto outer = _observable.doSubscribe(ObserverType(observer, _scheduler, _dueTime, inner)); 136 return new CompositeDisposable(disposableObject(outer), inner); 137 } 138 139 private: 140 TObservable _observable; 141 TScheduler _scheduler; 142 Duration _dueTime; 143 } 144 145 unittest 146 { 147 import std.array; 148 import rx.subject; 149 150 auto s = new TaskPoolScheduler; 151 auto sub = new SubjectObject!int; 152 auto buf = appender!(int[]); 153 154 auto d = sub.debounce(dur!"msecs"(50), s).doSubscribe(buf); 155 156 foreach (i; 0 .. 10) 157 { 158 sub.put(i); 159 } 160 Thread.sleep(dur!"msecs"(100)); 161 162 import std.algorithm : equal; 163 import std.format : format; 164 165 assert(equal(buf.data, [9]), "buf.data is %s".format(buf.data)); 166 } 167 168 unittest 169 { 170 import std.array; 171 import rx.subject; 172 173 auto sub = new SubjectObject!int; 174 auto buf = appender!(int[]); 175 176 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 177 178 foreach (i; 0 .. 10) 179 { 180 sub.put(i); 181 } 182 Thread.sleep(dur!"msecs"(100)); 183 184 import std.algorithm : equal; 185 186 assert(equal(buf.data, [9])); 187 } 188 189 unittest 190 { 191 import std.array; 192 import rx.subject; 193 194 auto sub = new SubjectObject!int; 195 auto buf = appender!(int[]); 196 197 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 198 199 foreach (i; 0 .. 10) 200 { 201 sub.put(i); 202 } 203 d.dispose(); 204 Thread.sleep(dur!"msecs"(100)); 205 206 import std.algorithm : equal; 207 208 assert(buf.data.length == 0); 209 } 210 211 unittest 212 { 213 import std.array; 214 import rx.subject; 215 216 auto sub = new SubjectObject!int; 217 auto buf = appender!(int[]); 218 219 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 220 221 foreach (i; 0 .. 10) 222 { 223 sub.put(i); 224 } 225 sub.completed(); 226 Thread.sleep(dur!"msecs"(100)); 227 228 import std.algorithm : equal; 229 230 assert(buf.data.length == 0); 231 } 232 233 unittest 234 { 235 import std.array; 236 import rx.subject; 237 238 auto sub = new SubjectObject!int; 239 auto buf = appender!(int[]); 240 241 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 242 243 foreach (i; 0 .. 10) 244 { 245 sub.put(i); 246 } 247 sub.failure(null); 248 Thread.sleep(dur!"msecs"(100)); 249 250 import std.algorithm : equal; 251 252 assert(buf.data.length == 0); 253 }