1 /+++++++++++++++++++++++++++++ 2 + This module defines the concept of Observable. 3 +/ 4 module rx.observable; 5 6 import std.functional : unaryFun; 7 import std.range : put, isInputRange, isOutputRange, ElementType; 8 9 import rx.disposable; 10 import rx.observer; 11 import rx.util; 12 13 ///Tests if something is a Observable. 14 template isObservable(T, E) 15 { 16 enum bool isObservable = is(T.ElementType : E) && is(typeof({ 17 T observable = void; 18 Observer!E observer = void; 19 auto d = observable.subscribe(observer); 20 static assert(isDisposable!(typeof(d))); 21 }())); 22 } 23 ///ditto 24 template isObservable(TObservable) 25 { 26 enum bool isObservable = __traits(compiles, { 27 static assert(isObservable!(TObservable, TObservable.ElementType)); 28 }); 29 30 } 31 32 /// 33 unittest 34 { 35 struct TestObservable 36 { 37 alias ElementType = int; 38 39 Disposable subscribe(T)(T observer) 40 { 41 static assert(isObserver!(T, int)); 42 return null; 43 } 44 } 45 46 static assert(isObservable!(TestObservable)); 47 static assert(isObservable!(TestObservable, int)); 48 static assert(!isObservable!(TestObservable, Object)); 49 } 50 51 /// 52 unittest 53 { 54 static assert(isObservable!(Observable!int)); 55 static assert(!isObservable!(Observer!int)); 56 static assert(!isObservable!(string)); 57 static assert(!isObservable!(Object)); 58 } 59 60 ///Test if the observer can subscribe to the observable. 61 template isSubscribable(TObservable, TObserver) 62 { 63 enum bool isSubscribable = is(typeof({ 64 static assert(isOutputRange!(TObserver, TObservable.ElementType)); 65 66 TObservable observable = void; 67 TObserver observer = void; 68 auto d = observable.subscribe(observer); 69 static assert(isDisposable!(typeof(d))); 70 }())); 71 } 72 /// 73 unittest 74 { 75 struct TestDisposable 76 { 77 void dispose() 78 { 79 } 80 } 81 82 struct TestObserver 83 { 84 void put(int n) 85 { 86 } 87 88 void completed() 89 { 90 } 91 92 void failure(Exception e) 93 { 94 } 95 } 96 97 struct TestObservable 98 { 99 alias ElementType = int; 100 101 TestDisposable subscribe(TestObserver observer) 102 { 103 return TestDisposable(); 104 } 105 } 106 107 static assert(isSubscribable!(TestObservable, TestObserver)); 108 } 109 110 ///The helper for subscribe easier. 111 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut, 112 void delegate() doCompleted, void delegate(Exception) doFailure) 113 { 114 return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure)); 115 } 116 ///ditto 117 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 118 void delegate(E) doPut, void delegate() doCompleted) 119 { 120 return doSubscribe(observable, makeObserver(doPut, doCompleted)); 121 } 122 ///ditto 123 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 124 void delegate(E) doPut, void delegate(Exception) doFailure) 125 { 126 return doSubscribe(observable, makeObserver(doPut, doFailure)); 127 } 128 ///ditto 129 auto doSubscribe(alias f, TObservable)(auto ref TObservable observable) 130 { 131 alias fun = unaryFun!f; 132 return doSubscribe(observable, (TObservable.ElementType obj) { 133 static if (__traits(compiles, { fun(obj); })) 134 { 135 fun(obj); 136 } 137 else 138 { 139 struct DoSubscribeObserver 140 { 141 void put(T)(T obj) 142 { 143 fun(obj); 144 } 145 } 146 147 DoSubscribeObserver observer; 148 .put(observer, obj); 149 } 150 }); 151 } 152 ///ditto 153 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer) 154 { 155 import std.format : format; 156 157 static assert(isObservable!(TObservable, TObservable.ElementType), 158 format!"%s is invalid as an Observable"(TObservable.stringof)); 159 160 alias ElementType = TObservable.ElementType; 161 static if (isSubscribable!(TObservable, TObserver)) 162 return observable.subscribe(observer); 163 else static if (isSubscribable!(TObservable, Observer!ElementType)) 164 return observable.subscribe(observerObject!ElementType(observer)); 165 else 166 { 167 static assert(false, format!"%s can not subscribe '%s', it published by %s"( 168 TObserver.stringof, ElementType.stringof, TObservable.stringof)); 169 } 170 } 171 /// 172 unittest 173 { 174 struct TestObservable 175 { 176 alias ElementType = int; 177 178 auto subscribe(TObserver)(TObserver observer) 179 { 180 .put(observer, [0, 1, 2]); 181 return NopDisposable.instance; 182 } 183 } 184 185 TestObservable observable; 186 int[] result; 187 observable.doSubscribe!(n => result ~= n); 188 assert(result.length == 3); 189 } 190 191 /// 192 unittest 193 { 194 struct TestObserver 195 { 196 void put(int n) 197 { 198 } 199 } 200 201 struct TestObservable1 202 { 203 alias ElementType = int; 204 Disposable subscribe(Observer!int observer) 205 { 206 return null; 207 } 208 } 209 210 struct TestObservable2 211 { 212 alias ElementType = int; 213 Disposable subscribe(T)(T observer) 214 { 215 return null; 216 } 217 } 218 219 TestObservable1 o1; 220 auto d0 = o1.doSubscribe((int n) {}, () {}, (Exception e) {}); 221 auto d1 = o1.doSubscribe((int n) {}, () {}); 222 auto d2 = o1.doSubscribe((int n) {}, (Exception e) {}); 223 auto d3 = o1.doSubscribe((int n) {}); 224 auto d4 = o1.doSubscribe(TestObserver()); 225 TestObservable2 o2; 226 auto d5 = o2.doSubscribe((int n) {}, () {}, (Exception e) {}); 227 auto d6 = o2.doSubscribe((int n) {}, () {}); 228 auto d7 = o2.doSubscribe((int n) {}, (Exception e) {}); 229 auto d8 = o2.doSubscribe((int n) {}); 230 auto d9 = o2.doSubscribe(TestObserver()); 231 } 232 233 unittest 234 { 235 static struct TestObservable 236 { 237 alias ElementType = int[]; 238 239 Disposable subscribe(TObserver)(TObserver observer) 240 { 241 .put(observer, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 242 return null; 243 } 244 } 245 246 TestObservable observable; 247 248 int sum1 = 0; 249 250 void add(int n) 251 { 252 sum1 += n; 253 } 254 255 observable.doSubscribe!(add); // by value 256 assert(sum1 == 55); 257 258 int sum2 = 0; 259 observable.doSubscribe!(n => sum2 += n); // by value 260 assert(sum2 == 55); 261 262 int size = 0; 263 observable.doSubscribe!(arr => size += arr.length); // by array 264 assert(size == 10); 265 } 266 267 ///Wrapper for Observable objects. 268 interface Observable(E) 269 { 270 alias ElementType = E; 271 272 Disposable subscribe(Observer!E observer); 273 } 274 275 unittest 276 { 277 static assert(isObservable!(Observable!int, int)); 278 } 279 280 unittest 281 { 282 static struct TestNoObservable 283 { 284 Disposable subscribe(Observer!int observer) 285 { 286 return null; 287 } 288 } 289 290 static assert(!isObservable!(TestNoObservable, int)); 291 } 292 293 ///Class that implements Observable interface and wraps the subscribe method in virtual function. 294 class ObservableObject(R, E) : Observable!E 295 { 296 public: 297 this(R observable) 298 { 299 _observable = observable; 300 } 301 302 public: 303 Disposable subscribe(Observer!E observer) 304 { 305 return disposableObject(_observable.subscribe(observer)); 306 } 307 308 private: 309 R _observable; 310 } 311 312 ///Wraps subscribe method in virtual function. 313 template observableObject(E) 314 { 315 Observable!E observableObject(R)(auto ref R observable) 316 { 317 static if (is(R : Observable!E)) 318 { 319 return observable; 320 } 321 else 322 { 323 return new ObservableObject!(R, E)(observable); 324 } 325 } 326 } 327 /// 328 unittest 329 { 330 int subscribeCount = 0; 331 class TestObservable : Observable!int 332 { 333 Disposable subscribe(Observer!int observer) 334 { 335 subscribeCount++; 336 return NopDisposable.instance; 337 } 338 } 339 340 auto test = new TestObservable; 341 auto observable = observableObject!int(test); 342 assert(observable is test); 343 assert(subscribeCount == 0); 344 auto d = observable.subscribe(null); 345 assert(subscribeCount == 1); 346 } 347 348 unittest 349 { 350 int disposeCount = 0; 351 int subscribeCount = 0; 352 353 struct TestDisposable 354 { 355 void dispose() 356 { 357 disposeCount++; 358 } 359 } 360 361 struct TestObservable 362 { 363 TestDisposable subscribe(Observer!int observer) 364 { 365 subscribeCount++; 366 return TestDisposable(); 367 } 368 } 369 370 Observable!int observable = observableObject!int(TestObservable()); 371 assert(subscribeCount == 0); 372 Disposable disposable = observable.subscribe(null); 373 assert(subscribeCount == 1); 374 assert(disposeCount == 0); 375 disposable.dispose(); 376 assert(disposeCount == 1); 377 } 378 379 //######################### 380 // Defer 381 //######################### 382 private struct DeferObserver(TObserver, E) 383 { 384 public: 385 this(TObserver observer, EventSignal signal) 386 { 387 _observer = observer; 388 _signal = signal; 389 } 390 391 public: 392 void put(E obj) 393 { 394 if (_signal.signal) 395 return; 396 397 static if (hasFailure!TObserver) 398 { 399 try 400 { 401 .put(_observer, obj); 402 } 403 catch (Exception e) 404 { 405 _observer.failure(e); 406 } 407 } 408 else 409 { 410 .put(_observer, obj); 411 } 412 } 413 414 void completed() 415 { 416 if (_signal.signal) 417 return; 418 _signal.setSignal(); 419 420 static if (hasCompleted!TObserver) 421 { 422 _observer.completed(); 423 } 424 } 425 426 void failure(Exception e) 427 { 428 if (_signal.signal) 429 return; 430 _signal.setSignal(); 431 432 static if (hasFailure!TObserver) 433 { 434 _observer.failure(e); 435 } 436 } 437 438 private: 439 TObserver _observer; 440 EventSignal _signal; 441 } 442 443 ///Create observable by function that template parameter. 444 auto defer(E, alias f)() 445 { 446 static struct DeferObservable 447 { 448 alias ElementType = E; 449 450 public: 451 auto subscribe(TObserver)(TObserver observer) 452 { 453 alias ObserverType = DeferObserver!(TObserver, E); 454 alias fun = unaryFun!f; 455 auto d = new SignalDisposable; 456 static if (__traits(compiles, { 457 auto disposable = fun(ObserverType(observer, d.signal)); 458 static assert(isDisposable!(typeof(disposable))); 459 })) 460 { 461 auto subscription = fun(ObserverType(observer, d.signal)); 462 return new CompositeDisposable(subscription, d); 463 } 464 else 465 { 466 fun(ObserverType(observer, d.signal)); 467 return d; 468 } 469 } 470 } 471 472 return DeferObservable(); 473 } 474 /// 475 unittest 476 { 477 auto sub = defer!(int, (observer) { 478 observer.put(1); 479 observer.put(2); 480 observer.put(3); 481 observer.completed(); 482 }); 483 484 int countPut = 0; 485 int countCompleted = 0; 486 struct A 487 { 488 void put(int n) 489 { 490 countPut++; 491 } 492 493 void completed() 494 { 495 countCompleted++; 496 } 497 } 498 499 assert(countPut == 0); 500 assert(countCompleted == 0); 501 auto d = sub.doSubscribe(A()); 502 assert(countPut == 3); 503 assert(countCompleted == 1); 504 } 505 506 unittest 507 { 508 auto sub = defer!(int, (observer) { 509 observer.put(0); 510 observer.failure(new Exception("")); 511 observer.put(1); 512 }); 513 514 int countPut = 0; 515 int countFailure = 0; 516 struct A 517 { 518 void put(int n) 519 { 520 countPut++; 521 } 522 523 void failure(Exception e) 524 { 525 countFailure++; 526 } 527 } 528 529 assert(countPut == 0); 530 assert(countFailure == 0); 531 auto d = sub.doSubscribe(A()); 532 assert(countPut == 1); 533 assert(countFailure == 1); 534 } 535 536 unittest 537 { 538 auto sub = defer!(int, (observer) { 539 observer.put(0); 540 observer.failure(new Exception("")); 541 observer.put(1); 542 }); 543 544 int countPut = 0; 545 struct A 546 { 547 void put(int n) 548 { 549 countPut++; 550 } 551 } 552 553 assert(countPut == 0); 554 auto d = sub.doSubscribe(A()); 555 assert(countPut == 1); 556 } 557 558 unittest 559 { 560 Disposable subscribeImpl(Observer!int observer) 561 { 562 .put(observer, 1); 563 return null; 564 } 565 566 import std.array : appender; 567 568 auto buf = appender!(int[]); 569 570 auto put1 = defer!int(&subscribeImpl); 571 auto d = put1.subscribe(buf); 572 573 assert(buf.data.length == 1); 574 assert(buf.data[0] == 1); 575 assert(d !is null); 576 } 577 578 unittest 579 { 580 auto sub = defer!(int, (observer) { 581 observer.put(1); 582 return NopDisposable.instance; 583 }); 584 585 auto called = false; 586 sub.doSubscribe((int _) { called = true; }); 587 assert(called); 588 } 589 590 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl) 591 { 592 struct DeferObservable 593 { 594 alias ElementType = E; 595 596 TSubscribe _subscribeImpl; 597 598 this(ref TSubscribe subscribeImpl) 599 { 600 _subscribeImpl = subscribeImpl; 601 } 602 603 auto subscribe(TObserver)(auto ref TObserver observer) 604 { 605 alias ObserverType = DeferObserver!(TObserver, E); 606 607 auto cancel = new SignalDisposable; 608 static if (__traits(compiles, { 609 auto d = _subscribeImpl(ObserverType.init); 610 static assert(isDisposable!(typeof(d))); 611 })) 612 { 613 auto subscription = disposableObject(_subscribeImpl(ObserverType(observer, 614 cancel.signal))); 615 return new CompositeDisposable(cancel, subscription); 616 } 617 else static if (__traits(compiles, { 618 _subscribeImpl(ObserverType.init); 619 })) 620 { 621 _subscribeImpl(ObserverType(observer, cancel.signal)); 622 return cancel; 623 } 624 else static if (__traits(compiles, { 625 auto d = _subscribeImpl(Observer!E.init); 626 static assert(isDisposable!(typeof(d))); 627 })) 628 { 629 auto subscription = _subscribeImpl(observerObject!E(ObserverType(observer, 630 cancel.signal))); 631 return new CompositeDisposable(cancel, subscription); 632 } 633 else static if (__traits(compiles, { 634 _subscribeImpl(Observer!E.init); 635 })) 636 { 637 _subscribeImpl(observerObject!E(ObserverType(observer, cancel.signal))); 638 return cancel; 639 } 640 else 641 { 642 static assert(false); 643 } 644 } 645 } 646 647 return DeferObservable(subscribeImpl); 648 } 649 650 unittest 651 { 652 import std.array : appender; 653 654 auto buf = appender!(int[]); 655 656 auto put12 = defer!int((Observer!int observer) { 657 .put(observer, 1); 658 .put(observer, 2); 659 return NopDisposable.instance; 660 }); 661 auto d = put12.subscribe(buf); 662 663 assert(buf.data.length == 2); 664 assert(buf.data[0] == 1); 665 assert(buf.data[1] == 2); 666 } 667 668 unittest 669 { 670 int last = 0; 671 struct TestObserver 672 { 673 void put(int n) 674 { 675 last = n; 676 } 677 } 678 679 auto disposed1 = false; 680 auto sub1 = defer!int((Observer!int observer) { 681 observer.put(1); 682 return new AnonymousDisposable({ disposed1 = true; }); 683 }); 684 685 auto sub2 = defer!int((Observer!int observer) { observer.put(100); }); 686 687 TestObserver observer; 688 last = 0; 689 auto disposable1 = sub1.subscribe(observer); 690 assert(last == 1); 691 assert(!disposed1); 692 disposable1.dispose(); 693 assert(disposed1); 694 695 last = 0; 696 auto disposable2 = sub2.subscribe(observer); 697 assert(last == 100); 698 disposable2.dispose(); 699 } 700 701 unittest 702 { 703 size_t hasBeenCalled1 = 0; 704 struct SubscribeImpl1 705 { 706 Disposable opCall(TObserver)(TObserver observer) 707 { 708 hasBeenCalled1++; 709 return NopDisposable.instance; 710 } 711 } 712 713 size_t hasBeenCalled2 = 0; 714 struct SubscribeImpl2 715 { 716 void opCall(TObserver)(TObserver observer) 717 { 718 hasBeenCalled2++; 719 } 720 } 721 722 struct TestObserver 723 { 724 void put(int _) 725 { 726 } 727 } 728 729 SubscribeImpl1 impl1; 730 auto sub1 = defer!int(impl1); 731 732 assert(hasBeenCalled1 == 0); 733 auto disposable1 = sub1.subscribe(TestObserver()); 734 assert(hasBeenCalled1 == 1); 735 disposable1.dispose(); 736 737 SubscribeImpl2 impl2; 738 auto sub2 = defer!int(impl2); 739 740 assert(hasBeenCalled2 == 0); 741 auto disposable2 = sub2.subscribe(TestObserver()); 742 assert(hasBeenCalled2 == 1); 743 disposable2.dispose(); 744 } 745 746 unittest 747 { 748 class SubscribeImpl1 749 { 750 public: 751 void publish() 752 { 753 .put(_observer, 1); 754 } 755 756 public: 757 Disposable opCall(TObserver)(TObserver observer) 758 { 759 _observer = observerObject!int(observer); 760 return NopDisposable.instance; 761 } 762 763 private: 764 Observer!int _observer; 765 } 766 767 import rx.subject : CounterObserver; 768 769 auto observer1 = new CounterObserver!int; 770 auto impl1 = new SubscribeImpl1; 771 auto sub1 = defer!int(impl1); 772 auto disposable1 = sub1.subscribe(observer1); 773 disposable1.dispose(); 774 775 assert(observer1.putCount == 0); 776 impl1.publish(); 777 assert(observer1.putCount == 0); 778 779 auto observer2 = new CounterObserver!int; 780 auto impl2 = new SubscribeImpl1; 781 auto sub2 = defer!int(impl2); 782 auto disposable2 = sub2.subscribe(observer2); 783 assert(observer2.putCount == 0); 784 impl2.publish(); 785 assert(observer2.putCount == 1); 786 disposable2.dispose(); 787 impl2.publish(); 788 assert(observer2.putCount == 1); 789 } 790 791 /// 792 auto empty(E)() 793 { 794 static struct EmptyObservable 795 { 796 alias ElementType = E; 797 798 Disposable subscribe(TObserver)(auto ref TObserver observer) 799 { 800 static if (hasCompleted!TObserver) 801 { 802 observer.completed(); 803 } 804 return NopDisposable.instance; 805 } 806 } 807 808 return EmptyObservable(); 809 } 810 811 unittest 812 { 813 auto completed = false; 814 auto o = empty!int(); 815 816 assert(!completed); 817 auto d = o.doSubscribe((int n) {}, () { completed = true; }); 818 assert(completed); 819 } 820 821 /// 822 auto never(E)() 823 { 824 static struct NeverObservable 825 { 826 alias ElementType = E; 827 828 Disposable subscribe(TObserver)(auto ref TObserver observer) 829 { 830 return NopDisposable.instance; 831 } 832 } 833 834 return NeverObservable(); 835 } 836 837 unittest 838 { 839 auto o = never!int(); 840 auto d = o.doSubscribe((int) {}); 841 d.dispose(); 842 } 843 844 /// 845 auto error(E)(auto ref Exception e) 846 { 847 static struct ErrorObservable 848 { 849 alias ElementType = E; 850 851 Exception _e; 852 853 this(ref Exception e) 854 { 855 _e = e; 856 } 857 858 Disposable subscribe(TObserver)(auto ref TObserver observer) 859 { 860 static if (hasFailure!TObserver) 861 { 862 observer.failure(_e); 863 } 864 return NopDisposable.instance; 865 } 866 } 867 868 return ErrorObservable(e); 869 } 870 871 unittest 872 { 873 auto expected = new Exception("TEST"); 874 auto o = error!int(expected); 875 876 Exception actual = null; 877 o.doSubscribe((int n) {}, (Exception e) { actual = e; }); 878 assert(actual is expected); 879 } 880 881 /// 882 auto from(R)(auto ref R input) if (isInputRange!R) 883 { 884 alias E = ElementType!R; 885 886 static struct FromObservable 887 { 888 alias ElementType = E; 889 890 this(ref R input) 891 { 892 this.input = input; 893 } 894 895 Disposable subscribe(TObserver)(auto ref TObserver observer) 896 if (isOutputRange!(TObserver, ElementType)) 897 { 898 .put(observer, input); 899 static if (hasCompleted!TObserver) 900 observer.completed(); 901 902 return NopDisposable.instance; 903 } 904 905 R input; 906 } 907 908 return FromObservable(input); 909 } 910 /// 911 alias asObservable = from; 912 913 /// 914 unittest 915 { 916 import std.range : iota; 917 918 auto obs = from(iota(10)); 919 auto res = new int[10]; 920 auto d = obs.subscribe(res[]); 921 scope (exit) 922 d.dispose(); 923 924 assert(res.length == 10); 925 assert(res[0] == 0); 926 assert(res[9] == 9); 927 } 928 929 /// 930 unittest 931 { 932 import std.range : iota; 933 934 auto obs = iota(10).asObservable(); 935 auto res = new int[10]; 936 auto d = obs.subscribe(res[]); 937 scope (exit) 938 d.dispose(); 939 940 assert(res.length == 10); 941 assert(res[0] == 0); 942 assert(res[9] == 9); 943 } 944 945 /// 946 unittest 947 { 948 import rx; 949 import std.range : iota; 950 951 auto observable = iota(10).asObservable(); 952 auto observer = new CounterObserver!int; 953 954 auto disposable = observable.subscribe(observer); 955 scope (exit) 956 disposable.dispose(); 957 958 assert(observer.putCount == 10); 959 assert(observer.completedCount == 1); 960 }