1 /+++++++++++++++++++++++++++++ 2 + This module defines the concept of Observable. 3 +/ 4 module rx.observable; 5 6 import std.functional : unaryFun; 7 import std.range : put, isInputRange, isOutputRange, ElementType; 8 9 import rx.disposable; 10 import rx.observer; 11 import rx.util; 12 13 ///Tests if something is a Observable. 14 template isObservable(T, E) 15 { 16 enum bool isObservable = is(T.ElementType : E) && is(typeof({ 17 T observable = void; 18 Observer!E observer = void; 19 auto d = observable.subscribe(observer); 20 static assert(isDisposable!(typeof(d))); 21 }())); 22 } 23 /// 24 unittest 25 { 26 struct TestObservable 27 { 28 alias ElementType = int; 29 30 Disposable subscribe(T)(T observer) 31 { 32 static assert(isObserver!(T, int)); 33 return null; 34 } 35 } 36 37 static assert(isObservable!(TestObservable, int)); 38 static assert(!isObservable!(TestObservable, Object)); 39 } 40 41 ///Test if the observer can subscribe to the observable. 42 template isSubscribable(TObservable, TObserver) 43 { 44 enum bool isSubscribable = is(typeof({ 45 static assert(isOutputRange!(TObserver, TObservable.ElementType)); 46 47 TObservable observable = void; 48 TObserver observer = void; 49 auto d = observable.subscribe(observer); 50 static assert(isDisposable!(typeof(d))); 51 }())); 52 } 53 /// 54 unittest 55 { 56 struct TestDisposable 57 { 58 void dispose() 59 { 60 } 61 } 62 63 struct TestObserver 64 { 65 void put(int n) 66 { 67 } 68 69 void completed() 70 { 71 } 72 73 void failure(Exception e) 74 { 75 } 76 } 77 78 struct TestObservable 79 { 80 alias ElementType = int; 81 82 TestDisposable subscribe(TestObserver observer) 83 { 84 return TestDisposable(); 85 } 86 } 87 88 static assert(isSubscribable!(TestObservable, TestObserver)); 89 } 90 91 ///The helper for subscribe easier. 92 auto doSubscribe(TObservable, E)(auto ref TObservable observable, void delegate(E) doPut, 93 void delegate() doCompleted, void delegate(Exception) doFailure) 94 { 95 return doSubscribe(observable, makeObserver(doPut, doCompleted, doFailure)); 96 } 97 ///ditto 98 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 99 void delegate(E) doPut, void delegate() doCompleted) 100 { 101 return doSubscribe(observable, makeObserver(doPut, doCompleted)); 102 } 103 ///ditto 104 auto doSubscribe(TObservable, E)(auto ref TObservable observable, 105 void delegate(E) doPut, void delegate(Exception) doFailure) 106 { 107 return doSubscribe(observable, makeObserver(doPut, doFailure)); 108 } 109 ///ditto 110 auto doSubscribe(alias f, TObservable)(auto ref TObservable observable) 111 { 112 alias fun = unaryFun!f; 113 return doSubscribe(observable, (TObservable.ElementType obj) { 114 static if (__traits(compiles, { fun(obj); })) 115 { 116 fun(obj); 117 } 118 else 119 { 120 struct DoSubscribeObserver 121 { 122 void put(T)(T obj) 123 { 124 fun(obj); 125 } 126 } 127 128 DoSubscribeObserver observer; 129 .put(observer, obj); 130 } 131 }); 132 } 133 ///ditto 134 auto doSubscribe(TObservable, TObserver)(auto ref TObservable observable, auto ref TObserver observer) 135 { 136 import std.format : format; 137 138 static assert(isObservable!(TObservable, TObservable.ElementType), 139 format!"%s is invalid as an Observable"(TObservable.stringof)); 140 141 alias ElementType = TObservable.ElementType; 142 static if (isSubscribable!(TObservable, TObserver)) 143 return observable.subscribe(observer); 144 else static if (isSubscribable!(TObservable, Observer!ElementType)) 145 return observable.subscribe(observerObject!ElementType(observer)); 146 else 147 { 148 static assert(false, format!"%s can not subscribe '%s', it published by %s"( 149 TObserver.stringof, ElementType.stringof, TObservable.stringof)); 150 } 151 } 152 /// 153 unittest 154 { 155 struct TestObservable 156 { 157 alias ElementType = int; 158 159 auto subscribe(TObserver)(TObserver observer) 160 { 161 .put(observer, [0, 1, 2]); 162 return NopDisposable.instance; 163 } 164 } 165 166 TestObservable observable; 167 int[] result; 168 observable.doSubscribe!(n => result ~= n); 169 assert(result.length == 3); 170 } 171 172 /// 173 unittest 174 { 175 struct TestObserver 176 { 177 void put(int n) 178 { 179 } 180 } 181 182 struct TestObservable1 183 { 184 alias ElementType = int; 185 Disposable subscribe(Observer!int observer) 186 { 187 return null; 188 } 189 } 190 191 struct TestObservable2 192 { 193 alias ElementType = int; 194 Disposable subscribe(T)(T observer) 195 { 196 return null; 197 } 198 } 199 200 TestObservable1 o1; 201 auto d0 = o1.doSubscribe((int n) { }, () { }, (Exception e) { }); 202 auto d1 = o1.doSubscribe((int n) { }, () { }); 203 auto d2 = o1.doSubscribe((int n) { }, (Exception e) { }); 204 auto d3 = o1.doSubscribe((int n) { }); 205 auto d4 = o1.doSubscribe(TestObserver()); 206 TestObservable2 o2; 207 auto d5 = o2.doSubscribe((int n) { }, () { }, (Exception e) { }); 208 auto d6 = o2.doSubscribe((int n) { }, () { }); 209 auto d7 = o2.doSubscribe((int n) { }, (Exception e) { }); 210 auto d8 = o2.doSubscribe((int n) { }); 211 auto d9 = o2.doSubscribe(TestObserver()); 212 } 213 214 unittest 215 { 216 static struct TestObservable 217 { 218 alias ElementType = int[]; 219 220 Disposable subscribe(TObserver)(TObserver observer) 221 { 222 .put(observer, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 223 return null; 224 } 225 } 226 227 TestObservable observable; 228 229 int sum1 = 0; 230 231 void add(int n) 232 { 233 sum1 += n; 234 } 235 236 observable.doSubscribe!(add); // by value 237 assert(sum1 == 55); 238 239 int sum2 = 0; 240 observable.doSubscribe!(n => sum2 += n); // by value 241 assert(sum2 == 55); 242 243 int size = 0; 244 observable.doSubscribe!(arr => size += arr.length); // by array 245 assert(size == 10); 246 } 247 248 ///Wrapper for Observable objects. 249 interface Observable(E) 250 { 251 alias ElementType = E; 252 253 Disposable subscribe(Observer!E observer); 254 } 255 256 unittest 257 { 258 static assert(isObservable!(Observable!int, int)); 259 } 260 261 unittest 262 { 263 static struct TestNoObservable 264 { 265 Disposable subscribe(Observer!int observer) 266 { 267 return null; 268 } 269 } 270 271 static assert(!isObservable!(TestNoObservable, int)); 272 } 273 274 ///Class that implements Observable interface and wraps the subscribe method in virtual function. 275 class ObservableObject(R, E) : Observable!E 276 { 277 public: 278 this(R observable) 279 { 280 _observable = observable; 281 } 282 283 public: 284 Disposable subscribe(Observer!E observer) 285 { 286 return disposableObject(_observable.subscribe(observer)); 287 } 288 289 private: 290 R _observable; 291 } 292 293 ///Wraps subscribe method in virtual function. 294 template observableObject(E) 295 { 296 Observable!E observableObject(R)(auto ref R observable) 297 { 298 static if (is(R : Observable!E)) 299 { 300 return observable; 301 } 302 else 303 { 304 return new ObservableObject!(R, E)(observable); 305 } 306 } 307 } 308 /// 309 unittest 310 { 311 int subscribeCount = 0; 312 class TestObservable : Observable!int 313 { 314 Disposable subscribe(Observer!int observer) 315 { 316 subscribeCount++; 317 return NopDisposable.instance; 318 } 319 } 320 321 auto test = new TestObservable; 322 auto observable = observableObject!int(test); 323 assert(observable is test); 324 assert(subscribeCount == 0); 325 auto d = observable.subscribe(null); 326 assert(subscribeCount == 1); 327 } 328 329 unittest 330 { 331 int disposeCount = 0; 332 int subscribeCount = 0; 333 334 struct TestDisposable 335 { 336 void dispose() 337 { 338 disposeCount++; 339 } 340 } 341 342 struct TestObservable 343 { 344 TestDisposable subscribe(Observer!int observer) 345 { 346 subscribeCount++; 347 return TestDisposable(); 348 } 349 } 350 351 Observable!int observable = observableObject!int(TestObservable()); 352 assert(subscribeCount == 0); 353 Disposable disposable = observable.subscribe(null); 354 assert(subscribeCount == 1); 355 assert(disposeCount == 0); 356 disposable.dispose(); 357 assert(disposeCount == 1); 358 } 359 360 //######################### 361 // Defer 362 //######################### 363 ///Create observable by function that template parameter. 364 auto defer(E, alias f)() 365 { 366 static struct DeferObservable 367 { 368 alias ElementType = E; 369 370 public: 371 auto subscribe(TObserver)(TObserver observer) 372 { 373 static struct DeferObserver 374 { 375 public: 376 this(TObserver observer, EventSignal signal) 377 { 378 _observer = observer; 379 _signal = signal; 380 } 381 382 public: 383 void put(E obj) 384 { 385 if (_signal.signal) 386 return; 387 388 static if (hasFailure!TObserver) 389 { 390 try 391 { 392 .put(_observer, obj); 393 } 394 catch (Exception e) 395 { 396 _observer.failure(e); 397 } 398 } 399 else 400 { 401 .put(_observer, obj); 402 } 403 } 404 405 void completed() 406 { 407 if (_signal.signal) 408 return; 409 _signal.setSignal(); 410 411 static if (hasCompleted!TObserver) 412 { 413 _observer.completed(); 414 } 415 } 416 417 void failure(Exception e) 418 { 419 if (_signal.signal) 420 return; 421 _signal.setSignal(); 422 423 static if (hasFailure!TObserver) 424 { 425 _observer.failure(e); 426 } 427 } 428 429 private: 430 TObserver _observer; 431 EventSignal _signal; 432 } 433 434 alias fun = unaryFun!f; 435 auto d = new SignalDisposable; 436 fun(DeferObserver(observer, d.signal)); 437 return d; 438 } 439 } 440 441 return DeferObservable(); 442 } 443 /// 444 unittest 445 { 446 auto sub = defer!(int, (observer) { 447 observer.put(1); 448 observer.put(2); 449 observer.put(3); 450 observer.completed(); 451 }); 452 453 int countPut = 0; 454 int countCompleted = 0; 455 struct A 456 { 457 void put(int n) 458 { 459 countPut++; 460 } 461 462 void completed() 463 { 464 countCompleted++; 465 } 466 } 467 468 assert(countPut == 0); 469 assert(countCompleted == 0); 470 auto d = sub.doSubscribe(A()); 471 assert(countPut == 3); 472 assert(countCompleted == 1); 473 } 474 475 unittest 476 { 477 auto sub = defer!(int, (observer) { 478 observer.put(0); 479 observer.failure(new Exception("")); 480 observer.put(1); 481 }); 482 483 int countPut = 0; 484 int countFailure = 0; 485 struct A 486 { 487 void put(int n) 488 { 489 countPut++; 490 } 491 492 void failure(Exception e) 493 { 494 countFailure++; 495 } 496 } 497 498 assert(countPut == 0); 499 assert(countFailure == 0); 500 auto d = sub.doSubscribe(A()); 501 assert(countPut == 1); 502 assert(countFailure == 1); 503 } 504 505 unittest 506 { 507 auto sub = defer!(int, (observer) { 508 observer.put(0); 509 observer.failure(new Exception("")); 510 observer.put(1); 511 }); 512 513 int countPut = 0; 514 struct A 515 { 516 void put(int n) 517 { 518 countPut++; 519 } 520 } 521 522 assert(countPut == 0); 523 auto d = sub.doSubscribe(A()); 524 assert(countPut == 1); 525 } 526 527 unittest 528 { 529 Disposable subscribeImpl(Observer!int observer) 530 { 531 .put(observer, 1); 532 return null; 533 } 534 535 import std.array : appender; 536 537 auto buf = appender!(int[]); 538 539 auto put1 = defer!int(&subscribeImpl); 540 auto d = put1.doSubscribe(buf); 541 542 assert(buf.data.length == 1); 543 assert(buf.data[0] == 1); 544 assert(d is null); 545 } 546 547 auto defer(E, TSubscribe)(auto ref TSubscribe subscribeImpl) 548 { 549 struct DeferObservable 550 { 551 alias ElementType = E; 552 553 TSubscribe _subscribeImpl; 554 555 this(ref TSubscribe subscribeImpl) 556 { 557 _subscribeImpl = subscribeImpl; 558 } 559 560 auto subscribe(TObserver)(auto ref TObserver observer) 561 { 562 return _subscribeImpl(observer); 563 } 564 } 565 566 return DeferObservable(subscribeImpl); 567 } 568 569 unittest 570 { 571 import std.array : appender; 572 573 auto buf = appender!(int[]); 574 575 auto put12 = defer!int((Observer!int observer) { 576 .put(observer, 1); 577 .put(observer, 2); 578 return NopDisposable.instance; 579 }); 580 auto d = put12.doSubscribe(buf); 581 582 assert(buf.data.length == 2); 583 assert(buf.data[0] == 1); 584 assert(buf.data[1] == 2); 585 } 586 587 auto empty(E)() 588 { 589 static struct EmptyObservable 590 { 591 alias ElementType = E; 592 593 Disposable subscribe(TObserver)(auto ref TObserver observer) 594 { 595 static if (hasCompleted!TObserver) 596 { 597 observer.completed(); 598 } 599 return NopDisposable.instance; 600 } 601 } 602 603 return EmptyObservable(); 604 } 605 606 unittest 607 { 608 auto completed = false; 609 auto o = empty!int(); 610 611 assert(!completed); 612 auto d = o.doSubscribe((int n) { }, () { completed = true; }); 613 assert(completed); 614 } 615 616 auto never(E)() 617 { 618 static struct NeverObservable 619 { 620 alias ElementType = E; 621 622 Disposable subscribe(TObserver)(auto ref TObserver observer) 623 { 624 return NopDisposable.instance; 625 } 626 } 627 628 return NeverObservable(); 629 } 630 631 unittest 632 { 633 auto o = never!int(); 634 auto d = o.doSubscribe((int) { }); 635 d.dispose(); 636 } 637 638 auto error(E)(auto ref Exception e) 639 { 640 static struct ErrorObservable 641 { 642 alias ElementType = E; 643 644 Exception _e; 645 646 this(ref Exception e) 647 { 648 _e = e; 649 } 650 651 Disposable subscribe(TObserver)(auto ref TObserver observer) 652 { 653 static if (hasFailure!TObserver) 654 { 655 observer.failure(_e); 656 } 657 return NopDisposable.instance; 658 } 659 } 660 661 return ErrorObservable(e); 662 } 663 664 unittest 665 { 666 auto expected = new Exception("TEST"); 667 auto o = error!int(expected); 668 669 Exception actual = null; 670 o.doSubscribe((int n) { }, (Exception e) { actual = e; }); 671 assert(actual is expected); 672 } 673 674 /// 675 auto from(R)(auto ref R input) if (isInputRange!R) 676 { 677 alias E = ElementType!R; 678 679 static struct FromObservable 680 { 681 alias ElementType = E; 682 683 this(ref R input) 684 { 685 this.input = input; 686 } 687 688 Disposable subscribe(TObserver)(auto ref TObserver observer) 689 if (isOutputRange!(TObserver, ElementType)) 690 { 691 .put(observer, input); 692 return NopDisposable.instance; 693 } 694 695 R input; 696 } 697 698 return FromObservable(input); 699 } 700 /// 701 alias asObservable = from; 702 703 /// 704 unittest 705 { 706 import std.range : iota; 707 708 auto obs = from(iota(10)); 709 auto res = new int[10]; 710 auto d = obs.subscribe(res[]); 711 scope (exit) 712 d.dispose(); 713 714 assert(res.length == 10); 715 assert(res[0] == 0); 716 assert(res[9] == 9); 717 } 718 719 /// 720 unittest 721 { 722 import std.range : iota; 723 724 auto obs = iota(10).asObservable(); 725 auto res = new int[10]; 726 auto d = obs.subscribe(res[]); 727 scope (exit) 728 d.dispose(); 729 730 assert(res.length == 10); 731 assert(res[0] == 0); 732 assert(res[9] == 9); 733 }