1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'any' 3 +/ 4 module rx.algorithm.any; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 11 import std.functional : unaryFun; 12 import std.range : put; 13 14 struct AnyObserver(TObserver, E, alias pred = "true") 15 { 16 this() @disable; 17 this(TObserver observer, Disposable cancel) 18 { 19 _observer = observer; 20 _cancel = cancel; 21 _ticket = new Ticket; 22 } 23 24 void put(E obj) 25 { 26 alias fun = unaryFun!pred; 27 if (fun(obj)) 28 { 29 if (!_ticket.stamp()) 30 return; 31 32 _observer.put(true); 33 _cancel.dispose(); 34 } 35 } 36 37 void failure(Exception) 38 { 39 if (!_ticket.stamp()) 40 return; 41 42 _observer.put(false); 43 44 static if (hasCompleted!TObserver) 45 { 46 _observer.completed(); 47 } 48 _cancel.dispose(); 49 } 50 51 void completed() 52 { 53 if (!_ticket.stamp()) 54 return; 55 56 _observer.put(false); 57 static if (hasCompleted!TObserver) 58 { 59 _observer.completed(); 60 } 61 _cancel.dispose(); 62 } 63 64 private: 65 TObserver _observer; 66 Disposable _cancel; 67 Ticket _ticket; 68 } 69 70 unittest 71 { 72 import std.array : Appender; 73 74 alias Buffer = Appender!(bool[]); 75 alias TObserver = AnyObserver!(Buffer, int); 76 assert(isObserver!(TObserver, int)); 77 } 78 79 unittest 80 { 81 import std.array : Appender, appender; 82 83 alias TObserver = AnyObserver!(Appender!(bool[]), int); 84 auto buf = appender!(bool[]); 85 auto a = TObserver(buf, NopDisposable.instance); 86 87 assert(buf.data.length == 0); 88 a.put(0); 89 assert(buf.data.length == 1); 90 assert(buf.data[0] == true); 91 a.put(0); 92 assert(buf.data.length == 1); 93 94 auto b = a; 95 b.put(1); 96 assert(buf.data.length == 1); 97 } 98 99 unittest 100 { 101 import std.array : Appender, appender; 102 103 alias TObserver = AnyObserver!(Appender!(bool[]), int); 104 105 auto buf = appender!(bool[]); 106 { 107 auto a = TObserver(buf, NopDisposable.instance); 108 assert(buf.data.length == 0); 109 a.failure(null); 110 assert(buf.data.length == 1); 111 assert(buf.data[0] == false); 112 } 113 buf.clear(); 114 { 115 auto a = TObserver(buf, NopDisposable.instance); 116 assert(buf.data.length == 0); 117 a.completed(); 118 assert(buf.data.length == 1); 119 assert(buf.data[0] == false); 120 } 121 } 122 123 unittest 124 { 125 import std.array : Appender, appender; 126 127 alias TObserver = AnyObserver!(Appender!(bool[]), int, "a % 2 == 0"); 128 129 auto buf = appender!(bool[]); 130 auto a = TObserver(buf, NopDisposable.instance); 131 132 assert(buf.data.length == 0); 133 a.put(1); 134 assert(buf.data.length == 0); 135 a.put(2); 136 assert(buf.data.length == 1); 137 assert(buf.data[0] == true); 138 } 139 140 struct AnyObservable(TObservable, alias pred = "true") 141 { 142 alias ElementType = bool; 143 144 this(TObservable observable) 145 { 146 _observable = observable; 147 } 148 149 Disposable subscribe(TObserver)(auto ref TObserver observer) 150 { 151 alias ObserverType = AnyObserver!(TObserver, TObservable.ElementType, pred); 152 153 auto subscription = new SingleAssignmentDisposable; 154 subscription.setDisposable(disposableObject(_observable.doSubscribe(ObserverType(observer, 155 subscription)))); 156 return subscription; 157 } 158 159 private: 160 TObservable _observable; 161 } 162 163 unittest 164 { 165 alias TObservable = AnyObservable!(Observable!int); 166 167 import rx.subject : SubjectObject; 168 169 auto sub = new SubjectObject!int; 170 auto o1 = TObservable(sub); 171 172 import std.array : appender; 173 174 auto buf = appender!(bool[]); 175 auto d = o1.subscribe(buf); 176 177 assert(buf.data.length == 0); 178 sub.put(1); 179 assert(buf.data.length == 1); 180 assert(buf.data[0] == true); 181 182 d.dispose(); 183 } 184 185 unittest 186 { 187 alias TObservable = AnyObservable!(Observable!int, "a % 2 == 0"); 188 189 import rx.subject : SubjectObject; 190 191 auto sub = new SubjectObject!int; 192 auto o1 = TObservable(sub); 193 194 import std.array : appender; 195 196 auto buf = appender!(bool[]); 197 auto d = o1.subscribe(buf); 198 199 assert(buf.data.length == 0); 200 sub.put(1); 201 assert(buf.data.length == 0); 202 sub.put(2); 203 assert(buf.data.length == 1); 204 assert(buf.data[0] == true); 205 206 d.dispose(); 207 } 208 209 unittest 210 { 211 alias TObservable = AnyObservable!(Observable!int, a => a % 3 == 0); 212 213 import rx.subject : SubjectObject; 214 215 auto sub = new SubjectObject!int; 216 auto o1 = TObservable(sub); 217 218 import std.array : appender; 219 220 auto buf = appender!(bool[]); 221 auto d = o1.subscribe(buf); 222 223 assert(buf.data.length == 0); 224 sub.put(1); 225 assert(buf.data.length == 0); 226 sub.put(3); 227 assert(buf.data.length == 1); 228 assert(buf.data[0] == true); 229 230 d.dispose(); 231 } 232 233 unittest 234 { 235 alias TObservable = AnyObservable!(Observable!int); 236 237 import rx.subject : SubjectObject; 238 239 auto sub = new SubjectObject!int; 240 auto o1 = TObservable(sub); 241 242 import std.array : appender; 243 244 auto buf = appender!(bool[]); 245 auto d = o1.subscribe(buf); 246 247 assert(buf.data.length == 0); 248 sub.failure(null); 249 assert(buf.data.length == 1); 250 assert(buf.data[0] == false); 251 252 sub.put(0); 253 assert(buf.data.length == 1); 254 255 d.dispose(); 256 } 257 258 unittest 259 { 260 alias TObservable = AnyObservable!(Observable!int); 261 262 import rx.subject : SubjectObject; 263 264 auto sub = new SubjectObject!int; 265 auto o1 = TObservable(sub); 266 267 import std.array : appender; 268 269 auto buf = appender!(bool[]); 270 auto d = o1.subscribe(buf); 271 272 assert(buf.data.length == 0); 273 sub.completed(); 274 assert(buf.data.length == 1); 275 assert(buf.data[0] == false); 276 277 sub.put(0); 278 assert(buf.data.length == 1); 279 280 d.dispose(); 281 } 282 283 unittest 284 { 285 alias TObservable = AnyObservable!(Observable!int); 286 287 import rx.subject : SubjectObject; 288 289 auto sub = new SubjectObject!int; 290 auto observable = TObservable(sub); 291 292 import std.array : appender; 293 294 auto buf = appender!(bool[]); 295 auto d = observable.subscribe(buf); 296 297 d.dispose(); 298 299 assert(buf.data.length == 0); 300 sub.put(0); 301 assert(buf.data.length == 0); 302 } 303 304 /// 305 template any(alias pred = "true") 306 { 307 AnyObservable!(TObservable, pred) any(TObservable)(auto ref TObservable observable) 308 { 309 return typeof(return)(observable); 310 } 311 } 312 /// 313 unittest 314 { 315 import rx.subject : SubjectObject; 316 317 auto sub = new SubjectObject!int; 318 319 bool result = false; 320 sub.any!("a % 2 == 0").doSubscribe((bool) { result = true; }); 321 322 assert(result == false); 323 sub.put(1); 324 assert(result == false); 325 sub.put(0); 326 assert(result == true); 327 } 328 329 unittest 330 { 331 import rx.subject : SubjectObject; 332 333 auto sub = new SubjectObject!int; 334 335 bool result = false; 336 sub.any!().doSubscribe((bool) { result = true; }); 337 338 assert(result == false); 339 sub.put(1); 340 assert(result == true); 341 } 342 343 /// 344 AnyObservable!TObservable any(TObservable)(auto ref TObservable observable) 345 { 346 return typeof(return)(observable); 347 } 348 /// 349 unittest 350 { 351 import rx.subject : SubjectObject; 352 353 auto sub = new SubjectObject!int; 354 355 bool result = false; 356 sub.any().doSubscribe((bool) { result = true; }); 357 358 assert(result == false); 359 sub.put(1); 360 assert(result == true); 361 } 362 363 unittest 364 { 365 import rx.algorithm : filter; 366 import rx.subject : SubjectObject; 367 368 auto sub = new SubjectObject!int; 369 370 bool result = true; 371 sub.filter!"a % 2 == 0"().any().doSubscribe((bool t) { result = t; }); 372 373 assert(result == true); 374 sub.completed(); 375 assert(result == false); 376 } 377 378 unittest 379 { 380 import rx.algorithm : filter; 381 import rx.subject : SubjectObject; 382 383 auto sub = new SubjectObject!int; 384 385 bool result = true; 386 sub.filter!"a % 2 == 0"().any().doSubscribe!(t => result = t); 387 388 assert(result == true); 389 sub.completed(); 390 assert(result == false); 391 }