1 /+++++++++++++++++++++++++++++ 2 + This module defines the concept of Observer. 3 +/ 4 module rx.observer; 5 6 import std.range; 7 import std.typetuple; 8 9 ///Tests if something has completed method. 10 template hasCompleted(T) 11 { 12 //dfmt off 13 enum bool hasCompleted = is(typeof({ 14 T observer = void; 15 observer.completed(); 16 }())); 17 //dfmt on 18 } 19 /// 20 unittest 21 { 22 struct A 23 { 24 void completed(); 25 } 26 27 struct B 28 { 29 void _completed(); 30 } 31 32 static assert(hasCompleted!A); 33 static assert(!hasCompleted!B); 34 } 35 36 ///Tests if something has failure method. 37 template hasFailure(T) 38 { 39 //dfmt off 40 enum bool hasFailure = is(typeof({ 41 T observer = void; 42 Exception e = void; 43 observer.failure(e); 44 }())); 45 //dfmt on 46 } 47 /// 48 unittest 49 { 50 struct A 51 { 52 void failure(Exception e); 53 } 54 55 struct B 56 { 57 void _failure(Exception e); 58 } 59 60 struct C 61 { 62 void failure(); 63 } 64 65 static assert(hasFailure!A); 66 static assert(!hasFailure!B); 67 static assert(!hasFailure!C); 68 } 69 70 ///Tests if something is Observer. 71 template isObserver(T, E) 72 { 73 enum bool isObserver = isOutputRange!(T, E) && hasCompleted!T && hasFailure!T; 74 } 75 /// 76 unittest 77 { 78 struct TestObserver 79 { 80 void put(int n) 81 { 82 } 83 84 void completed() 85 { 86 } 87 88 void failure(Exception e) 89 { 90 } 91 } 92 93 static assert(isObserver!(TestObserver, int)); 94 } 95 96 ///Wraps completed and failure method in virtual function. 97 interface Observer(E) : OutputRange!E 98 { 99 /// 100 void completed(); 101 /// 102 void failure(Exception e); 103 } 104 /// 105 unittest 106 { 107 alias TObserver = Observer!byte; 108 static assert(isObserver!(TObserver, byte)); 109 } 110 111 ///Class that implements Observer interface and wraps the completed and failure method in virtual functions. This class extends the OutputRangeObject. 112 class ObserverObject(R, E...) : OutputRangeObject!(R, E), staticMap!(Observer, E) 113 { 114 public: 115 this(R range) 116 { 117 super(range); 118 _range = range; 119 } 120 121 public: 122 /// 123 void completed() 124 { 125 static if (hasCompleted!R) 126 { 127 _range.completed(); 128 } 129 } 130 /// 131 void failure(Exception e) 132 { 133 static if (hasFailure!R) 134 { 135 _range.failure(e); 136 } 137 } 138 139 private: 140 R _range; 141 } 142 143 ///Wraps subscribe method in virtual function. 144 template observerObject(E) 145 { 146 ObserverObject!(R, E) observerObject(R)(R range) 147 { 148 return new ObserverObject!(R, E)(range); 149 } 150 } 151 /// 152 unittest 153 { 154 struct TestObserver 155 { 156 void put(int n) 157 { 158 } 159 160 void put(Object obj) 161 { 162 } 163 } 164 165 Observer!int observer = observerObject!int(TestObserver()); 166 observer.put(0); 167 observer.completed(); 168 observer.failure(null); 169 static assert(isObserver!(typeof(observer), int)); 170 } 171 172 unittest 173 { 174 int putCount = 0; 175 int completedCount = 0; 176 int failureCount = 0; 177 178 class TestObserver : Observer!int 179 { 180 void put(int n) 181 { 182 putCount++; 183 } 184 185 void completed() 186 { 187 completedCount++; 188 } 189 190 void failure(Exception e) 191 { 192 failureCount++; 193 } 194 } 195 196 static assert(isObserver!(TestObserver, int)); 197 198 auto test = new TestObserver; 199 Observer!int observer = observerObject!int(test); 200 assert(putCount == 0); 201 observer.put(0); 202 assert(putCount == 1); 203 assert(completedCount == 0); 204 observer.completed(); 205 assert(completedCount == 1); 206 assert(failureCount == 0); 207 observer.failure(null); 208 assert(failureCount == 1); 209 } 210 211 unittest 212 { 213 int putCount = 0; 214 int completedCount = 0; 215 int failureCount = 0; 216 217 struct TestObserver 218 { 219 void put(int n) 220 { 221 putCount++; 222 } 223 224 void completed() 225 { 226 completedCount++; 227 } 228 229 void failure(Exception e) 230 { 231 failureCount++; 232 } 233 } 234 235 static assert(isObserver!(TestObserver, int)); 236 237 TestObserver test; 238 Observer!int observer = observerObject!int(test); 239 assert(putCount == 0); 240 observer.put(0); 241 assert(putCount == 1); 242 assert(completedCount == 0); 243 observer.completed(); 244 assert(completedCount == 1); 245 assert(failureCount == 0); 246 observer.failure(null); 247 assert(failureCount == 1); 248 } 249 250 unittest 251 { 252 struct TestObserver1 253 { 254 void put(int n) 255 { 256 } 257 } 258 259 struct TestObserver2 260 { 261 void put(int n) 262 { 263 } 264 265 void completed() 266 { 267 } 268 } 269 270 struct TestObserver3 271 { 272 void put(int n) 273 { 274 } 275 276 void failure(Exception e) 277 { 278 } 279 } 280 281 struct TestObserver4 282 { 283 void put(int n) 284 { 285 } 286 287 void completed() 288 { 289 } 290 291 void failure(Exception e) 292 { 293 } 294 } 295 296 Observer!int o1 = observerObject!int(TestObserver1()); 297 Observer!int o2 = observerObject!int(TestObserver2()); 298 Observer!int o3 = observerObject!int(TestObserver3()); 299 Observer!int o4 = observerObject!int(TestObserver4()); 300 301 //dfmt off 302 o1.put(0); o1.completed(); o1.failure(null); 303 o2.put(0); o2.completed(); o2.failure(null); 304 o3.put(0); o3.completed(); o3.failure(null); 305 o4.put(0); o4.completed(); o4.failure(null); 306 //dfmt on 307 } 308 309 /// 310 final class NopObserver(E) : Observer!E 311 { 312 private: 313 this() 314 { 315 } 316 317 public: 318 void put(E) 319 { 320 } 321 322 void completed() 323 { 324 } 325 326 void failure(Exception) 327 { 328 } 329 330 public: 331 /// 332 static Observer!E instance() 333 { 334 import std.concurrency : initOnce; 335 336 static __gshared NopObserver!E inst; 337 return initOnce!inst(new NopObserver!E); 338 } 339 } 340 /// 341 unittest 342 { 343 Observer!int o1 = NopObserver!int.instance; 344 Observer!int o2 = NopObserver!int.instance; 345 assert(o1 !is null); 346 assert(o1 is o2); 347 } 348 349 /// 350 final class DoneObserver(E) : Observer!E 351 { 352 private: 353 this() 354 { 355 } 356 public: 357 this(Exception e) 358 { 359 _exception = e; 360 } 361 362 public: 363 Exception exception() @property 364 { 365 return _exception; 366 } 367 void exception(Exception e) @property 368 { 369 _exception = e; 370 } 371 372 public: 373 void put(E) 374 { 375 } 376 377 void completed() 378 { 379 } 380 381 void failure(Exception) 382 { 383 } 384 385 private: 386 Exception _exception; 387 388 public: 389 static Observer!E instance() 390 { 391 import std.concurrency : initOnce; 392 393 static __gshared DoneObserver!E inst; 394 return initOnce!inst(new DoneObserver!E); 395 } 396 } 397 /// 398 unittest 399 { 400 Observer!int o1 = DoneObserver!int.instance; 401 Observer!int o2 = DoneObserver!int.instance; 402 assert(o1 !is null); 403 assert(o1 is o2); 404 } 405 406 unittest 407 { 408 auto e = new Exception("test"); 409 auto observer = new DoneObserver!int(e); 410 assert(observer.exception is e); 411 } 412 413 /// 414 public class CompositeObserver(E) : Observer!E 415 { 416 private: 417 this() 418 { 419 } 420 421 public: 422 /// 423 this(Observer!E[] observers) 424 { 425 _observers = observers; 426 } 427 428 public: 429 /// 430 Observer!E[] observers() @property 431 { 432 return _observers; 433 } 434 435 public: 436 /// 437 void put(E obj) 438 { 439 foreach (observer; _observers) 440 .put(observer, obj); 441 } 442 /// 443 void completed() 444 { 445 foreach (observer; _observers) 446 observer.completed(); 447 } 448 /// 449 void failure(Exception e) 450 { 451 foreach (observer; _observers) 452 observer.failure(e); 453 } 454 /// 455 CompositeObserver!E add(Observer!E observer) 456 { 457 return new CompositeObserver!E(_observers ~ observer); 458 } 459 /// 460 Observer!E remove(Observer!E observer) 461 { 462 import std.algorithm : countUntil; 463 464 auto i = _observers.countUntil(observer); 465 if (i < 0) 466 return this; 467 468 if (_observers.length == 1) 469 return CompositeObserver!E.empty; 470 if (_observers.length == 2) 471 return _observers[1 - i]; 472 473 return new CompositeObserver!E(_observers[0 .. i] ~ _observers[i + 1 .. $]); 474 } 475 /// 476 CompositeObserver!E removeStrict(Observer!E observer) 477 { 478 import std.algorithm : countUntil; 479 480 auto i = _observers.countUntil(observer); 481 if (i < 0) 482 return this; 483 484 if (_observers.length == 1) 485 return CompositeObserver!E.empty; 486 if (_observers.length == 2) 487 { 488 if (i == 0) 489 return new CompositeObserver!E(_observers[1 .. $]); 490 if (i == 1) 491 return new CompositeObserver!E(_observers[0 .. 1]); 492 } 493 return new CompositeObserver!E(_observers[0 .. i] ~ _observers[i + 1 .. $]); 494 } 495 496 public: 497 /// 498 static CompositeObserver!E empty() 499 { 500 import std.concurrency : initOnce; 501 502 static __gshared CompositeObserver!E inst; 503 return initOnce!inst(new CompositeObserver!E); 504 } 505 506 private: 507 Observer!E[] _observers; 508 } 509 /// 510 unittest 511 { 512 int count = 0; 513 struct TestObserver 514 { 515 void put(int n) 516 { 517 count++; 518 } 519 } 520 521 auto c1 = new CompositeObserver!int; 522 c1.put(0); 523 auto o1 = observerObject!int(TestObserver()); 524 auto c2 = c1.add(o1); 525 c1.put(0); 526 assert(count == 0); 527 c2.put(0); 528 assert(count == 1); 529 auto c3 = c2.add(observerObject!int(TestObserver())); 530 c3.put(0); 531 assert(count == 3); 532 auto c4 = c3.remove(o1); 533 c4.put(0); 534 assert(count == 4); 535 } 536 unittest 537 { 538 int count = 0; 539 struct TestObserver 540 { 541 void put(int n) 542 { 543 count++; 544 } 545 } 546 auto c1 = new CompositeObserver!(int[]); 547 auto c2 = c1.add(observerObject!(int[])(TestObserver())); 548 549 assert(count == 0); 550 c2.put([1, 2]); 551 assert(count == 2); 552 } 553 554 ///The helper for the own observer. 555 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted, 556 void delegate(Exception) doFailure) 557 { 558 static struct AnonymouseObserver 559 { 560 public: 561 this(void delegate(E) doPut, void delegate() doCompleted, void delegate(Exception) doFailure) 562 { 563 _doPut = doPut; 564 _doCompleted = doCompleted; 565 _doFailure = doFailure; 566 } 567 568 public: 569 void put(E obj) 570 { 571 if (_doPut !is null) 572 _doPut(obj); 573 } 574 575 void completed() 576 { 577 if (_doCompleted !is null) 578 _doCompleted(); 579 } 580 581 void failure(Exception e) 582 { 583 if (_doFailure !is null) 584 _doFailure(e); 585 } 586 587 private: 588 void delegate(E) _doPut; 589 void delegate() _doCompleted; 590 void delegate(Exception) _doFailure; 591 } 592 593 return AnonymouseObserver(doPut, doCompleted, doFailure); 594 } 595 ///ditto 596 auto makeObserver(E)(void delegate(E) doPut, void delegate() doCompleted) 597 { 598 static struct AnonymouseObserver 599 { 600 public: 601 this(void delegate(E) doPut, void delegate() doCompleted) 602 { 603 _doPut = doPut; 604 _doCompleted = doCompleted; 605 } 606 607 public: 608 void put(E obj) 609 { 610 if (_doPut !is null) 611 _doPut(obj); 612 } 613 614 void completed() 615 { 616 if (_doCompleted !is null) 617 _doCompleted(); 618 } 619 620 private: 621 void delegate(E) _doPut; 622 void delegate() _doCompleted; 623 } 624 625 return AnonymouseObserver(doPut, doCompleted); 626 } 627 ///ditto 628 auto makeObserver(E)(void delegate(E) doPut, void delegate(Exception) doFailure) 629 { 630 static struct AnonymouseObserver 631 { 632 public: 633 this(void delegate(E) doPut, void delegate(Exception) doFailure) 634 { 635 _doPut = doPut; 636 _doFailure = doFailure; 637 } 638 639 public: 640 void put(E obj) 641 { 642 if (_doPut !is null) 643 _doPut(obj); 644 } 645 646 void failure(Exception e) 647 { 648 if (_doFailure !is null) 649 _doFailure(e); 650 } 651 652 private: 653 void delegate(E) _doPut; 654 void delegate(Exception) _doFailure; 655 } 656 657 return AnonymouseObserver(doPut, doFailure); 658 } 659 /// 660 unittest 661 { 662 int countPut = 0; 663 int countCompleted = 0; 664 int countFailure = 0; 665 666 auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; }, (Exception) { 667 countFailure++; 668 }); 669 670 .put(observer, 0); 671 assert(countPut == 1); 672 673 observer.completed(); 674 assert(countCompleted == 1); 675 676 observer.failure(null); 677 assert(countFailure == 1); 678 } 679 680 unittest 681 { 682 int countPut = 0; 683 int countCompleted = 0; 684 685 auto observer = makeObserver((int) { countPut++; }, () { countCompleted++; }); 686 687 .put(observer, 0); 688 assert(countPut == 1); 689 690 observer.completed(); 691 assert(countCompleted == 1); 692 693 static assert(!hasFailure!(typeof(observer))); 694 } 695 696 unittest 697 { 698 int countPut = 0; 699 int countFailure = 0; 700 701 auto observer = makeObserver((int) { countPut++; }, (Exception) { 702 countFailure++; 703 }); 704 705 .put(observer, 0); 706 assert(countPut == 1); 707 708 static assert(!hasCompleted!(typeof(observer))); 709 710 observer.failure(null); 711 assert(countFailure == 1); 712 } 713 714 package mixin template SimpleObserverImpl(TObserver, E) 715 { 716 public: 717 void put(E obj) 718 { 719 static if (hasFailure!TObserver) 720 { 721 try 722 { 723 putImpl(obj); 724 } 725 catch (Exception e) 726 { 727 _observer.failure(e); 728 _disposable.dispose(); 729 } 730 } 731 else 732 { 733 putImpl(obj); 734 } 735 } 736 737 static if (hasCompleted!TObserver) 738 { 739 void completed() 740 { 741 _observer.completed(); 742 _disposable.dispose(); 743 } 744 } 745 static if (hasFailure!TObserver) 746 { 747 void failure(Exception e) 748 { 749 _observer.failure(e); 750 _disposable.dispose(); 751 } 752 } 753 private: 754 TObserver _observer; 755 static if (hasCompleted!TObserver || hasFailure!TObserver) 756 { 757 Disposable _disposable; 758 } 759 }