1 module rx.algorithm.debounce; 2 3 import core.time; 4 import core.thread; 5 import std.range : put; 6 import rx.disposable; 7 import rx.observer; 8 import rx.observable; 9 import rx.scheduler; 10 11 //######################### 12 // Debounce 13 //######################### 14 /// 15 DebounceObservable!(T, TScheduler, T.ElementType) debounce(T, TScheduler : AsyncScheduler)( 16 T observable, Duration val, TScheduler scheduler) 17 { 18 return typeof(return)(observable, scheduler, val); 19 } 20 /// 21 DebounceObservable!(T, TaskPoolScheduler, T.ElementType) debounce(T)(T observable, Duration val) 22 { 23 return typeof(return)(observable, new TaskPoolScheduler, val); 24 } 25 /// 26 unittest 27 { 28 import rx.subject : SubjectObject; 29 import core.thread : Thread; 30 import core.time : dur; 31 32 auto obs = new SubjectObject!int; 33 34 import std.array : appender; 35 36 auto buf = appender!(int[]); 37 auto d = obs.debounce(dur!"msecs"(100), new TaskPoolScheduler).doSubscribe(buf); 38 scope (exit) 39 d.dispose(); 40 41 .put(obs, 1); 42 Thread.sleep(dur!"msecs"(200)); 43 .put(obs, 2); 44 .put(obs, 3); 45 Thread.sleep(dur!"msecs"(200)); 46 47 assert(buf.data.length == 2); 48 assert(buf.data[0] == 1); 49 assert(buf.data[1] == 3); 50 } 51 52 struct DebounceObserver(TObserver, TScheduler, E) 53 { 54 public: 55 this(TObserver observer, TScheduler scheduler, Duration val, SerialDisposable disposable) 56 { 57 _observer = observer; 58 _scheduler = scheduler; 59 _dueTime = val; 60 _disposable = disposable; 61 } 62 63 public: 64 void put(E obj) 65 { 66 static if (hasFailure!TObserver) 67 { 68 try 69 { 70 _disposable.disposable = _scheduler.schedule({ 71 try 72 { 73 .put(_observer, obj); 74 } 75 catch (Exception e) 76 { 77 _observer.failure(e); 78 _disposable.dispose(); 79 } 80 }, _dueTime); 81 } 82 catch (Exception e) 83 { 84 _observer.failure(e); 85 _disposable.dispose(); 86 } 87 } 88 else 89 { 90 _disposable.disposable = _scheduler.schedule({ .put(_observer, obj); }, _dueTime); 91 } 92 } 93 94 void completed() 95 { 96 static if (hasCompleted!TObserver) 97 { 98 _observer.completed(); 99 } 100 _disposable.dispose(); 101 } 102 103 void failure(Exception e) 104 { 105 static if (hasFailure!TObserver) 106 { 107 _observer.failure(e); 108 } 109 _disposable.dispose(); 110 } 111 112 private: 113 TObserver _observer; 114 TScheduler _scheduler; 115 Duration _dueTime; 116 SerialDisposable _disposable; 117 } 118 119 struct DebounceObservable(TObservable, TScheduler, E) 120 { 121 alias ElementType = E; 122 public: 123 this(TObservable observable, TScheduler scheduler, Duration val) 124 { 125 _observable = observable; 126 _scheduler = scheduler; 127 _dueTime = val; 128 } 129 130 public: 131 auto subscribe(T)(T observer) 132 { 133 alias ObserverType = DebounceObserver!(T, TScheduler, ElementType); 134 auto inner = new SerialDisposable; 135 auto outer = _observable.doSubscribe(ObserverType(observer, _scheduler, _dueTime, inner)); 136 return new CompositeDisposable(disposableObject(outer), inner); 137 } 138 139 private: 140 TObservable _observable; 141 TScheduler _scheduler; 142 Duration _dueTime; 143 } 144 145 unittest 146 { 147 import std.array; 148 import rx.subject; 149 150 auto s = new TaskPoolScheduler; 151 auto sub = new SubjectObject!int; 152 auto buf = appender!(int[]); 153 154 auto d = sub.debounce(dur!"msecs"(50), s).doSubscribe(buf); 155 156 foreach (i; 0 .. 10) 157 { 158 sub.put(i); 159 } 160 Thread.sleep(dur!"msecs"(100)); 161 162 import std.algorithm : equal; 163 164 assert(equal(buf.data, [9])); 165 } 166 167 unittest 168 { 169 import std.array; 170 import rx.subject; 171 172 auto sub = new SubjectObject!int; 173 auto buf = appender!(int[]); 174 175 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 176 177 foreach (i; 0 .. 10) 178 { 179 sub.put(i); 180 } 181 Thread.sleep(dur!"msecs"(100)); 182 183 import std.algorithm : equal; 184 185 assert(equal(buf.data, [9])); 186 } 187 188 unittest 189 { 190 import std.array; 191 import rx.subject; 192 193 auto sub = new SubjectObject!int; 194 auto buf = appender!(int[]); 195 196 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 197 198 foreach (i; 0 .. 10) 199 { 200 sub.put(i); 201 } 202 d.dispose(); 203 Thread.sleep(dur!"msecs"(100)); 204 205 import std.algorithm : equal; 206 207 assert(buf.data.length == 0); 208 } 209 210 unittest 211 { 212 import std.array; 213 import rx.subject; 214 215 auto sub = new SubjectObject!int; 216 auto buf = appender!(int[]); 217 218 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 219 220 foreach (i; 0 .. 10) 221 { 222 sub.put(i); 223 } 224 sub.completed(); 225 Thread.sleep(dur!"msecs"(100)); 226 227 import std.algorithm : equal; 228 229 assert(buf.data.length == 0); 230 } 231 232 unittest 233 { 234 import std.array; 235 import rx.subject; 236 237 auto sub = new SubjectObject!int; 238 auto buf = appender!(int[]); 239 240 auto d = sub.debounce(dur!"msecs"(50)).doSubscribe(buf); 241 242 foreach (i; 0 .. 10) 243 { 244 sub.put(i); 245 } 246 sub.failure(null); 247 Thread.sleep(dur!"msecs"(100)); 248 249 import std.algorithm : equal; 250 251 assert(buf.data.length == 0); 252 }