1 /+++++++++++++++++++++++++++++ 2 + This module defines algorithm 'all' 3 +/ 4 module rx.algorithm.all; 5 6 import rx.disposable; 7 import rx.observable; 8 import rx.observer; 9 import rx.util; 10 11 import std.functional : unaryFun; 12 import std.range : isOutputRange, put; 13 14 struct AllObserver(TObserver, E, alias pred = "true") 15 { 16 static assert(isOutputRange!(TObserver, bool), "TObserver must be OutputRange of bool."); 17 18 public: 19 this() @disable; 20 21 this(TObserver observer, Disposable cancel) 22 { 23 _observer = observer; 24 _cancel = cast(shared) cancel; 25 _ticket = new Ticket; 26 _hasValue = new Ticket; 27 } 28 29 public: 30 void put(E obj) 31 { 32 _hasValue.stamp(); 33 34 alias fun = unaryFun!pred; 35 static if (hasFailure!TObserver) 36 { 37 bool res = false; 38 try 39 { 40 res = fun(obj); 41 } 42 catch (Exception e) 43 { 44 if (!_ticket.stamp()) 45 return; 46 47 _observer.failure(e); 48 dispose(); 49 return; 50 } 51 52 if (!res) 53 { 54 if (!_ticket.stamp()) 55 return; 56 57 .put(_observer, false); 58 static if (hasCompleted!TObserver) 59 { 60 _observer.completed(); 61 } 62 63 dispose(); 64 } 65 } 66 else 67 { 68 if (!fun(obj)) 69 { 70 if (!_ticket.stamp()) 71 return; 72 73 .put(_observer, false); 74 static if (hasCompleted!TObserver) 75 { 76 _observer.completed(); 77 } 78 79 dispose(); 80 } 81 } 82 } 83 84 void failure(Exception e) 85 { 86 if (!_ticket.stamp()) 87 return; 88 89 static if (hasFailure!TObserver) 90 { 91 _observer.failure(e); 92 } 93 94 dispose(); 95 } 96 97 void completed() 98 { 99 if (!_ticket.stamp()) 100 return; 101 102 .put(_observer, _hasValue.isStamped); 103 static if (hasCompleted!TObserver) 104 { 105 _observer.completed(); 106 } 107 108 dispose(); 109 } 110 111 void dispose() 112 { 113 auto cancel = assumeThreadLocal(exchange(_cancel, null)); 114 if (cancel !is null) 115 cancel.dispose(); 116 } 117 118 private: 119 TObserver _observer; 120 shared(Disposable) _cancel; 121 Ticket _ticket; 122 Ticket _hasValue; 123 } 124 125 unittest 126 { 127 static assert(!__traits(compiles, { 128 AllObserver!(Observer!string, int) observer; 129 })); 130 } 131 132 unittest 133 { 134 alias TObserver = AllObserver!(Observer!bool, string); 135 136 static assert(isOutputRange!(TObserver, string)); 137 static assert(hasFailure!(TObserver)); 138 static assert(hasCompleted!(TObserver)); 139 } 140 141 unittest 142 { 143 alias TObserver = AllObserver!(Observer!bool, string); 144 145 static class CounterObserver : Observer!bool 146 { 147 void put(bool obj) 148 { 149 putCount++; 150 lastValue = obj; 151 } 152 153 void failure(Exception e) 154 { 155 failureCount++; 156 lastException = e; 157 } 158 159 void completed() 160 { 161 completedCount++; 162 } 163 164 size_t putCount = 0; 165 size_t failureCount = 0; 166 size_t completedCount = 0; 167 bool lastValue; 168 Exception lastException; 169 } 170 171 static class CounterDisposable : Disposable 172 { 173 void dispose() 174 { 175 disposeCount++; 176 } 177 178 size_t disposeCount = 0; 179 } 180 181 { 182 auto counterObserver = new CounterObserver; 183 auto counterDisposable = new CounterDisposable; 184 auto observer = TObserver(counterObserver, counterDisposable); 185 186 .put(observer, "TEST"); 187 observer.completed(); 188 assert(counterObserver.putCount == 1); 189 assert(counterObserver.lastValue == true); 190 assert(counterObserver.completedCount == 1); 191 assert(counterDisposable.disposeCount == 1); 192 } 193 194 { 195 auto counterObserver = new CounterObserver; 196 auto counterDisposable = new CounterDisposable; 197 auto observer = TObserver(counterObserver, counterDisposable); 198 199 observer.completed(); 200 assert(counterObserver.putCount == 1); 201 assert(counterObserver.lastValue == false); 202 assert(counterObserver.completedCount == 1); 203 assert(counterDisposable.disposeCount == 1); 204 } 205 206 { 207 auto counterObserver = new CounterObserver; 208 auto counterDisposable = new CounterDisposable; 209 auto observer = TObserver(counterObserver, counterDisposable); 210 211 auto e = new Exception("MyException"); 212 observer.failure(e); 213 assert(counterObserver.putCount == 0); 214 assert(counterObserver.failureCount == 1); 215 assert(counterObserver.lastException is e); 216 assert(counterDisposable.disposeCount == 1); 217 } 218 } 219 220 unittest 221 { 222 alias TObserver = AllObserver!(Observer!bool, int, "a % 2 == 0"); 223 224 static class CounterObserver : Observer!bool 225 { 226 void put(bool obj) 227 { 228 putCount++; 229 lastValue = obj; 230 } 231 232 void failure(Exception e) 233 { 234 failureCount++; 235 lastException = e; 236 } 237 238 void completed() 239 { 240 completedCount++; 241 } 242 243 size_t putCount = 0; 244 size_t failureCount = 0; 245 size_t completedCount = 0; 246 bool lastValue; 247 Exception lastException; 248 } 249 250 static class CounterDisposable : Disposable 251 { 252 void dispose() 253 { 254 disposeCount++; 255 } 256 257 size_t disposeCount = 0; 258 } 259 260 { 261 auto counterObserver = new CounterObserver; 262 auto counterDisposable = new CounterDisposable; 263 auto observer = TObserver(counterObserver, counterDisposable); 264 265 .put(observer, 0); 266 observer.completed(); 267 assert(counterObserver.putCount == 1); 268 assert(counterObserver.lastValue == true); 269 assert(counterObserver.completedCount == 1); 270 assert(counterDisposable.disposeCount == 1); 271 } 272 273 { 274 auto counterObserver = new CounterObserver; 275 auto counterDisposable = new CounterDisposable; 276 auto observer = TObserver(counterObserver, counterDisposable); 277 278 .put(observer, 1); 279 observer.completed(); 280 assert(counterObserver.putCount == 1); 281 assert(counterObserver.lastValue == false); 282 assert(counterObserver.completedCount == 1); 283 assert(counterDisposable.disposeCount == 1); 284 } 285 } 286 287 unittest 288 { 289 bool testThrow(int) 290 { 291 throw new Exception("MyException"); 292 } 293 294 alias TObserver = AllObserver!(Observer!bool, int, testThrow); 295 296 static class CounterObserver : Observer!bool 297 { 298 void put(bool obj) 299 { 300 putCount++; 301 lastValue = obj; 302 } 303 304 void failure(Exception e) 305 { 306 failureCount++; 307 lastException = e; 308 } 309 310 void completed() 311 { 312 completedCount++; 313 } 314 315 size_t putCount = 0; 316 size_t failureCount = 0; 317 size_t completedCount = 0; 318 bool lastValue; 319 Exception lastException; 320 } 321 322 static class CounterDisposable : Disposable 323 { 324 void dispose() 325 { 326 disposeCount++; 327 } 328 329 size_t disposeCount = 0; 330 } 331 332 { 333 auto counterObserver = new CounterObserver; 334 auto counterDisposable = new CounterDisposable; 335 auto observer = TObserver(counterObserver, counterDisposable); 336 337 .put(observer, 0); 338 observer.completed(); 339 assert(counterObserver.putCount == 0); 340 assert(counterObserver.failureCount == 1); 341 assert(counterObserver.completedCount == 0); 342 assert(counterObserver.lastException.msg == "MyException"); 343 assert(counterDisposable.disposeCount == 1); 344 } 345 } 346 347 unittest 348 { 349 import std.array : Appender, appender; 350 351 alias TObserver = AllObserver!(Appender!(bool[]), int); 352 353 auto buf = appender!(bool[]); 354 auto observer = TObserver(buf, NopDisposable.instance); 355 356 .put(observer, 0); 357 observer.completed(); 358 359 assert(buf.data.length == 1); 360 assert(buf.data[0] == true); 361 } 362 363 struct AllObservable(TObservable, alias pred = "true") 364 { 365 alias ElementType = bool; 366 367 public: 368 this(TObservable observable) 369 { 370 _observable = observable; 371 } 372 373 public: 374 Disposable subscribe(TObserver)(auto ref TObserver observer) 375 { 376 alias ObserverType = AllObserver!(TObserver, TObservable.ElementType, pred); 377 378 auto subscription = new SingleAssignmentDisposable; 379 subscription.setDisposable(disposableObject(_observable.doSubscribe(ObserverType(observer, 380 subscription)))); 381 return subscription; 382 } 383 384 private: 385 TObservable _observable; 386 } 387 388 unittest 389 { 390 alias TObservable = AllObservable!(Observable!int); 391 392 static assert(isObservable!(TObservable, bool)); 393 394 import rx.subject : SubjectObject; 395 396 auto sub = new SubjectObject!int; 397 398 import std.array : appender; 399 400 auto buf = appender!(bool[]); 401 402 auto observable = TObservable(sub); 403 auto d = observable.subscribe(buf); 404 405 sub.put(0); 406 sub.completed(); 407 assert(buf.data.length == 1); 408 assert(buf.data[0] == true); 409 } 410 411 /// 412 template all(alias pred = "true") 413 { 414 AllObservable!(TObservable, pred) all(TObservable)(auto ref TObservable observable) 415 { 416 return typeof(return)(observable); 417 } 418 } 419 /// 420 unittest 421 { 422 import rx.subject : SubjectObject; 423 424 auto sub = new SubjectObject!int; 425 426 bool result = false; 427 sub.all!"a % 2 == 0"().doSubscribe((bool res) { result = res; }); 428 429 sub.put(0); 430 sub.completed(); 431 assert(result); 432 } 433 434 unittest 435 { 436 import rx.subject : SubjectObject; 437 438 auto sub = new SubjectObject!int; 439 440 bool result = false; 441 sub.all!().doSubscribe((bool res) { result = res; }); 442 443 sub.put(0); 444 sub.completed(); 445 assert(result); 446 } 447 448 /// 449 AllObservable!TObservable all(TObservable)(auto ref TObservable observable) 450 { 451 return typeof(return)(observable); 452 } 453 /// 454 unittest 455 { 456 import rx.subject : SubjectObject; 457 458 auto sub = new SubjectObject!int; 459 460 bool result = false; 461 sub.all().doSubscribe((bool res) { result = res; }); 462 463 sub.put(0); 464 sub.completed(); 465 assert(result); 466 } 467 468 unittest 469 { 470 import rx.subject : SubjectObject; 471 472 auto sub = new SubjectObject!int; 473 474 import std.array : appender; 475 476 auto buf = appender!(bool[]); 477 478 auto d = sub.all!(a => a % 2 == 0).doSubscribe(buf); 479 480 assert(buf.data.length == 0); 481 sub.put(1); 482 assert(buf.data.length == 1); 483 assert(buf.data[0] == false); 484 } 485 486 unittest 487 { 488 import rx.subject : SubjectObject; 489 490 auto sub = new SubjectObject!int; 491 492 bool[] result; 493 auto d = sub.all!(a => a % 2 == 0).doSubscribe!(b => result ~= b); 494 495 assert(result.length == 0); 496 sub.put(1); 497 assert(result.length == 1); 498 assert(result[0] == false); 499 }