1 /+++++++++++++++++++++++++++++ 2 + This module defines the concept of Observer. 3 +/ 4 module rx.observer; 5 6 import std.range; 7 import std.typetuple; 8 9 ///Tests if something has completed method. 10 template hasCompleted(T) 11 { 12 //dfmt off 13 enum bool hasCompleted = is(typeof({ 14 T observer = void; 15 observer.completed(); 16 }())); 17 //dfmt on 18 } 19 /// 20 unittest 21 { 22 struct A 23 { 24 void completed(); 25 } 26 27 struct B 28 { 29 void _completed(); 30 } 31 32 static assert(hasCompleted!A); 33 static assert(!hasCompleted!B); 34 } 35 36 ///Tests if something has failure method. 37 template hasFailure(T) 38 { 39 //dfmt off 40 enum bool hasFailure = is(typeof({ 41 T observer = void; 42 Exception e = void; 43 observer.failure(e); 44 }())); 45 //dfmt on 46 } 47 /// 48 unittest 49 { 50 struct A 51 { 52 void failure(Exception e); 53 } 54 55 struct B 56 { 57 void _failure(Exception e); 58 } 59 60 struct C 61 { 62 void failure(); 63 } 64 65 static assert(hasFailure!A); 66 static assert(!hasFailure!B); 67 static assert(!hasFailure!C); 68 } 69 70 ///Tests if something is Observer. 71 template isObserver(T, E) 72 { 73 enum bool isObserver = isOutputRange!(T, E) && hasCompleted!T && hasFailure!T; 74 } 75 /// 76 unittest 77 { 78 struct TestObserver 79 { 80 void put(int n) 81 { 82 } 83 84 void completed() 85 { 86 } 87 88 void failure(Exception e) 89 { 90 } 91 } 92 93 static assert(isObserver!(TestObserver, int)); 94 } 95 96 ///Wraps completed and failure method in virtual function. 97 interface Observer(E) : OutputRange!E 98 { 99 /// 100 void completed(); 101 /// 102 void failure(Exception e); 103 } 104 /// 105 unittest 106 { 107 alias TObserver = Observer!byte; 108 static assert(isObserver!(TObserver, byte)); 109 } 110 111 ///Class that implements Observer interface and wraps the completed and failure method in virtual functions. This class extends the OutputRangeObject. 112 class ObserverObject(R, E...) : OutputRangeObject!(R, E), staticMap!(Observer, E) 113 { 114 public: 115 this(R range) 116 { 117 super(range); 118 _range = range; 119 } 120 121 public: 122 /// 123 void completed() 124 { 125 static if (hasCompleted!R) 126 { 127 _range.completed(); 128 } 129 } 130 /// 131 void failure(Exception e) 132 { 133 static if (hasFailure!R) 134 { 135 _range.failure(e); 136 } 137 } 138 139 private: 140 R _range; 141 } 142 143 ///Wraps subscribe method in virtual function. 144 template observerObject(E) 145 { 146 ObserverObject!(R, E) observerObject(R)(R range) 147 { 148 return new ObserverObject!(R, E)(range); 149 } 150 } 151 /// 152 unittest 153 { 154 struct TestObserver 155 { 156 void put(int n) 157 { 158 } 159 160 void put(Object obj) 161 { 162 } 163 } 164 165 Observer!int observer = observerObject!int(TestObserver()); 166 observer.put(0); 167 observer.completed(); 168 observer.failure(null); 169 static assert(isObserver!(typeof(observer), int)); 170 } 171 172 unittest 173 { 174 int putCount = 0; 175 int completedCount = 0; 176 int failureCount = 0; 177 178 class TestObserver : Observer!int 179 { 180 void put(int n) 181 { 182 putCount++; 183 } 184 185 void completed() 186 { 187 completedCount++; 188 } 189 190 void failure(Exception e) 191 { 192 failureCount++; 193 } 194 } 195 196 static assert(isObserver!(TestObserver, int)); 197 198 auto test = new TestObserver; 199 Observer!int observer = observerObject!int(test); 200 assert(putCount == 0); 201 observer.put(0); 202 assert(putCount == 1); 203 assert(completedCount == 0); 204 observer.completed(); 205 assert(completedCount == 1); 206 assert(failureCount == 0); 207 observer.failure(null); 208 assert(failureCount == 1); 209 } 210 211 unittest 212 { 213 int putCount = 0; 214 int completedCount = 0; 215 int failureCount = 0; 216 217 struct TestObserver 218 { 219 void put(int n) 220 { 221 putCount++; 222 } 223 224 void completed() 225 { 226 completedCount++; 227 } 228 229 void failure(Exception e) 230 { 231 failureCount++; 232 } 233 } 234 235 static assert(isObserver!(TestObserver, int)); 236 237 TestObserver test; 238 Observer!int observer = observerObject!int(test); 239 assert(putCount == 0); 240 observer.put(0); 241 assert(putCount == 1); 242 assert(completedCount == 0); 243 observer.completed(); 244 assert(completedCount == 1); 245 assert(failureCount == 0); 246 observer.failure(null); 247 assert(failureCount == 1); 248 } 249 250 unittest 251 { 252 struct TestObserver1 253 { 254 void put(int n) 255 { 256 } 257 } 258 259 struct TestObserver2 260 { 261 void put(int n) 262 { 263 } 264 265 void completed() 266 { 267 } 268 } 269 270 struct TestObserver3 271 { 272 void put(int n) 273 { 274 } 275 276 void failure(Exception e) 277 { 278 } 279 } 280 281 struct TestObserver4 282 { 283 void put(int n) 284 { 285 } 286 287 void completed() 288 { 289 } 290 291 void failure(Exception e) 292 { 293 } 294 } 295 296 Observer!int o1 = observerObject!int(TestObserver1()); 297 Observer!int o2 = observerObject!int(TestObserver2()); 298 Observer!int o3 = observerObject!int(TestObserver3()); 299 Observer!int o4 = observerObject!int(TestObserver4()); 300 301 //dfmt off 302 o1.put(0); o1.completed(); o1.failure(null); 303 o2.put(0); o2.completed(); o2.failure(null); 304 o3.put(0); o3.completed(); o3.failure(null); 305 o4.put(0); o4.completed(); o4.failure(null); 306 //dfmt on 307 } 308 309 /// 310 final class NopObserver(E) : Observer!E 311 { 312 private: 313 this() 314 { 315 } 316 317 public: 318 void put(E) 319 { 320 } 321 322 void completed() 323 { 324 } 325 326 void failure(Exception) 327 { 328 } 329 330 public: 331 /// 332 static Observer!E instance() 333 { 334 import std.concurrency : initOnce; 335 336 static __gshared NopObserver!E inst; 337 return initOnce!inst(new NopObserver!E); 338 } 339 } 340 /// 341 unittest 342 { 343 Observer!int o1 = NopObserver!int.instance; 344 Observer!int o2 = NopObserver!int.instance; 345 assert(o1 !is null); 346 assert(o1 is o2); 347 } 348 349 /// 350 final class DoneObserver(E) : Observer!E 351 { 352 private: 353 this() 354 { 355 } 356 public: 357 this(Exception e) 358 { 359 _exception = e; 360 } 361 362 public: 363 Exception exception() @property 364 { 365 return _exception; 366 } 367 void exception(Exception e) @property 368 { 369 _exception = e; 370 } 371 372 public: 373 void put(E) 374 { 375 } 376 377 void completed() 378 { 379 } 380 381 void failure(Exception) 382 { 383 } 384 385 private: 386 Exception _exception; 387 388 public: 389 static Observer!E instance() 390 { 391 import std.concurrency : initOnce; 392 393 static __gshared DoneObserver!E inst; 394 return initOnce!inst(new DoneObserver!E); 395 } 396 } 397 /// 398 unittest 399 { 400 Observer!int o1 = DoneObserver!int.instance; 401 Observer!int o2 = DoneObserver!int.instance; 402 assert(o1 !is null); 403 assert(o1 is o2); 404 } 405 406 unittest 407 { 408 auto e = new Exception("test"); 409 auto observer = new DoneObserver!int(e); 410 assert(observer.exception is e); 411 } 412 413 /// 414 public class CompositeObserver(E) : Observer!E 415 { 416 private: 417 this() 418 { 419 } 420 421 public: 422 this(Observer!E[] observers) 423 { 424 _observers = observers; 425 } 426 427 public: 428 /// 429 void put(E obj) 430 { 431 foreach (observer; _observers) 432 .put(observer, obj); 433 } 434 /// 435 void completed() 436 { 437 foreach (observer; _observers) 438 observer.completed(); 439 } 440 /// 441 void failure(Exception e) 442 { 443 foreach (observer; _observers) 444 observer.failure(e); 445 } 446 /// 447 CompositeObserver!E add(Observer!E observer) 448 { 449 return new CompositeObserver!E(_observers ~ observer); 450 } 451 /// 452 Observer!E remove(Observer!E observer) 453 { 454 import std.algorithm : countUntil; 455 456 auto i = _observers.countUntil(observer); 457 if (i < 0) 458 return this; 459 460 if (_observers.length == 1) 461 return CompositeObserver!E.empty; 462 if (_observers.length == 2) 463 return _observers[1 - i]; 464 465 return new CompositeObserver!E(_observers[0 .. i] ~ _observers[i + 1 .. $]); 466 } 467 468 public: 469 /// 470 static CompositeObserver!E empty() 471 { 472 import std.concurrency : initOnce; 473 474 static __gshared CompositeObserver!E inst; 475 return initOnce!inst(new CompositeObserver!E); 476 } 477 478 private: 479 Observer!E[] _observers; 480 } 481 /// 482 unittest 483 { 484 int count = 0; 485 struct TestObserver 486 { 487 void put(int n) 488 { 489 count++; 490 } 491 } 492 493 auto c1 = new CompositeObserver!int; 494 c1.put(0); 495 auto o1 = observerObject!int(TestObserver()); 496 auto c2 = c1.add(o1); 497 c1.put(0); 498 assert(count == 0); 499 c2.put(0); 500 assert(count == 1); 501 auto c3 = c2.add(observerObject!int(TestObserver())); 502 c3.put(0); 503 assert(count == 3); 504 auto c4 = c3.remove(o1); 505 c4.put(0); 506 assert(count == 4); 507 } 508 unittest 509 { 510 int count = 0; 511 struct TestObserver 512 { 513 void put(int n) 514 { 515 count++; 516 } 517 } 518 auto c1 = new CompositeObserver!(int[]); 519 auto c2 = c1.add(observerObject!(int[])(TestObserver())); 520 521 assert(count == 0); 522 c2.put([1, 2]); 523 assert(count == 2); 524 } 525 526 ///The helper for the own observer. 527 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted, 528 void delegate(Exception) doFailure) 529 { 530 static struct AnonymouseObserver 531 { 532 public: 533 this(void delegate(E) doPut, void delegate() doCompleted, void delegate(Exception) doFailure) 534 { 535 _doPut = doPut; 536 _doCompleted = doCompleted; 537 _doFailure = doFailure; 538 } 539 540 public: 541 void put(E obj) 542 { 543 if (_doPut !is null) 544 _doPut(obj); 545 } 546 547 void completed() 548 { 549 if (_doCompleted !is null) 550 _doCompleted(); 551 } 552 553 void failure(Exception e) 554 { 555 if (_doFailure !is null) 556 _doFailure(e); 557 } 558 559 private: 560 void delegate(E) _doPut; 561 void delegate() _doCompleted; 562 void delegate(Exception) _doFailure; 563 } 564 565 return AnonymouseObserver(doPut, doCompleted, doFailure); 566 } 567 ///ditto 568 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted) 569 { 570 static struct AnonymouseObserver 571 { 572 public: 573 this(void delegate(E) doPut, void delegate() doCompleted) 574 { 575 _doPut = doPut; 576 _doCompleted = doCompleted; 577 } 578 579 public: 580 void put(E obj) 581 { 582 if (_doPut !is null) 583 _doPut(obj); 584 } 585 586 void completed() 587 { 588 if (_doCompleted !is null) 589 _doCompleted(); 590 } 591 592 private: 593 void delegate(E) _doPut; 594 void delegate() _doCompleted; 595 } 596 597 return AnonymouseObserver(doPut, doCompleted); 598 } 599 ///ditto 600 auto makeObserver(E)(void delegate(E) doPut, void delegate(Exception) doFailure) 601 { 602 static struct AnonymouseObserver 603 { 604 public: 605 this(void delegate(E) doPut, void delegate(Exception) doFailure) 606 { 607 _doPut = doPut; 608 _doFailure = doFailure; 609 } 610 611 public: 612 void put(E obj) 613 { 614 if (_doPut !is null) 615 _doPut(obj); 616 } 617 618 void failure(Exception e) 619 { 620 if (_doFailure !is null) 621 _doFailure(e); 622 } 623 624 private: 625 void delegate(E) _doPut; 626 void delegate(Exception) _doFailure; 627 } 628 629 return AnonymouseObserver(doPut, doFailure); 630 } 631 /// 632 unittest 633 { 634 int countPut = 0; 635 int countCompleted = 0; 636 int countFailure = 0; 637 638 auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; }, (Exception) { 639 countFailure++; 640 }); 641 642 .put(observer, 0); 643 assert(countPut == 1); 644 645 observer.completed(); 646 assert(countCompleted == 1); 647 648 observer.failure(null); 649 assert(countFailure == 1); 650 } 651 652 unittest 653 { 654 int countPut = 0; 655 int countCompleted = 0; 656 657 auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; }); 658 659 .put(observer, 0); 660 assert(countPut == 1); 661 662 observer.completed(); 663 assert(countCompleted == 1); 664 665 static assert(!hasFailure!(typeof(observer))); 666 } 667 668 unittest 669 { 670 int countPut = 0; 671 int countFailure = 0; 672 673 auto observer = makeObserver((int) { countPut++; }, (Exception) { 674 countFailure++; 675 }); 676 677 .put(observer, 0); 678 assert(countPut == 1); 679 680 static assert(!hasCompleted!(typeof(observer))); 681 682 observer.failure(null); 683 assert(countFailure == 1); 684 } 685 686 package mixin template SimpleObserverImpl(TObserver, E) 687 { 688 public: 689 void put(E obj) 690 { 691 static if (hasFailure!TObserver) 692 { 693 try 694 { 695 putImpl(obj); 696 } 697 catch (Exception e) 698 { 699 _observer.failure(e); 700 _disposable.dispose(); 701 } 702 } 703 else 704 { 705 putImpl(obj); 706 } 707 } 708 709 static if (hasCompleted!TObserver) 710 { 711 void completed() 712 { 713 _observer.completed(); 714 _disposable.dispose(); 715 } 716 } 717 static if (hasFailure!TObserver) 718 { 719 void failure(Exception e) 720 { 721 _observer.failure(e); 722 _disposable.dispose(); 723 } 724 } 725 private: 726 TObserver _observer; 727 static if (hasCompleted!TObserver || hasFailure!TObserver) 728 { 729 Disposable _disposable; 730 } 731 }